resource_sharer.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #
  2. # We use a background thread for sharing fds on Unix, and for sharing sockets on
  3. # Windows.
  4. #
  5. # A client which wants to pickle a resource registers it with the resource
  6. # sharer and gets an identifier in return. The unpickling process will connect
  7. # to the resource sharer, sends the identifier and its pid, and then receives
  8. # the resource.
  9. #
  10. import os
  11. import signal
  12. import socket
  13. import sys
  14. import threading
  15. from . import process
  16. from .context import reduction
  17. from . import util
  18. __all__ = ['stop']
  19. if sys.platform == 'win32':
  20. __all__ += ['DupSocket']
  21. class DupSocket(object):
  22. '''Picklable wrapper for a socket.'''
  23. def __init__(self, sock):
  24. new_sock = sock.dup()
  25. def send(conn, pid):
  26. share = new_sock.share(pid)
  27. conn.send_bytes(share)
  28. self._id = _resource_sharer.register(send, new_sock.close)
  29. def detach(self):
  30. '''Get the socket. This should only be called once.'''
  31. with _resource_sharer.get_connection(self._id) as conn:
  32. share = conn.recv_bytes()
  33. return socket.fromshare(share)
  34. else:
  35. __all__ += ['DupFd']
  36. class DupFd(object):
  37. '''Wrapper for fd which can be used at any time.'''
  38. def __init__(self, fd):
  39. new_fd = os.dup(fd)
  40. def send(conn, pid):
  41. reduction.send_handle(conn, new_fd, pid)
  42. def close():
  43. os.close(new_fd)
  44. self._id = _resource_sharer.register(send, close)
  45. def detach(self):
  46. '''Get the fd. This should only be called once.'''
  47. with _resource_sharer.get_connection(self._id) as conn:
  48. return reduction.recv_handle(conn)
  49. class _ResourceSharer(object):
  50. '''Manager for resources using background thread.'''
  51. def __init__(self):
  52. self._key = 0
  53. self._cache = {}
  54. self._lock = threading.Lock()
  55. self._listener = None
  56. self._address = None
  57. self._thread = None
  58. util.register_after_fork(self, _ResourceSharer._afterfork)
  59. def register(self, send, close):
  60. '''Register resource, returning an identifier.'''
  61. with self._lock:
  62. if self._address is None:
  63. self._start()
  64. self._key += 1
  65. self._cache[self._key] = (send, close)
  66. return (self._address, self._key)
  67. @staticmethod
  68. def get_connection(ident):
  69. '''Return connection from which to receive identified resource.'''
  70. from .connection import Client
  71. address, key = ident
  72. c = Client(address, authkey=process.current_process().authkey)
  73. c.send((key, os.getpid()))
  74. return c
  75. def stop(self, timeout=None):
  76. '''Stop the background thread and clear registered resources.'''
  77. from .connection import Client
  78. with self._lock:
  79. if self._address is not None:
  80. c = Client(self._address,
  81. authkey=process.current_process().authkey)
  82. c.send(None)
  83. c.close()
  84. self._thread.join(timeout)
  85. if self._thread.is_alive():
  86. util.sub_warning('_ResourceSharer thread did '
  87. 'not stop when asked')
  88. self._listener.close()
  89. self._thread = None
  90. self._address = None
  91. self._listener = None
  92. for key, (send, close) in self._cache.items():
  93. close()
  94. self._cache.clear()
  95. def _afterfork(self):
  96. for key, (send, close) in self._cache.items():
  97. close()
  98. self._cache.clear()
  99. self._lock._at_fork_reinit()
  100. if self._listener is not None:
  101. self._listener.close()
  102. self._listener = None
  103. self._address = None
  104. self._thread = None
  105. def _start(self):
  106. from .connection import Listener
  107. assert self._listener is None, "Already have Listener"
  108. util.debug('starting listener and thread for sending handles')
  109. self._listener = Listener(authkey=process.current_process().authkey)
  110. self._address = self._listener.address
  111. t = threading.Thread(target=self._serve)
  112. t.daemon = True
  113. t.start()
  114. self._thread = t
  115. def _serve(self):
  116. if hasattr(signal, 'pthread_sigmask'):
  117. signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
  118. while 1:
  119. try:
  120. with self._listener.accept() as conn:
  121. msg = conn.recv()
  122. if msg is None:
  123. break
  124. key, destination_pid = msg
  125. send, close = self._cache.pop(key)
  126. try:
  127. send(conn, destination_pid)
  128. finally:
  129. close()
  130. except:
  131. if not util.is_exiting():
  132. sys.excepthook(*sys.exc_info())
  133. _resource_sharer = _ResourceSharer()
  134. stop = _resource_sharer.stop