| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- # Copyright 2012-2015 MongoDB, Inc.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Utilities for multi-threading support."""
- import threading
- try:
- from time import monotonic as _time
- except ImportError:
- from time import time as _time
- from pymongo.monotonic import time as _time
- from pymongo.errors import ExceededMaxWaiters
- ### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire
- class Semaphore:
- # After Tim Peters' semaphore class, but not quite the same (no maximum)
- def __init__(self, value=1):
- if value < 0:
- raise ValueError("semaphore initial value must be >= 0")
- self._cond = threading.Condition(threading.Lock())
- self._value = value
- def acquire(self, blocking=True, timeout=None):
- if not blocking and timeout is not None:
- raise ValueError("can't specify timeout for non-blocking acquire")
- rc = False
- endtime = None
- self._cond.acquire()
- while self._value == 0:
- if not blocking:
- break
- if timeout is not None:
- if endtime is None:
- endtime = _time() + timeout
- else:
- timeout = endtime - _time()
- if timeout <= 0:
- break
- self._cond.wait(timeout)
- else:
- self._value = self._value - 1
- rc = True
- self._cond.release()
- return rc
- __enter__ = acquire
- def release(self):
- self._cond.acquire()
- self._value = self._value + 1
- self._cond.notify()
- self._cond.release()
- def __exit__(self, t, v, tb):
- self.release()
- @property
- def counter(self):
- return self._value
- class BoundedSemaphore(Semaphore):
- """Semaphore that checks that # releases is <= # acquires"""
- def __init__(self, value=1):
- Semaphore.__init__(self, value)
- self._initial_value = value
- def release(self):
- if self._value >= self._initial_value:
- raise ValueError("Semaphore released too many times")
- return Semaphore.release(self)
- ### End backport from CPython 3.2
- class DummySemaphore(object):
- def __init__(self, value=None):
- pass
- def acquire(self, blocking=True, timeout=None):
- return True
- def release(self):
- pass
- class MaxWaitersBoundedSemaphore(object):
- def __init__(self, semaphore_class, value=1, max_waiters=1):
- self.waiter_semaphore = semaphore_class(max_waiters)
- self.semaphore = semaphore_class(value)
- def acquire(self, blocking=True, timeout=None):
- if not self.waiter_semaphore.acquire(False):
- raise ExceededMaxWaiters()
- try:
- return self.semaphore.acquire(blocking, timeout)
- finally:
- self.waiter_semaphore.release()
- def __getattr__(self, name):
- return getattr(self.semaphore, name)
- class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore):
- def __init__(self, value=1, max_waiters=1):
- MaxWaitersBoundedSemaphore.__init__(
- self, BoundedSemaphore, value, max_waiters)
- def create_semaphore(max_size, max_waiters):
- if max_size is None:
- return DummySemaphore()
- else:
- if max_waiters is None:
- return BoundedSemaphore(max_size)
- else:
- return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters)
- class Event(object):
- """Copy of standard threading.Event, but uses a custom condition class.
- Allows async frameworks to override monitors' synchronization behavior
- with TopologySettings.condition_class.
- Copied from CPython's threading.py at hash c7960cc9.
- """
- def __init__(self, condition_class):
- self._cond = condition_class(threading.Lock())
- self._flag = False
- def is_set(self):
- """Return true if and only if the internal flag is true."""
- return self._flag
- isSet = is_set
- def set(self):
- """Set the internal flag to true.
- All threads waiting for it to become true are awakened. Threads
- that call wait() once the flag is true will not block at all.
- """
- self._cond.acquire()
- try:
- self._flag = True
- self._cond.notify_all()
- finally:
- self._cond.release()
- def clear(self):
- """Reset the internal flag to false.
- Subsequently, threads calling wait() will block until set() is called to
- set the internal flag to true again.
- """
- self._cond.acquire()
- try:
- self._flag = False
- finally:
- self._cond.release()
- def wait(self, timeout=None):
- """Block until the internal flag is true.
- If the internal flag is true on entry, return immediately. Otherwise,
- block until another thread calls set() to set the flag to true, or until
- the optional timeout occurs.
- When the timeout argument is present and not None, it should be a
- floating point number specifying a timeout for the operation in seconds
- (or fractions thereof).
- This method returns the internal flag on exit, so it will always return
- True except if a timeout is given and the operation times out.
- """
- self._cond.acquire()
- try:
- signaled = self._flag
- if not signaled:
- signaled = self._cond.wait(timeout)
- return signaled
- finally:
- self._cond.release()
|