reduction.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. #
  2. # Module which deals with pickling of objects.
  3. #
  4. # multiprocessing/reduction.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. from abc import ABCMeta
  10. import copyreg
  11. import functools
  12. import io
  13. import os
  14. import pickle
  15. import socket
  16. import sys
  17. from . import context
  18. __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
  19. HAVE_SEND_HANDLE = (sys.platform == 'win32' or
  20. (hasattr(socket, 'CMSG_LEN') and
  21. hasattr(socket, 'SCM_RIGHTS') and
  22. hasattr(socket.socket, 'sendmsg')))
  23. #
  24. # Pickler subclass
  25. #
  26. class ForkingPickler(pickle.Pickler):
  27. '''Pickler subclass used by multiprocessing.'''
  28. _extra_reducers = {}
  29. _copyreg_dispatch_table = copyreg.dispatch_table
  30. def __init__(self, *args):
  31. super().__init__(*args)
  32. self.dispatch_table = self._copyreg_dispatch_table.copy()
  33. self.dispatch_table.update(self._extra_reducers)
  34. @classmethod
  35. def register(cls, type, reduce):
  36. '''Register a reduce function for a type.'''
  37. cls._extra_reducers[type] = reduce
  38. @classmethod
  39. def dumps(cls, obj, protocol=None):
  40. buf = io.BytesIO()
  41. cls(buf, protocol).dump(obj)
  42. return buf.getbuffer()
  43. loads = pickle.loads
  44. register = ForkingPickler.register
  45. def dump(obj, file, protocol=None):
  46. '''Replacement for pickle.dump() using ForkingPickler.'''
  47. ForkingPickler(file, protocol).dump(obj)
  48. #
  49. # Platform specific definitions
  50. #
  51. if sys.platform == 'win32':
  52. # Windows
  53. __all__ += ['DupHandle', 'duplicate', 'steal_handle']
  54. import _winapi
  55. def duplicate(handle, target_process=None, inheritable=False,
  56. *, source_process=None):
  57. '''Duplicate a handle. (target_process is a handle not a pid!)'''
  58. current_process = _winapi.GetCurrentProcess()
  59. if source_process is None:
  60. source_process = current_process
  61. if target_process is None:
  62. target_process = current_process
  63. return _winapi.DuplicateHandle(
  64. source_process, handle, target_process,
  65. 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
  66. def steal_handle(source_pid, handle):
  67. '''Steal a handle from process identified by source_pid.'''
  68. source_process_handle = _winapi.OpenProcess(
  69. _winapi.PROCESS_DUP_HANDLE, False, source_pid)
  70. try:
  71. return _winapi.DuplicateHandle(
  72. source_process_handle, handle,
  73. _winapi.GetCurrentProcess(), 0, False,
  74. _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
  75. finally:
  76. _winapi.CloseHandle(source_process_handle)
  77. def send_handle(conn, handle, destination_pid):
  78. '''Send a handle over a local connection.'''
  79. dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
  80. conn.send(dh)
  81. def recv_handle(conn):
  82. '''Receive a handle over a local connection.'''
  83. return conn.recv().detach()
  84. class DupHandle(object):
  85. '''Picklable wrapper for a handle.'''
  86. def __init__(self, handle, access, pid=None):
  87. if pid is None:
  88. # We just duplicate the handle in the current process and
  89. # let the receiving process steal the handle.
  90. pid = os.getpid()
  91. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
  92. try:
  93. self._handle = _winapi.DuplicateHandle(
  94. _winapi.GetCurrentProcess(),
  95. handle, proc, access, False, 0)
  96. finally:
  97. _winapi.CloseHandle(proc)
  98. self._access = access
  99. self._pid = pid
  100. def detach(self):
  101. '''Get the handle. This should only be called once.'''
  102. # retrieve handle from process which currently owns it
  103. if self._pid == os.getpid():
  104. # The handle has already been duplicated for this process.
  105. return self._handle
  106. # We must steal the handle from the process whose pid is self._pid.
  107. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
  108. self._pid)
  109. try:
  110. return _winapi.DuplicateHandle(
  111. proc, self._handle, _winapi.GetCurrentProcess(),
  112. self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
  113. finally:
  114. _winapi.CloseHandle(proc)
  115. else:
  116. # Unix
  117. __all__ += ['DupFd', 'sendfds', 'recvfds']
  118. import array
  119. # On MacOSX we should acknowledge receipt of fds -- see Issue14669
  120. ACKNOWLEDGE = sys.platform == 'darwin'
  121. def sendfds(sock, fds):
  122. '''Send an array of fds over an AF_UNIX socket.'''
  123. fds = array.array('i', fds)
  124. msg = bytes([len(fds) % 256])
  125. sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
  126. if ACKNOWLEDGE and sock.recv(1) != b'A':
  127. raise RuntimeError('did not receive acknowledgement of fd')
  128. def recvfds(sock, size):
  129. '''Receive an array of fds over an AF_UNIX socket.'''
  130. a = array.array('i')
  131. bytes_size = a.itemsize * size
  132. msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
  133. if not msg and not ancdata:
  134. raise EOFError
  135. try:
  136. if ACKNOWLEDGE:
  137. sock.send(b'A')
  138. if len(ancdata) != 1:
  139. raise RuntimeError('received %d items of ancdata' %
  140. len(ancdata))
  141. cmsg_level, cmsg_type, cmsg_data = ancdata[0]
  142. if (cmsg_level == socket.SOL_SOCKET and
  143. cmsg_type == socket.SCM_RIGHTS):
  144. if len(cmsg_data) % a.itemsize != 0:
  145. raise ValueError
  146. a.frombytes(cmsg_data)
  147. if len(a) % 256 != msg[0]:
  148. raise AssertionError(
  149. "Len is {0:n} but msg[0] is {1!r}".format(
  150. len(a), msg[0]))
  151. return list(a)
  152. except (ValueError, IndexError):
  153. pass
  154. raise RuntimeError('Invalid data received')
  155. def send_handle(conn, handle, destination_pid):
  156. '''Send a handle over a local connection.'''
  157. with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
  158. sendfds(s, [handle])
  159. def recv_handle(conn):
  160. '''Receive a handle over a local connection.'''
  161. with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
  162. return recvfds(s, 1)[0]
  163. def DupFd(fd):
  164. '''Return a wrapper for an fd.'''
  165. popen_obj = context.get_spawning_popen()
  166. if popen_obj is not None:
  167. return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
  168. elif HAVE_SEND_HANDLE:
  169. from . import resource_sharer
  170. return resource_sharer.DupFd(fd)
  171. else:
  172. raise ValueError('SCM_RIGHTS appears not to be available')
  173. #
  174. # Try making some callable types picklable
  175. #
  176. def _reduce_method(m):
  177. if m.__self__ is None:
  178. return getattr, (m.__class__, m.__func__.__name__)
  179. else:
  180. return getattr, (m.__self__, m.__func__.__name__)
  181. class _C:
  182. def f(self):
  183. pass
  184. register(type(_C().f), _reduce_method)
  185. def _reduce_method_descriptor(m):
  186. return getattr, (m.__objclass__, m.__name__)
  187. register(type(list.append), _reduce_method_descriptor)
  188. register(type(int.__add__), _reduce_method_descriptor)
  189. def _reduce_partial(p):
  190. return _rebuild_partial, (p.func, p.args, p.keywords or {})
  191. def _rebuild_partial(func, args, keywords):
  192. return functools.partial(func, *args, **keywords)
  193. register(functools.partial, _reduce_partial)
  194. #
  195. # Make sockets picklable
  196. #
  197. if sys.platform == 'win32':
  198. def _reduce_socket(s):
  199. from .resource_sharer import DupSocket
  200. return _rebuild_socket, (DupSocket(s),)
  201. def _rebuild_socket(ds):
  202. return ds.detach()
  203. register(socket.socket, _reduce_socket)
  204. else:
  205. def _reduce_socket(s):
  206. df = DupFd(s.fileno())
  207. return _rebuild_socket, (df, s.family, s.type, s.proto)
  208. def _rebuild_socket(df, family, type, proto):
  209. fd = df.detach()
  210. return socket.socket(family, type, proto, fileno=fd)
  211. register(socket.socket, _reduce_socket)
  212. class AbstractReducer(metaclass=ABCMeta):
  213. '''Abstract base class for use in implementing a Reduction class
  214. suitable for use in replacing the standard reduction mechanism
  215. used in multiprocessing.'''
  216. ForkingPickler = ForkingPickler
  217. register = register
  218. dump = dump
  219. send_handle = send_handle
  220. recv_handle = recv_handle
  221. if sys.platform == 'win32':
  222. steal_handle = steal_handle
  223. duplicate = duplicate
  224. DupHandle = DupHandle
  225. else:
  226. sendfds = sendfds
  227. recvfds = recvfds
  228. DupFd = DupFd
  229. _reduce_method = _reduce_method
  230. _reduce_method_descriptor = _reduce_method_descriptor
  231. _rebuild_partial = _rebuild_partial
  232. _reduce_socket = _reduce_socket
  233. _rebuild_socket = _rebuild_socket
  234. def __init__(self, *args):
  235. register(type(_C().f), _reduce_method)
  236. register(type(list.append), _reduce_method_descriptor)
  237. register(type(int.__add__), _reduce_method_descriptor)
  238. register(functools.partial, _reduce_partial)
  239. register(socket.socket, _reduce_socket)