synchronize.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. #
  2. # Module implementing synchronization primitives
  3. #
  4. # multiprocessing/synchronize.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = [
  10. 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
  11. ]
  12. import threading
  13. import sys
  14. import tempfile
  15. import _multiprocessing
  16. import time
  17. from . import context
  18. from . import process
  19. from . import util
  20. # Try to import the mp.synchronize module cleanly, if it fails
  21. # raise ImportError for platforms lacking a working sem_open implementation.
  22. # See issue 3770
  23. try:
  24. from _multiprocessing import SemLock, sem_unlink
  25. except (ImportError):
  26. raise ImportError("This platform lacks a functioning sem_open" +
  27. " implementation, therefore, the required" +
  28. " synchronization primitives needed will not" +
  29. " function, see issue 3770.")
  30. #
  31. # Constants
  32. #
  33. RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
  34. SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
  35. #
  36. # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
  37. #
  38. class SemLock(object):
  39. _rand = tempfile._RandomNameSequence()
  40. def __init__(self, kind, value, maxvalue, *, ctx):
  41. if ctx is None:
  42. ctx = context._default_context.get_context()
  43. self._is_fork_ctx = ctx.get_start_method() == 'fork'
  44. unlink_now = sys.platform == 'win32' or self._is_fork_ctx
  45. for i in range(100):
  46. try:
  47. sl = self._semlock = _multiprocessing.SemLock(
  48. kind, value, maxvalue, self._make_name(),
  49. unlink_now)
  50. except FileExistsError:
  51. pass
  52. else:
  53. break
  54. else:
  55. raise FileExistsError('cannot find name for semaphore')
  56. util.debug('created semlock with handle %s' % sl.handle)
  57. self._make_methods()
  58. if sys.platform != 'win32':
  59. def _after_fork(obj):
  60. obj._semlock._after_fork()
  61. util.register_after_fork(self, _after_fork)
  62. if self._semlock.name is not None:
  63. # We only get here if we are on Unix with forking
  64. # disabled. When the object is garbage collected or the
  65. # process shuts down we unlink the semaphore name
  66. from .resource_tracker import register
  67. register(self._semlock.name, "semaphore")
  68. util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
  69. exitpriority=0)
  70. @staticmethod
  71. def _cleanup(name):
  72. from .resource_tracker import unregister
  73. sem_unlink(name)
  74. unregister(name, "semaphore")
  75. def _make_methods(self):
  76. self.acquire = self._semlock.acquire
  77. self.release = self._semlock.release
  78. def __enter__(self):
  79. return self._semlock.__enter__()
  80. def __exit__(self, *args):
  81. return self._semlock.__exit__(*args)
  82. def __getstate__(self):
  83. context.assert_spawning(self)
  84. sl = self._semlock
  85. if sys.platform == 'win32':
  86. h = context.get_spawning_popen().duplicate_for_child(sl.handle)
  87. else:
  88. if self._is_fork_ctx:
  89. raise RuntimeError('A SemLock created in a fork context is being '
  90. 'shared with a process in a spawn context. This is '
  91. 'not supported. Please use the same context to create '
  92. 'multiprocessing objects and Process.')
  93. h = sl.handle
  94. return (h, sl.kind, sl.maxvalue, sl.name)
  95. def __setstate__(self, state):
  96. self._semlock = _multiprocessing.SemLock._rebuild(*state)
  97. util.debug('recreated blocker with handle %r' % state[0])
  98. self._make_methods()
  99. # Ensure that deserialized SemLock can be serialized again (gh-108520).
  100. self._is_fork_ctx = False
  101. @staticmethod
  102. def _make_name():
  103. return '%s-%s' % (process.current_process()._config['semprefix'],
  104. next(SemLock._rand))
  105. #
  106. # Semaphore
  107. #
  108. class Semaphore(SemLock):
  109. def __init__(self, value=1, *, ctx):
  110. SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
  111. def get_value(self):
  112. return self._semlock._get_value()
  113. def __repr__(self):
  114. try:
  115. value = self._semlock._get_value()
  116. except Exception:
  117. value = 'unknown'
  118. return '<%s(value=%s)>' % (self.__class__.__name__, value)
  119. #
  120. # Bounded semaphore
  121. #
  122. class BoundedSemaphore(Semaphore):
  123. def __init__(self, value=1, *, ctx):
  124. SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
  125. def __repr__(self):
  126. try:
  127. value = self._semlock._get_value()
  128. except Exception:
  129. value = 'unknown'
  130. return '<%s(value=%s, maxvalue=%s)>' % \
  131. (self.__class__.__name__, value, self._semlock.maxvalue)
  132. #
  133. # Non-recursive lock
  134. #
  135. class Lock(SemLock):
  136. def __init__(self, *, ctx):
  137. SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  138. def __repr__(self):
  139. try:
  140. if self._semlock._is_mine():
  141. name = process.current_process().name
  142. if threading.current_thread().name != 'MainThread':
  143. name += '|' + threading.current_thread().name
  144. elif self._semlock._get_value() == 1:
  145. name = 'None'
  146. elif self._semlock._count() > 0:
  147. name = 'SomeOtherThread'
  148. else:
  149. name = 'SomeOtherProcess'
  150. except Exception:
  151. name = 'unknown'
  152. return '<%s(owner=%s)>' % (self.__class__.__name__, name)
  153. #
  154. # Recursive lock
  155. #
  156. class RLock(SemLock):
  157. def __init__(self, *, ctx):
  158. SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
  159. def __repr__(self):
  160. try:
  161. if self._semlock._is_mine():
  162. name = process.current_process().name
  163. if threading.current_thread().name != 'MainThread':
  164. name += '|' + threading.current_thread().name
  165. count = self._semlock._count()
  166. elif self._semlock._get_value() == 1:
  167. name, count = 'None', 0
  168. elif self._semlock._count() > 0:
  169. name, count = 'SomeOtherThread', 'nonzero'
  170. else:
  171. name, count = 'SomeOtherProcess', 'nonzero'
  172. except Exception:
  173. name, count = 'unknown', 'unknown'
  174. return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
  175. #
  176. # Condition variable
  177. #
  178. class Condition(object):
  179. def __init__(self, lock=None, *, ctx):
  180. self._lock = lock or ctx.RLock()
  181. self._sleeping_count = ctx.Semaphore(0)
  182. self._woken_count = ctx.Semaphore(0)
  183. self._wait_semaphore = ctx.Semaphore(0)
  184. self._make_methods()
  185. def __getstate__(self):
  186. context.assert_spawning(self)
  187. return (self._lock, self._sleeping_count,
  188. self._woken_count, self._wait_semaphore)
  189. def __setstate__(self, state):
  190. (self._lock, self._sleeping_count,
  191. self._woken_count, self._wait_semaphore) = state
  192. self._make_methods()
  193. def __enter__(self):
  194. return self._lock.__enter__()
  195. def __exit__(self, *args):
  196. return self._lock.__exit__(*args)
  197. def _make_methods(self):
  198. self.acquire = self._lock.acquire
  199. self.release = self._lock.release
  200. def __repr__(self):
  201. try:
  202. num_waiters = (self._sleeping_count._semlock._get_value() -
  203. self._woken_count._semlock._get_value())
  204. except Exception:
  205. num_waiters = 'unknown'
  206. return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
  207. def wait(self, timeout=None):
  208. assert self._lock._semlock._is_mine(), \
  209. 'must acquire() condition before using wait()'
  210. # indicate that this thread is going to sleep
  211. self._sleeping_count.release()
  212. # release lock
  213. count = self._lock._semlock._count()
  214. for i in range(count):
  215. self._lock.release()
  216. try:
  217. # wait for notification or timeout
  218. return self._wait_semaphore.acquire(True, timeout)
  219. finally:
  220. # indicate that this thread has woken
  221. self._woken_count.release()
  222. # reacquire lock
  223. for i in range(count):
  224. self._lock.acquire()
  225. def notify(self, n=1):
  226. assert self._lock._semlock._is_mine(), 'lock is not owned'
  227. assert not self._wait_semaphore.acquire(
  228. False), ('notify: Should not have been able to acquire '
  229. + '_wait_semaphore')
  230. # to take account of timeouts since last notify*() we subtract
  231. # woken_count from sleeping_count and rezero woken_count
  232. while self._woken_count.acquire(False):
  233. res = self._sleeping_count.acquire(False)
  234. assert res, ('notify: Bug in sleeping_count.acquire'
  235. + '- res should not be False')
  236. sleepers = 0
  237. while sleepers < n and self._sleeping_count.acquire(False):
  238. self._wait_semaphore.release() # wake up one sleeper
  239. sleepers += 1
  240. if sleepers:
  241. for i in range(sleepers):
  242. self._woken_count.acquire() # wait for a sleeper to wake
  243. # rezero wait_semaphore in case some timeouts just happened
  244. while self._wait_semaphore.acquire(False):
  245. pass
  246. def notify_all(self):
  247. self.notify(n=sys.maxsize)
  248. def wait_for(self, predicate, timeout=None):
  249. result = predicate()
  250. if result:
  251. return result
  252. if timeout is not None:
  253. endtime = time.monotonic() + timeout
  254. else:
  255. endtime = None
  256. waittime = None
  257. while not result:
  258. if endtime is not None:
  259. waittime = endtime - time.monotonic()
  260. if waittime <= 0:
  261. break
  262. self.wait(waittime)
  263. result = predicate()
  264. return result
  265. #
  266. # Event
  267. #
  268. class Event(object):
  269. def __init__(self, *, ctx):
  270. self._cond = ctx.Condition(ctx.Lock())
  271. self._flag = ctx.Semaphore(0)
  272. def is_set(self):
  273. with self._cond:
  274. if self._flag.acquire(False):
  275. self._flag.release()
  276. return True
  277. return False
  278. def set(self):
  279. with self._cond:
  280. self._flag.acquire(False)
  281. self._flag.release()
  282. self._cond.notify_all()
  283. def clear(self):
  284. with self._cond:
  285. self._flag.acquire(False)
  286. def wait(self, timeout=None):
  287. with self._cond:
  288. if self._flag.acquire(False):
  289. self._flag.release()
  290. else:
  291. self._cond.wait(timeout)
  292. if self._flag.acquire(False):
  293. self._flag.release()
  294. return True
  295. return False
  296. def __repr__(self) -> str:
  297. set_status = 'set' if self.is_set() else 'unset'
  298. return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"
  299. #
  300. # Barrier
  301. #
  302. class Barrier(threading.Barrier):
  303. def __init__(self, parties, action=None, timeout=None, *, ctx):
  304. import struct
  305. from .heap import BufferWrapper
  306. wrapper = BufferWrapper(struct.calcsize('i') * 2)
  307. cond = ctx.Condition()
  308. self.__setstate__((parties, action, timeout, cond, wrapper))
  309. self._state = 0
  310. self._count = 0
  311. def __setstate__(self, state):
  312. (self._parties, self._action, self._timeout,
  313. self._cond, self._wrapper) = state
  314. self._array = self._wrapper.create_memoryview().cast('i')
  315. def __getstate__(self):
  316. return (self._parties, self._action, self._timeout,
  317. self._cond, self._wrapper)
  318. @property
  319. def _state(self):
  320. return self._array[0]
  321. @_state.setter
  322. def _state(self, value):
  323. self._array[0] = value
  324. @property
  325. def _count(self):
  326. return self._array[1]
  327. @_count.setter
  328. def _count(self, value):
  329. self._array[1] = value