pool.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = ['Pool', 'ThreadPool']
  10. #
  11. # Imports
  12. #
  13. import collections
  14. import itertools
  15. import os
  16. import queue
  17. import threading
  18. import time
  19. import traceback
  20. import types
  21. import warnings
  22. # If threading is available then ThreadPool should be provided. Therefore
  23. # we avoid top-level imports which are liable to fail on some systems.
  24. from . import util
  25. from . import get_context, TimeoutError
  26. from .connection import wait
  27. #
  28. # Constants representing the state of a pool
  29. #
  30. INIT = "INIT"
  31. RUN = "RUN"
  32. CLOSE = "CLOSE"
  33. TERMINATE = "TERMINATE"
  34. #
  35. # Miscellaneous
  36. #
  37. job_counter = itertools.count()
  38. def mapstar(args):
  39. return list(map(*args))
  40. def starmapstar(args):
  41. return list(itertools.starmap(args[0], args[1]))
  42. #
  43. # Hack to embed stringification of remote traceback in local traceback
  44. #
  45. class RemoteTraceback(Exception):
  46. def __init__(self, tb):
  47. self.tb = tb
  48. def __str__(self):
  49. return self.tb
  50. class ExceptionWithTraceback:
  51. def __init__(self, exc, tb):
  52. tb = traceback.format_exception(type(exc), exc, tb)
  53. tb = ''.join(tb)
  54. self.exc = exc
  55. self.tb = '\n"""\n%s"""' % tb
  56. def __reduce__(self):
  57. return rebuild_exc, (self.exc, self.tb)
  58. def rebuild_exc(exc, tb):
  59. exc.__cause__ = RemoteTraceback(tb)
  60. return exc
  61. #
  62. # Code run by worker processes
  63. #
  64. class MaybeEncodingError(Exception):
  65. """Wraps possible unpickleable errors, so they can be
  66. safely sent through the socket."""
  67. def __init__(self, exc, value):
  68. self.exc = repr(exc)
  69. self.value = repr(value)
  70. super(MaybeEncodingError, self).__init__(self.exc, self.value)
  71. def __str__(self):
  72. return "Error sending result: '%s'. Reason: '%s'" % (self.value,
  73. self.exc)
  74. def __repr__(self):
  75. return "<%s: %s>" % (self.__class__.__name__, self)
  76. def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
  77. wrap_exception=False):
  78. if (maxtasks is not None) and not (isinstance(maxtasks, int)
  79. and maxtasks >= 1):
  80. raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
  81. put = outqueue.put
  82. get = inqueue.get
  83. if hasattr(inqueue, '_writer'):
  84. inqueue._writer.close()
  85. outqueue._reader.close()
  86. if initializer is not None:
  87. initializer(*initargs)
  88. completed = 0
  89. while maxtasks is None or (maxtasks and completed < maxtasks):
  90. try:
  91. task = get()
  92. except (EOFError, OSError):
  93. util.debug('worker got EOFError or OSError -- exiting')
  94. break
  95. if task is None:
  96. util.debug('worker got sentinel -- exiting')
  97. break
  98. job, i, func, args, kwds = task
  99. try:
  100. result = (True, func(*args, **kwds))
  101. except Exception as e:
  102. if wrap_exception and func is not _helper_reraises_exception:
  103. e = ExceptionWithTraceback(e, e.__traceback__)
  104. result = (False, e)
  105. try:
  106. put((job, i, result))
  107. except Exception as e:
  108. wrapped = MaybeEncodingError(e, result[1])
  109. util.debug("Possible encoding error while sending result: %s" % (
  110. wrapped))
  111. put((job, i, (False, wrapped)))
  112. task = job = result = func = args = kwds = None
  113. completed += 1
  114. util.debug('worker exiting after %d tasks' % completed)
  115. def _helper_reraises_exception(ex):
  116. 'Pickle-able helper function for use by _guarded_task_generation.'
  117. raise ex
  118. #
  119. # Class representing a process pool
  120. #
  121. class _PoolCache(dict):
  122. """
  123. Class that implements a cache for the Pool class that will notify
  124. the pool management threads every time the cache is emptied. The
  125. notification is done by the use of a queue that is provided when
  126. instantiating the cache.
  127. """
  128. def __init__(self, /, *args, notifier=None, **kwds):
  129. self.notifier = notifier
  130. super().__init__(*args, **kwds)
  131. def __delitem__(self, item):
  132. super().__delitem__(item)
  133. # Notify that the cache is empty. This is important because the
  134. # pool keeps maintaining workers until the cache gets drained. This
  135. # eliminates a race condition in which a task is finished after the
  136. # the pool's _handle_workers method has enter another iteration of the
  137. # loop. In this situation, the only event that can wake up the pool
  138. # is the cache to be emptied (no more tasks available).
  139. if not self:
  140. self.notifier.put(None)
  141. class Pool(object):
  142. '''
  143. Class which supports an async version of applying functions to arguments.
  144. '''
  145. _wrap_exception = True
  146. @staticmethod
  147. def Process(ctx, *args, **kwds):
  148. return ctx.Process(*args, **kwds)
  149. def __init__(self, processes=None, initializer=None, initargs=(),
  150. maxtasksperchild=None, context=None):
  151. # Attributes initialized early to make sure that they exist in
  152. # __del__() if __init__() raises an exception
  153. self._pool = []
  154. self._state = INIT
  155. self._ctx = context or get_context()
  156. self._setup_queues()
  157. self._taskqueue = queue.SimpleQueue()
  158. # The _change_notifier queue exist to wake up self._handle_workers()
  159. # when the cache (self._cache) is empty or when there is a change in
  160. # the _state variable of the thread that runs _handle_workers.
  161. self._change_notifier = self._ctx.SimpleQueue()
  162. self._cache = _PoolCache(notifier=self._change_notifier)
  163. self._maxtasksperchild = maxtasksperchild
  164. self._initializer = initializer
  165. self._initargs = initargs
  166. if processes is None:
  167. processes = os.cpu_count() or 1
  168. if processes < 1:
  169. raise ValueError("Number of processes must be at least 1")
  170. if initializer is not None and not callable(initializer):
  171. raise TypeError('initializer must be a callable')
  172. self._processes = processes
  173. try:
  174. self._repopulate_pool()
  175. except Exception:
  176. for p in self._pool:
  177. if p.exitcode is None:
  178. p.terminate()
  179. for p in self._pool:
  180. p.join()
  181. raise
  182. sentinels = self._get_sentinels()
  183. self._worker_handler = threading.Thread(
  184. target=Pool._handle_workers,
  185. args=(self._cache, self._taskqueue, self._ctx, self.Process,
  186. self._processes, self._pool, self._inqueue, self._outqueue,
  187. self._initializer, self._initargs, self._maxtasksperchild,
  188. self._wrap_exception, sentinels, self._change_notifier)
  189. )
  190. self._worker_handler.daemon = True
  191. self._worker_handler._state = RUN
  192. self._worker_handler.start()
  193. self._task_handler = threading.Thread(
  194. target=Pool._handle_tasks,
  195. args=(self._taskqueue, self._quick_put, self._outqueue,
  196. self._pool, self._cache)
  197. )
  198. self._task_handler.daemon = True
  199. self._task_handler._state = RUN
  200. self._task_handler.start()
  201. self._result_handler = threading.Thread(
  202. target=Pool._handle_results,
  203. args=(self._outqueue, self._quick_get, self._cache)
  204. )
  205. self._result_handler.daemon = True
  206. self._result_handler._state = RUN
  207. self._result_handler.start()
  208. self._terminate = util.Finalize(
  209. self, self._terminate_pool,
  210. args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
  211. self._change_notifier, self._worker_handler, self._task_handler,
  212. self._result_handler, self._cache),
  213. exitpriority=15
  214. )
  215. self._state = RUN
  216. # Copy globals as function locals to make sure that they are available
  217. # during Python shutdown when the Pool is destroyed.
  218. def __del__(self, _warn=warnings.warn, RUN=RUN):
  219. if self._state == RUN:
  220. _warn(f"unclosed running multiprocessing pool {self!r}",
  221. ResourceWarning, source=self)
  222. if getattr(self, '_change_notifier', None) is not None:
  223. self._change_notifier.put(None)
  224. def __repr__(self):
  225. cls = self.__class__
  226. return (f'<{cls.__module__}.{cls.__qualname__} '
  227. f'state={self._state} '
  228. f'pool_size={len(self._pool)}>')
  229. def _get_sentinels(self):
  230. task_queue_sentinels = [self._outqueue._reader]
  231. self_notifier_sentinels = [self._change_notifier._reader]
  232. return [*task_queue_sentinels, *self_notifier_sentinels]
  233. @staticmethod
  234. def _get_worker_sentinels(workers):
  235. return [worker.sentinel for worker in
  236. workers if hasattr(worker, "sentinel")]
  237. @staticmethod
  238. def _join_exited_workers(pool):
  239. """Cleanup after any worker processes which have exited due to reaching
  240. their specified lifetime. Returns True if any workers were cleaned up.
  241. """
  242. cleaned = False
  243. for i in reversed(range(len(pool))):
  244. worker = pool[i]
  245. if worker.exitcode is not None:
  246. # worker exited
  247. util.debug('cleaning up worker %d' % i)
  248. worker.join()
  249. cleaned = True
  250. del pool[i]
  251. return cleaned
  252. def _repopulate_pool(self):
  253. return self._repopulate_pool_static(self._ctx, self.Process,
  254. self._processes,
  255. self._pool, self._inqueue,
  256. self._outqueue, self._initializer,
  257. self._initargs,
  258. self._maxtasksperchild,
  259. self._wrap_exception)
  260. @staticmethod
  261. def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
  262. outqueue, initializer, initargs,
  263. maxtasksperchild, wrap_exception):
  264. """Bring the number of pool processes up to the specified number,
  265. for use after reaping workers which have exited.
  266. """
  267. for i in range(processes - len(pool)):
  268. w = Process(ctx, target=worker,
  269. args=(inqueue, outqueue,
  270. initializer,
  271. initargs, maxtasksperchild,
  272. wrap_exception))
  273. w.name = w.name.replace('Process', 'PoolWorker')
  274. w.daemon = True
  275. w.start()
  276. pool.append(w)
  277. util.debug('added worker')
  278. @staticmethod
  279. def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
  280. initializer, initargs, maxtasksperchild,
  281. wrap_exception):
  282. """Clean up any exited workers and start replacements for them.
  283. """
  284. if Pool._join_exited_workers(pool):
  285. Pool._repopulate_pool_static(ctx, Process, processes, pool,
  286. inqueue, outqueue, initializer,
  287. initargs, maxtasksperchild,
  288. wrap_exception)
  289. def _setup_queues(self):
  290. self._inqueue = self._ctx.SimpleQueue()
  291. self._outqueue = self._ctx.SimpleQueue()
  292. self._quick_put = self._inqueue._writer.send
  293. self._quick_get = self._outqueue._reader.recv
  294. def _check_running(self):
  295. if self._state != RUN:
  296. raise ValueError("Pool not running")
  297. def apply(self, func, args=(), kwds={}):
  298. '''
  299. Equivalent of `func(*args, **kwds)`.
  300. Pool must be running.
  301. '''
  302. return self.apply_async(func, args, kwds).get()
  303. def map(self, func, iterable, chunksize=None):
  304. '''
  305. Apply `func` to each element in `iterable`, collecting the results
  306. in a list that is returned.
  307. '''
  308. return self._map_async(func, iterable, mapstar, chunksize).get()
  309. def starmap(self, func, iterable, chunksize=None):
  310. '''
  311. Like `map()` method but the elements of the `iterable` are expected to
  312. be iterables as well and will be unpacked as arguments. Hence
  313. `func` and (a, b) becomes func(a, b).
  314. '''
  315. return self._map_async(func, iterable, starmapstar, chunksize).get()
  316. def starmap_async(self, func, iterable, chunksize=None, callback=None,
  317. error_callback=None):
  318. '''
  319. Asynchronous version of `starmap()` method.
  320. '''
  321. return self._map_async(func, iterable, starmapstar, chunksize,
  322. callback, error_callback)
  323. def _guarded_task_generation(self, result_job, func, iterable):
  324. '''Provides a generator of tasks for imap and imap_unordered with
  325. appropriate handling for iterables which throw exceptions during
  326. iteration.'''
  327. try:
  328. i = -1
  329. for i, x in enumerate(iterable):
  330. yield (result_job, i, func, (x,), {})
  331. except Exception as e:
  332. yield (result_job, i+1, _helper_reraises_exception, (e,), {})
  333. def imap(self, func, iterable, chunksize=1):
  334. '''
  335. Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
  336. '''
  337. self._check_running()
  338. if chunksize == 1:
  339. result = IMapIterator(self)
  340. self._taskqueue.put(
  341. (
  342. self._guarded_task_generation(result._job, func, iterable),
  343. result._set_length
  344. ))
  345. return result
  346. else:
  347. if chunksize < 1:
  348. raise ValueError(
  349. "Chunksize must be 1+, not {0:n}".format(
  350. chunksize))
  351. task_batches = Pool._get_tasks(func, iterable, chunksize)
  352. result = IMapIterator(self)
  353. self._taskqueue.put(
  354. (
  355. self._guarded_task_generation(result._job,
  356. mapstar,
  357. task_batches),
  358. result._set_length
  359. ))
  360. return (item for chunk in result for item in chunk)
  361. def imap_unordered(self, func, iterable, chunksize=1):
  362. '''
  363. Like `imap()` method but ordering of results is arbitrary.
  364. '''
  365. self._check_running()
  366. if chunksize == 1:
  367. result = IMapUnorderedIterator(self)
  368. self._taskqueue.put(
  369. (
  370. self._guarded_task_generation(result._job, func, iterable),
  371. result._set_length
  372. ))
  373. return result
  374. else:
  375. if chunksize < 1:
  376. raise ValueError(
  377. "Chunksize must be 1+, not {0!r}".format(chunksize))
  378. task_batches = Pool._get_tasks(func, iterable, chunksize)
  379. result = IMapUnorderedIterator(self)
  380. self._taskqueue.put(
  381. (
  382. self._guarded_task_generation(result._job,
  383. mapstar,
  384. task_batches),
  385. result._set_length
  386. ))
  387. return (item for chunk in result for item in chunk)
  388. def apply_async(self, func, args=(), kwds={}, callback=None,
  389. error_callback=None):
  390. '''
  391. Asynchronous version of `apply()` method.
  392. '''
  393. self._check_running()
  394. result = ApplyResult(self, callback, error_callback)
  395. self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
  396. return result
  397. def map_async(self, func, iterable, chunksize=None, callback=None,
  398. error_callback=None):
  399. '''
  400. Asynchronous version of `map()` method.
  401. '''
  402. return self._map_async(func, iterable, mapstar, chunksize, callback,
  403. error_callback)
  404. def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
  405. error_callback=None):
  406. '''
  407. Helper function to implement map, starmap and their async counterparts.
  408. '''
  409. self._check_running()
  410. if not hasattr(iterable, '__len__'):
  411. iterable = list(iterable)
  412. if chunksize is None:
  413. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  414. if extra:
  415. chunksize += 1
  416. if len(iterable) == 0:
  417. chunksize = 0
  418. task_batches = Pool._get_tasks(func, iterable, chunksize)
  419. result = MapResult(self, chunksize, len(iterable), callback,
  420. error_callback=error_callback)
  421. self._taskqueue.put(
  422. (
  423. self._guarded_task_generation(result._job,
  424. mapper,
  425. task_batches),
  426. None
  427. )
  428. )
  429. return result
  430. @staticmethod
  431. def _wait_for_updates(sentinels, change_notifier, timeout=None):
  432. wait(sentinels, timeout=timeout)
  433. while not change_notifier.empty():
  434. change_notifier.get()
  435. @classmethod
  436. def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
  437. pool, inqueue, outqueue, initializer, initargs,
  438. maxtasksperchild, wrap_exception, sentinels,
  439. change_notifier):
  440. thread = threading.current_thread()
  441. # Keep maintaining workers until the cache gets drained, unless the pool
  442. # is terminated.
  443. while thread._state == RUN or (cache and thread._state != TERMINATE):
  444. cls._maintain_pool(ctx, Process, processes, pool, inqueue,
  445. outqueue, initializer, initargs,
  446. maxtasksperchild, wrap_exception)
  447. current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
  448. cls._wait_for_updates(current_sentinels, change_notifier)
  449. # send sentinel to stop workers
  450. taskqueue.put(None)
  451. util.debug('worker handler exiting')
  452. @staticmethod
  453. def _handle_tasks(taskqueue, put, outqueue, pool, cache):
  454. thread = threading.current_thread()
  455. for taskseq, set_length in iter(taskqueue.get, None):
  456. task = None
  457. try:
  458. # iterating taskseq cannot fail
  459. for task in taskseq:
  460. if thread._state != RUN:
  461. util.debug('task handler found thread._state != RUN')
  462. break
  463. try:
  464. put(task)
  465. except Exception as e:
  466. job, idx = task[:2]
  467. try:
  468. cache[job]._set(idx, (False, e))
  469. except KeyError:
  470. pass
  471. else:
  472. if set_length:
  473. util.debug('doing set_length()')
  474. idx = task[1] if task else -1
  475. set_length(idx + 1)
  476. continue
  477. break
  478. finally:
  479. task = taskseq = job = None
  480. else:
  481. util.debug('task handler got sentinel')
  482. try:
  483. # tell result handler to finish when cache is empty
  484. util.debug('task handler sending sentinel to result handler')
  485. outqueue.put(None)
  486. # tell workers there is no more work
  487. util.debug('task handler sending sentinel to workers')
  488. for p in pool:
  489. put(None)
  490. except OSError:
  491. util.debug('task handler got OSError when sending sentinels')
  492. util.debug('task handler exiting')
  493. @staticmethod
  494. def _handle_results(outqueue, get, cache):
  495. thread = threading.current_thread()
  496. while 1:
  497. try:
  498. task = get()
  499. except (OSError, EOFError):
  500. util.debug('result handler got EOFError/OSError -- exiting')
  501. return
  502. if thread._state != RUN:
  503. assert thread._state == TERMINATE, "Thread not in TERMINATE"
  504. util.debug('result handler found thread._state=TERMINATE')
  505. break
  506. if task is None:
  507. util.debug('result handler got sentinel')
  508. break
  509. job, i, obj = task
  510. try:
  511. cache[job]._set(i, obj)
  512. except KeyError:
  513. pass
  514. task = job = obj = None
  515. while cache and thread._state != TERMINATE:
  516. try:
  517. task = get()
  518. except (OSError, EOFError):
  519. util.debug('result handler got EOFError/OSError -- exiting')
  520. return
  521. if task is None:
  522. util.debug('result handler ignoring extra sentinel')
  523. continue
  524. job, i, obj = task
  525. try:
  526. cache[job]._set(i, obj)
  527. except KeyError:
  528. pass
  529. task = job = obj = None
  530. if hasattr(outqueue, '_reader'):
  531. util.debug('ensuring that outqueue is not full')
  532. # If we don't make room available in outqueue then
  533. # attempts to add the sentinel (None) to outqueue may
  534. # block. There is guaranteed to be no more than 2 sentinels.
  535. try:
  536. for i in range(10):
  537. if not outqueue._reader.poll():
  538. break
  539. get()
  540. except (OSError, EOFError):
  541. pass
  542. util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
  543. len(cache), thread._state)
  544. @staticmethod
  545. def _get_tasks(func, it, size):
  546. it = iter(it)
  547. while 1:
  548. x = tuple(itertools.islice(it, size))
  549. if not x:
  550. return
  551. yield (func, x)
  552. def __reduce__(self):
  553. raise NotImplementedError(
  554. 'pool objects cannot be passed between processes or pickled'
  555. )
  556. def close(self):
  557. util.debug('closing pool')
  558. if self._state == RUN:
  559. self._state = CLOSE
  560. self._worker_handler._state = CLOSE
  561. self._change_notifier.put(None)
  562. def terminate(self):
  563. util.debug('terminating pool')
  564. self._state = TERMINATE
  565. self._terminate()
  566. def join(self):
  567. util.debug('joining pool')
  568. if self._state == RUN:
  569. raise ValueError("Pool is still running")
  570. elif self._state not in (CLOSE, TERMINATE):
  571. raise ValueError("In unknown state")
  572. self._worker_handler.join()
  573. self._task_handler.join()
  574. self._result_handler.join()
  575. for p in self._pool:
  576. p.join()
  577. @staticmethod
  578. def _help_stuff_finish(inqueue, task_handler, size):
  579. # task_handler may be blocked trying to put items on inqueue
  580. util.debug('removing tasks from inqueue until task handler finished')
  581. inqueue._rlock.acquire()
  582. while task_handler.is_alive() and inqueue._reader.poll():
  583. inqueue._reader.recv()
  584. time.sleep(0)
  585. @classmethod
  586. def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
  587. worker_handler, task_handler, result_handler, cache):
  588. # this is guaranteed to only be called once
  589. util.debug('finalizing pool')
  590. # Notify that the worker_handler state has been changed so the
  591. # _handle_workers loop can be unblocked (and exited) in order to
  592. # send the finalization sentinel all the workers.
  593. worker_handler._state = TERMINATE
  594. change_notifier.put(None)
  595. task_handler._state = TERMINATE
  596. util.debug('helping task handler/workers to finish')
  597. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  598. if (not result_handler.is_alive()) and (len(cache) != 0):
  599. raise AssertionError(
  600. "Cannot have cache with result_hander not alive")
  601. result_handler._state = TERMINATE
  602. change_notifier.put(None)
  603. outqueue.put(None) # sentinel
  604. # We must wait for the worker handler to exit before terminating
  605. # workers because we don't want workers to be restarted behind our back.
  606. util.debug('joining worker handler')
  607. if threading.current_thread() is not worker_handler:
  608. worker_handler.join()
  609. # Terminate workers which haven't already finished.
  610. if pool and hasattr(pool[0], 'terminate'):
  611. util.debug('terminating workers')
  612. for p in pool:
  613. if p.exitcode is None:
  614. p.terminate()
  615. util.debug('joining task handler')
  616. if threading.current_thread() is not task_handler:
  617. task_handler.join()
  618. util.debug('joining result handler')
  619. if threading.current_thread() is not result_handler:
  620. result_handler.join()
  621. if pool and hasattr(pool[0], 'terminate'):
  622. util.debug('joining pool workers')
  623. for p in pool:
  624. if p.is_alive():
  625. # worker has not yet exited
  626. util.debug('cleaning up worker %d' % p.pid)
  627. p.join()
  628. def __enter__(self):
  629. self._check_running()
  630. return self
  631. def __exit__(self, exc_type, exc_val, exc_tb):
  632. self.terminate()
  633. #
  634. # Class whose instances are returned by `Pool.apply_async()`
  635. #
  636. class ApplyResult(object):
  637. def __init__(self, pool, callback, error_callback):
  638. self._pool = pool
  639. self._event = threading.Event()
  640. self._job = next(job_counter)
  641. self._cache = pool._cache
  642. self._callback = callback
  643. self._error_callback = error_callback
  644. self._cache[self._job] = self
  645. def ready(self):
  646. return self._event.is_set()
  647. def successful(self):
  648. if not self.ready():
  649. raise ValueError("{0!r} not ready".format(self))
  650. return self._success
  651. def wait(self, timeout=None):
  652. self._event.wait(timeout)
  653. def get(self, timeout=None):
  654. self.wait(timeout)
  655. if not self.ready():
  656. raise TimeoutError
  657. if self._success:
  658. return self._value
  659. else:
  660. raise self._value
  661. def _set(self, i, obj):
  662. self._success, self._value = obj
  663. if self._callback and self._success:
  664. self._callback(self._value)
  665. if self._error_callback and not self._success:
  666. self._error_callback(self._value)
  667. self._event.set()
  668. del self._cache[self._job]
  669. self._pool = None
  670. __class_getitem__ = classmethod(types.GenericAlias)
  671. AsyncResult = ApplyResult # create alias -- see #17805
  672. #
  673. # Class whose instances are returned by `Pool.map_async()`
  674. #
  675. class MapResult(ApplyResult):
  676. def __init__(self, pool, chunksize, length, callback, error_callback):
  677. ApplyResult.__init__(self, pool, callback,
  678. error_callback=error_callback)
  679. self._success = True
  680. self._value = [None] * length
  681. self._chunksize = chunksize
  682. if chunksize <= 0:
  683. self._number_left = 0
  684. self._event.set()
  685. del self._cache[self._job]
  686. else:
  687. self._number_left = length//chunksize + bool(length % chunksize)
  688. def _set(self, i, success_result):
  689. self._number_left -= 1
  690. success, result = success_result
  691. if success and self._success:
  692. self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  693. if self._number_left == 0:
  694. if self._callback:
  695. self._callback(self._value)
  696. del self._cache[self._job]
  697. self._event.set()
  698. self._pool = None
  699. else:
  700. if not success and self._success:
  701. # only store first exception
  702. self._success = False
  703. self._value = result
  704. if self._number_left == 0:
  705. # only consider the result ready once all jobs are done
  706. if self._error_callback:
  707. self._error_callback(self._value)
  708. del self._cache[self._job]
  709. self._event.set()
  710. self._pool = None
  711. #
  712. # Class whose instances are returned by `Pool.imap()`
  713. #
  714. class IMapIterator(object):
  715. def __init__(self, pool):
  716. self._pool = pool
  717. self._cond = threading.Condition(threading.Lock())
  718. self._job = next(job_counter)
  719. self._cache = pool._cache
  720. self._items = collections.deque()
  721. self._index = 0
  722. self._length = None
  723. self._unsorted = {}
  724. self._cache[self._job] = self
  725. def __iter__(self):
  726. return self
  727. def next(self, timeout=None):
  728. with self._cond:
  729. try:
  730. item = self._items.popleft()
  731. except IndexError:
  732. if self._index == self._length:
  733. self._pool = None
  734. raise StopIteration from None
  735. self._cond.wait(timeout)
  736. try:
  737. item = self._items.popleft()
  738. except IndexError:
  739. if self._index == self._length:
  740. self._pool = None
  741. raise StopIteration from None
  742. raise TimeoutError from None
  743. success, value = item
  744. if success:
  745. return value
  746. raise value
  747. __next__ = next # XXX
  748. def _set(self, i, obj):
  749. with self._cond:
  750. if self._index == i:
  751. self._items.append(obj)
  752. self._index += 1
  753. while self._index in self._unsorted:
  754. obj = self._unsorted.pop(self._index)
  755. self._items.append(obj)
  756. self._index += 1
  757. self._cond.notify()
  758. else:
  759. self._unsorted[i] = obj
  760. if self._index == self._length:
  761. del self._cache[self._job]
  762. self._pool = None
  763. def _set_length(self, length):
  764. with self._cond:
  765. self._length = length
  766. if self._index == self._length:
  767. self._cond.notify()
  768. del self._cache[self._job]
  769. self._pool = None
  770. #
  771. # Class whose instances are returned by `Pool.imap_unordered()`
  772. #
  773. class IMapUnorderedIterator(IMapIterator):
  774. def _set(self, i, obj):
  775. with self._cond:
  776. self._items.append(obj)
  777. self._index += 1
  778. self._cond.notify()
  779. if self._index == self._length:
  780. del self._cache[self._job]
  781. self._pool = None
  782. #
  783. #
  784. #
  785. class ThreadPool(Pool):
  786. _wrap_exception = False
  787. @staticmethod
  788. def Process(ctx, *args, **kwds):
  789. from .dummy import Process
  790. return Process(*args, **kwds)
  791. def __init__(self, processes=None, initializer=None, initargs=()):
  792. Pool.__init__(self, processes, initializer, initargs)
  793. def _setup_queues(self):
  794. self._inqueue = queue.SimpleQueue()
  795. self._outqueue = queue.SimpleQueue()
  796. self._quick_put = self._inqueue.put
  797. self._quick_get = self._outqueue.get
  798. def _get_sentinels(self):
  799. return [self._change_notifier._reader]
  800. @staticmethod
  801. def _get_worker_sentinels(workers):
  802. return []
  803. @staticmethod
  804. def _help_stuff_finish(inqueue, task_handler, size):
  805. # drain inqueue, and put sentinels at its head to make workers finish
  806. try:
  807. while True:
  808. inqueue.get(block=False)
  809. except queue.Empty:
  810. pass
  811. for i in range(size):
  812. inqueue.put(None)
  813. def _wait_for_updates(self, sentinels, change_notifier, timeout):
  814. time.sleep(timeout)