monitoring.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. # Copyright 2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you
  4. # may not use this file except in compliance with the License. You
  5. # may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied. See the License for the specific language governing
  13. # permissions and limitations under the License.
  14. """Tools to monitor driver events.
  15. Use :func:`register` to register global listeners for specific events.
  16. Currently only command events are published. Listeners must be
  17. a subclass of :class:`CommandListener` and implement
  18. :meth:`~CommandListener.started`, :meth:`~CommandListener.succeeded`, and
  19. :meth:`~CommandListener.failed`.
  20. For example, a simple command logger might be implemented like this::
  21. import logging
  22. from pymongo import monitoring
  23. class CommandLogger(monitoring.CommandListener):
  24. def started(self, event):
  25. logging.info("Command {0.command_name} with request id "
  26. "{0.request_id} started on server "
  27. "{0.connection_id}".format(event))
  28. def succeeded(self, event):
  29. logging.info("Command {0.command_name} with request id "
  30. "{0.request_id} on server {0.connection_id} "
  31. "succeeded in {0.duration_micros} "
  32. "microseconds".format(event))
  33. def failed(self, event):
  34. logging.info("Command {0.command_name} with request id "
  35. "{0.request_id} on server {0.connection_id} "
  36. "failed in {0.duration_micros} "
  37. "microseconds".format(event))
  38. monitoring.register(CommandLogger())
  39. Event listeners can also be registered per instance of
  40. :class:`~pymongo.mongo_client.MongoClient`::
  41. client = MongoClient(event_listeners=[CommandLogger()])
  42. Note that previously registered global listeners are automatically included when
  43. configuring per client event listeners. Registering a new global listener will
  44. not add that listener to existing client instances.
  45. .. note:: Events are delivered **synchronously**. Application threads block
  46. waiting for event handlers (e.g. :meth:`~CommandListener.started`) to
  47. return. Care must be taken to ensure that your event handlers are efficient
  48. enough to not adversely affect overall application performance.
  49. .. warning:: The command documents published through this API are *not* copies.
  50. If you intend to modify them in any way you must copy them in your event
  51. handler first.
  52. """
  53. import sys
  54. import traceback
  55. from collections import namedtuple, Sequence
  56. _Listeners = namedtuple('Listeners', ('command_listeners',))
  57. _LISTENERS = _Listeners([])
  58. class CommandListener(object):
  59. """Abstract base class for command listeners."""
  60. def started(self, event):
  61. """Abstract method to handle CommandStartedEvent.
  62. :Parameters:
  63. - `event`: An instance of :class:`CommandStartedEvent`
  64. """
  65. raise NotImplementedError
  66. def succeeded(self, event):
  67. """Abstract method to handle CommandSucceededEvent.
  68. :Parameters:
  69. - `event`: An instance of :class:`CommandSucceededEvent`
  70. """
  71. raise NotImplementedError
  72. def failed(self, event):
  73. """Abstract method to handle CommandFailedEvent.
  74. :Parameters:
  75. - `event`: An instance of :class:`CommandFailedEvent`
  76. """
  77. raise NotImplementedError
  78. def _to_micros(dur):
  79. """Convert duration 'dur' to microseconds."""
  80. if hasattr(dur, 'total_seconds'):
  81. return int(dur.total_seconds() * 10e5)
  82. # Python 2.6
  83. return dur.microseconds + (dur.seconds + dur.days * 24 * 3600) * 1000000
  84. def _validate_event_listeners(option, listeners):
  85. """Validate event listeners"""
  86. if not isinstance(listeners, Sequence):
  87. raise TypeError("%s must be a list or tuple" % (option,))
  88. for listener in listeners:
  89. if not isinstance(listener, CommandListener):
  90. raise TypeError("Only subclasses of "
  91. "pymongo.monitoring.CommandListener are supported")
  92. return listeners
  93. def register(listener):
  94. """Register a global event listener.
  95. :Parameters:
  96. - `listener`: A subclass of :class:`CommandListener`.
  97. """
  98. _validate_event_listeners('listener', [listener])
  99. _LISTENERS.command_listeners.append(listener)
  100. def _handle_exception():
  101. """Print exceptions raised by subscribers to stderr."""
  102. # Heavily influenced by logging.Handler.handleError.
  103. # See note here:
  104. # https://docs.python.org/3.4/library/sys.html#sys.__stderr__
  105. if sys.stderr:
  106. einfo = sys.exc_info()
  107. try:
  108. traceback.print_exception(einfo[0], einfo[1], einfo[2],
  109. None, sys.stderr)
  110. except IOError:
  111. pass
  112. finally:
  113. del einfo
  114. # Note - to avoid bugs from forgetting which if these is all lowercase and
  115. # which are camelCase, and at the same time avoid having to add a test for
  116. # every command, use all lowercase here and test against command_name.lower().
  117. _SENSITIVE_COMMANDS = set(
  118. ["authenticate", "saslstart", "saslcontinue", "getnonce", "createuser",
  119. "updateuser", "copydbgetnonce", "copydbsaslstart", "copydb"])
  120. class _CommandEvent(object):
  121. """Base class for command events."""
  122. __slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id")
  123. def __init__(self, command_name, request_id, connection_id, operation_id):
  124. self.__cmd_name = command_name
  125. self.__rqst_id = request_id
  126. self.__conn_id = connection_id
  127. self.__op_id = operation_id
  128. @property
  129. def command_name(self):
  130. """The command name."""
  131. return self.__cmd_name
  132. @property
  133. def request_id(self):
  134. """The request id for this operation."""
  135. return self.__rqst_id
  136. @property
  137. def connection_id(self):
  138. """The address (host, port) of the server this command was sent to."""
  139. return self.__conn_id
  140. @property
  141. def operation_id(self):
  142. """An id for this series of events or None."""
  143. return self.__op_id
  144. class CommandStartedEvent(_CommandEvent):
  145. """Event published when a command starts.
  146. :Parameters:
  147. - `command`: The command document.
  148. - `database_name`: The name of the database this command was run against.
  149. - `request_id`: The request id for this operation.
  150. - `connection_id`: The address (host, port) of the server this command
  151. was sent to.
  152. - `operation_id`: An optional identifier for a series of related events.
  153. """
  154. __slots__ = ("__cmd", "__db")
  155. def __init__(self, command, database_name, *args):
  156. if not command:
  157. raise ValueError("%r is not a valid command" % (command,))
  158. # Command name must be first key.
  159. command_name = next(iter(command))
  160. super(CommandStartedEvent, self).__init__(command_name, *args)
  161. if command_name.lower() in _SENSITIVE_COMMANDS:
  162. self.__cmd = {}
  163. else:
  164. self.__cmd = command
  165. self.__db = database_name
  166. @property
  167. def command(self):
  168. """The command document."""
  169. return self.__cmd
  170. @property
  171. def database_name(self):
  172. """The name of the database this command was run against."""
  173. return self.__db
  174. class CommandSucceededEvent(_CommandEvent):
  175. """Event published when a command succeeds.
  176. :Parameters:
  177. - `duration`: The command duration as a datetime.timedelta.
  178. - `reply`: The server reply document.
  179. - `command_name`: The command name.
  180. - `request_id`: The request id for this operation.
  181. - `connection_id`: The address (host, port) of the server this command
  182. was sent to.
  183. - `operation_id`: An optional identifier for a series of related events.
  184. """
  185. __slots__ = ("__duration_micros", "__reply")
  186. def __init__(self, duration, reply, command_name,
  187. request_id, connection_id, operation_id):
  188. super(CommandSucceededEvent, self).__init__(
  189. command_name, request_id, connection_id, operation_id)
  190. self.__duration_micros = _to_micros(duration)
  191. if command_name.lower() in _SENSITIVE_COMMANDS:
  192. self.__reply = {}
  193. else:
  194. self.__reply = reply
  195. @property
  196. def duration_micros(self):
  197. """The duration of this operation in microseconds."""
  198. return self.__duration_micros
  199. @property
  200. def reply(self):
  201. """The server failure document for this operation."""
  202. return self.__reply
  203. class CommandFailedEvent(_CommandEvent):
  204. """Event published when a command fails.
  205. :Parameters:
  206. - `duration`: The command duration as a datetime.timedelta.
  207. - `failure`: The server reply document.
  208. - `command_name`: The command name.
  209. - `request_id`: The request id for this operation.
  210. - `connection_id`: The address (host, port) of the server this command
  211. was sent to.
  212. - `operation_id`: An optional identifier for a series of related events.
  213. """
  214. __slots__ = ("__duration_micros", "__failure")
  215. def __init__(self, duration, failure, *args):
  216. super(CommandFailedEvent, self).__init__(*args)
  217. self.__duration_micros = _to_micros(duration)
  218. self.__failure = failure
  219. @property
  220. def duration_micros(self):
  221. """The duration of this operation in microseconds."""
  222. return self.__duration_micros
  223. @property
  224. def failure(self):
  225. """The server failure document for this operation."""
  226. return self.__failure
  227. class _EventListeners(object):
  228. """Configure event listeners for a client instance.
  229. Any event listeners registered globally are included by default.
  230. :Parameters:
  231. - `listeners`: A list of event listeners.
  232. """
  233. def __init__(self, listeners):
  234. self.__command_listeners = _LISTENERS.command_listeners[:]
  235. if listeners is not None:
  236. self.__command_listeners.extend(listeners)
  237. self.__enabled_for_commands = bool(self.__command_listeners)
  238. @property
  239. def enabled_for_commands(self):
  240. """Are any CommandListener instances registered?"""
  241. return self.__enabled_for_commands
  242. @property
  243. def event_listeners(self):
  244. """List of registered event listeners."""
  245. return self.__command_listeners[:]
  246. def publish_command_start(self, command, database_name,
  247. request_id, connection_id, op_id=None):
  248. """Publish a CommandStartedEvent to all command listeners.
  249. :Parameters:
  250. - `command`: The command document.
  251. - `database_name`: The name of the database this command was run
  252. against.
  253. - `request_id`: The request id for this operation.
  254. - `connection_id`: The address (host, port) of the server this
  255. command was sent to.
  256. - `op_id`: The (optional) operation id for this operation.
  257. """
  258. if op_id is None:
  259. op_id = request_id
  260. event = CommandStartedEvent(
  261. command, database_name, request_id, connection_id, op_id)
  262. for subscriber in self.__command_listeners:
  263. try:
  264. subscriber.started(event)
  265. except Exception:
  266. _handle_exception()
  267. def publish_command_success(self, duration, reply, command_name,
  268. request_id, connection_id, op_id=None):
  269. """Publish a CommandSucceededEvent to all command listeners.
  270. :Parameters:
  271. - `duration`: The command duration as a datetime.timedelta.
  272. - `reply`: The server reply document.
  273. - `command_name`: The command name.
  274. - `request_id`: The request id for this operation.
  275. - `connection_id`: The address (host, port) of the server this
  276. command was sent to.
  277. - `op_id`: The (optional) operation id for this operation.
  278. """
  279. if op_id is None:
  280. op_id = request_id
  281. event = CommandSucceededEvent(
  282. duration, reply, command_name, request_id, connection_id, op_id)
  283. for subscriber in self.__command_listeners:
  284. try:
  285. subscriber.succeeded(event)
  286. except Exception:
  287. _handle_exception()
  288. def publish_command_failure(self, duration, failure, command_name,
  289. request_id, connection_id, op_id=None):
  290. """Publish a CommandFailedEvent to all command listeners.
  291. :Parameters:
  292. - `duration`: The command duration as a datetime.timedelta.
  293. - `failure`: The server reply document or failure description
  294. document.
  295. - `command_name`: The command name.
  296. - `request_id`: The request id for this operation.
  297. - `connection_id`: The address (host, port) of the server this
  298. command was sent to.
  299. - `op_id`: The (optional) operation id for this operation.
  300. """
  301. if op_id is None:
  302. op_id = request_id
  303. event = CommandFailedEvent(
  304. duration, failure, command_name, request_id, connection_id, op_id)
  305. for subscriber in self.__command_listeners:
  306. try:
  307. subscriber.failed(event)
  308. except Exception:
  309. _handle_exception()