| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- # Copyright 2012-2015 MongoDB, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License",
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Utilities for choosing which member of a replica set to read from."""
- from collections import Mapping
- from pymongo.errors import ConfigurationError
- from pymongo.server_selectors import (member_with_tags_server_selector,
- secondary_with_tags_server_selector,
- writable_server_selector)
- _PRIMARY = 0
- _PRIMARY_PREFERRED = 1
- _SECONDARY = 2
- _SECONDARY_PREFERRED = 3
- _NEAREST = 4
- _MONGOS_MODES = (
- 'primary',
- 'primaryPreferred',
- 'secondary',
- 'secondaryPreferred',
- 'nearest',
- )
- def _validate_tag_sets(tag_sets):
- """Validate tag sets for a MongoReplicaSetClient.
- """
- if tag_sets is None:
- return tag_sets
- if not isinstance(tag_sets, list):
- raise TypeError((
- "Tag sets %r invalid, must be a list") % (tag_sets,))
- if len(tag_sets) == 0:
- raise ValueError((
- "Tag sets %r invalid, must be None or contain at least one set of"
- " tags") % (tag_sets,))
- for tags in tag_sets:
- if not isinstance(tags, Mapping):
- raise TypeError(
- "Tag set %r invalid, must be an instance of dict, "
- "bson.son.SON or other type that inherits from "
- "collection.Mapping" % (tags,))
- return tag_sets
- class _ServerMode(object):
- """Base class for all read preferences.
- """
- __slots__ = ("__mongos_mode", "__mode", "__tag_sets")
- def __init__(self, mode, tag_sets=None):
- if mode == _PRIMARY and tag_sets is not None:
- raise ConfigurationError("Read preference primary "
- "cannot be combined with tags")
- self.__mongos_mode = _MONGOS_MODES[mode]
- self.__mode = mode
- self.__tag_sets = _validate_tag_sets(tag_sets)
- @property
- def name(self):
- """The name of this read preference.
- """
- return self.__class__.__name__
- @property
- def document(self):
- """Read preference as a document.
- """
- if self.__tag_sets in (None, [{}]):
- return {'mode': self.__mongos_mode}
- return {'mode': self.__mongos_mode, 'tags': self.__tag_sets}
- @property
- def mode(self):
- """The mode of this read preference instance.
- """
- return self.__mode
- @property
- def tag_sets(self):
- """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
- read only from members whose ``dc`` tag has the value ``"ny"``.
- To specify a priority-order for tag sets, provide a list of
- tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
- set, ``{}``, means "read from any member that matches the mode,
- ignoring tags." MongoReplicaSetClient tries each set of tags in turn
- until it finds a set of tags with at least one matching member.
- .. seealso:: `Data-Center Awareness
- <http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
- """
- return list(self.__tag_sets) if self.__tag_sets else [{}]
- def __repr__(self):
- return "%s(tag_sets=%r)" % (
- self.name, self.__tag_sets)
- def __eq__(self, other):
- if isinstance(other, _ServerMode):
- return (self.mode == other.mode and
- self.tag_sets == other.tag_sets)
- raise NotImplementedError
- def __ne__(self, other):
- return not self == other
- def __getstate__(self):
- """Return value of object for pickling.
- Needed explicitly because __slots__() defined.
- """
- return {'mode': self.__mode, 'tag_sets': self.__tag_sets}
- def __setstate__(self, value):
- """Restore from pickling."""
- self.__mode = value['mode']
- self.__mongos_mode = _MONGOS_MODES[self.__mode]
- self.__tag_sets = _validate_tag_sets(value['tag_sets'])
- class Primary(_ServerMode):
- """Primary read preference.
- * When directly connected to one mongod queries are allowed if the server
- is standalone or a replica set primary.
- * When connected to a mongos queries are sent to the primary of a shard.
- * When connected to a replica set queries are sent to the primary of
- the replica set.
- """
- def __init__(self):
- super(Primary, self).__init__(_PRIMARY)
- def __call__(self, server_descriptions):
- """Return matching ServerDescriptions from a list."""
- return writable_server_selector(server_descriptions)
- def __repr__(self):
- return "Primary()"
- def __eq__(self, other):
- if isinstance(other, _ServerMode):
- return other.mode == _PRIMARY
- raise NotImplementedError
- class PrimaryPreferred(_ServerMode):
- """PrimaryPreferred read preference.
- * When directly connected to one mongod queries are allowed to standalone
- servers, to a replica set primary, or to replica set secondaries.
- * When connected to a mongos queries are sent to the primary of a shard if
- available, otherwise a shard secondary.
- * When connected to a replica set queries are sent to the primary if
- available, otherwise a secondary.
- :Parameters:
- - `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
- available.
- """
- def __init__(self, tag_sets=None):
- super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets)
- def __call__(self, server_descriptions):
- """Return matching ServerDescriptions from a list."""
- writable_servers = writable_server_selector(server_descriptions)
- if writable_servers:
- return writable_servers
- else:
- return secondary_with_tags_server_selector(
- self.tag_sets,
- server_descriptions)
- class Secondary(_ServerMode):
- """Secondary read preference.
- * When directly connected to one mongod queries are allowed to standalone
- servers, to a replica set primary, or to replica set secondaries.
- * When connected to a mongos queries are distributed among shard
- secondaries. An error is raised if no secondaries are available.
- * When connected to a replica set queries are distributed among
- secondaries. An error is raised if no secondaries are available.
- :Parameters:
- - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
- """
- def __init__(self, tag_sets=None):
- super(Secondary, self).__init__(_SECONDARY, tag_sets)
- def __call__(self, server_descriptions):
- """Return matching ServerDescriptions from a list."""
- return secondary_with_tags_server_selector(
- self.tag_sets,
- server_descriptions)
- class SecondaryPreferred(_ServerMode):
- """SecondaryPreferred read preference.
- * When directly connected to one mongod queries are allowed to standalone
- servers, to a replica set primary, or to replica set secondaries.
- * When connected to a mongos queries are distributed among shard
- secondaries, or the shard primary if no secondary is available.
- * When connected to a replica set queries are distributed among
- secondaries, or the primary if no secondary is available.
- :Parameters:
- - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
- """
- def __init__(self, tag_sets=None):
- super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets)
- def __call__(self, server_descriptions):
- """Return matching ServerDescriptions from a list."""
- secondaries = secondary_with_tags_server_selector(
- self.tag_sets,
- server_descriptions)
- if secondaries:
- return secondaries
- else:
- return writable_server_selector(server_descriptions)
- class Nearest(_ServerMode):
- """Nearest read preference.
- * When directly connected to one mongod queries are allowed to standalone
- servers, to a replica set primary, or to replica set secondaries.
- * When connected to a mongos queries are distributed among all members of
- a shard.
- * When connected to a replica set queries are distributed among all
- members.
- :Parameters:
- - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
- """
- def __init__(self, tag_sets=None):
- super(Nearest, self).__init__(_NEAREST, tag_sets)
- def __call__(self, server_descriptions):
- """Return matching ServerDescriptions from a list."""
- return member_with_tags_server_selector(
- self.tag_sets or [{}],
- server_descriptions)
- _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
- Secondary, SecondaryPreferred, Nearest)
- def make_read_preference(mode, tag_sets):
- if mode == _PRIMARY:
- if tag_sets not in (None, [{}]):
- raise ConfigurationError("Read preference primary "
- "cannot be combined with tags")
- return Primary()
- return _ALL_READ_PREFERENCES[mode](tag_sets)
- _MODES = (
- 'PRIMARY',
- 'PRIMARY_PREFERRED',
- 'SECONDARY',
- 'SECONDARY_PREFERRED',
- 'NEAREST',
- )
- class ReadPreference(object):
- """An enum that defines the read preference modes supported by PyMongo.
- See :doc:`/examples/high_availability` for code examples.
- A read preference is used in three cases:
- :class:`~pymongo.mongo_client.MongoClient` connected to a single mongod:
- - ``PRIMARY``: Queries are allowed if the server is standalone or a replica
- set primary.
- - All other modes allow queries to standalone servers, to a replica set
- primary, or to replica set secondaries.
- :class:`~pymongo.mongo_client.MongoClient` initialized with the
- ``replicaSet`` option:
- - ``PRIMARY``: Read from the primary. This is the default, and provides the
- strongest consistency. If no primary is available, raise
- :class:`~pymongo.errors.AutoReconnect`.
- - ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
- none, read from a secondary.
- - ``SECONDARY``: Read from a secondary. If no secondary is available,
- raise :class:`~pymongo.errors.AutoReconnect`.
- - ``SECONDARY_PREFERRED``: Read from a secondary if available, otherwise
- from the primary.
- - ``NEAREST``: Read from any member.
- :class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a
- sharded cluster of replica sets:
- - ``PRIMARY``: Read from the primary of the shard, or raise
- :class:`~pymongo.errors.OperationFailure` if there is none.
- This is the default.
- - ``PRIMARY_PREFERRED``: Read from the primary of the shard, or if there is
- none, read from a secondary of the shard.
- - ``SECONDARY``: Read from a secondary of the shard, or raise
- :class:`~pymongo.errors.OperationFailure` if there is none.
- - ``SECONDARY_PREFERRED``: Read from a secondary of the shard if available,
- otherwise from the shard primary.
- - ``NEAREST``: Read from any shard member.
- """
- PRIMARY = Primary()
- PRIMARY_PREFERRED = PrimaryPreferred()
- SECONDARY = Secondary()
- SECONDARY_PREFERRED = SecondaryPreferred()
- NEAREST = Nearest()
- def read_pref_mode_from_name(name):
- """Get the read preference mode from mongos/uri name.
- """
- return _MONGOS_MODES.index(name)
- class MovingAverage(object):
- """Tracks an exponentially-weighted moving average."""
- def __init__(self):
- self.average = None
- def add_sample(self, sample):
- if sample < 0:
- # Likely system time change while waiting for ismaster response
- # and not using time.monotonic. Ignore it, the next one will
- # probably be valid.
- return
- if self.average is None:
- self.average = sample
- else:
- # The Server Selection Spec requires an exponentially weighted
- # average with alpha = 0.2.
- self.average = 0.8 * self.average + 0.2 * sample
- def get(self):
- """Get the calculated average, or None if no samples yet."""
- return self.average
- def reset(self):
- self.average = None
|