thread_util.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # Copyright 2012-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Utilities for multi-threading support."""
  15. import threading
  16. try:
  17. from time import monotonic as _time
  18. except ImportError:
  19. from time import time as _time
  20. from pymongo.monotonic import time as _time
  21. from pymongo.errors import ExceededMaxWaiters
  22. ### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire
  23. class Semaphore:
  24. # After Tim Peters' semaphore class, but not quite the same (no maximum)
  25. def __init__(self, value=1):
  26. if value < 0:
  27. raise ValueError("semaphore initial value must be >= 0")
  28. self._cond = threading.Condition(threading.Lock())
  29. self._value = value
  30. def acquire(self, blocking=True, timeout=None):
  31. if not blocking and timeout is not None:
  32. raise ValueError("can't specify timeout for non-blocking acquire")
  33. rc = False
  34. endtime = None
  35. self._cond.acquire()
  36. while self._value == 0:
  37. if not blocking:
  38. break
  39. if timeout is not None:
  40. if endtime is None:
  41. endtime = _time() + timeout
  42. else:
  43. timeout = endtime - _time()
  44. if timeout <= 0:
  45. break
  46. self._cond.wait(timeout)
  47. else:
  48. self._value = self._value - 1
  49. rc = True
  50. self._cond.release()
  51. return rc
  52. __enter__ = acquire
  53. def release(self):
  54. self._cond.acquire()
  55. self._value = self._value + 1
  56. self._cond.notify()
  57. self._cond.release()
  58. def __exit__(self, t, v, tb):
  59. self.release()
  60. @property
  61. def counter(self):
  62. return self._value
  63. class BoundedSemaphore(Semaphore):
  64. """Semaphore that checks that # releases is <= # acquires"""
  65. def __init__(self, value=1):
  66. Semaphore.__init__(self, value)
  67. self._initial_value = value
  68. def release(self):
  69. if self._value >= self._initial_value:
  70. raise ValueError("Semaphore released too many times")
  71. return Semaphore.release(self)
  72. ### End backport from CPython 3.2
  73. class DummySemaphore(object):
  74. def __init__(self, value=None):
  75. pass
  76. def acquire(self, blocking=True, timeout=None):
  77. return True
  78. def release(self):
  79. pass
  80. class MaxWaitersBoundedSemaphore(object):
  81. def __init__(self, semaphore_class, value=1, max_waiters=1):
  82. self.waiter_semaphore = semaphore_class(max_waiters)
  83. self.semaphore = semaphore_class(value)
  84. def acquire(self, blocking=True, timeout=None):
  85. if not self.waiter_semaphore.acquire(False):
  86. raise ExceededMaxWaiters()
  87. try:
  88. return self.semaphore.acquire(blocking, timeout)
  89. finally:
  90. self.waiter_semaphore.release()
  91. def __getattr__(self, name):
  92. return getattr(self.semaphore, name)
  93. class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore):
  94. def __init__(self, value=1, max_waiters=1):
  95. MaxWaitersBoundedSemaphore.__init__(
  96. self, BoundedSemaphore, value, max_waiters)
  97. def create_semaphore(max_size, max_waiters):
  98. if max_size is None:
  99. return DummySemaphore()
  100. else:
  101. if max_waiters is None:
  102. return BoundedSemaphore(max_size)
  103. else:
  104. return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters)
  105. class Event(object):
  106. """Copy of standard threading.Event, but uses a custom condition class.
  107. Allows async frameworks to override monitors' synchronization behavior
  108. with TopologySettings.condition_class.
  109. Copied from CPython's threading.py at hash c7960cc9.
  110. """
  111. def __init__(self, condition_class):
  112. self._cond = condition_class(threading.Lock())
  113. self._flag = False
  114. def is_set(self):
  115. """Return true if and only if the internal flag is true."""
  116. return self._flag
  117. isSet = is_set
  118. def set(self):
  119. """Set the internal flag to true.
  120. All threads waiting for it to become true are awakened. Threads
  121. that call wait() once the flag is true will not block at all.
  122. """
  123. self._cond.acquire()
  124. try:
  125. self._flag = True
  126. self._cond.notify_all()
  127. finally:
  128. self._cond.release()
  129. def clear(self):
  130. """Reset the internal flag to false.
  131. Subsequently, threads calling wait() will block until set() is called to
  132. set the internal flag to true again.
  133. """
  134. self._cond.acquire()
  135. try:
  136. self._flag = False
  137. finally:
  138. self._cond.release()
  139. def wait(self, timeout=None):
  140. """Block until the internal flag is true.
  141. If the internal flag is true on entry, return immediately. Otherwise,
  142. block until another thread calls set() to set the flag to true, or until
  143. the optional timeout occurs.
  144. When the timeout argument is present and not None, it should be a
  145. floating point number specifying a timeout for the operation in seconds
  146. (or fractions thereof).
  147. This method returns the internal flag on exit, so it will always return
  148. True except if a timeout is given and the operation times out.
  149. """
  150. self._cond.acquire()
  151. try:
  152. signaled = self._flag
  153. if not signaled:
  154. signaled = self._cond.wait(timeout)
  155. return signaled
  156. finally:
  157. self._cond.release()