forkserver.py 12 KB


  1. import errno
  2. import os
  3. import selectors
  4. import signal
  5. import socket
  6. import struct
  7. import sys
  8. import threading
  9. import warnings
  10. from . import connection
  11. from . import process
  12. from .context import reduction
  13. from . import resource_tracker
  14. from . import spawn
  15. from . import util
  16. __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
  17. 'set_forkserver_preload']
  18. #
  19. #
  20. #
  21. MAXFDS_TO_SEND = 256
  22. SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
  23. #
  24. # Forkserver class
  25. #
  26. class ForkServer(object):
  27. def __init__(self):
  28. self._forkserver_address = None
  29. self._forkserver_alive_fd = None
  30. self._forkserver_pid = None
  31. self._inherited_fds = None
  32. self._lock = threading.Lock()
  33. self._preload_modules = ['__main__']
  34. def _stop(self):
  35. # Method used by unit tests to stop the server
  36. with self._lock:
  37. self._stop_unlocked()
  38. def _stop_unlocked(self):
  39. if self._forkserver_pid is None:
  40. return
  41. # close the "alive" file descriptor asks the server to stop
  42. os.close(self._forkserver_alive_fd)
  43. self._forkserver_alive_fd = None
  44. os.waitpid(self._forkserver_pid, 0)
  45. self._forkserver_pid = None
  46. if not util.is_abstract_socket_namespace(self._forkserver_address):
  47. os.unlink(self._forkserver_address)
  48. self._forkserver_address = None
  49. def set_forkserver_preload(self, modules_names):
  50. '''Set list of module names to try to load in forkserver process.'''
  51. if not all(type(mod) is str for mod in self._preload_modules):
  52. raise TypeError('module_names must be a list of strings')
  53. self._preload_modules = modules_names
  54. def get_inherited_fds(self):
  55. '''Return list of fds inherited from parent process.
  56. This returns None if the current process was not started by fork
  57. server.
  58. '''
  59. return self._inherited_fds
  60. def connect_to_new_process(self, fds):
  61. '''Request forkserver to create a child process.
  62. Returns a pair of fds (status_r, data_w). The calling process can read
  63. the child process's pid and (eventually) its returncode from status_r.
  64. The calling process should write to data_w the pickled preparation and
  65. process data.
  66. '''
  67. self.ensure_running()
  68. if len(fds) + 4 >= MAXFDS_TO_SEND:
  69. raise ValueError('too many fds')
  70. with socket.socket(socket.AF_UNIX) as client:
  71. client.connect(self._forkserver_address)
  72. parent_r, child_w = os.pipe()
  73. child_r, parent_w = os.pipe()
  74. allfds = [child_r, child_w, self._forkserver_alive_fd,
  75. resource_tracker.getfd()]
  76. allfds += fds
  77. try:
  78. reduction.sendfds(client, allfds)
  79. return parent_r, parent_w
  80. except:
  81. os.close(parent_r)
  82. os.close(parent_w)
  83. raise
  84. finally:
  85. os.close(child_r)
  86. os.close(child_w)
  87. def ensure_running(self):
  88. '''Make sure that a fork server is running.
  89. This can be called from any process. Note that usually a child
  90. process will just reuse the forkserver started by its parent, so
  91. ensure_running() will do nothing.
  92. '''
  93. with self._lock:
  94. resource_tracker.ensure_running()
  95. if self._forkserver_pid is not None:
  96. # forkserver was launched before, is it still running?
  97. pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
  98. if not pid:
  99. # still alive
  100. return
  101. # dead, launch it again
  102. os.close(self._forkserver_alive_fd)
  103. self._forkserver_address = None
  104. self._forkserver_alive_fd = None
  105. self._forkserver_pid = None
  106. cmd = ('from multiprocessing.forkserver import main; ' +
  107. 'main(%d, %d, %r, **%r)')
  108. if self._preload_modules:
  109. desired_keys = {'main_path', 'sys_path'}
  110. data = spawn.get_preparation_data('ignore')
  111. data = {x: y for x, y in data.items() if x in desired_keys}
  112. else:
  113. data = {}
  114. with socket.socket(socket.AF_UNIX) as listener:
  115. address = connection.arbitrary_address('AF_UNIX')
  116. listener.bind(address)
  117. if not util.is_abstract_socket_namespace(address):
  118. os.chmod(address, 0o600)
  119. listener.listen()
  120. # all client processes own the write end of the "alive" pipe;
  121. # when they all terminate the read end becomes ready.
  122. alive_r, alive_w = os.pipe()
  123. try:
  124. fds_to_pass = [listener.fileno(), alive_r]
  125. cmd %= (listener.fileno(), alive_r, self._preload_modules,
  126. data)
  127. exe = spawn.get_executable()
  128. args = [exe] + util._args_from_interpreter_flags()
  129. args += ['-c', cmd]
  130. pid = util.spawnv_passfds(exe, args, fds_to_pass)
  131. except:
  132. os.close(alive_w)
  133. raise
  134. finally:
  135. os.close(alive_r)
  136. self._forkserver_address = address
  137. self._forkserver_alive_fd = alive_w
  138. self._forkserver_pid = pid
  139. #
  140. #
  141. #
  142. def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
  143. '''Run forkserver.'''
  144. if preload:
  145. if '__main__' in preload and main_path is not None:
  146. process.current_process()._inheriting = True
  147. try:
  148. spawn.import_main_path(main_path)
  149. finally:
  150. del process.current_process()._inheriting
  151. for modname in preload:
  152. try:
  153. __import__(modname)
  154. except ImportError:
  155. pass
  156. util._close_stdin()
  157. sig_r, sig_w = os.pipe()
  158. os.set_blocking(sig_r, False)
  159. os.set_blocking(sig_w, False)
  160. def sigchld_handler(*_unused):
  161. # Dummy signal handler, doesn't do anything
  162. pass
  163. handlers = {
  164. # unblocking SIGCHLD allows the wakeup fd to notify our event loop
  165. signal.SIGCHLD: sigchld_handler,
  166. # protect the process from ^C
  167. signal.SIGINT: signal.SIG_IGN,
  168. }
  169. old_handlers = {sig: signal.signal(sig, val)
  170. for (sig, val) in handlers.items()}
  171. # calling os.write() in the Python signal handler is racy
  172. signal.set_wakeup_fd(sig_w)
  173. # map child pids to client fds
  174. pid_to_fd = {}
  175. with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
  176. selectors.DefaultSelector() as selector:
  177. _forkserver._forkserver_address = listener.getsockname()
  178. selector.register(listener, selectors.EVENT_READ)
  179. selector.register(alive_r, selectors.EVENT_READ)
  180. selector.register(sig_r, selectors.EVENT_READ)
  181. while True:
  182. try:
  183. while True:
  184. rfds = [key.fileobj for (key, events) in selector.select()]
  185. if rfds:
  186. break
  187. if alive_r in rfds:
  188. # EOF because no more client processes left
  189. assert os.read(alive_r, 1) == b'', "Not at EOF?"
  190. raise SystemExit
  191. if sig_r in rfds:
  192. # Got SIGCHLD
  193. os.read(sig_r, 65536) # exhaust
  194. while True:
  195. # Scan for child processes
  196. try:
  197. pid, sts = os.waitpid(-1, os.WNOHANG)
  198. except ChildProcessError:
  199. break
  200. if pid == 0:
  201. break
  202. child_w = pid_to_fd.pop(pid, None)
  203. if child_w is not None:
  204. returncode = os.waitstatus_to_exitcode(sts)
  205. # Send exit code to client process
  206. try:
  207. write_signed(child_w, returncode)
  208. except BrokenPipeError:
  209. # client vanished
  210. pass
  211. os.close(child_w)
  212. else:
  213. # This shouldn't happen really
  214. warnings.warn('forkserver: waitpid returned '
  215. 'unexpected pid %d' % pid)
  216. if listener in rfds:
  217. # Incoming fork request
  218. with listener.accept()[0] as s:
  219. # Receive fds from client
  220. fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
  221. if len(fds) > MAXFDS_TO_SEND:
  222. raise RuntimeError(
  223. "Too many ({0:n}) fds to send".format(
  224. len(fds)))
  225. child_r, child_w, *fds = fds
  226. s.close()
  227. pid = os.fork()
  228. if pid == 0:
  229. # Child
  230. code = 1
  231. try:
  232. listener.close()
  233. selector.close()
  234. unused_fds = [alive_r, child_w, sig_r, sig_w]
  235. unused_fds.extend(pid_to_fd.values())
  236. code = _serve_one(child_r, fds,
  237. unused_fds,
  238. old_handlers)
  239. except Exception:
  240. sys.excepthook(*sys.exc_info())
  241. sys.stderr.flush()
  242. finally:
  243. os._exit(code)
  244. else:
  245. # Send pid to client process
  246. try:
  247. write_signed(child_w, pid)
  248. except BrokenPipeError:
  249. # client vanished
  250. pass
  251. pid_to_fd[pid] = child_w
  252. os.close(child_r)
  253. for fd in fds:
  254. os.close(fd)
  255. except OSError as e:
  256. if e.errno != errno.ECONNABORTED:
  257. raise
  258. def _serve_one(child_r, fds, unused_fds, handlers):
  259. # close unnecessary stuff and reset signal handlers
  260. signal.set_wakeup_fd(-1)
  261. for sig, val in handlers.items():
  262. signal.signal(sig, val)
  263. for fd in unused_fds:
  264. os.close(fd)
  265. (_forkserver._forkserver_alive_fd,
  266. resource_tracker._resource_tracker._fd,
  267. *_forkserver._inherited_fds) = fds
  268. # Run process object received over pipe
  269. parent_sentinel = os.dup(child_r)
  270. code = spawn._main(child_r, parent_sentinel)
  271. return code
  272. #
  273. # Read and write signed numbers
  274. #
  275. def read_signed(fd):
  276. data = b''
  277. length = SIGNED_STRUCT.size
  278. while len(data) < length:
  279. s = os.read(fd, length - len(data))
  280. if not s:
  281. raise EOFError('unexpected EOF')
  282. data += s
  283. return SIGNED_STRUCT.unpack(data)[0]
  284. def write_signed(fd, n):
  285. msg = SIGNED_STRUCT.pack(n)
  286. while msg:
  287. nbytes = os.write(fd, msg)
  288. if nbytes == 0:
  289. raise RuntimeError('should not get here')
  290. msg = msg[nbytes:]
  291. #
  292. #
  293. #
  294. _forkserver = ForkServer()
  295. ensure_running = _forkserver.ensure_running
  296. get_inherited_fds = _forkserver.get_inherited_fds
  297. connect_to_new_process = _forkserver.connect_to_new_process
  298. set_forkserver_preload = _forkserver.set_forkserver_preload