|
- #
- # Module which deals with pickling of objects.
- #
- # multiprocessing/reduction.py
- #
- # Copyright (c) 2006-2008, R Oudkerk
- # Licensed to PSF under a Contributor Agreement.
- #
- from abc import ABCMeta
- import copyreg
- import functools
- import io
- import os
- import pickle
- import socket
- import sys
- from . import context
- __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
- HAVE_SEND_HANDLE = (sys.platform == 'win32' or
- (hasattr(socket, 'CMSG_LEN') and
- hasattr(socket, 'SCM_RIGHTS') and
- hasattr(socket.socket, 'sendmsg')))
- #
- # Pickler subclass
- #
- class ForkingPickler(pickle.Pickler):
- '''Pickler subclass used by multiprocessing.'''
- _extra_reducers = {}
- _copyreg_dispatch_table = copyreg.dispatch_table
- def __init__(self, *args):
- super().__init__(*args)
- self.dispatch_table = self._copyreg_dispatch_table.copy()
- self.dispatch_table.update(self._extra_reducers)
- @classmethod
- def register(cls, type, reduce):
- '''Register a reduce function for a type.'''
- cls._extra_reducers[type] = reduce
- @classmethod
- def dumps(cls, obj, protocol=None):
- buf = io.BytesIO()
- cls(buf, protocol).dump(obj)
- return buf.getbuffer()
- loads = pickle.loads
- register = ForkingPickler.register
- def dump(obj, file, protocol=None):
- '''Replacement for pickle.dump() using ForkingPickler.'''
- ForkingPickler(file, protocol).dump(obj)
- #
- # Platform specific definitions
- #
- if sys.platform == 'win32':
- # Windows
- __all__ += ['DupHandle', 'duplicate', 'steal_handle']
- import _winapi
- def duplicate(handle, target_process=None, inheritable=False,
- *, source_process=None):
- '''Duplicate a handle. (target_process is a handle not a pid!)'''
- current_process = _winapi.GetCurrentProcess()
- if source_process is None:
- source_process = current_process
- if target_process is None:
- target_process = current_process
- return _winapi.DuplicateHandle(
- source_process, handle, target_process,
- 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
- def steal_handle(source_pid, handle):
- '''Steal a handle from process identified by source_pid.'''
- source_process_handle = _winapi.OpenProcess(
- _winapi.PROCESS_DUP_HANDLE, False, source_pid)
- try:
- return _winapi.DuplicateHandle(
- source_process_handle, handle,
- _winapi.GetCurrentProcess(), 0, False,
- _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
- finally:
- _winapi.CloseHandle(source_process_handle)
- def send_handle(conn, handle, destination_pid):
- '''Send a handle over a local connection.'''
- dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
- conn.send(dh)
- def recv_handle(conn):
- '''Receive a handle over a local connection.'''
- return conn.recv().detach()
- class DupHandle(object):
- '''Picklable wrapper for a handle.'''
- def __init__(self, handle, access, pid=None):
- if pid is None:
- # We just duplicate the handle in the current process and
- # let the receiving process steal the handle.
- pid = os.getpid()
- proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
- try:
- self._handle = _winapi.DuplicateHandle(
- _winapi.GetCurrentProcess(),
- handle, proc, access, False, 0)
- finally:
- _winapi.CloseHandle(proc)
- self._access = access
- self._pid = pid
- def detach(self):
- '''Get the handle. This should only be called once.'''
- # retrieve handle from process which currently owns it
- if self._pid == os.getpid():
- # The handle has already been duplicated for this process.
- return self._handle
- # We must steal the handle from the process whose pid is self._pid.
- proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
- self._pid)
- try:
- return _winapi.DuplicateHandle(
- proc, self._handle, _winapi.GetCurrentProcess(),
- self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
- finally:
- _winapi.CloseHandle(proc)
- else:
- # Unix
- __all__ += ['DupFd', 'sendfds', 'recvfds']
- import array
- # On MacOSX we should acknowledge receipt of fds -- see Issue14669
- ACKNOWLEDGE = sys.platform == 'darwin'
- def sendfds(sock, fds):
- '''Send an array of fds over an AF_UNIX socket.'''
- fds = array.array('i', fds)
- msg = bytes([len(fds) % 256])
- sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
- if ACKNOWLEDGE and sock.recv(1) != b'A':
- raise RuntimeError('did not receive acknowledgement of fd')
- def recvfds(sock, size):
- '''Receive an array of fds over an AF_UNIX socket.'''
- a = array.array('i')
- bytes_size = a.itemsize * size
- msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
- if not msg and not ancdata:
- raise EOFError
- try:
- if ACKNOWLEDGE:
- sock.send(b'A')
- if len(ancdata) != 1:
- raise RuntimeError('received %d items of ancdata' %
- len(ancdata))
- cmsg_level, cmsg_type, cmsg_data = ancdata[0]
- if (cmsg_level == socket.SOL_SOCKET and
- cmsg_type == socket.SCM_RIGHTS):
- if len(cmsg_data) % a.itemsize != 0:
- raise ValueError
- a.frombytes(cmsg_data)
- if len(a) % 256 != msg[0]:
- raise AssertionError(
- "Len is {0:n} but msg[0] is {1!r}".format(
- len(a), msg[0]))
- return list(a)
- except (ValueError, IndexError):
- pass
- raise RuntimeError('Invalid data received')
- def send_handle(conn, handle, destination_pid):
- '''Send a handle over a local connection.'''
- with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
- sendfds(s, [handle])
- def recv_handle(conn):
- '''Receive a handle over a local connection.'''
- with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
- return recvfds(s, 1)[0]
- def DupFd(fd):
- '''Return a wrapper for an fd.'''
- popen_obj = context.get_spawning_popen()
- if popen_obj is not None:
- return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
- elif HAVE_SEND_HANDLE:
- from . import resource_sharer
- return resource_sharer.DupFd(fd)
- else:
- raise ValueError('SCM_RIGHTS appears not to be available')
- #
- # Try making some callable types picklable
- #
- def _reduce_method(m):
- if m.__self__ is None:
- return getattr, (m.__class__, m.__func__.__name__)
- else:
- return getattr, (m.__self__, m.__func__.__name__)
- class _C:
- def f(self):
- pass
- register(type(_C().f), _reduce_method)
- def _reduce_method_descriptor(m):
- return getattr, (m.__objclass__, m.__name__)
- register(type(list.append), _reduce_method_descriptor)
- register(type(int.__add__), _reduce_method_descriptor)
- def _reduce_partial(p):
- return _rebuild_partial, (p.func, p.args, p.keywords or {})
- def _rebuild_partial(func, args, keywords):
- return functools.partial(func, *args, **keywords)
- register(functools.partial, _reduce_partial)
- #
- # Make sockets picklable
- #
- if sys.platform == 'win32':
- def _reduce_socket(s):
- from .resource_sharer import DupSocket
- return _rebuild_socket, (DupSocket(s),)
- def _rebuild_socket(ds):
- return ds.detach()
- register(socket.socket, _reduce_socket)
- else:
- def _reduce_socket(s):
- df = DupFd(s.fileno())
- return _rebuild_socket, (df, s.family, s.type, s.proto)
- def _rebuild_socket(df, family, type, proto):
- fd = df.detach()
- return socket.socket(family, type, proto, fileno=fd)
- register(socket.socket, _reduce_socket)
- class AbstractReducer(metaclass=ABCMeta):
- '''Abstract base class for use in implementing a Reduction class
- suitable for use in replacing the standard reduction mechanism
- used in multiprocessing.'''
- ForkingPickler = ForkingPickler
- register = register
- dump = dump
- send_handle = send_handle
- recv_handle = recv_handle
- if sys.platform == 'win32':
- steal_handle = steal_handle
- duplicate = duplicate
- DupHandle = DupHandle
- else:
- sendfds = sendfds
- recvfds = recvfds
- DupFd = DupFd
- _reduce_method = _reduce_method
- _reduce_method_descriptor = _reduce_method_descriptor
- _rebuild_partial = _rebuild_partial
- _reduce_socket = _reduce_socket
- _rebuild_socket = _rebuild_socket
- def __init__(self, *args):
- register(type(_C().f), _reduce_method)
- register(type(list.append), _reduce_method_descriptor)
- register(type(int.__add__), _reduce_method_descriptor)
- register(functools.partial, _reduce_partial)
- register(socket.socket, _reduce_socket)
|