read_preferences.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. # Copyright 2012-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License",
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Utilities for choosing which member of a replica set to read from."""
  15. from collections import Mapping
  16. from pymongo.errors import ConfigurationError
  17. from pymongo.server_selectors import (member_with_tags_server_selector,
  18. secondary_with_tags_server_selector,
  19. writable_server_selector)
  20. _PRIMARY = 0
  21. _PRIMARY_PREFERRED = 1
  22. _SECONDARY = 2
  23. _SECONDARY_PREFERRED = 3
  24. _NEAREST = 4
  25. _MONGOS_MODES = (
  26. 'primary',
  27. 'primaryPreferred',
  28. 'secondary',
  29. 'secondaryPreferred',
  30. 'nearest',
  31. )
  32. def _validate_tag_sets(tag_sets):
  33. """Validate tag sets for a MongoReplicaSetClient.
  34. """
  35. if tag_sets is None:
  36. return tag_sets
  37. if not isinstance(tag_sets, list):
  38. raise TypeError((
  39. "Tag sets %r invalid, must be a list") % (tag_sets,))
  40. if len(tag_sets) == 0:
  41. raise ValueError((
  42. "Tag sets %r invalid, must be None or contain at least one set of"
  43. " tags") % (tag_sets,))
  44. for tags in tag_sets:
  45. if not isinstance(tags, Mapping):
  46. raise TypeError(
  47. "Tag set %r invalid, must be an instance of dict, "
  48. "bson.son.SON or other type that inherits from "
  49. "collection.Mapping" % (tags,))
  50. return tag_sets
  51. class _ServerMode(object):
  52. """Base class for all read preferences.
  53. """
  54. __slots__ = ("__mongos_mode", "__mode", "__tag_sets")
  55. def __init__(self, mode, tag_sets=None):
  56. if mode == _PRIMARY and tag_sets is not None:
  57. raise ConfigurationError("Read preference primary "
  58. "cannot be combined with tags")
  59. self.__mongos_mode = _MONGOS_MODES[mode]
  60. self.__mode = mode
  61. self.__tag_sets = _validate_tag_sets(tag_sets)
  62. @property
  63. def name(self):
  64. """The name of this read preference.
  65. """
  66. return self.__class__.__name__
  67. @property
  68. def document(self):
  69. """Read preference as a document.
  70. """
  71. if self.__tag_sets in (None, [{}]):
  72. return {'mode': self.__mongos_mode}
  73. return {'mode': self.__mongos_mode, 'tags': self.__tag_sets}
  74. @property
  75. def mode(self):
  76. """The mode of this read preference instance.
  77. """
  78. return self.__mode
  79. @property
  80. def tag_sets(self):
  81. """Set ``tag_sets`` to a list of dictionaries like [{'dc': 'ny'}] to
  82. read only from members whose ``dc`` tag has the value ``"ny"``.
  83. To specify a priority-order for tag sets, provide a list of
  84. tag sets: ``[{'dc': 'ny'}, {'dc': 'la'}, {}]``. A final, empty tag
  85. set, ``{}``, means "read from any member that matches the mode,
  86. ignoring tags." MongoReplicaSetClient tries each set of tags in turn
  87. until it finds a set of tags with at least one matching member.
  88. .. seealso:: `Data-Center Awareness
  89. <http://www.mongodb.org/display/DOCS/Data+Center+Awareness>`_
  90. """
  91. return list(self.__tag_sets) if self.__tag_sets else [{}]
  92. def __repr__(self):
  93. return "%s(tag_sets=%r)" % (
  94. self.name, self.__tag_sets)
  95. def __eq__(self, other):
  96. if isinstance(other, _ServerMode):
  97. return (self.mode == other.mode and
  98. self.tag_sets == other.tag_sets)
  99. raise NotImplementedError
  100. def __ne__(self, other):
  101. return not self == other
  102. def __getstate__(self):
  103. """Return value of object for pickling.
  104. Needed explicitly because __slots__() defined.
  105. """
  106. return {'mode': self.__mode, 'tag_sets': self.__tag_sets}
  107. def __setstate__(self, value):
  108. """Restore from pickling."""
  109. self.__mode = value['mode']
  110. self.__mongos_mode = _MONGOS_MODES[self.__mode]
  111. self.__tag_sets = _validate_tag_sets(value['tag_sets'])
  112. class Primary(_ServerMode):
  113. """Primary read preference.
  114. * When directly connected to one mongod queries are allowed if the server
  115. is standalone or a replica set primary.
  116. * When connected to a mongos queries are sent to the primary of a shard.
  117. * When connected to a replica set queries are sent to the primary of
  118. the replica set.
  119. """
  120. def __init__(self):
  121. super(Primary, self).__init__(_PRIMARY)
  122. def __call__(self, server_descriptions):
  123. """Return matching ServerDescriptions from a list."""
  124. return writable_server_selector(server_descriptions)
  125. def __repr__(self):
  126. return "Primary()"
  127. def __eq__(self, other):
  128. if isinstance(other, _ServerMode):
  129. return other.mode == _PRIMARY
  130. raise NotImplementedError
  131. class PrimaryPreferred(_ServerMode):
  132. """PrimaryPreferred read preference.
  133. * When directly connected to one mongod queries are allowed to standalone
  134. servers, to a replica set primary, or to replica set secondaries.
  135. * When connected to a mongos queries are sent to the primary of a shard if
  136. available, otherwise a shard secondary.
  137. * When connected to a replica set queries are sent to the primary if
  138. available, otherwise a secondary.
  139. :Parameters:
  140. - `tag_sets`: The :attr:`~tag_sets` to use if the primary is not
  141. available.
  142. """
  143. def __init__(self, tag_sets=None):
  144. super(PrimaryPreferred, self).__init__(_PRIMARY_PREFERRED, tag_sets)
  145. def __call__(self, server_descriptions):
  146. """Return matching ServerDescriptions from a list."""
  147. writable_servers = writable_server_selector(server_descriptions)
  148. if writable_servers:
  149. return writable_servers
  150. else:
  151. return secondary_with_tags_server_selector(
  152. self.tag_sets,
  153. server_descriptions)
  154. class Secondary(_ServerMode):
  155. """Secondary read preference.
  156. * When directly connected to one mongod queries are allowed to standalone
  157. servers, to a replica set primary, or to replica set secondaries.
  158. * When connected to a mongos queries are distributed among shard
  159. secondaries. An error is raised if no secondaries are available.
  160. * When connected to a replica set queries are distributed among
  161. secondaries. An error is raised if no secondaries are available.
  162. :Parameters:
  163. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
  164. """
  165. def __init__(self, tag_sets=None):
  166. super(Secondary, self).__init__(_SECONDARY, tag_sets)
  167. def __call__(self, server_descriptions):
  168. """Return matching ServerDescriptions from a list."""
  169. return secondary_with_tags_server_selector(
  170. self.tag_sets,
  171. server_descriptions)
  172. class SecondaryPreferred(_ServerMode):
  173. """SecondaryPreferred read preference.
  174. * When directly connected to one mongod queries are allowed to standalone
  175. servers, to a replica set primary, or to replica set secondaries.
  176. * When connected to a mongos queries are distributed among shard
  177. secondaries, or the shard primary if no secondary is available.
  178. * When connected to a replica set queries are distributed among
  179. secondaries, or the primary if no secondary is available.
  180. :Parameters:
  181. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
  182. """
  183. def __init__(self, tag_sets=None):
  184. super(SecondaryPreferred, self).__init__(_SECONDARY_PREFERRED, tag_sets)
  185. def __call__(self, server_descriptions):
  186. """Return matching ServerDescriptions from a list."""
  187. secondaries = secondary_with_tags_server_selector(
  188. self.tag_sets,
  189. server_descriptions)
  190. if secondaries:
  191. return secondaries
  192. else:
  193. return writable_server_selector(server_descriptions)
  194. class Nearest(_ServerMode):
  195. """Nearest read preference.
  196. * When directly connected to one mongod queries are allowed to standalone
  197. servers, to a replica set primary, or to replica set secondaries.
  198. * When connected to a mongos queries are distributed among all members of
  199. a shard.
  200. * When connected to a replica set queries are distributed among all
  201. members.
  202. :Parameters:
  203. - `tag_sets`: The :attr:`~tag_sets` to use with this read_preference
  204. """
  205. def __init__(self, tag_sets=None):
  206. super(Nearest, self).__init__(_NEAREST, tag_sets)
  207. def __call__(self, server_descriptions):
  208. """Return matching ServerDescriptions from a list."""
  209. return member_with_tags_server_selector(
  210. self.tag_sets or [{}],
  211. server_descriptions)
  212. _ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
  213. Secondary, SecondaryPreferred, Nearest)
  214. def make_read_preference(mode, tag_sets):
  215. if mode == _PRIMARY:
  216. if tag_sets not in (None, [{}]):
  217. raise ConfigurationError("Read preference primary "
  218. "cannot be combined with tags")
  219. return Primary()
  220. return _ALL_READ_PREFERENCES[mode](tag_sets)
  221. _MODES = (
  222. 'PRIMARY',
  223. 'PRIMARY_PREFERRED',
  224. 'SECONDARY',
  225. 'SECONDARY_PREFERRED',
  226. 'NEAREST',
  227. )
  228. class ReadPreference(object):
  229. """An enum that defines the read preference modes supported by PyMongo.
  230. See :doc:`/examples/high_availability` for code examples.
  231. A read preference is used in three cases:
  232. :class:`~pymongo.mongo_client.MongoClient` connected to a single mongod:
  233. - ``PRIMARY``: Queries are allowed if the server is standalone or a replica
  234. set primary.
  235. - All other modes allow queries to standalone servers, to a replica set
  236. primary, or to replica set secondaries.
  237. :class:`~pymongo.mongo_client.MongoClient` initialized with the
  238. ``replicaSet`` option:
  239. - ``PRIMARY``: Read from the primary. This is the default, and provides the
  240. strongest consistency. If no primary is available, raise
  241. :class:`~pymongo.errors.AutoReconnect`.
  242. - ``PRIMARY_PREFERRED``: Read from the primary if available, or if there is
  243. none, read from a secondary.
  244. - ``SECONDARY``: Read from a secondary. If no secondary is available,
  245. raise :class:`~pymongo.errors.AutoReconnect`.
  246. - ``SECONDARY_PREFERRED``: Read from a secondary if available, otherwise
  247. from the primary.
  248. - ``NEAREST``: Read from any member.
  249. :class:`~pymongo.mongo_client.MongoClient` connected to a mongos, with a
  250. sharded cluster of replica sets:
  251. - ``PRIMARY``: Read from the primary of the shard, or raise
  252. :class:`~pymongo.errors.OperationFailure` if there is none.
  253. This is the default.
  254. - ``PRIMARY_PREFERRED``: Read from the primary of the shard, or if there is
  255. none, read from a secondary of the shard.
  256. - ``SECONDARY``: Read from a secondary of the shard, or raise
  257. :class:`~pymongo.errors.OperationFailure` if there is none.
  258. - ``SECONDARY_PREFERRED``: Read from a secondary of the shard if available,
  259. otherwise from the shard primary.
  260. - ``NEAREST``: Read from any shard member.
  261. """
  262. PRIMARY = Primary()
  263. PRIMARY_PREFERRED = PrimaryPreferred()
  264. SECONDARY = Secondary()
  265. SECONDARY_PREFERRED = SecondaryPreferred()
  266. NEAREST = Nearest()
  267. def read_pref_mode_from_name(name):
  268. """Get the read preference mode from mongos/uri name.
  269. """
  270. return _MONGOS_MODES.index(name)
  271. class MovingAverage(object):
  272. """Tracks an exponentially-weighted moving average."""
  273. def __init__(self):
  274. self.average = None
  275. def add_sample(self, sample):
  276. if sample < 0:
  277. # Likely system time change while waiting for ismaster response
  278. # and not using time.monotonic. Ignore it, the next one will
  279. # probably be valid.
  280. return
  281. if self.average is None:
  282. self.average = sample
  283. else:
  284. # The Server Selection Spec requires an exponentially weighted
  285. # average with alpha = 0.2.
  286. self.average = 0.8 * self.average + 0.2 * sample
  287. def get(self):
  288. """Get the calculated average, or None if no samples yet."""
  289. return self.average
  290. def reset(self):
  291. self.average = None