util.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. #
  2. # Module providing various facilities to other parts of the package
  3. #
  4. # multiprocessing/util.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. import os
  10. import itertools
  11. import sys
  12. import weakref
  13. import atexit
  14. import threading # we want threading to install it's
  15. # cleanup function before multiprocessing does
  16. from subprocess import _args_from_interpreter_flags
  17. from . import process
  18. __all__ = [
  19. 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
  20. 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
  21. 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
  22. 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
  23. ]
  24. #
  25. # Logging
  26. #
  27. NOTSET = 0
  28. SUBDEBUG = 5
  29. DEBUG = 10
  30. INFO = 20
  31. SUBWARNING = 25
  32. LOGGER_NAME = 'multiprocessing'
  33. DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
  34. _logger = None
  35. _log_to_stderr = False
  36. def sub_debug(msg, *args):
  37. if _logger:
  38. _logger.log(SUBDEBUG, msg, *args)
  39. def debug(msg, *args):
  40. if _logger:
  41. _logger.log(DEBUG, msg, *args)
  42. def info(msg, *args):
  43. if _logger:
  44. _logger.log(INFO, msg, *args)
  45. def sub_warning(msg, *args):
  46. if _logger:
  47. _logger.log(SUBWARNING, msg, *args)
  48. def get_logger():
  49. '''
  50. Returns logger used by multiprocessing
  51. '''
  52. global _logger
  53. import logging
  54. logging._acquireLock()
  55. try:
  56. if not _logger:
  57. _logger = logging.getLogger(LOGGER_NAME)
  58. _logger.propagate = 0
  59. # XXX multiprocessing should cleanup before logging
  60. if hasattr(atexit, 'unregister'):
  61. atexit.unregister(_exit_function)
  62. atexit.register(_exit_function)
  63. else:
  64. atexit._exithandlers.remove((_exit_function, (), {}))
  65. atexit._exithandlers.append((_exit_function, (), {}))
  66. finally:
  67. logging._releaseLock()
  68. return _logger
  69. def log_to_stderr(level=None):
  70. '''
  71. Turn on logging and add a handler which prints to stderr
  72. '''
  73. global _log_to_stderr
  74. import logging
  75. logger = get_logger()
  76. formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
  77. handler = logging.StreamHandler()
  78. handler.setFormatter(formatter)
  79. logger.addHandler(handler)
  80. if level:
  81. logger.setLevel(level)
  82. _log_to_stderr = True
  83. return _logger
  84. # Abstract socket support
  85. def _platform_supports_abstract_sockets():
  86. if sys.platform == "linux":
  87. return True
  88. if hasattr(sys, 'getandroidapilevel'):
  89. return True
  90. return False
  91. def is_abstract_socket_namespace(address):
  92. if not address:
  93. return False
  94. if isinstance(address, bytes):
  95. return address[0] == 0
  96. elif isinstance(address, str):
  97. return address[0] == "\0"
  98. raise TypeError(f'address type of {address!r} unrecognized')
  99. abstract_sockets_supported = _platform_supports_abstract_sockets()
  100. #
  101. # Function returning a temp directory which will be removed on exit
  102. #
  103. def _remove_temp_dir(rmtree, tempdir):
  104. rmtree(tempdir)
  105. current_process = process.current_process()
  106. # current_process() can be None if the finalizer is called
  107. # late during Python finalization
  108. if current_process is not None:
  109. current_process._config['tempdir'] = None
  110. def get_temp_dir():
  111. # get name of a temp directory which will be automatically cleaned up
  112. tempdir = process.current_process()._config.get('tempdir')
  113. if tempdir is None:
  114. import shutil, tempfile
  115. tempdir = tempfile.mkdtemp(prefix='pymp-')
  116. info('created temp directory %s', tempdir)
  117. # keep a strong reference to shutil.rmtree(), since the finalizer
  118. # can be called late during Python shutdown
  119. Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
  120. exitpriority=-100)
  121. process.current_process()._config['tempdir'] = tempdir
  122. return tempdir
  123. #
  124. # Support for reinitialization of objects when bootstrapping a child process
  125. #
  126. _afterfork_registry = weakref.WeakValueDictionary()
  127. _afterfork_counter = itertools.count()
  128. def _run_after_forkers():
  129. items = list(_afterfork_registry.items())
  130. items.sort()
  131. for (index, ident, func), obj in items:
  132. try:
  133. func(obj)
  134. except Exception as e:
  135. info('after forker raised exception %s', e)
  136. def register_after_fork(obj, func):
  137. _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
  138. #
  139. # Finalization using weakrefs
  140. #
  141. _finalizer_registry = {}
  142. _finalizer_counter = itertools.count()
  143. class Finalize(object):
  144. '''
  145. Class which supports object finalization using weakrefs
  146. '''
  147. def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
  148. if (exitpriority is not None) and not isinstance(exitpriority,int):
  149. raise TypeError(
  150. "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
  151. exitpriority, type(exitpriority)))
  152. if obj is not None:
  153. self._weakref = weakref.ref(obj, self)
  154. elif exitpriority is None:
  155. raise ValueError("Without object, exitpriority cannot be None")
  156. self._callback = callback
  157. self._args = args
  158. self._kwargs = kwargs or {}
  159. self._key = (exitpriority, next(_finalizer_counter))
  160. self._pid = os.getpid()
  161. _finalizer_registry[self._key] = self
  162. def __call__(self, wr=None,
  163. # Need to bind these locally because the globals can have
  164. # been cleared at shutdown
  165. _finalizer_registry=_finalizer_registry,
  166. sub_debug=sub_debug, getpid=os.getpid):
  167. '''
  168. Run the callback unless it has already been called or cancelled
  169. '''
  170. try:
  171. del _finalizer_registry[self._key]
  172. except KeyError:
  173. sub_debug('finalizer no longer registered')
  174. else:
  175. if self._pid != getpid():
  176. sub_debug('finalizer ignored because different process')
  177. res = None
  178. else:
  179. sub_debug('finalizer calling %s with args %s and kwargs %s',
  180. self._callback, self._args, self._kwargs)
  181. res = self._callback(*self._args, **self._kwargs)
  182. self._weakref = self._callback = self._args = \
  183. self._kwargs = self._key = None
  184. return res
  185. def cancel(self):
  186. '''
  187. Cancel finalization of the object
  188. '''
  189. try:
  190. del _finalizer_registry[self._key]
  191. except KeyError:
  192. pass
  193. else:
  194. self._weakref = self._callback = self._args = \
  195. self._kwargs = self._key = None
  196. def still_active(self):
  197. '''
  198. Return whether this finalizer is still waiting to invoke callback
  199. '''
  200. return self._key in _finalizer_registry
  201. def __repr__(self):
  202. try:
  203. obj = self._weakref()
  204. except (AttributeError, TypeError):
  205. obj = None
  206. if obj is None:
  207. return '<%s object, dead>' % self.__class__.__name__
  208. x = '<%s object, callback=%s' % (
  209. self.__class__.__name__,
  210. getattr(self._callback, '__name__', self._callback))
  211. if self._args:
  212. x += ', args=' + str(self._args)
  213. if self._kwargs:
  214. x += ', kwargs=' + str(self._kwargs)
  215. if self._key[0] is not None:
  216. x += ', exitpriority=' + str(self._key[0])
  217. return x + '>'
  218. def _run_finalizers(minpriority=None):
  219. '''
  220. Run all finalizers whose exit priority is not None and at least minpriority
  221. Finalizers with highest priority are called first; finalizers with
  222. the same priority will be called in reverse order of creation.
  223. '''
  224. if _finalizer_registry is None:
  225. # This function may be called after this module's globals are
  226. # destroyed. See the _exit_function function in this module for more
  227. # notes.
  228. return
  229. if minpriority is None:
  230. f = lambda p : p[0] is not None
  231. else:
  232. f = lambda p : p[0] is not None and p[0] >= minpriority
  233. # Careful: _finalizer_registry may be mutated while this function
  234. # is running (either by a GC run or by another thread).
  235. # list(_finalizer_registry) should be atomic, while
  236. # list(_finalizer_registry.items()) is not.
  237. keys = [key for key in list(_finalizer_registry) if f(key)]
  238. keys.sort(reverse=True)
  239. for key in keys:
  240. finalizer = _finalizer_registry.get(key)
  241. # key may have been removed from the registry
  242. if finalizer is not None:
  243. sub_debug('calling %s', finalizer)
  244. try:
  245. finalizer()
  246. except Exception:
  247. import traceback
  248. traceback.print_exc()
  249. if minpriority is None:
  250. _finalizer_registry.clear()
  251. #
  252. # Clean up on exit
  253. #
  254. def is_exiting():
  255. '''
  256. Returns true if the process is shutting down
  257. '''
  258. return _exiting or _exiting is None
  259. _exiting = False
  260. def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
  261. active_children=process.active_children,
  262. current_process=process.current_process):
  263. # We hold on to references to functions in the arglist due to the
  264. # situation described below, where this function is called after this
  265. # module's globals are destroyed.
  266. global _exiting
  267. if not _exiting:
  268. _exiting = True
  269. info('process shutting down')
  270. debug('running all "atexit" finalizers with priority >= 0')
  271. _run_finalizers(0)
  272. if current_process() is not None:
  273. # We check if the current process is None here because if
  274. # it's None, any call to ``active_children()`` will raise
  275. # an AttributeError (active_children winds up trying to
  276. # get attributes from util._current_process). One
  277. # situation where this can happen is if someone has
  278. # manipulated sys.modules, causing this module to be
  279. # garbage collected. The destructor for the module type
  280. # then replaces all values in the module dict with None.
  281. # For instance, after setuptools runs a test it replaces
  282. # sys.modules with a copy created earlier. See issues
  283. # #9775 and #15881. Also related: #4106, #9205, and
  284. # #9207.
  285. for p in active_children():
  286. if p.daemon:
  287. info('calling terminate() for daemon %s', p.name)
  288. p._popen.terminate()
  289. for p in active_children():
  290. info('calling join() for process %s', p.name)
  291. p.join()
  292. debug('running the remaining "atexit" finalizers')
  293. _run_finalizers()
  294. atexit.register(_exit_function)
  295. #
  296. # Some fork aware types
  297. #
  298. class ForkAwareThreadLock(object):
  299. def __init__(self):
  300. self._lock = threading.Lock()
  301. self.acquire = self._lock.acquire
  302. self.release = self._lock.release
  303. register_after_fork(self, ForkAwareThreadLock._at_fork_reinit)
  304. def _at_fork_reinit(self):
  305. self._lock._at_fork_reinit()
  306. def __enter__(self):
  307. return self._lock.__enter__()
  308. def __exit__(self, *args):
  309. return self._lock.__exit__(*args)
  310. class ForkAwareLocal(threading.local):
  311. def __init__(self):
  312. register_after_fork(self, lambda obj : obj.__dict__.clear())
  313. def __reduce__(self):
  314. return type(self), ()
  315. #
  316. # Close fds except those specified
  317. #
  318. try:
  319. MAXFD = os.sysconf("SC_OPEN_MAX")
  320. except Exception:
  321. MAXFD = 256
  322. def close_all_fds_except(fds):
  323. fds = list(fds) + [-1, MAXFD]
  324. fds.sort()
  325. assert fds[-1] == MAXFD, 'fd too large'
  326. for i in range(len(fds) - 1):
  327. os.closerange(fds[i]+1, fds[i+1])
  328. #
  329. # Close sys.stdin and replace stdin with os.devnull
  330. #
  331. def _close_stdin():
  332. if sys.stdin is None:
  333. return
  334. try:
  335. sys.stdin.close()
  336. except (OSError, ValueError):
  337. pass
  338. try:
  339. fd = os.open(os.devnull, os.O_RDONLY)
  340. try:
  341. sys.stdin = open(fd, encoding="utf-8", closefd=False)
  342. except:
  343. os.close(fd)
  344. raise
  345. except (OSError, ValueError):
  346. pass
  347. #
  348. # Flush standard streams, if any
  349. #
  350. def _flush_std_streams():
  351. try:
  352. sys.stdout.flush()
  353. except (AttributeError, ValueError):
  354. pass
  355. try:
  356. sys.stderr.flush()
  357. except (AttributeError, ValueError):
  358. pass
  359. #
  360. # Start a program with only specified fds kept open
  361. #
  362. def spawnv_passfds(path, args, passfds):
  363. import _posixsubprocess
  364. import subprocess
  365. passfds = tuple(sorted(map(int, passfds)))
  366. errpipe_read, errpipe_write = os.pipe()
  367. try:
  368. return _posixsubprocess.fork_exec(
  369. args, [path], True, passfds, None, None,
  370. -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
  371. False, False, -1, None, None, None, -1, None,
  372. subprocess._USE_VFORK)
  373. finally:
  374. os.close(errpipe_read)
  375. os.close(errpipe_write)
  376. def close_fds(*fds):
  377. """Close each file descriptor given as an argument"""
  378. for fd in fds:
  379. os.close(fd)
  380. def _cleanup_tests():
  381. """Cleanup multiprocessing resources when multiprocessing tests
  382. completed."""
  383. from test import support
  384. # cleanup multiprocessing
  385. process._cleanup()
  386. # Stop the ForkServer process if it's running
  387. from multiprocessing import forkserver
  388. forkserver._forkserver._stop()
  389. # Stop the ResourceTracker process if it's running
  390. from multiprocessing import resource_tracker
  391. resource_tracker._resource_tracker._stop()
  392. # bpo-37421: Explicitly call _run_finalizers() to remove immediately
  393. # temporary directories created by multiprocessing.util.get_temp_dir().
  394. _run_finalizers()
  395. support.gc_collect()
  396. support.reap_children()