| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032 |
- # Copyright 2009-2015 MongoDB, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you
- # may not use this file except in compliance with the License. You
- # may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied. See the License for the specific language governing
- # permissions and limitations under the License.
- """Tools for connecting to MongoDB.
- .. seealso:: :doc:`/examples/high_availability` for examples of connecting
- to replica sets or sets of mongos servers.
- To get a :class:`~pymongo.database.Database` instance from a
- :class:`MongoClient` use either dictionary-style or attribute-style
- access:
- .. doctest::
- >>> from pymongo import MongoClient
- >>> c = MongoClient()
- >>> c.test_database
- Database(MongoClient('localhost', 27017), u'test_database')
- >>> c['test-database']
- Database(MongoClient('localhost', 27017), u'test-database')
- """
- import contextlib
- import datetime
- import threading
- import warnings
- import weakref
- from collections import defaultdict
- from bson.py3compat import (integer_types,
- string_type)
- from pymongo import (common,
- database,
- message,
- periodic_executor,
- uri_parser)
- from pymongo.client_options import ClientOptions
- from pymongo.cursor_manager import CursorManager
- from pymongo.errors import (AutoReconnect,
- ConfigurationError,
- ConnectionFailure,
- InvalidOperation,
- InvalidURI,
- NetworkTimeout,
- NotMasterError,
- OperationFailure)
- from pymongo.read_preferences import ReadPreference
- from pymongo.server_selectors import (writable_preferred_server_selector,
- writable_server_selector)
- from pymongo.server_type import SERVER_TYPE
- from pymongo.topology import Topology
- from pymongo.topology_description import TOPOLOGY_TYPE
- from pymongo.settings import TopologySettings
- class MongoClient(common.BaseObject):
- HOST = "localhost"
- PORT = 27017
- def __init__(
- self,
- host=None,
- port=None,
- document_class=dict,
- tz_aware=False,
- connect=True,
- **kwargs):
- """Client for a MongoDB instance, a replica set, or a set of mongoses.
- The client object is thread-safe and has connection-pooling built in.
- If an operation fails because of a network error,
- :class:`~pymongo.errors.ConnectionFailure` is raised and the client
- reconnects in the background. Application code should handle this
- exception (recognizing that the operation failed) and then continue to
- execute.
- The `host` parameter can be a full `mongodb URI
- <http://dochub.mongodb.org/core/connections>`_, in addition to
- a simple hostname. It can also be a list of hostnames or
- URIs. Any port specified in the host string(s) will override
- the `port` parameter. If multiple mongodb URIs containing
- database or auth information are passed, the last database,
- username, and password present will be used. For username and
- passwords reserved characters like ':', '/', '+' and '@' must be
- escaped following RFC 2396.
- :Parameters:
- - `host` (optional): hostname or IP address of the
- instance to connect to, or a mongodb URI, or a list of
- hostnames / mongodb URIs. If `host` is an IPv6 literal
- it must be enclosed in '[' and ']' characters following
- the RFC2732 URL syntax (e.g. '[::1]' for localhost)
- - `port` (optional): port number on which to connect
- - `document_class` (optional): default class to use for
- documents returned from queries on this client
- - `tz_aware` (optional): if ``True``,
- :class:`~datetime.datetime` instances returned as values
- in a document by this :class:`MongoClient` will be timezone
- aware (otherwise they will be naive)
- - `connect` (optional): if ``True`` (the default), immediately
- begin connecting to MongoDB in the background. Otherwise connect
- on the first operation.
- | **Other optional parameters can be passed as keyword arguments:**
- - `maxPoolSize` (optional): The maximum number of connections
- that the pool will open simultaneously. If this is set, operations
- will block if there are `maxPoolSize` outstanding connections
- from the pool. Defaults to 100.
- - `socketTimeoutMS`: (integer or None) Controls how long (in
- milliseconds) the driver will wait for a response after sending an
- ordinary (non-monitoring) database operation before concluding that
- a network error has occurred. Defaults to ``None`` (no timeout).
- - `connectTimeoutMS`: (integer or None) Controls how long (in
- milliseconds) the driver will wait during server monitoring when
- connecting a new socket to a server before concluding the server
- is unavailable. Defaults to ``20000`` (20 seconds).
- - `serverSelectionTimeoutMS`: (integer) Controls how long (in
- milliseconds) the driver will wait to find an available,
- appropriate server to carry out a database operation; while it is
- waiting, multiple server monitoring operations may be carried out,
- each controlled by `connectTimeoutMS`. Defaults to ``30000`` (30
- seconds).
- - `waitQueueTimeoutMS`: (integer or None) How long (in milliseconds)
- a thread will wait for a socket from the pool if the pool has no
- free sockets. Defaults to ``None`` (no timeout).
- - `waitQueueMultiple`: (integer or None) Multiplied by maxPoolSize
- to give the number of threads allowed to wait for a socket at one
- time. Defaults to ``None`` (no limit).
- - `socketKeepAlive`: (boolean) Whether to send periodic keep-alive
- packets on connected sockets. Defaults to ``False`` (do not send
- keep-alive packets).
- | **Write Concern options:**
- | (Only set if passed. No default values.)
- - `w`: (integer or string) If this is a replica set, write operations
- will block until they have been replicated to the specified number
- or tagged set of servers. `w=<int>` always includes the replica set
- primary (e.g. w=3 means write to the primary and wait until
- replicated to **two** secondaries). Passing w=0 **disables write
- acknowledgement** and all other write concern options.
- - `wtimeout`: (integer) Used in conjunction with `w`. Specify a value
- in milliseconds to control how long to wait for write propagation
- to complete. If replication does not complete in the given
- timeframe, a timeout exception is raised.
- - `j`: If ``True`` block until write operations have been committed
- to the journal. Cannot be used in combination with `fsync`. Prior
- to MongoDB 2.6 this option was ignored if the server was running
- without journaling. Starting with MongoDB 2.6 write operations will
- fail with an exception if this option is used when the server is
- running without journaling.
- - `fsync`: If ``True`` and the server is running without journaling,
- blocks until the server has synced all data files to disk. If the
- server is running with journaling, this acts the same as the `j`
- option, blocking until write operations have been committed to the
- journal. Cannot be used in combination with `j`.
- | **Replica set keyword arguments for connecting with a replica set
- - either directly or via a mongos:**
- - `replicaSet`: (string or None) The name of the replica set to
- connect to. The driver will verify that all servers it connects to
- match this name. Implies that the hosts specified are a seed list
- and the driver should attempt to find all members of the set.
- Defaults to ``None``.
- - `read_preference`: The read preference for this client. If
- connecting directly to a secondary then a read preference mode
- *other* than PRIMARY is required - otherwise all queries will throw
- :class:`~pymongo.errors.AutoReconnect` "not master".
- See :class:`~pymongo.read_preferences.ReadPreference` for all
- available read preference options. Defaults to ``PRIMARY``.
- | **SSL configuration:**
- - `ssl`: If ``True``, create the connection to the server using SSL.
- Defaults to ``False``.
- - `ssl_keyfile`: The private keyfile used to identify the local
- connection against mongod. If included with the ``certfile`` then
- only the ``ssl_certfile`` is needed. Implies ``ssl=True``.
- Defaults to ``None``.
- - `ssl_certfile`: The certificate file used to identify the local
- connection against mongod. Implies ``ssl=True``. Defaults to
- ``None``.
- - `ssl_cert_reqs`: Specifies whether a certificate is required from
- the other side of the connection, and whether it will be validated
- if provided. It must be one of the three values ``ssl.CERT_NONE``
- (certificates ignored), ``ssl.CERT_OPTIONAL``
- (not required, but validated if provided), or ``ssl.CERT_REQUIRED``
- (required and validated). If the value of this parameter is not
- ``ssl.CERT_NONE`` and a value is not provided for ``ssl_ca_certs``
- PyMongo will attempt to load system provided CA certificates.
- If the python version in use does not support loading system CA
- certificates then the ``ssl_ca_certs`` parameter must point
- to a file of CA certificates. Implies ``ssl=True``. Defaults to
- ``ssl.CERT_REQUIRED`` if not provided and ``ssl=True``.
- - `ssl_ca_certs`: The ca_certs file contains a set of concatenated
- "certification authority" certificates, which are used to validate
- certificates passed from the other end of the connection.
- Implies ``ssl=True``. Defaults to ``None``.
- - `ssl_match_hostname`: If ``True`` (the default), and
- `ssl_cert_reqs` is not ``ssl.CERT_NONE``, enables hostname
- verification using the :func:`~ssl.match_hostname` function from
- python's :mod:`~ssl` module. Think very carefully before setting
- this to ``False`` as that could make your application vulnerable to
- man-in-the-middle attacks.
- .. mongodoc:: connections
- .. versionchanged:: 3.0
- :class:`~pymongo.mongo_client.MongoClient` is now the one and only
- client class for a standalone server, mongos, or replica set.
- It includes the functionality that had been split into
- :class:`~pymongo.mongo_client.MongoReplicaSetClient`: it can connect
- to a replica set, discover all its members, and monitor the set for
- stepdowns, elections, and reconfigs.
- The :class:`~pymongo.mongo_client.MongoClient` constructor no
- longer blocks while connecting to the server or servers, and it no
- longer raises :class:`~pymongo.errors.ConnectionFailure` if they
- are unavailable, nor :class:`~pymongo.errors.ConfigurationError`
- if the user's credentials are wrong. Instead, the constructor
- returns immediately and launches the connection process on
- background threads.
- Therefore the ``alive`` method is removed since it no longer
- provides meaningful information; even if the client is disconnected,
- it may discover a server in time to fulfill the next operation.
- In PyMongo 2.x, :class:`~pymongo.MongoClient` accepted a list of
- standalone MongoDB servers and used the first it could connect to::
- MongoClient(['host1.com:27017', 'host2.com:27017'])
- A list of multiple standalones is no longer supported; if multiple
- servers are listed they must be members of the same replica set, or
- mongoses in the same sharded cluster.
- The behavior for a list of mongoses is changed from "high
- availability" to "load balancing". Before, the client connected to
- the lowest-latency mongos in the list, and used it until a network
- error prompted it to re-evaluate all mongoses' latencies and
- reconnect to one of them. In PyMongo 3, the client monitors its
- network latency to all the mongoses continuously, and distributes
- operations evenly among those with the lowest latency. See
- :ref:`mongos-load-balancing` for more information.
- The ``connect`` option is added.
- The ``start_request``, ``in_request``, and ``end_request`` methods
- are removed, as well as the ``auto_start_request`` option.
- The ``copy_database`` method is removed, see the
- :doc:`copy_database examples </examples/copydb>` for alternatives.
- The :meth:`MongoClient.disconnect` method is removed; it was a
- synonym for :meth:`~pymongo.MongoClient.close`.
- :class:`~pymongo.mongo_client.MongoClient` no longer returns an
- instance of :class:`~pymongo.database.Database` for attribute names
- with leading underscores. You must use dict-style lookups instead::
- client['__my_database__']
- Not::
- client.__my_database__
- """
- if host is None:
- host = self.HOST
- if isinstance(host, string_type):
- host = [host]
- if port is None:
- port = self.PORT
- if not isinstance(port, int):
- raise TypeError("port must be an instance of int")
- seeds = set()
- username = None
- password = None
- dbase = None
- opts = {}
- for entity in host:
- if "://" in entity:
- if entity.startswith("mongodb://"):
- res = uri_parser.parse_uri(entity, port, False)
- seeds.update(res["nodelist"])
- username = res["username"] or username
- password = res["password"] or password
- dbase = res["database"] or dbase
- opts = res["options"]
- else:
- idx = entity.find("://")
- raise InvalidURI("Invalid URI scheme: "
- "%s" % (entity[:idx],))
- else:
- seeds.update(uri_parser.split_hosts(entity, port))
- if not seeds:
- raise ConfigurationError("need to specify at least one host")
- # _pool_class, _monitor_class, and _condition_class are for deep
- # customization of PyMongo, e.g. Motor.
- pool_class = kwargs.pop('_pool_class', None)
- monitor_class = kwargs.pop('_monitor_class', None)
- condition_class = kwargs.pop('_condition_class', None)
- kwargs['document_class'] = document_class
- kwargs['tz_aware'] = tz_aware
- opts.update(kwargs)
- self.__options = options = ClientOptions(
- username, password, dbase, opts)
- self.__default_database_name = dbase
- self.__lock = threading.Lock()
- self.__cursor_manager = CursorManager(self)
- self.__kill_cursors_queue = []
- # Cache of existing indexes used by ensure_index ops.
- self.__index_cache = {}
- super(MongoClient, self).__init__(options.codec_options,
- options.read_preference,
- options.write_concern)
- self.__all_credentials = {}
- creds = options.credentials
- if creds:
- self._cache_credentials(creds.source, creds)
- self._topology_settings = TopologySettings(
- seeds=seeds,
- replica_set_name=options.replica_set_name,
- pool_class=pool_class,
- pool_options=options.pool_options,
- monitor_class=monitor_class,
- condition_class=condition_class,
- local_threshold_ms=options.local_threshold_ms,
- server_selection_timeout=options.server_selection_timeout)
- self._topology = Topology(self._topology_settings)
- if connect:
- self._topology.open()
- def target():
- client = self_ref()
- if client is None:
- return False # Stop the executor.
- MongoClient._process_kill_cursors_queue(client)
- return True
- executor = periodic_executor.PeriodicExecutor(
- condition_class=self._topology_settings.condition_class,
- interval=common.KILL_CURSOR_FREQUENCY,
- min_interval=0,
- target=target)
- # We strongly reference the executor and it weakly references us via
- # this closure. When the client is freed, stop the executor soon.
- self_ref = weakref.ref(self, executor.close)
- self._kill_cursors_executor = executor
- executor.open()
- def _cache_credentials(self, source, credentials, connect=False):
- """Save a set of authentication credentials.
- The credentials are used to login a socket whenever one is created.
- If `connect` is True, verify the credentials on the server first.
- """
- # Don't let other threads affect this call's data.
- all_credentials = self.__all_credentials.copy()
- if source in all_credentials:
- # Nothing to do if we already have these credentials.
- if credentials == all_credentials[source]:
- return
- raise OperationFailure('Another user is already authenticated '
- 'to this database. You must logout first.')
- if connect:
- server = self._get_topology().select_server(
- writable_preferred_server_selector)
- # get_socket() logs out of the database if logged in with old
- # credentials, and logs in with new ones.
- with server.get_socket(all_credentials) as sock_info:
- sock_info.authenticate(credentials)
- # If several threads run _cache_credentials at once, last one wins.
- self.__all_credentials[source] = credentials
- def _purge_credentials(self, source):
- """Purge credentials from the authentication cache."""
- self.__all_credentials.pop(source, None)
- def _cached(self, dbname, coll, index):
- """Test if `index` is cached."""
- cache = self.__index_cache
- now = datetime.datetime.utcnow()
- return (dbname in cache and
- coll in cache[dbname] and
- index in cache[dbname][coll] and
- now < cache[dbname][coll][index])
- def _cache_index(self, dbname, collection, index, cache_for):
- """Add an index to the index cache for ensure_index operations."""
- now = datetime.datetime.utcnow()
- expire = datetime.timedelta(seconds=cache_for) + now
- if database not in self.__index_cache:
- self.__index_cache[dbname] = {}
- self.__index_cache[dbname][collection] = {}
- self.__index_cache[dbname][collection][index] = expire
- elif collection not in self.__index_cache[dbname]:
- self.__index_cache[dbname][collection] = {}
- self.__index_cache[dbname][collection][index] = expire
- else:
- self.__index_cache[dbname][collection][index] = expire
- def _purge_index(self, database_name,
- collection_name=None, index_name=None):
- """Purge an index from the index cache.
- If `index_name` is None purge an entire collection.
- If `collection_name` is None purge an entire database.
- """
- if not database_name in self.__index_cache:
- return
- if collection_name is None:
- del self.__index_cache[database_name]
- return
- if not collection_name in self.__index_cache[database_name]:
- return
- if index_name is None:
- del self.__index_cache[database_name][collection_name]
- return
- if index_name in self.__index_cache[database_name][collection_name]:
- del self.__index_cache[database_name][collection_name][index_name]
- def _server_property(self, attr_name, default=None):
- """An attribute of the current server's description.
- Returns "default" while there is no current server, primary, or mongos.
- Not threadsafe if used multiple times in a single method, since
- the server may change. In such cases, store a local reference to a
- ServerDescription first, then use its properties.
- """
- try:
- server = self._topology.select_server(
- writable_server_selector, server_selection_timeout=0)
- return getattr(server.description, attr_name)
- except ConnectionFailure:
- return default
- @property
- def address(self):
- """(host, port) of the current standalone, primary, or mongos, or None.
- Accessing :attr:`address` raises :exc:`~.errors.InvalidOperation` if
- the client is load-balancing among mongoses, since there is no single
- address. Use :attr:`nodes` instead.
- .. versionadded:: 3.0
- """
- try:
- return self._topology.get_direct_or_primary()
- except InvalidOperation:
- # Only one case where Topology throws InvalidOperation.
- raise InvalidOperation(
- 'Cannot use "address" property when load balancing among'
- ' mongoses, use "nodes" instead.')
- @property
- def primary(self):
- """The (host, port) of the current primary of the replica set.
- Returns ``None`` if this client is not connected to a replica set,
- there is no primary, or this client was created without the
- `replicaSet` option.
- .. versionadded:: 3.0
- MongoClient gained this property in version 3.0 when
- MongoReplicaSetClient's functionality was merged in.
- """
- return self._topology.get_primary()
- @property
- def secondaries(self):
- """The secondary members known to this client.
- A sequence of (host, port) pairs. Empty if this client is not
- connected to a replica set, there are no visible secondaries, or this
- client was created without the `replicaSet` option.
- .. versionadded:: 3.0
- MongoClient gained this property in version 3.0 when
- MongoReplicaSetClient's functionality was merged in.
- """
- return self._topology.get_secondaries()
- @property
- def arbiters(self):
- """Arbiters in the replica set.
- A sequence of (host, port) pairs. Empty if this client is not
- connected to a replica set, there are no arbiters, or this client was
- created without the `replicaSet` option.
- """
- return self._topology.get_arbiters()
- @property
- def is_primary(self):
- """If this client if connected to a server that can accept writes.
- True if the current server is a standalone, mongos, or the primary of
- a replica set.
- """
- return self._server_property('is_writable', False)
- @property
- def is_mongos(self):
- """If this client is connected to mongos.
- """
- return self._server_property('server_type') == SERVER_TYPE.Mongos
- @property
- def max_pool_size(self):
- """The maximum number of sockets the pool will open concurrently.
- When the pool has reached `max_pool_size`, operations block waiting for
- a socket to be returned to the pool. If ``waitQueueTimeoutMS`` is set,
- a blocked operation will raise :exc:`~pymongo.errors.ConnectionFailure`
- after a timeout. By default ``waitQueueTimeoutMS`` is not set.
- """
- return self.__options.pool_options.max_pool_size
- @property
- def nodes(self):
- """Set of all currently connected servers.
- .. warning:: When connected to a replica set the value of :attr:`nodes`
- can change over time as :class:`MongoClient`'s view of the replica
- set changes. :attr:`nodes` can also be an empty set when
- :class:`MongoClient` is first instantiated and hasn't yet connected
- to any servers, or a network partition causes it to lose connection
- to all servers.
- """
- description = self._topology.description
- return frozenset(s.address for s in description.known_servers)
- @property
- def max_bson_size(self):
- """The largest BSON object the connected server accepts in bytes.
- Defaults to 16MB if not connected to a server.
- """
- return self._server_property('max_bson_size', common.MAX_BSON_SIZE)
- @property
- def max_message_size(self):
- """The largest message the connected server accepts in bytes.
- Defaults to 32MB if not connected to a server.
- """
- return self._server_property(
- 'max_message_size', common.MAX_MESSAGE_SIZE)
- @property
- def max_write_batch_size(self):
- """The maxWriteBatchSize reported by the server.
- Returns a default value when connected to server versions prior to
- MongoDB 2.6.
- """
- return self._server_property(
- 'max_write_batch_size', common.MAX_WRITE_BATCH_SIZE)
- @property
- def local_threshold_ms(self):
- """The local threshold for this instance."""
- return self.__options.local_threshold_ms
- @property
- def server_selection_timeout(self):
- """The server selection timeout for this instance in seconds."""
- return self.__options.server_selection_timeout
- def _is_writable(self):
- """Attempt to connect to a writable server, or return False.
- """
- topology = self._get_topology() # Starts monitors if necessary.
- try:
- svr = topology.select_server(writable_server_selector)
- # When directly connected to a secondary, arbiter, etc.,
- # select_server returns it, whatever the selector. Check
- # again if the server is writable.
- return svr.description.is_writable
- except ConnectionFailure:
- return False
- def close(self):
- """Disconnect from MongoDB.
- Close all sockets in the connection pools and stop the monitor threads.
- If this instance is used again it will be automatically re-opened and
- the threads restarted.
- """
- self._topology.close()
- def set_cursor_manager(self, manager_class):
- """Set this client's cursor manager.
- Raises :class:`TypeError` if `manager_class` is not a subclass of
- :class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
- handles closing cursors. Different managers can implement different
- policies in terms of when to actually kill a cursor that has
- been closed.
- :Parameters:
- - `manager_class`: cursor manager to use
- .. versionchanged:: 3.0
- Undeprecated.
- """
- manager = manager_class(self)
- if not isinstance(manager, CursorManager):
- raise TypeError("manager_class must be a subclass of "
- "CursorManager")
- self.__cursor_manager = manager
- def _get_topology(self):
- """Get the internal :class:`~pymongo.topology.Topology` object.
- If this client was created with "connect=False", calling _get_topology
- launches the connection process in the background.
- """
- self._topology.open()
- return self._topology
- @contextlib.contextmanager
- def _get_socket(self, selector):
- server = self._get_topology().select_server(selector)
- try:
- with server.get_socket(self.__all_credentials) as sock_info:
- yield sock_info
- except NetworkTimeout:
- # The socket has been closed. Don't reset the server.
- # Server Discovery And Monitoring Spec: "When an application
- # operation fails because of any network error besides a socket
- # timeout...."
- raise
- except NotMasterError:
- # "When the client sees a "not master" error it MUST replace the
- # server's description with type Unknown. It MUST request an
- # immediate check of the server."
- self._reset_server_and_request_check(server.description.address)
- raise
- except ConnectionFailure:
- # "Client MUST replace the server's description with type Unknown
- # ... MUST NOT request an immediate check of the server."
- self.__reset_server(server.description.address)
- raise
- def _socket_for_writes(self):
- return self._get_socket(writable_server_selector)
- @contextlib.contextmanager
- def _socket_for_reads(self, read_preference):
- preference = read_preference or ReadPreference.PRIMARY
- # Get a socket for a server matching the read preference, and yield
- # sock_info, slave_ok. Server Selection Spec: "slaveOK must be sent to
- # mongods with topology type Single. If the server type is Mongos,
- # follow the rules for passing read preference to mongos, even for
- # topology type Single."
- # Thread safe: if the type is single it cannot change.
- topology = self._get_topology()
- single = topology.description.topology_type == TOPOLOGY_TYPE.Single
- with self._get_socket(read_preference) as sock_info:
- slave_ok = (single and not sock_info.is_mongos) or (
- preference != ReadPreference.PRIMARY)
- yield sock_info, slave_ok
- def _send_message_with_response(self, operation, read_preference=None,
- exhaust=False, address=None):
- """Send a message to MongoDB and return a Response.
- :Parameters:
- - `operation`: a _Query or _GetMore object.
- - `read_preference` (optional): A ReadPreference.
- - `exhaust` (optional): If True, the socket used stays checked out.
- It is returned along with its Pool in the Response.
- - `address` (optional): Optional address when sending a message
- to a specific server, used for getMore.
- """
- with self.__lock:
- # If needed, restart kill-cursors thread after a fork.
- self._kill_cursors_executor.open()
- topology = self._get_topology()
- if address:
- server = topology.select_server_by_address(address)
- if not server:
- raise AutoReconnect('server %s:%d no longer available'
- % address)
- else:
- selector = read_preference or writable_server_selector
- server = topology.select_server(selector)
- # A _Query's slaveOk bit is already set for queries with non-primary
- # read preference. If this is a direct connection to a mongod, override
- # and *always* set the slaveOk bit. See bullet point 2 in
- # server-selection.rst#topology-type-single.
- set_slave_ok = (
- topology.description.topology_type == TOPOLOGY_TYPE.Single
- and server.description.server_type != SERVER_TYPE.Mongos)
- return self._reset_on_error(
- server,
- server.send_message_with_response,
- operation,
- set_slave_ok,
- self.__all_credentials,
- exhaust)
- def _reset_on_error(self, server, func, *args, **kwargs):
- """Execute an operation. Reset the server on network error.
- Returns fn()'s return value on success. On error, clears the server's
- pool and marks the server Unknown.
- Re-raises any exception thrown by fn().
- """
- try:
- return func(*args, **kwargs)
- except NetworkTimeout:
- # The socket has been closed. Don't reset the server.
- raise
- except ConnectionFailure:
- self.__reset_server(server.description.address)
- raise
- def __reset_server(self, address):
- """Clear our connection pool for a server and mark it Unknown."""
- self._topology.reset_server(address)
- def _reset_server_and_request_check(self, address):
- """Clear our pool for a server, mark it Unknown, and check it soon."""
- self._topology.reset_server_and_request_check(address)
- def __eq__(self, other):
- if isinstance(other, self.__class__):
- return self.address == other.address
- return NotImplemented
- def __ne__(self, other):
- return not self == other
- def __repr__(self):
- server_descriptions = self._topology.description.server_descriptions()
- if len(server_descriptions) == 1:
- description, = server_descriptions.values()
- return "MongoClient(%r, %r)" % description.address
- else:
- return "MongoClient(%r)" % [
- "%s:%d" % address for address in server_descriptions]
- def __getattr__(self, name):
- """Get a database by name.
- Raises :class:`~pymongo.errors.InvalidName` if an invalid
- database name is used.
- :Parameters:
- - `name`: the name of the database to get
- """
- if name.startswith('_'):
- raise AttributeError(
- "MongoClient has no attribute %r. To access the %s"
- " database, use client[%r]." % (name, name, name))
- return self.__getitem__(name)
- def __getitem__(self, name):
- """Get a database by name.
- Raises :class:`~pymongo.errors.InvalidName` if an invalid
- database name is used.
- :Parameters:
- - `name`: the name of the database to get
- """
- return database.Database(self, name)
- def close_cursor(self, cursor_id, address=None):
- """Close a single database cursor.
- Raises :class:`TypeError` if `cursor_id` is not an instance of
- ``(int, long)``. What closing the cursor actually means
- depends on this client's cursor manager.
- :Parameters:
- - `cursor_id`: id of cursor to close
- - `address` (optional): (host, port) pair of the cursor's server.
- If it is not provided, the client attempts to close the cursor on
- the primary or standalone, or a mongos server.
- .. versionchanged:: 3.0
- Added ``address`` parameter.
- """
- if not isinstance(cursor_id, integer_types):
- raise TypeError("cursor_id must be an instance of (int, long)")
- self.__cursor_manager.close(cursor_id, address)
- def kill_cursors(self, cursor_ids, address=None):
- """Send a kill cursors message soon with the given ids.
- Raises :class:`TypeError` if `cursor_ids` is not an instance of
- ``list``.
- This method may be called from a :class:`~pymongo.cursor.Cursor`
- destructor during garbage collection, so it isn't safe to take a
- lock or do network I/O. Instead, we schedule the cursor to be closed
- soon on a background thread.
- :Parameters:
- - `cursor_ids`: list of cursor ids to kill
- - `address` (optional): (host, port) pair of the cursor's server.
- If it is not provided, the client attempts to close the cursor on
- the primary or standalone, or a mongos server.
- .. versionchanged:: 3.0
- Now accepts an `address` argument. Schedules the cursors to be
- closed on a background thread instead of sending the message
- immediately.
- """
- if not isinstance(cursor_ids, list):
- raise TypeError("cursor_ids must be a list")
- # "Atomic", needs no lock.
- self.__kill_cursors_queue.append((address, cursor_ids))
- # This method is run periodically by a background thread.
- def _process_kill_cursors_queue(self):
- """Process any pending kill cursors requests."""
- address_to_cursor_ids = defaultdict(list)
- # Other threads or the GC may append to the queue concurrently.
- while True:
- try:
- address, cursor_ids = self.__kill_cursors_queue.pop()
- except IndexError:
- break
- address_to_cursor_ids[address].extend(cursor_ids)
- # Don't re-open topology if it's closed and there's no pending cursors.
- if address_to_cursor_ids:
- topology = self._get_topology()
- for address, cursor_ids in address_to_cursor_ids.items():
- try:
- if address:
- server = topology.select_server_by_address(address)
- else:
- # Application called close_cursor() with no address.
- server = topology.select_server(
- writable_server_selector)
- server.send_message(message.kill_cursors(cursor_ids),
- self.__all_credentials)
- except ConnectionFailure as exc:
- warnings.warn("couldn't close cursor on %s: %s"
- % (address, exc))
- def server_info(self):
- """Get information about the MongoDB server we're connected to."""
- return self.admin.command("buildinfo",
- read_preference=ReadPreference.PRIMARY)
- def database_names(self):
- """Get a list of the names of all databases on the connected server."""
- return [db["name"] for db in
- self.admin.command(
- "listDatabases",
- read_preference=ReadPreference.PRIMARY)["databases"]]
- def drop_database(self, name_or_database):
- """Drop a database.
- Raises :class:`TypeError` if `name_or_database` is not an instance of
- :class:`basestring` (:class:`str` in python 3) or
- :class:`~pymongo.database.Database`.
- :Parameters:
- - `name_or_database`: the name of a database to drop, or a
- :class:`~pymongo.database.Database` instance representing the
- database to drop
- """
- name = name_or_database
- if isinstance(name, database.Database):
- name = name.name
- if not isinstance(name, string_type):
- raise TypeError("name_or_database must be an instance "
- "of %s or a Database" % (string_type.__name__,))
- self._purge_index(name)
- self[name].command("dropDatabase",
- read_preference=ReadPreference.PRIMARY)
- def get_default_database(self):
- """Get the database named in the MongoDB connection URI.
- >>> uri = 'mongodb://host/my_database'
- >>> client = MongoClient(uri)
- >>> db = client.get_default_database()
- >>> assert db.name == 'my_database'
- Useful in scripts where you want to choose which database to use
- based only on the URI in a configuration file.
- """
- if self.__default_database_name is None:
- raise ConfigurationError('No default database defined')
- return self[self.__default_database_name]
- def get_database(self, name, codec_options=None,
- read_preference=None, write_concern=None):
- """Get a :class:`~pymongo.database.Database` with the given name and
- options.
- Useful for creating a :class:`~pymongo.database.Database` with
- different codec options, read preference, and/or write concern from
- this :class:`MongoClient`.
- >>> client.read_preference
- Primary()
- >>> db1 = client.test
- >>> db1.read_preference
- Primary()
- >>> from pymongo import ReadPreference
- >>> db2 = client.get_database(
- ... 'test', read_preference=ReadPreference.SECONDARY)
- >>> db2.read_preference
- Secondary(tag_sets=None)
- :Parameters:
- - `name`: The name of the database - a string.
- - `codec_options` (optional): An instance of
- :class:`~bson.codec_options.CodecOptions`. If ``None`` (the
- default) the :attr:`codec_options` of this :class:`MongoClient` is
- used.
- - `read_preference` (optional): The read preference to use. If
- ``None`` (the default) the :attr:`read_preference` of this
- :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences`
- for options.
- - `write_concern` (optional): An instance of
- :class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
- default) the :attr:`write_concern` of this :class:`MongoClient` is
- used.
- """
- return database.Database(
- self, name, codec_options, read_preference, write_concern)
- @property
- def is_locked(self):
- """Is this server locked? While locked, all write operations
- are blocked, although read operations may still be allowed.
- Use :meth:`unlock` to unlock.
- """
- ops = self.admin.current_op()
- return bool(ops.get('fsyncLock', 0))
- def fsync(self, **kwargs):
- """Flush all pending writes to datafiles.
- :Parameters:
- Optional parameters can be passed as keyword arguments:
- - `lock`: If True lock the server to disallow writes.
- - `async`: If True don't block while synchronizing.
- .. warning:: `async` and `lock` can not be used together.
- .. warning:: MongoDB does not support the `async` option
- on Windows and will raise an exception on that
- platform.
- """
- self.admin.command("fsync",
- read_preference=ReadPreference.PRIMARY, **kwargs)
- def unlock(self):
- """Unlock a previously locked server.
- """
- coll = self.admin.get_collection(
- "$cmd.sys.unlock", read_preference=ReadPreference.PRIMARY)
- coll.find_one()
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- def __iter__(self):
- return self
- def __next__(self):
- raise TypeError("'MongoClient' object is not iterable")
- next = __next__
|