subprocess.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. __all__ = 'create_subprocess_exec', 'create_subprocess_shell'
  2. import subprocess
  3. import warnings
  4. from . import events
  5. from . import protocols
  6. from . import streams
  7. from . import tasks
  8. from .log import logger
  9. PIPE = subprocess.PIPE
  10. STDOUT = subprocess.STDOUT
  11. DEVNULL = subprocess.DEVNULL
  12. class SubprocessStreamProtocol(streams.FlowControlMixin,
  13. protocols.SubprocessProtocol):
  14. """Like StreamReaderProtocol, but for a subprocess."""
  15. def __init__(self, limit, loop):
  16. super().__init__(loop=loop)
  17. self._limit = limit
  18. self.stdin = self.stdout = self.stderr = None
  19. self._transport = None
  20. self._process_exited = False
  21. self._pipe_fds = []
  22. self._stdin_closed = self._loop.create_future()
  23. def __repr__(self):
  24. info = [self.__class__.__name__]
  25. if self.stdin is not None:
  26. info.append(f'stdin={self.stdin!r}')
  27. if self.stdout is not None:
  28. info.append(f'stdout={self.stdout!r}')
  29. if self.stderr is not None:
  30. info.append(f'stderr={self.stderr!r}')
  31. return '<{}>'.format(' '.join(info))
  32. def connection_made(self, transport):
  33. self._transport = transport
  34. stdout_transport = transport.get_pipe_transport(1)
  35. if stdout_transport is not None:
  36. self.stdout = streams.StreamReader(limit=self._limit,
  37. loop=self._loop)
  38. self.stdout.set_transport(stdout_transport)
  39. self._pipe_fds.append(1)
  40. stderr_transport = transport.get_pipe_transport(2)
  41. if stderr_transport is not None:
  42. self.stderr = streams.StreamReader(limit=self._limit,
  43. loop=self._loop)
  44. self.stderr.set_transport(stderr_transport)
  45. self._pipe_fds.append(2)
  46. stdin_transport = transport.get_pipe_transport(0)
  47. if stdin_transport is not None:
  48. self.stdin = streams.StreamWriter(stdin_transport,
  49. protocol=self,
  50. reader=None,
  51. loop=self._loop)
  52. def pipe_data_received(self, fd, data):
  53. if fd == 1:
  54. reader = self.stdout
  55. elif fd == 2:
  56. reader = self.stderr
  57. else:
  58. reader = None
  59. if reader is not None:
  60. reader.feed_data(data)
  61. def pipe_connection_lost(self, fd, exc):
  62. if fd == 0:
  63. pipe = self.stdin
  64. if pipe is not None:
  65. pipe.close()
  66. self.connection_lost(exc)
  67. if exc is None:
  68. self._stdin_closed.set_result(None)
  69. else:
  70. self._stdin_closed.set_exception(exc)
  71. return
  72. if fd == 1:
  73. reader = self.stdout
  74. elif fd == 2:
  75. reader = self.stderr
  76. else:
  77. reader = None
  78. if reader is not None:
  79. if exc is None:
  80. reader.feed_eof()
  81. else:
  82. reader.set_exception(exc)
  83. if fd in self._pipe_fds:
  84. self._pipe_fds.remove(fd)
  85. self._maybe_close_transport()
  86. def process_exited(self):
  87. self._process_exited = True
  88. self._maybe_close_transport()
  89. def _maybe_close_transport(self):
  90. if len(self._pipe_fds) == 0 and self._process_exited:
  91. self._transport.close()
  92. self._transport = None
  93. def _get_close_waiter(self, stream):
  94. if stream is self.stdin:
  95. return self._stdin_closed
  96. class Process:
  97. def __init__(self, transport, protocol, loop):
  98. self._transport = transport
  99. self._protocol = protocol
  100. self._loop = loop
  101. self.stdin = protocol.stdin
  102. self.stdout = protocol.stdout
  103. self.stderr = protocol.stderr
  104. self.pid = transport.get_pid()
  105. def __repr__(self):
  106. return f'<{self.__class__.__name__} {self.pid}>'
  107. @property
  108. def returncode(self):
  109. return self._transport.get_returncode()
  110. async def wait(self):
  111. """Wait until the process exit and return the process return code."""
  112. return await self._transport._wait()
  113. def send_signal(self, signal):
  114. self._transport.send_signal(signal)
  115. def terminate(self):
  116. self._transport.terminate()
  117. def kill(self):
  118. self._transport.kill()
  119. async def _feed_stdin(self, input):
  120. debug = self._loop.get_debug()
  121. self.stdin.write(input)
  122. if debug:
  123. logger.debug(
  124. '%r communicate: feed stdin (%s bytes)', self, len(input))
  125. try:
  126. await self.stdin.drain()
  127. except (BrokenPipeError, ConnectionResetError) as exc:
  128. # communicate() ignores BrokenPipeError and ConnectionResetError
  129. if debug:
  130. logger.debug('%r communicate: stdin got %r', self, exc)
  131. if debug:
  132. logger.debug('%r communicate: close stdin', self)
  133. self.stdin.close()
  134. async def _noop(self):
  135. return None
  136. async def _read_stream(self, fd):
  137. transport = self._transport.get_pipe_transport(fd)
  138. if fd == 2:
  139. stream = self.stderr
  140. else:
  141. assert fd == 1
  142. stream = self.stdout
  143. if self._loop.get_debug():
  144. name = 'stdout' if fd == 1 else 'stderr'
  145. logger.debug('%r communicate: read %s', self, name)
  146. output = await stream.read()
  147. if self._loop.get_debug():
  148. name = 'stdout' if fd == 1 else 'stderr'
  149. logger.debug('%r communicate: close %s', self, name)
  150. transport.close()
  151. return output
  152. async def communicate(self, input=None):
  153. if input is not None:
  154. stdin = self._feed_stdin(input)
  155. else:
  156. stdin = self._noop()
  157. if self.stdout is not None:
  158. stdout = self._read_stream(1)
  159. else:
  160. stdout = self._noop()
  161. if self.stderr is not None:
  162. stderr = self._read_stream(2)
  163. else:
  164. stderr = self._noop()
  165. stdin, stdout, stderr = await tasks._gather(stdin, stdout, stderr,
  166. loop=self._loop)
  167. await self.wait()
  168. return (stdout, stderr)
  169. async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
  170. loop=None, limit=streams._DEFAULT_LIMIT,
  171. **kwds):
  172. if loop is None:
  173. loop = events.get_event_loop()
  174. else:
  175. warnings.warn("The loop argument is deprecated since Python 3.8 "
  176. "and scheduled for removal in Python 3.10.",
  177. DeprecationWarning,
  178. stacklevel=2
  179. )
  180. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  181. loop=loop)
  182. transport, protocol = await loop.subprocess_shell(
  183. protocol_factory,
  184. cmd, stdin=stdin, stdout=stdout,
  185. stderr=stderr, **kwds)
  186. return Process(transport, protocol, loop)
  187. async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
  188. stderr=None, loop=None,
  189. limit=streams._DEFAULT_LIMIT, **kwds):
  190. if loop is None:
  191. loop = events.get_event_loop()
  192. else:
  193. warnings.warn("The loop argument is deprecated since Python 3.8 "
  194. "and scheduled for removal in Python 3.10.",
  195. DeprecationWarning,
  196. stacklevel=2
  197. )
  198. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  199. loop=loop)
  200. transport, protocol = await loop.subprocess_exec(
  201. protocol_factory,
  202. program, *args,
  203. stdin=stdin, stdout=stdout,
  204. stderr=stderr, **kwds)
  205. return Process(transport, protocol, loop)