process.py 12 KB


  1. #
  2. # Module providing the `Process` class which emulates `threading.Thread`
  3. #
  4. # multiprocessing/process.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = ['BaseProcess', 'current_process', 'active_children',
  10. 'parent_process']
  11. #
  12. # Imports
  13. #
  14. import os
  15. import sys
  16. import signal
  17. import itertools
  18. import threading
  19. from _weakrefset import WeakSet
  20. #
  21. #
  22. #
  23. try:
  24. ORIGINAL_DIR = os.path.abspath(os.getcwd())
  25. except OSError:
  26. ORIGINAL_DIR = None
  27. #
  28. # Public functions
  29. #
  30. def current_process():
  31. '''
  32. Return process object representing the current process
  33. '''
  34. return _current_process
  35. def active_children():
  36. '''
  37. Return list of process objects corresponding to live child processes
  38. '''
  39. _cleanup()
  40. return list(_children)
  41. def parent_process():
  42. '''
  43. Return process object representing the parent process
  44. '''
  45. return _parent_process
  46. #
  47. #
  48. #
  49. def _cleanup():
  50. # check for processes which have finished
  51. for p in list(_children):
  52. if (child_popen := p._popen) and child_popen.poll() is not None:
  53. _children.discard(p)
  54. #
  55. # The `Process` class
  56. #
  57. class BaseProcess(object):
  58. '''
  59. Process objects represent activity that is run in a separate process
  60. The class is analogous to `threading.Thread`
  61. '''
  62. def _Popen(self):
  63. raise NotImplementedError
  64. def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
  65. *, daemon=None):
  66. assert group is None, 'group argument must be None for now'
  67. count = next(_process_counter)
  68. self._identity = _current_process._identity + (count,)
  69. self._config = _current_process._config.copy()
  70. self._parent_pid = os.getpid()
  71. self._parent_name = _current_process.name
  72. self._popen = None
  73. self._closed = False
  74. self._target = target
  75. self._args = tuple(args)
  76. self._kwargs = dict(kwargs)
  77. self._name = name or type(self).__name__ + '-' + \
  78. ':'.join(str(i) for i in self._identity)
  79. if daemon is not None:
  80. self.daemon = daemon
  81. _dangling.add(self)
  82. def _check_closed(self):
  83. if self._closed:
  84. raise ValueError("process object is closed")
  85. def run(self):
  86. '''
  87. Method to be run in sub-process; can be overridden in sub-class
  88. '''
  89. if self._target:
  90. self._target(*self._args, **self._kwargs)
  91. def start(self):
  92. '''
  93. Start child process
  94. '''
  95. self._check_closed()
  96. assert self._popen is None, 'cannot start a process twice'
  97. assert self._parent_pid == os.getpid(), \
  98. 'can only start a process object created by current process'
  99. assert not _current_process._config.get('daemon'), \
  100. 'daemonic processes are not allowed to have children'
  101. _cleanup()
  102. self._popen = self._Popen(self)
  103. self._sentinel = self._popen.sentinel
  104. # Avoid a refcycle if the target function holds an indirect
  105. # reference to the process object (see bpo-30775)
  106. del self._target, self._args, self._kwargs
  107. _children.add(self)
  108. def terminate(self):
  109. '''
  110. Terminate process; sends SIGTERM signal or uses TerminateProcess()
  111. '''
  112. self._check_closed()
  113. self._popen.terminate()
  114. def kill(self):
  115. '''
  116. Terminate process; sends SIGKILL signal or uses TerminateProcess()
  117. '''
  118. self._check_closed()
  119. self._popen.kill()
  120. def join(self, timeout=None):
  121. '''
  122. Wait until child process terminates
  123. '''
  124. self._check_closed()
  125. assert self._parent_pid == os.getpid(), 'can only join a child process'
  126. assert self._popen is not None, 'can only join a started process'
  127. res = self._popen.wait(timeout)
  128. if res is not None:
  129. _children.discard(self)
  130. def is_alive(self):
  131. '''
  132. Return whether process is alive
  133. '''
  134. self._check_closed()
  135. if self is _current_process:
  136. return True
  137. assert self._parent_pid == os.getpid(), 'can only test a child process'
  138. if self._popen is None:
  139. return False
  140. returncode = self._popen.poll()
  141. if returncode is None:
  142. return True
  143. else:
  144. _children.discard(self)
  145. return False
  146. def close(self):
  147. '''
  148. Close the Process object.
  149. This method releases resources held by the Process object. It is
  150. an error to call this method if the child process is still running.
  151. '''
  152. if self._popen is not None:
  153. if self._popen.poll() is None:
  154. raise ValueError("Cannot close a process while it is still running. "
  155. "You should first call join() or terminate().")
  156. self._popen.close()
  157. self._popen = None
  158. del self._sentinel
  159. _children.discard(self)
  160. self._closed = True
  161. @property
  162. def name(self):
  163. return self._name
  164. @name.setter
  165. def name(self, name):
  166. assert isinstance(name, str), 'name must be a string'
  167. self._name = name
  168. @property
  169. def daemon(self):
  170. '''
  171. Return whether process is a daemon
  172. '''
  173. return self._config.get('daemon', False)
  174. @daemon.setter
  175. def daemon(self, daemonic):
  176. '''
  177. Set whether process is a daemon
  178. '''
  179. assert self._popen is None, 'process has already started'
  180. self._config['daemon'] = daemonic
  181. @property
  182. def authkey(self):
  183. return self._config['authkey']
  184. @authkey.setter
  185. def authkey(self, authkey):
  186. '''
  187. Set authorization key of process
  188. '''
  189. self._config['authkey'] = AuthenticationString(authkey)
  190. @property
  191. def exitcode(self):
  192. '''
  193. Return exit code of process or `None` if it has yet to stop
  194. '''
  195. self._check_closed()
  196. if self._popen is None:
  197. return self._popen
  198. return self._popen.poll()
  199. @property
  200. def ident(self):
  201. '''
  202. Return identifier (PID) of process or `None` if it has yet to start
  203. '''
  204. self._check_closed()
  205. if self is _current_process:
  206. return os.getpid()
  207. else:
  208. return self._popen and self._popen.pid
  209. pid = ident
  210. @property
  211. def sentinel(self):
  212. '''
  213. Return a file descriptor (Unix) or handle (Windows) suitable for
  214. waiting for process termination.
  215. '''
  216. self._check_closed()
  217. try:
  218. return self._sentinel
  219. except AttributeError:
  220. raise ValueError("process not started") from None
  221. def __repr__(self):
  222. exitcode = None
  223. if self is _current_process:
  224. status = 'started'
  225. elif self._closed:
  226. status = 'closed'
  227. elif self._parent_pid != os.getpid():
  228. status = 'unknown'
  229. elif self._popen is None:
  230. status = 'initial'
  231. else:
  232. exitcode = self._popen.poll()
  233. if exitcode is not None:
  234. status = 'stopped'
  235. else:
  236. status = 'started'
  237. info = [type(self).__name__, 'name=%r' % self._name]
  238. if self._popen is not None:
  239. info.append('pid=%s' % self._popen.pid)
  240. info.append('parent=%s' % self._parent_pid)
  241. info.append(status)
  242. if exitcode is not None:
  243. exitcode = _exitcode_to_name.get(exitcode, exitcode)
  244. info.append('exitcode=%s' % exitcode)
  245. if self.daemon:
  246. info.append('daemon')
  247. return '<%s>' % ' '.join(info)
  248. ##
  249. def _bootstrap(self, parent_sentinel=None):
  250. from . import util, context
  251. global _current_process, _parent_process, _process_counter, _children
  252. try:
  253. if self._start_method is not None:
  254. context._force_start_method(self._start_method)
  255. _process_counter = itertools.count(1)
  256. _children = set()
  257. util._close_stdin()
  258. old_process = _current_process
  259. _current_process = self
  260. _parent_process = _ParentProcess(
  261. self._parent_name, self._parent_pid, parent_sentinel)
  262. if threading._HAVE_THREAD_NATIVE_ID:
  263. threading.main_thread()._set_native_id()
  264. try:
  265. self._after_fork()
  266. finally:
  267. # delay finalization of the old process object until after
  268. # _run_after_forkers() is executed
  269. del old_process
  270. util.info('child process calling self.run()')
  271. try:
  272. self.run()
  273. exitcode = 0
  274. finally:
  275. util._exit_function()
  276. except SystemExit as e:
  277. if e.code is None:
  278. exitcode = 0
  279. elif isinstance(e.code, int):
  280. exitcode = e.code
  281. else:
  282. sys.stderr.write(str(e.code) + '\n')
  283. exitcode = 1
  284. except:
  285. exitcode = 1
  286. import traceback
  287. sys.stderr.write('Process %s:\n' % self.name)
  288. traceback.print_exc()
  289. finally:
  290. threading._shutdown()
  291. util.info('process exiting with exitcode %d' % exitcode)
  292. util._flush_std_streams()
  293. return exitcode
  294. @staticmethod
  295. def _after_fork():
  296. from . import util
  297. util._finalizer_registry.clear()
  298. util._run_after_forkers()
  299. #
  300. # We subclass bytes to avoid accidental transmission of auth keys over network
  301. #
  302. class AuthenticationString(bytes):
  303. def __reduce__(self):
  304. from .context import get_spawning_popen
  305. if get_spawning_popen() is None:
  306. raise TypeError(
  307. 'Pickling an AuthenticationString object is '
  308. 'disallowed for security reasons'
  309. )
  310. return AuthenticationString, (bytes(self),)
  311. #
  312. # Create object representing the parent process
  313. #
  314. class _ParentProcess(BaseProcess):
  315. def __init__(self, name, pid, sentinel):
  316. self._identity = ()
  317. self._name = name
  318. self._pid = pid
  319. self._parent_pid = None
  320. self._popen = None
  321. self._closed = False
  322. self._sentinel = sentinel
  323. self._config = {}
  324. def is_alive(self):
  325. from multiprocessing.connection import wait
  326. return not wait([self._sentinel], timeout=0)
  327. @property
  328. def ident(self):
  329. return self._pid
  330. def join(self, timeout=None):
  331. '''
  332. Wait until parent process terminates
  333. '''
  334. from multiprocessing.connection import wait
  335. wait([self._sentinel], timeout=timeout)
  336. pid = ident
  337. #
  338. # Create object representing the main process
  339. #
  340. class _MainProcess(BaseProcess):
  341. def __init__(self):
  342. self._identity = ()
  343. self._name = 'MainProcess'
  344. self._parent_pid = None
  345. self._popen = None
  346. self._closed = False
  347. self._config = {'authkey': AuthenticationString(os.urandom(32)),
  348. 'semprefix': '/mp'}
  349. # Note that some versions of FreeBSD only allow named
  350. # semaphores to have names of up to 14 characters. Therefore
  351. # we choose a short prefix.
  352. #
  353. # On MacOSX in a sandbox it may be necessary to use a
  354. # different prefix -- see #19478.
  355. #
  356. # Everything in self._config will be inherited by descendant
  357. # processes.
  358. def close(self):
  359. pass
  360. _parent_process = None
  361. _current_process = _MainProcess()
  362. _process_counter = itertools.count(1)
  363. _children = set()
  364. del _MainProcess
  365. #
  366. # Give names to some return codes
  367. #
  368. _exitcode_to_name = {}
  369. for name, signum in list(signal.__dict__.items()):
  370. if name[:3]=='SIG' and '_' not in name:
  371. _exitcode_to_name[-signum] = f'-{name}'
  372. del name, signum
  373. # For debug and leak testing
  374. _dangling = WeakSet()