synchronize.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #
  2. # Module implementing synchronization primitives
  3. #
  4. # multiprocessing/synchronize.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = [
  10. 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
  11. ]
  12. import threading
  13. import sys
  14. import tempfile
  15. import _multiprocessing
  16. import time
  17. from . import context
  18. from . import process
  19. from . import util
  20. # Try to import the mp.synchronize module cleanly, if it fails
  21. # raise ImportError for platforms lacking a working sem_open implementation.
  22. # See issue 3770
  23. try:
  24. from _multiprocessing import SemLock, sem_unlink
  25. except (ImportError):
  26. raise ImportError("This platform lacks a functioning sem_open" +
  27. " implementation, therefore, the required" +
  28. " synchronization primitives needed will not" +
  29. " function, see issue 3770.")
  30. #
  31. # Constants
  32. #
  33. RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
  34. SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
  35. #
  36. # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
  37. #
  38. class SemLock(object):
  39. _rand = tempfile._RandomNameSequence()
  40. def __init__(self, kind, value, maxvalue, *, ctx):
  41. if ctx is None:
  42. ctx = context._default_context.get_context()
  43. name = ctx.get_start_method()
  44. unlink_now = sys.platform == 'win32' or name == 'fork'
  45. for i in range(100):
  46. try:
  47. sl = self._semlock = _multiprocessing.SemLock(
  48. kind, value, maxvalue, self._make_name(),
  49. unlink_now)
  50. except FileExistsError:
  51. pass
  52. else:
  53. break
  54. else:
  55. raise FileExistsError('cannot find name for semaphore')
  56. util.debug('created semlock with handle %s' % sl.handle)
  57. self._make_methods()
  58. if sys.platform != 'win32':
  59. def _after_fork(obj):
  60. obj._semlock._after_fork()
  61. util.register_after_fork(self, _after_fork)
  62. if self._semlock.name is not None:
  63. # We only get here if we are on Unix with forking
  64. # disabled. When the object is garbage collected or the
  65. # process shuts down we unlink the semaphore name
  66. from .resource_tracker import register
  67. register(self._semlock.name, "semaphore")
  68. util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
  69. exitpriority=0)
  70. @staticmethod
  71. def _cleanup(name):
  72. from .resource_tracker import unregister
  73. sem_unlink(name)
  74. unregister(name, "semaphore")
  75. def _make_methods(self):
  76. self.acquire = self._semlock.acquire
  77. self.release = self._semlock.release
  78. def __enter__(self):
  79. return self._semlock.__enter__()
  80. def __exit__(self, *args):
  81. return self._semlock.__exit__(*args)
  82. def __getstate__(self):
  83. context.assert_spawning(self)
  84. sl = self._semlock
  85. if sys.platform == 'win32':
  86. h = context.get_spawning_popen().duplicate_for_child(sl.handle)
  87. else:
  88. h = sl.handle
  89. return (h, sl.kind, sl.maxvalue, sl.name)
  90. def __setstate__(self, state):
  91. self._semlock = _multiprocessing.SemLock._rebuild(*state)
  92. util.debug('recreated blocker with handle %r' % state[0])
  93. self._make_methods()
  94. @staticmethod
  95. def _make_name():
  96. return '%s-%s' % (process.current_process()._config['semprefix'],
  97. next(SemLock._rand))
  98. #
  99. # Semaphore
  100. #
  101. class Semaphore(SemLock):
  102. def __init__(self, value=1, *, ctx):
  103. SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
  104. def get_value(self):
  105. return self._semlock._get_value()
  106. def __repr__(self):
  107. try:
  108. value = self._semlock._get_value()
  109. except Exception:
  110. value = 'unknown'
  111. return '<%s(value=%s)>' % (self.__class__.__name__, value)
  112. #
  113. # Bounded semaphore
  114. #
  115. class BoundedSemaphore(Semaphore):
  116. def __init__(self, value=1, *, ctx):
  117. SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
  118. def __repr__(self):
  119. try:
  120. value = self._semlock._get_value()
  121. except Exception:
  122. value = 'unknown'
  123. return '<%s(value=%s, maxvalue=%s)>' % \
  124. (self.__class__.__name__, value, self._semlock.maxvalue)
  125. #
  126. # Non-recursive lock
  127. #
  128. class Lock(SemLock):
  129. def __init__(self, *, ctx):
  130. SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  131. def __repr__(self):
  132. try:
  133. if self._semlock._is_mine():
  134. name = process.current_process().name
  135. if threading.current_thread().name != 'MainThread':
  136. name += '|' + threading.current_thread().name
  137. elif self._semlock._get_value() == 1:
  138. name = 'None'
  139. elif self._semlock._count() > 0:
  140. name = 'SomeOtherThread'
  141. else:
  142. name = 'SomeOtherProcess'
  143. except Exception:
  144. name = 'unknown'
  145. return '<%s(owner=%s)>' % (self.__class__.__name__, name)
  146. #
  147. # Recursive lock
  148. #
  149. class RLock(SemLock):
  150. def __init__(self, *, ctx):
  151. SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
  152. def __repr__(self):
  153. try:
  154. if self._semlock._is_mine():
  155. name = process.current_process().name
  156. if threading.current_thread().name != 'MainThread':
  157. name += '|' + threading.current_thread().name
  158. count = self._semlock._count()
  159. elif self._semlock._get_value() == 1:
  160. name, count = 'None', 0
  161. elif self._semlock._count() > 0:
  162. name, count = 'SomeOtherThread', 'nonzero'
  163. else:
  164. name, count = 'SomeOtherProcess', 'nonzero'
  165. except Exception:
  166. name, count = 'unknown', 'unknown'
  167. return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
  168. #
  169. # Condition variable
  170. #
  171. class Condition(object):
  172. def __init__(self, lock=None, *, ctx):
  173. self._lock = lock or ctx.RLock()
  174. self._sleeping_count = ctx.Semaphore(0)
  175. self._woken_count = ctx.Semaphore(0)
  176. self._wait_semaphore = ctx.Semaphore(0)
  177. self._make_methods()
  178. def __getstate__(self):
  179. context.assert_spawning(self)
  180. return (self._lock, self._sleeping_count,
  181. self._woken_count, self._wait_semaphore)
  182. def __setstate__(self, state):
  183. (self._lock, self._sleeping_count,
  184. self._woken_count, self._wait_semaphore) = state
  185. self._make_methods()
  186. def __enter__(self):
  187. return self._lock.__enter__()
  188. def __exit__(self, *args):
  189. return self._lock.__exit__(*args)
  190. def _make_methods(self):
  191. self.acquire = self._lock.acquire
  192. self.release = self._lock.release
  193. def __repr__(self):
  194. try:
  195. num_waiters = (self._sleeping_count._semlock._get_value() -
  196. self._woken_count._semlock._get_value())
  197. except Exception:
  198. num_waiters = 'unknown'
  199. return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
  200. def wait(self, timeout=None):
  201. assert self._lock._semlock._is_mine(), \
  202. 'must acquire() condition before using wait()'
  203. # indicate that this thread is going to sleep
  204. self._sleeping_count.release()
  205. # release lock
  206. count = self._lock._semlock._count()
  207. for i in range(count):
  208. self._lock.release()
  209. try:
  210. # wait for notification or timeout
  211. return self._wait_semaphore.acquire(True, timeout)
  212. finally:
  213. # indicate that this thread has woken
  214. self._woken_count.release()
  215. # reacquire lock
  216. for i in range(count):
  217. self._lock.acquire()
  218. def notify(self, n=1):
  219. assert self._lock._semlock._is_mine(), 'lock is not owned'
  220. assert not self._wait_semaphore.acquire(
  221. False), ('notify: Should not have been able to acquire '
  222. + '_wait_semaphore')
  223. # to take account of timeouts since last notify*() we subtract
  224. # woken_count from sleeping_count and rezero woken_count
  225. while self._woken_count.acquire(False):
  226. res = self._sleeping_count.acquire(False)
  227. assert res, ('notify: Bug in sleeping_count.acquire'
  228. + '- res should not be False')
  229. sleepers = 0
  230. while sleepers < n and self._sleeping_count.acquire(False):
  231. self._wait_semaphore.release() # wake up one sleeper
  232. sleepers += 1
  233. if sleepers:
  234. for i in range(sleepers):
  235. self._woken_count.acquire() # wait for a sleeper to wake
  236. # rezero wait_semaphore in case some timeouts just happened
  237. while self._wait_semaphore.acquire(False):
  238. pass
  239. def notify_all(self):
  240. self.notify(n=sys.maxsize)
  241. def wait_for(self, predicate, timeout=None):
  242. result = predicate()
  243. if result:
  244. return result
  245. if timeout is not None:
  246. endtime = time.monotonic() + timeout
  247. else:
  248. endtime = None
  249. waittime = None
  250. while not result:
  251. if endtime is not None:
  252. waittime = endtime - time.monotonic()
  253. if waittime <= 0:
  254. break
  255. self.wait(waittime)
  256. result = predicate()
  257. return result
  258. #
  259. # Event
  260. #
  261. class Event(object):
  262. def __init__(self, *, ctx):
  263. self._cond = ctx.Condition(ctx.Lock())
  264. self._flag = ctx.Semaphore(0)
  265. def is_set(self):
  266. with self._cond:
  267. if self._flag.acquire(False):
  268. self._flag.release()
  269. return True
  270. return False
  271. def set(self):
  272. with self._cond:
  273. self._flag.acquire(False)
  274. self._flag.release()
  275. self._cond.notify_all()
  276. def clear(self):
  277. with self._cond:
  278. self._flag.acquire(False)
  279. def wait(self, timeout=None):
  280. with self._cond:
  281. if self._flag.acquire(False):
  282. self._flag.release()
  283. else:
  284. self._cond.wait(timeout)
  285. if self._flag.acquire(False):
  286. self._flag.release()
  287. return True
  288. return False
  289. #
  290. # Barrier
  291. #
  292. class Barrier(threading.Barrier):
  293. def __init__(self, parties, action=None, timeout=None, *, ctx):
  294. import struct
  295. from .heap import BufferWrapper
  296. wrapper = BufferWrapper(struct.calcsize('i') * 2)
  297. cond = ctx.Condition()
  298. self.__setstate__((parties, action, timeout, cond, wrapper))
  299. self._state = 0
  300. self._count = 0
  301. def __setstate__(self, state):
  302. (self._parties, self._action, self._timeout,
  303. self._cond, self._wrapper) = state
  304. self._array = self._wrapper.create_memoryview().cast('i')
  305. def __getstate__(self):
  306. return (self._parties, self._action, self._timeout,
  307. self._cond, self._wrapper)
  308. @property
  309. def _state(self):
  310. return self._array[0]
  311. @_state.setter
  312. def _state(self, value):
  313. self._array[0] = value
  314. @property
  315. def _count(self):
  316. return self._array[1]
  317. @_count.setter
  318. def _count(self, value):
  319. self._array[1] = value