context.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import os
  2. import sys
  3. import threading
  4. from . import process
  5. from . import reduction
  6. __all__ = ()
  7. #
  8. # Exceptions
  9. #
  10. class ProcessError(Exception):
  11. pass
  12. class BufferTooShort(ProcessError):
  13. pass
  14. class TimeoutError(ProcessError):
  15. pass
  16. class AuthenticationError(ProcessError):
  17. pass
  18. #
  19. # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
  20. #
  21. class BaseContext(object):
  22. ProcessError = ProcessError
  23. BufferTooShort = BufferTooShort
  24. TimeoutError = TimeoutError
  25. AuthenticationError = AuthenticationError
  26. current_process = staticmethod(process.current_process)
  27. parent_process = staticmethod(process.parent_process)
  28. active_children = staticmethod(process.active_children)
  29. def cpu_count(self):
  30. '''Returns the number of CPUs in the system'''
  31. num = os.cpu_count()
  32. if num is None:
  33. raise NotImplementedError('cannot determine number of cpus')
  34. else:
  35. return num
  36. def Manager(self):
  37. '''Returns a manager associated with a running server process
  38. The managers methods such as `Lock()`, `Condition()` and `Queue()`
  39. can be used to create shared objects.
  40. '''
  41. from .managers import SyncManager
  42. m = SyncManager(ctx=self.get_context())
  43. m.start()
  44. return m
  45. def Pipe(self, duplex=True):
  46. '''Returns two connection object connected by a pipe'''
  47. from .connection import Pipe
  48. return Pipe(duplex)
  49. def Lock(self):
  50. '''Returns a non-recursive lock object'''
  51. from .synchronize import Lock
  52. return Lock(ctx=self.get_context())
  53. def RLock(self):
  54. '''Returns a recursive lock object'''
  55. from .synchronize import RLock
  56. return RLock(ctx=self.get_context())
  57. def Condition(self, lock=None):
  58. '''Returns a condition object'''
  59. from .synchronize import Condition
  60. return Condition(lock, ctx=self.get_context())
  61. def Semaphore(self, value=1):
  62. '''Returns a semaphore object'''
  63. from .synchronize import Semaphore
  64. return Semaphore(value, ctx=self.get_context())
  65. def BoundedSemaphore(self, value=1):
  66. '''Returns a bounded semaphore object'''
  67. from .synchronize import BoundedSemaphore
  68. return BoundedSemaphore(value, ctx=self.get_context())
  69. def Event(self):
  70. '''Returns an event object'''
  71. from .synchronize import Event
  72. return Event(ctx=self.get_context())
  73. def Barrier(self, parties, action=None, timeout=None):
  74. '''Returns a barrier object'''
  75. from .synchronize import Barrier
  76. return Barrier(parties, action, timeout, ctx=self.get_context())
  77. def Queue(self, maxsize=0):
  78. '''Returns a queue object'''
  79. from .queues import Queue
  80. return Queue(maxsize, ctx=self.get_context())
  81. def JoinableQueue(self, maxsize=0):
  82. '''Returns a queue object'''
  83. from .queues import JoinableQueue
  84. return JoinableQueue(maxsize, ctx=self.get_context())
  85. def SimpleQueue(self):
  86. '''Returns a queue object'''
  87. from .queues import SimpleQueue
  88. return SimpleQueue(ctx=self.get_context())
  89. def Pool(self, processes=None, initializer=None, initargs=(),
  90. maxtasksperchild=None):
  91. '''Returns a process pool object'''
  92. from .pool import Pool
  93. return Pool(processes, initializer, initargs, maxtasksperchild,
  94. context=self.get_context())
  95. def RawValue(self, typecode_or_type, *args):
  96. '''Returns a shared object'''
  97. from .sharedctypes import RawValue
  98. return RawValue(typecode_or_type, *args)
  99. def RawArray(self, typecode_or_type, size_or_initializer):
  100. '''Returns a shared array'''
  101. from .sharedctypes import RawArray
  102. return RawArray(typecode_or_type, size_or_initializer)
  103. def Value(self, typecode_or_type, *args, lock=True):
  104. '''Returns a synchronized shared object'''
  105. from .sharedctypes import Value
  106. return Value(typecode_or_type, *args, lock=lock,
  107. ctx=self.get_context())
  108. def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
  109. '''Returns a synchronized shared array'''
  110. from .sharedctypes import Array
  111. return Array(typecode_or_type, size_or_initializer, lock=lock,
  112. ctx=self.get_context())
  113. def freeze_support(self):
  114. '''Check whether this is a fake forked process in a frozen executable.
  115. If so then run code specified by commandline and exit.
  116. '''
  117. if sys.platform == 'win32' and getattr(sys, 'frozen', False):
  118. from .spawn import freeze_support
  119. freeze_support()
  120. def get_logger(self):
  121. '''Return package logger -- if it does not already exist then
  122. it is created.
  123. '''
  124. from .util import get_logger
  125. return get_logger()
  126. def log_to_stderr(self, level=None):
  127. '''Turn on logging and add a handler which prints to stderr'''
  128. from .util import log_to_stderr
  129. return log_to_stderr(level)
  130. def allow_connection_pickling(self):
  131. '''Install support for sending connections and sockets
  132. between processes
  133. '''
  134. # This is undocumented. In previous versions of multiprocessing
  135. # its only effect was to make socket objects inheritable on Windows.
  136. from . import connection
  137. def set_executable(self, executable):
  138. '''Sets the path to a python.exe or pythonw.exe binary used to run
  139. child processes instead of sys.executable when using the 'spawn'
  140. start method. Useful for people embedding Python.
  141. '''
  142. from .spawn import set_executable
  143. set_executable(executable)
  144. def set_forkserver_preload(self, module_names):
  145. '''Set list of module names to try to load in forkserver process.
  146. This is really just a hint.
  147. '''
  148. from .forkserver import set_forkserver_preload
  149. set_forkserver_preload(module_names)
  150. def get_context(self, method=None):
  151. if method is None:
  152. return self
  153. try:
  154. ctx = _concrete_contexts[method]
  155. except KeyError:
  156. raise ValueError('cannot find context for %r' % method) from None
  157. ctx._check_available()
  158. return ctx
  159. def get_start_method(self, allow_none=False):
  160. return self._name
  161. def set_start_method(self, method, force=False):
  162. raise ValueError('cannot set start method of concrete context')
  163. @property
  164. def reducer(self):
  165. '''Controls how objects will be reduced to a form that can be
  166. shared with other processes.'''
  167. return globals().get('reduction')
  168. @reducer.setter
  169. def reducer(self, reduction):
  170. globals()['reduction'] = reduction
  171. def _check_available(self):
  172. pass
  173. #
  174. # Type of default context -- underlying context can be set at most once
  175. #
  176. class Process(process.BaseProcess):
  177. _start_method = None
  178. @staticmethod
  179. def _Popen(process_obj):
  180. return _default_context.get_context().Process._Popen(process_obj)
  181. class DefaultContext(BaseContext):
  182. Process = Process
  183. def __init__(self, context):
  184. self._default_context = context
  185. self._actual_context = None
  186. def get_context(self, method=None):
  187. if method is None:
  188. if self._actual_context is None:
  189. self._actual_context = self._default_context
  190. return self._actual_context
  191. else:
  192. return super().get_context(method)
  193. def set_start_method(self, method, force=False):
  194. if self._actual_context is not None and not force:
  195. raise RuntimeError('context has already been set')
  196. if method is None and force:
  197. self._actual_context = None
  198. return
  199. self._actual_context = self.get_context(method)
  200. def get_start_method(self, allow_none=False):
  201. if self._actual_context is None:
  202. if allow_none:
  203. return None
  204. self._actual_context = self._default_context
  205. return self._actual_context._name
  206. def get_all_start_methods(self):
  207. if sys.platform == 'win32':
  208. return ['spawn']
  209. else:
  210. methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn']
  211. if reduction.HAVE_SEND_HANDLE:
  212. methods.append('forkserver')
  213. return methods
  214. #
  215. # Context types for fixed start method
  216. #
  217. if sys.platform != 'win32':
  218. class ForkProcess(process.BaseProcess):
  219. _start_method = 'fork'
  220. @staticmethod
  221. def _Popen(process_obj):
  222. from .popen_fork import Popen
  223. return Popen(process_obj)
  224. class SpawnProcess(process.BaseProcess):
  225. _start_method = 'spawn'
  226. @staticmethod
  227. def _Popen(process_obj):
  228. from .popen_spawn_posix import Popen
  229. return Popen(process_obj)
  230. class ForkServerProcess(process.BaseProcess):
  231. _start_method = 'forkserver'
  232. @staticmethod
  233. def _Popen(process_obj):
  234. from .popen_forkserver import Popen
  235. return Popen(process_obj)
  236. class ForkContext(BaseContext):
  237. _name = 'fork'
  238. Process = ForkProcess
  239. class SpawnContext(BaseContext):
  240. _name = 'spawn'
  241. Process = SpawnProcess
  242. class ForkServerContext(BaseContext):
  243. _name = 'forkserver'
  244. Process = ForkServerProcess
  245. def _check_available(self):
  246. if not reduction.HAVE_SEND_HANDLE:
  247. raise ValueError('forkserver start method not available')
  248. _concrete_contexts = {
  249. 'fork': ForkContext(),
  250. 'spawn': SpawnContext(),
  251. 'forkserver': ForkServerContext(),
  252. }
  253. if sys.platform == 'darwin':
  254. # bpo-33725: running arbitrary code after fork() is no longer reliable
  255. # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
  256. _default_context = DefaultContext(_concrete_contexts['spawn'])
  257. else:
  258. _default_context = DefaultContext(_concrete_contexts['fork'])
  259. else:
  260. class SpawnProcess(process.BaseProcess):
  261. _start_method = 'spawn'
  262. @staticmethod
  263. def _Popen(process_obj):
  264. from .popen_spawn_win32 import Popen
  265. return Popen(process_obj)
  266. class SpawnContext(BaseContext):
  267. _name = 'spawn'
  268. Process = SpawnProcess
  269. _concrete_contexts = {
  270. 'spawn': SpawnContext(),
  271. }
  272. _default_context = DefaultContext(_concrete_contexts['spawn'])
  273. #
  274. # Force the start method
  275. #
  276. def _force_start_method(method):
  277. _default_context._actual_context = _concrete_contexts[method]
  278. #
  279. # Check that the current thread is spawning a child process
  280. #
  281. _tls = threading.local()
  282. def get_spawning_popen():
  283. return getattr(_tls, 'spawning_popen', None)
  284. def set_spawning_popen(popen):
  285. _tls.spawning_popen = popen
  286. def assert_spawning(obj):
  287. if get_spawning_popen() is None:
  288. raise RuntimeError(
  289. '%s objects should only be shared between processes'
  290. ' through inheritance' % type(obj).__name__
  291. )