topology_description.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. # Copyright 2014-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Represent the topology of servers."""
  15. from collections import namedtuple
  16. from pymongo import common
  17. from pymongo.server_type import SERVER_TYPE
  18. from pymongo.errors import ConfigurationError
  19. from pymongo.server_description import ServerDescription
  20. TOPOLOGY_TYPE = namedtuple('TopologyType', ['Single', 'ReplicaSetNoPrimary',
  21. 'ReplicaSetWithPrimary', 'Sharded',
  22. 'Unknown'])(*range(5))
  23. class TopologyDescription(object):
  24. def __init__(
  25. self,
  26. topology_type,
  27. server_descriptions,
  28. replica_set_name,
  29. max_election_id):
  30. """Represent a topology of servers.
  31. :Parameters:
  32. - `topology_type`: initial type
  33. - `server_descriptions`: dict of (address, ServerDescription) for
  34. all seeds
  35. - `replica_set_name`: replica set name or None
  36. - `max_election_id`: greatest electionId seen from a primary, or None
  37. """
  38. self._topology_type = topology_type
  39. self._replica_set_name = replica_set_name
  40. self._server_descriptions = server_descriptions
  41. self._max_election_id = max_election_id
  42. # Is PyMongo compatible with all servers' wire protocols?
  43. self._incompatible_err = None
  44. for s in self._server_descriptions.values():
  45. # s.min/max_wire_version is the server's wire protocol.
  46. # MIN/MAX_SUPPORTED_WIRE_VERSION is what PyMongo supports.
  47. server_too_new = (
  48. # Server too new.
  49. s.min_wire_version is not None
  50. and s.min_wire_version > common.MAX_SUPPORTED_WIRE_VERSION)
  51. server_too_old = (
  52. # Server too old.
  53. s.max_wire_version is not None
  54. and s.max_wire_version < common.MIN_SUPPORTED_WIRE_VERSION)
  55. if server_too_new or server_too_old:
  56. self._incompatible_err = (
  57. "Server at %s:%d "
  58. "uses wire protocol versions %d through %d, "
  59. "but PyMongo only supports %d through %d"
  60. % (s.address[0], s.address[1],
  61. s.min_wire_version, s.max_wire_version,
  62. common.MIN_SUPPORTED_WIRE_VERSION,
  63. common.MAX_SUPPORTED_WIRE_VERSION))
  64. break
  65. def check_compatible(self):
  66. """Raise ConfigurationError if any server is incompatible.
  67. A server is incompatible if its wire protocol version range does not
  68. overlap with PyMongo's.
  69. """
  70. if self._incompatible_err:
  71. raise ConfigurationError(self._incompatible_err)
  72. def has_server(self, address):
  73. return address in self._server_descriptions
  74. def reset_server(self, address):
  75. """A copy of this description, with one server marked Unknown."""
  76. return updated_topology_description(self, ServerDescription(address))
  77. def reset(self):
  78. """A copy of this description, with all servers marked Unknown."""
  79. if self._topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
  80. topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
  81. else:
  82. topology_type = self._topology_type
  83. # The default ServerDescription's type is Unknown.
  84. sds = dict((address, ServerDescription(address))
  85. for address in self._server_descriptions)
  86. return TopologyDescription(
  87. topology_type,
  88. sds,
  89. self._replica_set_name,
  90. self._max_election_id)
  91. def server_descriptions(self):
  92. """Dict of (address, ServerDescription)."""
  93. return self._server_descriptions.copy()
  94. @property
  95. def topology_type(self):
  96. return self._topology_type
  97. @property
  98. def replica_set_name(self):
  99. """The replica set name."""
  100. return self._replica_set_name
  101. @property
  102. def max_election_id(self):
  103. """Greatest electionId seen from a primary, or None."""
  104. return self._max_election_id
  105. @property
  106. def known_servers(self):
  107. """List of Servers of types besides Unknown."""
  108. return [s for s in self._server_descriptions.values()
  109. if s.is_server_type_known]
  110. # If topology type is Unknown and we receive an ismaster response, what should
  111. # the new topology type be?
  112. _SERVER_TYPE_TO_TOPOLOGY_TYPE = {
  113. SERVER_TYPE.Mongos: TOPOLOGY_TYPE.Sharded,
  114. SERVER_TYPE.RSPrimary: TOPOLOGY_TYPE.ReplicaSetWithPrimary,
  115. SERVER_TYPE.RSSecondary: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
  116. SERVER_TYPE.RSArbiter: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
  117. SERVER_TYPE.RSOther: TOPOLOGY_TYPE.ReplicaSetNoPrimary,
  118. }
  119. def updated_topology_description(topology_description, server_description):
  120. """Return an updated copy of a TopologyDescription.
  121. :Parameters:
  122. - `topology_description`: the current TopologyDescription
  123. - `server_description`: a new ServerDescription that resulted from
  124. an ismaster call
  125. Called after attempting (successfully or not) to call ismaster on the
  126. server at server_description.address. Does not modify topology_description.
  127. """
  128. address = server_description.address
  129. # These values will be updated, if necessary, to form the new
  130. # TopologyDescription.
  131. topology_type = topology_description.topology_type
  132. set_name = topology_description.replica_set_name
  133. max_election_id = topology_description.max_election_id
  134. server_type = server_description.server_type
  135. # Don't mutate the original dict of server descriptions; copy it.
  136. sds = topology_description.server_descriptions()
  137. # Replace this server's description with the new one.
  138. sds[address] = server_description
  139. if topology_type == TOPOLOGY_TYPE.Single:
  140. # Single type never changes.
  141. return TopologyDescription(
  142. TOPOLOGY_TYPE.Single,
  143. sds,
  144. set_name,
  145. max_election_id)
  146. if topology_type == TOPOLOGY_TYPE.Unknown:
  147. if server_type == SERVER_TYPE.Standalone:
  148. sds.pop(address)
  149. elif server_type not in (SERVER_TYPE.Unknown, SERVER_TYPE.RSGhost):
  150. topology_type = _SERVER_TYPE_TO_TOPOLOGY_TYPE[server_type]
  151. if topology_type == TOPOLOGY_TYPE.Sharded:
  152. if server_type not in (SERVER_TYPE.Mongos, SERVER_TYPE.Unknown):
  153. sds.pop(address)
  154. elif topology_type == TOPOLOGY_TYPE.ReplicaSetNoPrimary:
  155. if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
  156. sds.pop(address)
  157. elif server_type == SERVER_TYPE.RSPrimary:
  158. topology_type, set_name, max_election_id = _update_rs_from_primary(
  159. sds, set_name, server_description, max_election_id)
  160. elif server_type in (
  161. SERVER_TYPE.RSSecondary,
  162. SERVER_TYPE.RSArbiter,
  163. SERVER_TYPE.RSOther):
  164. topology_type, set_name = _update_rs_no_primary_from_member(
  165. sds, set_name, server_description)
  166. elif topology_type == TOPOLOGY_TYPE.ReplicaSetWithPrimary:
  167. if server_type in (SERVER_TYPE.Standalone, SERVER_TYPE.Mongos):
  168. sds.pop(address)
  169. topology_type = _check_has_primary(sds)
  170. elif server_type == SERVER_TYPE.RSPrimary:
  171. topology_type, set_name, max_election_id = _update_rs_from_primary(
  172. sds, set_name, server_description, max_election_id)
  173. elif server_type in (
  174. SERVER_TYPE.RSSecondary,
  175. SERVER_TYPE.RSArbiter,
  176. SERVER_TYPE.RSOther):
  177. topology_type = _update_rs_with_primary_from_member(
  178. sds, set_name, server_description)
  179. else:
  180. # Server type is Unknown or RSGhost: did we just lose the primary?
  181. topology_type = _check_has_primary(sds)
  182. # Return updated copy.
  183. return TopologyDescription(topology_type, sds, set_name, max_election_id)
  184. def _update_rs_from_primary(
  185. sds,
  186. replica_set_name,
  187. server_description,
  188. max_election_id):
  189. """Update topology description from a primary's ismaster response.
  190. Pass in a dict of ServerDescriptions, current replica set name, the
  191. ServerDescription we are processing, and the TopologyDescription's
  192. max_election_id if any.
  193. Returns (new topology type, new replica_set_name, new max_election_id).
  194. """
  195. if replica_set_name is None:
  196. replica_set_name = server_description.replica_set_name
  197. elif replica_set_name != server_description.replica_set_name:
  198. # We found a primary but it doesn't have the replica_set_name
  199. # provided by the user.
  200. sds.pop(server_description.address)
  201. return _check_has_primary(sds), replica_set_name, max_election_id
  202. if server_description.election_id is not None:
  203. if max_election_id and max_election_id > server_description.election_id:
  204. # Stale primary, set to type Unknown.
  205. address = server_description.address
  206. sds[address] = ServerDescription(address)
  207. return _check_has_primary(sds), replica_set_name, max_election_id
  208. max_election_id = server_description.election_id
  209. # We've heard from the primary. Is it the same primary as before?
  210. for server in sds.values():
  211. if (server.server_type is SERVER_TYPE.RSPrimary
  212. and server.address != server_description.address):
  213. # Reset old primary's type to Unknown.
  214. sds[server.address] = ServerDescription(server.address)
  215. # There can be only one prior primary.
  216. break
  217. # Discover new hosts from this primary's response.
  218. for new_address in server_description.all_hosts:
  219. if new_address not in sds:
  220. sds[new_address] = ServerDescription(new_address)
  221. # Remove hosts not in the response.
  222. for addr in set(sds) - server_description.all_hosts:
  223. sds.pop(addr)
  224. # If the host list differs from the seed list, we may not have a primary
  225. # after all.
  226. return _check_has_primary(sds), replica_set_name, max_election_id
  227. def _update_rs_with_primary_from_member(
  228. sds,
  229. replica_set_name,
  230. server_description):
  231. """RS with known primary. Process a response from a non-primary.
  232. Pass in a dict of ServerDescriptions, current replica set name, and the
  233. ServerDescription we are processing.
  234. Returns new topology type.
  235. """
  236. assert replica_set_name is not None
  237. if replica_set_name != server_description.replica_set_name:
  238. sds.pop(server_description.address)
  239. # Had this member been the primary?
  240. return _check_has_primary(sds)
  241. def _update_rs_no_primary_from_member(
  242. sds,
  243. replica_set_name,
  244. server_description):
  245. """RS without known primary. Update from a non-primary's response.
  246. Pass in a dict of ServerDescriptions, current replica set name, and the
  247. ServerDescription we are processing.
  248. Returns (new topology type, new replica_set_name).
  249. """
  250. topology_type = TOPOLOGY_TYPE.ReplicaSetNoPrimary
  251. if replica_set_name is None:
  252. replica_set_name = server_description.replica_set_name
  253. elif replica_set_name != server_description.replica_set_name:
  254. sds.pop(server_description.address)
  255. return topology_type, replica_set_name
  256. # This isn't the primary's response, so don't remove any servers
  257. # it doesn't report. Only add new servers.
  258. for address in server_description.all_hosts:
  259. if address not in sds:
  260. sds[address] = ServerDescription(address)
  261. return topology_type, replica_set_name
  262. def _check_has_primary(sds):
  263. """Current topology type is ReplicaSetWithPrimary. Is primary still known?
  264. Pass in a dict of ServerDescriptions.
  265. Returns new topology type.
  266. """
  267. for s in sds.values():
  268. if s.server_type == SERVER_TYPE.RSPrimary:
  269. return TOPOLOGY_TYPE.ReplicaSetWithPrimary
  270. else:
  271. return TOPOLOGY_TYPE.ReplicaSetNoPrimary