| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- # Copyright 2015 MongoDB, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you
- # may not use this file except in compliance with the License. You
- # may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied. See the License for the specific language governing
- # permissions and limitations under the License.
- """Tools to monitor driver events.
- Use :func:`register` to register global listeners for specific events.
- Currently only command events are published. Listeners must be
- a subclass of :class:`CommandListener` and implement
- :meth:`~CommandListener.started`, :meth:`~CommandListener.succeeded`, and
- :meth:`~CommandListener.failed`.
- For example, a simple command logger might be implemented like this::
- import logging
- from pymongo import monitoring
- class CommandLogger(monitoring.CommandListener):
- def started(self, event):
- logging.info("Command {0.command_name} with request id "
- "{0.request_id} started on server "
- "{0.connection_id}".format(event))
- def succeeded(self, event):
- logging.info("Command {0.command_name} with request id "
- "{0.request_id} on server {0.connection_id} "
- "succeeded in {0.duration_micros} "
- "microseconds".format(event))
- def failed(self, event):
- logging.info("Command {0.command_name} with request id "
- "{0.request_id} on server {0.connection_id} "
- "failed in {0.duration_micros} "
- "microseconds".format(event))
- monitoring.register(CommandLogger())
- Event listeners can also be registered per instance of
- :class:`~pymongo.mongo_client.MongoClient`::
- client = MongoClient(event_listeners=[CommandLogger()])
- Note that previously registered global listeners are automatically included when
- configuring per client event listeners. Registering a new global listener will
- not add that listener to existing client instances.
- .. note:: Events are delivered **synchronously**. Application threads block
- waiting for event handlers (e.g. :meth:`~CommandListener.started`) to
- return. Care must be taken to ensure that your event handlers are efficient
- enough to not adversely affect overall application performance.
- .. warning:: The command documents published through this API are *not* copies.
- If you intend to modify them in any way you must copy them in your event
- handler first.
- """
- import sys
- import traceback
- from collections import namedtuple, Sequence
- _Listeners = namedtuple('Listeners', ('command_listeners',))
- _LISTENERS = _Listeners([])
- class CommandListener(object):
- """Abstract base class for command listeners."""
- def started(self, event):
- """Abstract method to handle CommandStartedEvent.
- :Parameters:
- - `event`: An instance of :class:`CommandStartedEvent`
- """
- raise NotImplementedError
- def succeeded(self, event):
- """Abstract method to handle CommandSucceededEvent.
- :Parameters:
- - `event`: An instance of :class:`CommandSucceededEvent`
- """
- raise NotImplementedError
- def failed(self, event):
- """Abstract method to handle CommandFailedEvent.
- :Parameters:
- - `event`: An instance of :class:`CommandFailedEvent`
- """
- raise NotImplementedError
- def _to_micros(dur):
- """Convert duration 'dur' to microseconds."""
- if hasattr(dur, 'total_seconds'):
- return int(dur.total_seconds() * 10e5)
- # Python 2.6
- return dur.microseconds + (dur.seconds + dur.days * 24 * 3600) * 1000000
- def _validate_event_listeners(option, listeners):
- """Validate event listeners"""
- if not isinstance(listeners, Sequence):
- raise TypeError("%s must be a list or tuple" % (option,))
- for listener in listeners:
- if not isinstance(listener, CommandListener):
- raise TypeError("Only subclasses of "
- "pymongo.monitoring.CommandListener are supported")
- return listeners
- def register(listener):
- """Register a global event listener.
- :Parameters:
- - `listener`: A subclass of :class:`CommandListener`.
- """
- _validate_event_listeners('listener', [listener])
- _LISTENERS.command_listeners.append(listener)
- def _handle_exception():
- """Print exceptions raised by subscribers to stderr."""
- # Heavily influenced by logging.Handler.handleError.
- # See note here:
- # https://docs.python.org/3.4/library/sys.html#sys.__stderr__
- if sys.stderr:
- einfo = sys.exc_info()
- try:
- traceback.print_exception(einfo[0], einfo[1], einfo[2],
- None, sys.stderr)
- except IOError:
- pass
- finally:
- del einfo
- # Note - to avoid bugs from forgetting which if these is all lowercase and
- # which are camelCase, and at the same time avoid having to add a test for
- # every command, use all lowercase here and test against command_name.lower().
- _SENSITIVE_COMMANDS = set(
- ["authenticate", "saslstart", "saslcontinue", "getnonce", "createuser",
- "updateuser", "copydbgetnonce", "copydbsaslstart", "copydb"])
- class _CommandEvent(object):
- """Base class for command events."""
- __slots__ = ("__cmd_name", "__rqst_id", "__conn_id", "__op_id")
- def __init__(self, command_name, request_id, connection_id, operation_id):
- self.__cmd_name = command_name
- self.__rqst_id = request_id
- self.__conn_id = connection_id
- self.__op_id = operation_id
- @property
- def command_name(self):
- """The command name."""
- return self.__cmd_name
- @property
- def request_id(self):
- """The request id for this operation."""
- return self.__rqst_id
- @property
- def connection_id(self):
- """The address (host, port) of the server this command was sent to."""
- return self.__conn_id
- @property
- def operation_id(self):
- """An id for this series of events or None."""
- return self.__op_id
- class CommandStartedEvent(_CommandEvent):
- """Event published when a command starts.
- :Parameters:
- - `command`: The command document.
- - `database_name`: The name of the database this command was run against.
- - `request_id`: The request id for this operation.
- - `connection_id`: The address (host, port) of the server this command
- was sent to.
- - `operation_id`: An optional identifier for a series of related events.
- """
- __slots__ = ("__cmd", "__db")
- def __init__(self, command, database_name, *args):
- if not command:
- raise ValueError("%r is not a valid command" % (command,))
- # Command name must be first key.
- command_name = next(iter(command))
- super(CommandStartedEvent, self).__init__(command_name, *args)
- if command_name.lower() in _SENSITIVE_COMMANDS:
- self.__cmd = {}
- else:
- self.__cmd = command
- self.__db = database_name
- @property
- def command(self):
- """The command document."""
- return self.__cmd
- @property
- def database_name(self):
- """The name of the database this command was run against."""
- return self.__db
- class CommandSucceededEvent(_CommandEvent):
- """Event published when a command succeeds.
- :Parameters:
- - `duration`: The command duration as a datetime.timedelta.
- - `reply`: The server reply document.
- - `command_name`: The command name.
- - `request_id`: The request id for this operation.
- - `connection_id`: The address (host, port) of the server this command
- was sent to.
- - `operation_id`: An optional identifier for a series of related events.
- """
- __slots__ = ("__duration_micros", "__reply")
- def __init__(self, duration, reply, command_name,
- request_id, connection_id, operation_id):
- super(CommandSucceededEvent, self).__init__(
- command_name, request_id, connection_id, operation_id)
- self.__duration_micros = _to_micros(duration)
- if command_name.lower() in _SENSITIVE_COMMANDS:
- self.__reply = {}
- else:
- self.__reply = reply
- @property
- def duration_micros(self):
- """The duration of this operation in microseconds."""
- return self.__duration_micros
- @property
- def reply(self):
- """The server failure document for this operation."""
- return self.__reply
- class CommandFailedEvent(_CommandEvent):
- """Event published when a command fails.
- :Parameters:
- - `duration`: The command duration as a datetime.timedelta.
- - `failure`: The server reply document.
- - `command_name`: The command name.
- - `request_id`: The request id for this operation.
- - `connection_id`: The address (host, port) of the server this command
- was sent to.
- - `operation_id`: An optional identifier for a series of related events.
- """
- __slots__ = ("__duration_micros", "__failure")
- def __init__(self, duration, failure, *args):
- super(CommandFailedEvent, self).__init__(*args)
- self.__duration_micros = _to_micros(duration)
- self.__failure = failure
- @property
- def duration_micros(self):
- """The duration of this operation in microseconds."""
- return self.__duration_micros
- @property
- def failure(self):
- """The server failure document for this operation."""
- return self.__failure
- class _EventListeners(object):
- """Configure event listeners for a client instance.
- Any event listeners registered globally are included by default.
- :Parameters:
- - `listeners`: A list of event listeners.
- """
- def __init__(self, listeners):
- self.__command_listeners = _LISTENERS.command_listeners[:]
- if listeners is not None:
- self.__command_listeners.extend(listeners)
- self.__enabled_for_commands = bool(self.__command_listeners)
- @property
- def enabled_for_commands(self):
- """Are any CommandListener instances registered?"""
- return self.__enabled_for_commands
- @property
- def event_listeners(self):
- """List of registered event listeners."""
- return self.__command_listeners[:]
- def publish_command_start(self, command, database_name,
- request_id, connection_id, op_id=None):
- """Publish a CommandStartedEvent to all command listeners.
- :Parameters:
- - `command`: The command document.
- - `database_name`: The name of the database this command was run
- against.
- - `request_id`: The request id for this operation.
- - `connection_id`: The address (host, port) of the server this
- command was sent to.
- - `op_id`: The (optional) operation id for this operation.
- """
- if op_id is None:
- op_id = request_id
- event = CommandStartedEvent(
- command, database_name, request_id, connection_id, op_id)
- for subscriber in self.__command_listeners:
- try:
- subscriber.started(event)
- except Exception:
- _handle_exception()
- def publish_command_success(self, duration, reply, command_name,
- request_id, connection_id, op_id=None):
- """Publish a CommandSucceededEvent to all command listeners.
- :Parameters:
- - `duration`: The command duration as a datetime.timedelta.
- - `reply`: The server reply document.
- - `command_name`: The command name.
- - `request_id`: The request id for this operation.
- - `connection_id`: The address (host, port) of the server this
- command was sent to.
- - `op_id`: The (optional) operation id for this operation.
- """
- if op_id is None:
- op_id = request_id
- event = CommandSucceededEvent(
- duration, reply, command_name, request_id, connection_id, op_id)
- for subscriber in self.__command_listeners:
- try:
- subscriber.succeeded(event)
- except Exception:
- _handle_exception()
- def publish_command_failure(self, duration, failure, command_name,
- request_id, connection_id, op_id=None):
- """Publish a CommandFailedEvent to all command listeners.
- :Parameters:
- - `duration`: The command duration as a datetime.timedelta.
- - `failure`: The server reply document or failure description
- document.
- - `command_name`: The command name.
- - `request_id`: The request id for this operation.
- - `connection_id`: The address (host, port) of the server this
- command was sent to.
- - `op_id`: The (optional) operation id for this operation.
- """
- if op_id is None:
- op_id = request_id
- event = CommandFailedEvent(
- duration, failure, command_name, request_id, connection_id, op_id)
- for subscriber in self.__command_listeners:
- try:
- subscriber.failed(event)
- except Exception:
- _handle_exception()
|