monitor.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # Copyright 2014-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Class to monitor a MongoDB server on a background thread."""
  15. import weakref
  16. from bson.codec_options import DEFAULT_CODEC_OPTIONS
  17. from pymongo import common, helpers, message, periodic_executor
  18. from pymongo.server_type import SERVER_TYPE
  19. from pymongo.ismaster import IsMaster
  20. from pymongo.monotonic import time as _time
  21. from pymongo.read_preferences import MovingAverage
  22. from pymongo.server_description import ServerDescription
  23. class Monitor(object):
  24. def __init__(
  25. self,
  26. server_description,
  27. topology,
  28. pool,
  29. topology_settings):
  30. """Class to monitor a MongoDB server on a background thread.
  31. Pass an initial ServerDescription, a Topology, a Pool, and
  32. TopologySettings.
  33. The Topology is weakly referenced. The Pool must be exclusive to this
  34. Monitor.
  35. """
  36. self._server_description = server_description
  37. self._pool = pool
  38. self._settings = topology_settings
  39. self._avg_round_trip_time = MovingAverage()
  40. # We strongly reference the executor and it weakly references us via
  41. # this closure. When the monitor is freed, stop the executor soon.
  42. def target():
  43. monitor = self_ref()
  44. if monitor is None:
  45. return False # Stop the executor.
  46. Monitor._run(monitor)
  47. return True
  48. executor = periodic_executor.PeriodicExecutor(
  49. condition_class=self._settings.condition_class,
  50. interval=common.HEARTBEAT_FREQUENCY,
  51. min_interval=common.MIN_HEARTBEAT_INTERVAL,
  52. target=target)
  53. self._executor = executor
  54. # Avoid cycles. When self or topology is freed, stop executor soon.
  55. self_ref = weakref.ref(self, executor.close)
  56. self._topology = weakref.proxy(topology, executor.close)
  57. def open(self):
  58. """Start monitoring, or restart after a fork.
  59. Multiple calls have no effect.
  60. """
  61. self._executor.open()
  62. def close(self):
  63. """Close and stop monitoring.
  64. open() restarts the monitor after closing.
  65. """
  66. self._executor.close()
  67. # Increment the pool_id and maybe close the socket. If the executor
  68. # thread has the socket checked out, it will be closed when checked in.
  69. self._pool.reset()
  70. def join(self, timeout=None):
  71. self._executor.join(timeout)
  72. def request_check(self):
  73. """If the monitor is sleeping, wake and check the server soon."""
  74. self._executor.wake()
  75. def _run(self):
  76. try:
  77. self._server_description = self._check_with_retry()
  78. self._topology.on_change(self._server_description)
  79. except ReferenceError:
  80. # Topology was garbage-collected.
  81. self.close()
  82. def _check_with_retry(self):
  83. """Call ismaster once or twice. Reset server's pool on error.
  84. Returns a ServerDescription.
  85. """
  86. # According to the spec, if an ismaster call fails we reset the
  87. # server's pool. If a server was once connected, change its type
  88. # to Unknown only after retrying once.
  89. address = self._server_description.address
  90. retry = self._server_description.server_type != SERVER_TYPE.Unknown
  91. try:
  92. return self._check_once()
  93. except ReferenceError:
  94. raise
  95. except Exception as error:
  96. self._topology.reset_pool(address)
  97. default = ServerDescription(address, error=error)
  98. if not retry:
  99. self._avg_round_trip_time.reset()
  100. # Server type defaults to Unknown.
  101. return default
  102. # Try a second and final time. If it fails return original error.
  103. try:
  104. return self._check_once()
  105. except ReferenceError:
  106. raise
  107. except Exception:
  108. self._avg_round_trip_time.reset()
  109. return default
  110. def _check_once(self):
  111. """A single attempt to call ismaster.
  112. Returns a ServerDescription, or raises an exception.
  113. """
  114. with self._pool.get_socket({}) as sock_info:
  115. response, round_trip_time = self._check_with_socket(sock_info)
  116. self._avg_round_trip_time.add_sample(round_trip_time)
  117. sd = ServerDescription(
  118. address=self._server_description.address,
  119. ismaster=response,
  120. round_trip_time=self._avg_round_trip_time.get())
  121. return sd
  122. def _check_with_socket(self, sock_info):
  123. """Return (IsMaster, round_trip_time).
  124. Can raise ConnectionFailure or OperationFailure.
  125. """
  126. start = _time()
  127. request_id, msg, max_doc_size = message.query(
  128. 0, 'admin.$cmd', 0, -1, {'ismaster': 1},
  129. None, DEFAULT_CODEC_OPTIONS)
  130. # TODO: use sock_info.command()
  131. sock_info.send_message(msg, max_doc_size)
  132. raw_response = sock_info.receive_message(1, request_id)
  133. result = helpers._unpack_response(raw_response)
  134. return IsMaster(result['data'][0]), _time() - start