mongo_client.py 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032
  1. # Copyright 2009-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Tools for connecting to MongoDB.
  15. .. seealso:: :doc:`/examples/high_availability` for examples of connecting
  16. to replica sets or sets of mongos servers.
  17. To get a :class:`~pymongo.database.Database` instance from a
  18. :class:`MongoClient` use either dictionary-style or attribute-style
  19. access:
  20. .. doctest::
  21. >>> from pymongo import MongoClient
  22. >>> c = MongoClient()
  23. >>> c.test_database
  24. Database(MongoClient('localhost', 27017), u'test_database')
  25. >>> c['test-database']
  26. Database(MongoClient('localhost', 27017), u'test-database')
  27. """
  28. import contextlib
  29. import datetime
  30. import threading
  31. import warnings
  32. import weakref
  33. from collections import defaultdict
  34. from bson.py3compat import (integer_types,
  35. string_type)
  36. from pymongo import (common,
  37. database,
  38. message,
  39. periodic_executor,
  40. uri_parser)
  41. from pymongo.client_options import ClientOptions
  42. from pymongo.cursor_manager import CursorManager
  43. from pymongo.errors import (AutoReconnect,
  44. ConfigurationError,
  45. ConnectionFailure,
  46. InvalidOperation,
  47. InvalidURI,
  48. NetworkTimeout,
  49. NotMasterError,
  50. OperationFailure)
  51. from pymongo.read_preferences import ReadPreference
  52. from pymongo.server_selectors import (writable_preferred_server_selector,
  53. writable_server_selector)
  54. from pymongo.server_type import SERVER_TYPE
  55. from pymongo.topology import Topology
  56. from pymongo.topology_description import TOPOLOGY_TYPE
  57. from pymongo.settings import TopologySettings
  58. class MongoClient(common.BaseObject):
  59. HOST = "localhost"
  60. PORT = 27017
  61. def __init__(
  62. self,
  63. host=None,
  64. port=None,
  65. document_class=dict,
  66. tz_aware=False,
  67. connect=True,
  68. **kwargs):
  69. """Client for a MongoDB instance, a replica set, or a set of mongoses.
  70. The client object is thread-safe and has connection-pooling built in.
  71. If an operation fails because of a network error,
  72. :class:`~pymongo.errors.ConnectionFailure` is raised and the client
  73. reconnects in the background. Application code should handle this
  74. exception (recognizing that the operation failed) and then continue to
  75. execute.
  76. The `host` parameter can be a full `mongodb URI
  77. <http://dochub.mongodb.org/core/connections>`_, in addition to
  78. a simple hostname. It can also be a list of hostnames or
  79. URIs. Any port specified in the host string(s) will override
  80. the `port` parameter. If multiple mongodb URIs containing
  81. database or auth information are passed, the last database,
  82. username, and password present will be used. For username and
  83. passwords reserved characters like ':', '/', '+' and '@' must be
  84. escaped following RFC 2396.
  85. :Parameters:
  86. - `host` (optional): hostname or IP address of the
  87. instance to connect to, or a mongodb URI, or a list of
  88. hostnames / mongodb URIs. If `host` is an IPv6 literal
  89. it must be enclosed in '[' and ']' characters following
  90. the RFC2732 URL syntax (e.g. '[::1]' for localhost)
  91. - `port` (optional): port number on which to connect
  92. - `document_class` (optional): default class to use for
  93. documents returned from queries on this client
  94. - `tz_aware` (optional): if ``True``,
  95. :class:`~datetime.datetime` instances returned as values
  96. in a document by this :class:`MongoClient` will be timezone
  97. aware (otherwise they will be naive)
  98. - `connect` (optional): if ``True`` (the default), immediately
  99. begin connecting to MongoDB in the background. Otherwise connect
  100. on the first operation.
  101. | **Other optional parameters can be passed as keyword arguments:**
  102. - `maxPoolSize` (optional): The maximum number of connections
  103. that the pool will open simultaneously. If this is set, operations
  104. will block if there are `maxPoolSize` outstanding connections
  105. from the pool. Defaults to 100.
  106. - `socketTimeoutMS`: (integer or None) Controls how long (in
  107. milliseconds) the driver will wait for a response after sending an
  108. ordinary (non-monitoring) database operation before concluding that
  109. a network error has occurred. Defaults to ``None`` (no timeout).
  110. - `connectTimeoutMS`: (integer or None) Controls how long (in
  111. milliseconds) the driver will wait during server monitoring when
  112. connecting a new socket to a server before concluding the server
  113. is unavailable. Defaults to ``20000`` (20 seconds).
  114. - `serverSelectionTimeoutMS`: (integer) Controls how long (in
  115. milliseconds) the driver will wait to find an available,
  116. appropriate server to carry out a database operation; while it is
  117. waiting, multiple server monitoring operations may be carried out,
  118. each controlled by `connectTimeoutMS`. Defaults to ``30000`` (30
  119. seconds).
  120. - `waitQueueTimeoutMS`: (integer or None) How long (in milliseconds)
  121. a thread will wait for a socket from the pool if the pool has no
  122. free sockets. Defaults to ``None`` (no timeout).
  123. - `waitQueueMultiple`: (integer or None) Multiplied by maxPoolSize
  124. to give the number of threads allowed to wait for a socket at one
  125. time. Defaults to ``None`` (no limit).
  126. - `socketKeepAlive`: (boolean) Whether to send periodic keep-alive
  127. packets on connected sockets. Defaults to ``False`` (do not send
  128. keep-alive packets).
  129. | **Write Concern options:**
  130. | (Only set if passed. No default values.)
  131. - `w`: (integer or string) If this is a replica set, write operations
  132. will block until they have been replicated to the specified number
  133. or tagged set of servers. `w=<int>` always includes the replica set
  134. primary (e.g. w=3 means write to the primary and wait until
  135. replicated to **two** secondaries). Passing w=0 **disables write
  136. acknowledgement** and all other write concern options.
  137. - `wtimeout`: (integer) Used in conjunction with `w`. Specify a value
  138. in milliseconds to control how long to wait for write propagation
  139. to complete. If replication does not complete in the given
  140. timeframe, a timeout exception is raised.
  141. - `j`: If ``True`` block until write operations have been committed
  142. to the journal. Cannot be used in combination with `fsync`. Prior
  143. to MongoDB 2.6 this option was ignored if the server was running
  144. without journaling. Starting with MongoDB 2.6 write operations will
  145. fail with an exception if this option is used when the server is
  146. running without journaling.
  147. - `fsync`: If ``True`` and the server is running without journaling,
  148. blocks until the server has synced all data files to disk. If the
  149. server is running with journaling, this acts the same as the `j`
  150. option, blocking until write operations have been committed to the
  151. journal. Cannot be used in combination with `j`.
  152. | **Replica set keyword arguments for connecting with a replica set
  153. - either directly or via a mongos:**
  154. - `replicaSet`: (string or None) The name of the replica set to
  155. connect to. The driver will verify that all servers it connects to
  156. match this name. Implies that the hosts specified are a seed list
  157. and the driver should attempt to find all members of the set.
  158. Defaults to ``None``.
  159. - `read_preference`: The read preference for this client. If
  160. connecting directly to a secondary then a read preference mode
  161. *other* than PRIMARY is required - otherwise all queries will throw
  162. :class:`~pymongo.errors.AutoReconnect` "not master".
  163. See :class:`~pymongo.read_preferences.ReadPreference` for all
  164. available read preference options. Defaults to ``PRIMARY``.
  165. | **SSL configuration:**
  166. - `ssl`: If ``True``, create the connection to the server using SSL.
  167. Defaults to ``False``.
  168. - `ssl_keyfile`: The private keyfile used to identify the local
  169. connection against mongod. If included with the ``certfile`` then
  170. only the ``ssl_certfile`` is needed. Implies ``ssl=True``.
  171. Defaults to ``None``.
  172. - `ssl_certfile`: The certificate file used to identify the local
  173. connection against mongod. Implies ``ssl=True``. Defaults to
  174. ``None``.
  175. - `ssl_cert_reqs`: Specifies whether a certificate is required from
  176. the other side of the connection, and whether it will be validated
  177. if provided. It must be one of the three values ``ssl.CERT_NONE``
  178. (certificates ignored), ``ssl.CERT_OPTIONAL``
  179. (not required, but validated if provided), or ``ssl.CERT_REQUIRED``
  180. (required and validated). If the value of this parameter is not
  181. ``ssl.CERT_NONE`` and a value is not provided for ``ssl_ca_certs``
  182. PyMongo will attempt to load system provided CA certificates.
  183. If the python version in use does not support loading system CA
  184. certificates then the ``ssl_ca_certs`` parameter must point
  185. to a file of CA certificates. Implies ``ssl=True``. Defaults to
  186. ``ssl.CERT_REQUIRED`` if not provided and ``ssl=True``.
  187. - `ssl_ca_certs`: The ca_certs file contains a set of concatenated
  188. "certification authority" certificates, which are used to validate
  189. certificates passed from the other end of the connection.
  190. Implies ``ssl=True``. Defaults to ``None``.
  191. - `ssl_match_hostname`: If ``True`` (the default), and
  192. `ssl_cert_reqs` is not ``ssl.CERT_NONE``, enables hostname
  193. verification using the :func:`~ssl.match_hostname` function from
  194. python's :mod:`~ssl` module. Think very carefully before setting
  195. this to ``False`` as that could make your application vulnerable to
  196. man-in-the-middle attacks.
  197. .. mongodoc:: connections
  198. .. versionchanged:: 3.0
  199. :class:`~pymongo.mongo_client.MongoClient` is now the one and only
  200. client class for a standalone server, mongos, or replica set.
  201. It includes the functionality that had been split into
  202. :class:`~pymongo.mongo_client.MongoReplicaSetClient`: it can connect
  203. to a replica set, discover all its members, and monitor the set for
  204. stepdowns, elections, and reconfigs.
  205. The :class:`~pymongo.mongo_client.MongoClient` constructor no
  206. longer blocks while connecting to the server or servers, and it no
  207. longer raises :class:`~pymongo.errors.ConnectionFailure` if they
  208. are unavailable, nor :class:`~pymongo.errors.ConfigurationError`
  209. if the user's credentials are wrong. Instead, the constructor
  210. returns immediately and launches the connection process on
  211. background threads.
  212. Therefore the ``alive`` method is removed since it no longer
  213. provides meaningful information; even if the client is disconnected,
  214. it may discover a server in time to fulfill the next operation.
  215. In PyMongo 2.x, :class:`~pymongo.MongoClient` accepted a list of
  216. standalone MongoDB servers and used the first it could connect to::
  217. MongoClient(['host1.com:27017', 'host2.com:27017'])
  218. A list of multiple standalones is no longer supported; if multiple
  219. servers are listed they must be members of the same replica set, or
  220. mongoses in the same sharded cluster.
  221. The behavior for a list of mongoses is changed from "high
  222. availability" to "load balancing". Before, the client connected to
  223. the lowest-latency mongos in the list, and used it until a network
  224. error prompted it to re-evaluate all mongoses' latencies and
  225. reconnect to one of them. In PyMongo 3, the client monitors its
  226. network latency to all the mongoses continuously, and distributes
  227. operations evenly among those with the lowest latency. See
  228. :ref:`mongos-load-balancing` for more information.
  229. The ``connect`` option is added.
  230. The ``start_request``, ``in_request``, and ``end_request`` methods
  231. are removed, as well as the ``auto_start_request`` option.
  232. The ``copy_database`` method is removed, see the
  233. :doc:`copy_database examples </examples/copydb>` for alternatives.
  234. The :meth:`MongoClient.disconnect` method is removed; it was a
  235. synonym for :meth:`~pymongo.MongoClient.close`.
  236. :class:`~pymongo.mongo_client.MongoClient` no longer returns an
  237. instance of :class:`~pymongo.database.Database` for attribute names
  238. with leading underscores. You must use dict-style lookups instead::
  239. client['__my_database__']
  240. Not::
  241. client.__my_database__
  242. """
  243. if host is None:
  244. host = self.HOST
  245. if isinstance(host, string_type):
  246. host = [host]
  247. if port is None:
  248. port = self.PORT
  249. if not isinstance(port, int):
  250. raise TypeError("port must be an instance of int")
  251. seeds = set()
  252. username = None
  253. password = None
  254. dbase = None
  255. opts = {}
  256. for entity in host:
  257. if "://" in entity:
  258. if entity.startswith("mongodb://"):
  259. res = uri_parser.parse_uri(entity, port, False)
  260. seeds.update(res["nodelist"])
  261. username = res["username"] or username
  262. password = res["password"] or password
  263. dbase = res["database"] or dbase
  264. opts = res["options"]
  265. else:
  266. idx = entity.find("://")
  267. raise InvalidURI("Invalid URI scheme: "
  268. "%s" % (entity[:idx],))
  269. else:
  270. seeds.update(uri_parser.split_hosts(entity, port))
  271. if not seeds:
  272. raise ConfigurationError("need to specify at least one host")
  273. # _pool_class, _monitor_class, and _condition_class are for deep
  274. # customization of PyMongo, e.g. Motor.
  275. pool_class = kwargs.pop('_pool_class', None)
  276. monitor_class = kwargs.pop('_monitor_class', None)
  277. condition_class = kwargs.pop('_condition_class', None)
  278. kwargs['document_class'] = document_class
  279. kwargs['tz_aware'] = tz_aware
  280. opts.update(kwargs)
  281. self.__options = options = ClientOptions(
  282. username, password, dbase, opts)
  283. self.__default_database_name = dbase
  284. self.__lock = threading.Lock()
  285. self.__cursor_manager = CursorManager(self)
  286. self.__kill_cursors_queue = []
  287. # Cache of existing indexes used by ensure_index ops.
  288. self.__index_cache = {}
  289. super(MongoClient, self).__init__(options.codec_options,
  290. options.read_preference,
  291. options.write_concern)
  292. self.__all_credentials = {}
  293. creds = options.credentials
  294. if creds:
  295. self._cache_credentials(creds.source, creds)
  296. self._topology_settings = TopologySettings(
  297. seeds=seeds,
  298. replica_set_name=options.replica_set_name,
  299. pool_class=pool_class,
  300. pool_options=options.pool_options,
  301. monitor_class=monitor_class,
  302. condition_class=condition_class,
  303. local_threshold_ms=options.local_threshold_ms,
  304. server_selection_timeout=options.server_selection_timeout)
  305. self._topology = Topology(self._topology_settings)
  306. if connect:
  307. self._topology.open()
  308. def target():
  309. client = self_ref()
  310. if client is None:
  311. return False # Stop the executor.
  312. MongoClient._process_kill_cursors_queue(client)
  313. return True
  314. executor = periodic_executor.PeriodicExecutor(
  315. condition_class=self._topology_settings.condition_class,
  316. interval=common.KILL_CURSOR_FREQUENCY,
  317. min_interval=0,
  318. target=target)
  319. # We strongly reference the executor and it weakly references us via
  320. # this closure. When the client is freed, stop the executor soon.
  321. self_ref = weakref.ref(self, executor.close)
  322. self._kill_cursors_executor = executor
  323. executor.open()
  324. def _cache_credentials(self, source, credentials, connect=False):
  325. """Save a set of authentication credentials.
  326. The credentials are used to login a socket whenever one is created.
  327. If `connect` is True, verify the credentials on the server first.
  328. """
  329. # Don't let other threads affect this call's data.
  330. all_credentials = self.__all_credentials.copy()
  331. if source in all_credentials:
  332. # Nothing to do if we already have these credentials.
  333. if credentials == all_credentials[source]:
  334. return
  335. raise OperationFailure('Another user is already authenticated '
  336. 'to this database. You must logout first.')
  337. if connect:
  338. server = self._get_topology().select_server(
  339. writable_preferred_server_selector)
  340. # get_socket() logs out of the database if logged in with old
  341. # credentials, and logs in with new ones.
  342. with server.get_socket(all_credentials) as sock_info:
  343. sock_info.authenticate(credentials)
  344. # If several threads run _cache_credentials at once, last one wins.
  345. self.__all_credentials[source] = credentials
  346. def _purge_credentials(self, source):
  347. """Purge credentials from the authentication cache."""
  348. self.__all_credentials.pop(source, None)
  349. def _cached(self, dbname, coll, index):
  350. """Test if `index` is cached."""
  351. cache = self.__index_cache
  352. now = datetime.datetime.utcnow()
  353. return (dbname in cache and
  354. coll in cache[dbname] and
  355. index in cache[dbname][coll] and
  356. now < cache[dbname][coll][index])
  357. def _cache_index(self, dbname, collection, index, cache_for):
  358. """Add an index to the index cache for ensure_index operations."""
  359. now = datetime.datetime.utcnow()
  360. expire = datetime.timedelta(seconds=cache_for) + now
  361. if database not in self.__index_cache:
  362. self.__index_cache[dbname] = {}
  363. self.__index_cache[dbname][collection] = {}
  364. self.__index_cache[dbname][collection][index] = expire
  365. elif collection not in self.__index_cache[dbname]:
  366. self.__index_cache[dbname][collection] = {}
  367. self.__index_cache[dbname][collection][index] = expire
  368. else:
  369. self.__index_cache[dbname][collection][index] = expire
  370. def _purge_index(self, database_name,
  371. collection_name=None, index_name=None):
  372. """Purge an index from the index cache.
  373. If `index_name` is None purge an entire collection.
  374. If `collection_name` is None purge an entire database.
  375. """
  376. if not database_name in self.__index_cache:
  377. return
  378. if collection_name is None:
  379. del self.__index_cache[database_name]
  380. return
  381. if not collection_name in self.__index_cache[database_name]:
  382. return
  383. if index_name is None:
  384. del self.__index_cache[database_name][collection_name]
  385. return
  386. if index_name in self.__index_cache[database_name][collection_name]:
  387. del self.__index_cache[database_name][collection_name][index_name]
  388. def _server_property(self, attr_name, default=None):
  389. """An attribute of the current server's description.
  390. Returns "default" while there is no current server, primary, or mongos.
  391. Not threadsafe if used multiple times in a single method, since
  392. the server may change. In such cases, store a local reference to a
  393. ServerDescription first, then use its properties.
  394. """
  395. try:
  396. server = self._topology.select_server(
  397. writable_server_selector, server_selection_timeout=0)
  398. return getattr(server.description, attr_name)
  399. except ConnectionFailure:
  400. return default
  401. @property
  402. def address(self):
  403. """(host, port) of the current standalone, primary, or mongos, or None.
  404. Accessing :attr:`address` raises :exc:`~.errors.InvalidOperation` if
  405. the client is load-balancing among mongoses, since there is no single
  406. address. Use :attr:`nodes` instead.
  407. .. versionadded:: 3.0
  408. """
  409. try:
  410. return self._topology.get_direct_or_primary()
  411. except InvalidOperation:
  412. # Only one case where Topology throws InvalidOperation.
  413. raise InvalidOperation(
  414. 'Cannot use "address" property when load balancing among'
  415. ' mongoses, use "nodes" instead.')
  416. @property
  417. def primary(self):
  418. """The (host, port) of the current primary of the replica set.
  419. Returns ``None`` if this client is not connected to a replica set,
  420. there is no primary, or this client was created without the
  421. `replicaSet` option.
  422. .. versionadded:: 3.0
  423. MongoClient gained this property in version 3.0 when
  424. MongoReplicaSetClient's functionality was merged in.
  425. """
  426. return self._topology.get_primary()
  427. @property
  428. def secondaries(self):
  429. """The secondary members known to this client.
  430. A sequence of (host, port) pairs. Empty if this client is not
  431. connected to a replica set, there are no visible secondaries, or this
  432. client was created without the `replicaSet` option.
  433. .. versionadded:: 3.0
  434. MongoClient gained this property in version 3.0 when
  435. MongoReplicaSetClient's functionality was merged in.
  436. """
  437. return self._topology.get_secondaries()
  438. @property
  439. def arbiters(self):
  440. """Arbiters in the replica set.
  441. A sequence of (host, port) pairs. Empty if this client is not
  442. connected to a replica set, there are no arbiters, or this client was
  443. created without the `replicaSet` option.
  444. """
  445. return self._topology.get_arbiters()
  446. @property
  447. def is_primary(self):
  448. """If this client if connected to a server that can accept writes.
  449. True if the current server is a standalone, mongos, or the primary of
  450. a replica set.
  451. """
  452. return self._server_property('is_writable', False)
  453. @property
  454. def is_mongos(self):
  455. """If this client is connected to mongos.
  456. """
  457. return self._server_property('server_type') == SERVER_TYPE.Mongos
  458. @property
  459. def max_pool_size(self):
  460. """The maximum number of sockets the pool will open concurrently.
  461. When the pool has reached `max_pool_size`, operations block waiting for
  462. a socket to be returned to the pool. If ``waitQueueTimeoutMS`` is set,
  463. a blocked operation will raise :exc:`~pymongo.errors.ConnectionFailure`
  464. after a timeout. By default ``waitQueueTimeoutMS`` is not set.
  465. """
  466. return self.__options.pool_options.max_pool_size
  467. @property
  468. def nodes(self):
  469. """Set of all currently connected servers.
  470. .. warning:: When connected to a replica set the value of :attr:`nodes`
  471. can change over time as :class:`MongoClient`'s view of the replica
  472. set changes. :attr:`nodes` can also be an empty set when
  473. :class:`MongoClient` is first instantiated and hasn't yet connected
  474. to any servers, or a network partition causes it to lose connection
  475. to all servers.
  476. """
  477. description = self._topology.description
  478. return frozenset(s.address for s in description.known_servers)
  479. @property
  480. def max_bson_size(self):
  481. """The largest BSON object the connected server accepts in bytes.
  482. Defaults to 16MB if not connected to a server.
  483. """
  484. return self._server_property('max_bson_size', common.MAX_BSON_SIZE)
  485. @property
  486. def max_message_size(self):
  487. """The largest message the connected server accepts in bytes.
  488. Defaults to 32MB if not connected to a server.
  489. """
  490. return self._server_property(
  491. 'max_message_size', common.MAX_MESSAGE_SIZE)
  492. @property
  493. def max_write_batch_size(self):
  494. """The maxWriteBatchSize reported by the server.
  495. Returns a default value when connected to server versions prior to
  496. MongoDB 2.6.
  497. """
  498. return self._server_property(
  499. 'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE)
  500. @property
  501. def local_threshold_ms(self):
  502. """The local threshold for this instance."""
  503. return self.__options.local_threshold_ms
  504. @property
  505. def server_selection_timeout(self):
  506. """The server selection timeout for this instance in seconds."""
  507. return self.__options.server_selection_timeout
  508. def _is_writable(self):
  509. """Attempt to connect to a writable server, or return False.
  510. """
  511. topology = self._get_topology() # Starts monitors if necessary.
  512. try:
  513. svr = topology.select_server(writable_server_selector)
  514. # When directly connected to a secondary, arbiter, etc.,
  515. # select_server returns it, whatever the selector. Check
  516. # again if the server is writable.
  517. return svr.description.is_writable
  518. except ConnectionFailure:
  519. return False
  520. def close(self):
  521. """Disconnect from MongoDB.
  522. Close all sockets in the connection pools and stop the monitor threads.
  523. If this instance is used again it will be automatically re-opened and
  524. the threads restarted.
  525. """
  526. self._topology.close()
  527. def set_cursor_manager(self, manager_class):
  528. """Set this client's cursor manager.
  529. Raises :class:`TypeError` if `manager_class` is not a subclass of
  530. :class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
  531. handles closing cursors. Different managers can implement different
  532. policies in terms of when to actually kill a cursor that has
  533. been closed.
  534. :Parameters:
  535. - `manager_class`: cursor manager to use
  536. .. versionchanged:: 3.0
  537. Undeprecated.
  538. """
  539. manager = manager_class(self)
  540. if not isinstance(manager, CursorManager):
  541. raise TypeError("manager_class must be a subclass of "
  542. "CursorManager")
  543. self.__cursor_manager = manager
  544. def _get_topology(self):
  545. """Get the internal :class:`~pymongo.topology.Topology` object.
  546. If this client was created with "connect=False", calling _get_topology
  547. launches the connection process in the background.
  548. """
  549. self._topology.open()
  550. return self._topology
  551. @contextlib.contextmanager
  552. def _get_socket(self, selector):
  553. server = self._get_topology().select_server(selector)
  554. try:
  555. with server.get_socket(self.__all_credentials) as sock_info:
  556. yield sock_info
  557. except NetworkTimeout:
  558. # The socket has been closed. Don't reset the server.
  559. # Server Discovery And Monitoring Spec: "When an application
  560. # operation fails because of any network error besides a socket
  561. # timeout...."
  562. raise
  563. except NotMasterError:
  564. # "When the client sees a "not master" error it MUST replace the
  565. # server's description with type Unknown. It MUST request an
  566. # immediate check of the server."
  567. self._reset_server_and_request_check(server.description.address)
  568. raise
  569. except ConnectionFailure:
  570. # "Client MUST replace the server's description with type Unknown
  571. # ... MUST NOT request an immediate check of the server."
  572. self.__reset_server(server.description.address)
  573. raise
  574. def _socket_for_writes(self):
  575. return self._get_socket(writable_server_selector)
  576. @contextlib.contextmanager
  577. def _socket_for_reads(self, read_preference):
  578. preference = read_preference or ReadPreference.PRIMARY
  579. # Get a socket for a server matching the read preference, and yield
  580. # sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
  581. # mongods with topology type Single. If the server type is Mongos,
  582. # follow the rules for passing read preference to mongos, even for
  583. # topology type Single."
  584. # Thread safe: if the type is single it cannot change.
  585. topology = self._get_topology()
  586. single = topology.description.topology_type == TOPOLOGY_TYPE.Single
  587. with self._get_socket(read_preference) as sock_info:
  588. slave_ok = (single and not sock_info.is_mongos) or (
  589. preference != ReadPreference.PRIMARY)
  590. yield sock_info, slave_ok
  591. def _send_message_with_response(self, operation, read_preference=None,
  592. exhaust=False, address=None):
  593. """Send a message to MongoDB and return a Response.
  594. :Parameters:
  595. - `operation`: a _Query or _GetMore object.
  596. - `read_preference` (optional): A ReadPreference.
  597. - `exhaust` (optional): If True, the socket used stays checked out.
  598. It is returned along with its Pool in the Response.
  599. - `address` (optional): Optional address when sending a message
  600. to a specific server, used for getMore.
  601. """
  602. with self.__lock:
  603. # If needed, restart kill-cursors thread after a fork.
  604. self._kill_cursors_executor.open()
  605. topology = self._get_topology()
  606. if address:
  607. server = topology.select_server_by_address(address)
  608. if not server:
  609. raise AutoReconnect('server %s:%d no longer available'
  610. % address)
  611. else:
  612. selector = read_preference or writable_server_selector
  613. server = topology.select_server(selector)
  614. # A _Query's slaveOk bit is already set for queries with non-primary
  615. # read preference. If this is a direct connection to a mongod, override
  616. # and *always* set the slaveOk bit. See bullet point 2 in
  617. # server-selection.rst#topology-type-single.
  618. set_slave_ok = (
  619. topology.description.topology_type == TOPOLOGY_TYPE.Single
  620. and server.description.server_type != SERVER_TYPE.Mongos)
  621. return self._reset_on_error(
  622. server,
  623. server.send_message_with_response,
  624. operation,
  625. set_slave_ok,
  626. self.__all_credentials,
  627. exhaust)
  628. def _reset_on_error(self, server, func, *args, **kwargs):
  629. """Execute an operation. Reset the server on network error.
  630. Returns fn()'s return value on success. On error, clears the server's
  631. pool and marks the server Unknown.
  632. Re-raises any exception thrown by fn().
  633. """
  634. try:
  635. return func(*args, **kwargs)
  636. except NetworkTimeout:
  637. # The socket has been closed. Don't reset the server.
  638. raise
  639. except ConnectionFailure:
  640. self.__reset_server(server.description.address)
  641. raise
  642. def __reset_server(self, address):
  643. """Clear our connection pool for a server and mark it Unknown."""
  644. self._topology.reset_server(address)
  645. def _reset_server_and_request_check(self, address):
  646. """Clear our pool for a server, mark it Unknown, and check it soon."""
  647. self._topology.reset_server_and_request_check(address)
  648. def __eq__(self, other):
  649. if isinstance(other, self.__class__):
  650. return self.address == other.address
  651. return NotImplemented
  652. def __ne__(self, other):
  653. return not self == other
  654. def __repr__(self):
  655. server_descriptions = self._topology.description.server_descriptions()
  656. if len(server_descriptions) == 1:
  657. description, = server_descriptions.values()
  658. return "MongoClient(%r, %r)" % description.address
  659. else:
  660. return "MongoClient(%r)" % [
  661. "%s:%d" % address for address in server_descriptions]
  662. def __getattr__(self, name):
  663. """Get a database by name.
  664. Raises :class:`~pymongo.errors.InvalidName` if an invalid
  665. database name is used.
  666. :Parameters:
  667. - `name`: the name of the database to get
  668. """
  669. if name.startswith('_'):
  670. raise AttributeError(
  671. "MongoClient has no attribute %r. To access the %s"
  672. " database, use client[%r]." % (name, name, name))
  673. return self.__getitem__(name)
  674. def __getitem__(self, name):
  675. """Get a database by name.
  676. Raises :class:`~pymongo.errors.InvalidName` if an invalid
  677. database name is used.
  678. :Parameters:
  679. - `name`: the name of the database to get
  680. """
  681. return database.Database(self, name)
  682. def close_cursor(self, cursor_id, address=None):
  683. """Close a single database cursor.
  684. Raises :class:`TypeError` if `cursor_id` is not an instance of
  685. ``(int, long)``. What closing the cursor actually means
  686. depends on this client's cursor manager.
  687. :Parameters:
  688. - `cursor_id`: id of cursor to close
  689. - `address` (optional): (host, port) pair of the cursor's server.
  690. If it is not provided, the client attempts to close the cursor on
  691. the primary or standalone, or a mongos server.
  692. .. versionchanged:: 3.0
  693. Added ``address`` parameter.
  694. """
  695. if not isinstance(cursor_id, integer_types):
  696. raise TypeError("cursor_id must be an instance of (int, long)")
  697. self.__cursor_manager.close(cursor_id, address)
  698. def kill_cursors(self, cursor_ids, address=None):
  699. """Send a kill cursors message soon with the given ids.
  700. Raises :class:`TypeError` if `cursor_ids` is not an instance of
  701. ``list``.
  702. This method may be called from a :class:`~pymongo.cursor.Cursor`
  703. destructor during garbage collection, so it isn't safe to take a
  704. lock or do network I/O. Instead, we schedule the cursor to be closed
  705. soon on a background thread.
  706. :Parameters:
  707. - `cursor_ids`: list of cursor ids to kill
  708. - `address` (optional): (host, port) pair of the cursor's server.
  709. If it is not provided, the client attempts to close the cursor on
  710. the primary or standalone, or a mongos server.
  711. .. versionchanged:: 3.0
  712. Now accepts an `address` argument. Schedules the cursors to be
  713. closed on a background thread instead of sending the message
  714. immediately.
  715. """
  716. if not isinstance(cursor_ids, list):
  717. raise TypeError("cursor_ids must be a list")
  718. # "Atomic", needs no lock.
  719. self.__kill_cursors_queue.append((address, cursor_ids))
  720. # This method is run periodically by a background thread.
  721. def _process_kill_cursors_queue(self):
  722. """Process any pending kill cursors requests."""
  723. address_to_cursor_ids = defaultdict(list)
  724. # Other threads or the GC may append to the queue concurrently.
  725. while True:
  726. try:
  727. address, cursor_ids = self.__kill_cursors_queue.pop()
  728. except IndexError:
  729. break
  730. address_to_cursor_ids[address].extend(cursor_ids)
  731. # Don't re-open topology if it's closed and there's no pending cursors.
  732. if address_to_cursor_ids:
  733. topology = self._get_topology()
  734. for address, cursor_ids in address_to_cursor_ids.items():
  735. try:
  736. if address:
  737. server = topology.select_server_by_address(address)
  738. else:
  739. # Application called close_cursor() with no address.
  740. server = topology.select_server(
  741. writable_server_selector)
  742. server.send_message(message.kill_cursors(cursor_ids),
  743. self.__all_credentials)
  744. except ConnectionFailure as exc:
  745. warnings.warn("couldn't close cursor on %s: %s"
  746. % (address, exc))
  747. def server_info(self):
  748. """Get information about the MongoDB server we're connected to."""
  749. return self.admin.command("buildinfo",
  750. read_preference=ReadPreference.PRIMARY)
  751. def database_names(self):
  752. """Get a list of the names of all databases on the connected server."""
  753. return [db["name"] for db in
  754. self.admin.command(
  755. "listDatabases",
  756. read_preference=ReadPreference.PRIMARY)["databases"]]
  757. def drop_database(self, name_or_database):
  758. """Drop a database.
  759. Raises :class:`TypeError` if `name_or_database` is not an instance of
  760. :class:`basestring` (:class:`str` in python 3) or
  761. :class:`~pymongo.database.Database`.
  762. :Parameters:
  763. - `name_or_database`: the name of a database to drop, or a
  764. :class:`~pymongo.database.Database` instance representing the
  765. database to drop
  766. """
  767. name = name_or_database
  768. if isinstance(name, database.Database):
  769. name = name.name
  770. if not isinstance(name, string_type):
  771. raise TypeError("name_or_database must be an instance "
  772. "of %s or a Database" % (string_type.__name__,))
  773. self._purge_index(name)
  774. self[name].command("dropDatabase",
  775. read_preference=ReadPreference.PRIMARY)
  776. def get_default_database(self):
  777. """Get the database named in the MongoDB connection URI.
  778. >>> uri = 'mongodb://host/my_database'
  779. >>> client = MongoClient(uri)
  780. >>> db = client.get_default_database()
  781. >>> assert db.name == 'my_database'
  782. Useful in scripts where you want to choose which database to use
  783. based only on the URI in a configuration file.
  784. """
  785. if self.__default_database_name is None:
  786. raise ConfigurationError('No default database defined')
  787. return self[self.__default_database_name]
  788. def get_database(self, name, codec_options=None,
  789. read_preference=None, write_concern=None):
  790. """Get a :class:`~pymongo.database.Database` with the given name and
  791. options.
  792. Useful for creating a :class:`~pymongo.database.Database` with
  793. different codec options, read preference, and/or write concern from
  794. this :class:`MongoClient`.
  795. >>> client.read_preference
  796. Primary()
  797. >>> db1 = client.test
  798. >>> db1.read_preference
  799. Primary()
  800. >>> from pymongo import ReadPreference
  801. >>> db2 = client.get_database(
  802. ... 'test', read_preference=ReadPreference.SECONDARY)
  803. >>> db2.read_preference
  804. Secondary(tag_sets=None)
  805. :Parameters:
  806. - `name`: The name of the database - a string.
  807. - `codec_options` (optional): An instance of
  808. :class:`~bson.codec_options.CodecOptions`. If ``None`` (the
  809. default) the :attr:`codec_options` of this :class:`MongoClient` is
  810. used.
  811. - `read_preference` (optional): The read preference to use. If
  812. ``None`` (the default) the :attr:`read_preference` of this
  813. :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences`
  814. for options.
  815. - `write_concern` (optional): An instance of
  816. :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
  817. default) the :attr:`write_concern` of this :class:`MongoClient` is
  818. used.
  819. """
  820. return database.Database(
  821. self, name, codec_options, read_preference, write_concern)
  822. @property
  823. def is_locked(self):
  824. """Is this server locked? While locked, all write operations
  825. are blocked, although read operations may still be allowed.
  826. Use :meth:`unlock` to unlock.
  827. """
  828. ops = self.admin.current_op()
  829. return bool(ops.get('fsyncLock', 0))
  830. def fsync(self, **kwargs):
  831. """Flush all pending writes to datafiles.
  832. :Parameters:
  833. Optional parameters can be passed as keyword arguments:
  834. - `lock`: If True lock the server to disallow writes.
  835. - `async`: If True don't block while synchronizing.
  836. .. warning:: `async` and `lock` can not be used together.
  837. .. warning:: MongoDB does not support the `async` option
  838. on Windows and will raise an exception on that
  839. platform.
  840. """
  841. self.admin.command("fsync",
  842. read_preference=ReadPreference.PRIMARY, **kwargs)
  843. def unlock(self):
  844. """Unlock a previously locked server.
  845. """
  846. coll = self.admin.get_collection(
  847. "$cmd.sys.unlock", read_preference=ReadPreference.PRIMARY)
  848. coll.find_one()
  849. def __enter__(self):
  850. return self
  851. def __exit__(self, exc_type, exc_val, exc_tb):
  852. self.close()
  853. def __iter__(self):
  854. return self
  855. def __next__(self):
  856. raise TypeError("'MongoClient' object is not iterable")
  857. next = __next__