selector_events.py 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097
  1. """Event loop using a selector and related classes.
  2. A selector is a "notify-when-ready" multiplexer. For a subclass which
  3. also includes support for signal handling, see the unix_events sub-module.
  4. """
  5. __all__ = 'BaseSelectorEventLoop',
  6. import collections
  7. import errno
  8. import functools
  9. import selectors
  10. import socket
  11. import warnings
  12. import weakref
  13. try:
  14. import ssl
  15. except ImportError: # pragma: no cover
  16. ssl = None
  17. from . import base_events
  18. from . import constants
  19. from . import events
  20. from . import futures
  21. from . import protocols
  22. from . import sslproto
  23. from . import transports
  24. from . import trsock
  25. from .log import logger
  26. def _test_selector_event(selector, fd, event):
  27. # Test if the selector is monitoring 'event' events
  28. # for the file descriptor 'fd'.
  29. try:
  30. key = selector.get_key(fd)
  31. except KeyError:
  32. return False
  33. else:
  34. return bool(key.events & event)
  35. class BaseSelectorEventLoop(base_events.BaseEventLoop):
  36. """Selector event loop.
  37. See events.EventLoop for API specification.
  38. """
  39. def __init__(self, selector=None):
  40. super().__init__()
  41. if selector is None:
  42. selector = selectors.DefaultSelector()
  43. logger.debug('Using selector: %s', selector.__class__.__name__)
  44. self._selector = selector
  45. self._make_self_pipe()
  46. self._transports = weakref.WeakValueDictionary()
  47. def _make_socket_transport(self, sock, protocol, waiter=None, *,
  48. extra=None, server=None):
  49. return _SelectorSocketTransport(self, sock, protocol, waiter,
  50. extra, server)
  51. def _make_ssl_transport(
  52. self, rawsock, protocol, sslcontext, waiter=None,
  53. *, server_side=False, server_hostname=None,
  54. extra=None, server=None,
  55. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  56. ssl_protocol = sslproto.SSLProtocol(
  57. self, protocol, sslcontext, waiter,
  58. server_side, server_hostname,
  59. ssl_handshake_timeout=ssl_handshake_timeout)
  60. _SelectorSocketTransport(self, rawsock, ssl_protocol,
  61. extra=extra, server=server)
  62. return ssl_protocol._app_transport
  63. def _make_datagram_transport(self, sock, protocol,
  64. address=None, waiter=None, extra=None):
  65. return _SelectorDatagramTransport(self, sock, protocol,
  66. address, waiter, extra)
  67. def close(self):
  68. if self.is_running():
  69. raise RuntimeError("Cannot close a running event loop")
  70. if self.is_closed():
  71. return
  72. self._close_self_pipe()
  73. super().close()
  74. if self._selector is not None:
  75. self._selector.close()
  76. self._selector = None
  77. def _close_self_pipe(self):
  78. self._remove_reader(self._ssock.fileno())
  79. self._ssock.close()
  80. self._ssock = None
  81. self._csock.close()
  82. self._csock = None
  83. self._internal_fds -= 1
  84. def _make_self_pipe(self):
  85. # A self-socket, really. :-)
  86. self._ssock, self._csock = socket.socketpair()
  87. self._ssock.setblocking(False)
  88. self._csock.setblocking(False)
  89. self._internal_fds += 1
  90. self._add_reader(self._ssock.fileno(), self._read_from_self)
  91. def _process_self_data(self, data):
  92. pass
  93. def _read_from_self(self):
  94. while True:
  95. try:
  96. data = self._ssock.recv(4096)
  97. if not data:
  98. break
  99. self._process_self_data(data)
  100. except InterruptedError:
  101. continue
  102. except BlockingIOError:
  103. break
  104. def _write_to_self(self):
  105. # This may be called from a different thread, possibly after
  106. # _close_self_pipe() has been called or even while it is
  107. # running. Guard for self._csock being None or closed. When
  108. # a socket is closed, send() raises OSError (with errno set to
  109. # EBADF, but let's not rely on the exact error code).
  110. csock = self._csock
  111. if csock is None:
  112. return
  113. try:
  114. csock.send(b'\0')
  115. except OSError:
  116. if self._debug:
  117. logger.debug("Fail to write a null byte into the "
  118. "self-pipe socket",
  119. exc_info=True)
  120. def _start_serving(self, protocol_factory, sock,
  121. sslcontext=None, server=None, backlog=100,
  122. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  123. self._add_reader(sock.fileno(), self._accept_connection,
  124. protocol_factory, sock, sslcontext, server, backlog,
  125. ssl_handshake_timeout)
  126. def _accept_connection(
  127. self, protocol_factory, sock,
  128. sslcontext=None, server=None, backlog=100,
  129. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  130. # This method is only called once for each event loop tick where the
  131. # listening socket has triggered an EVENT_READ. There may be multiple
  132. # connections waiting for an .accept() so it is called in a loop.
  133. # See https://bugs.python.org/issue27906 for more details.
  134. for _ in range(backlog):
  135. try:
  136. conn, addr = sock.accept()
  137. if self._debug:
  138. logger.debug("%r got a new connection from %r: %r",
  139. server, addr, conn)
  140. conn.setblocking(False)
  141. except (BlockingIOError, InterruptedError, ConnectionAbortedError):
  142. # Early exit because the socket accept buffer is empty.
  143. return None
  144. except OSError as exc:
  145. # There's nowhere to send the error, so just log it.
  146. if exc.errno in (errno.EMFILE, errno.ENFILE,
  147. errno.ENOBUFS, errno.ENOMEM):
  148. # Some platforms (e.g. Linux keep reporting the FD as
  149. # ready, so we remove the read handler temporarily.
  150. # We'll try again in a while.
  151. self.call_exception_handler({
  152. 'message': 'socket.accept() out of system resource',
  153. 'exception': exc,
  154. 'socket': trsock.TransportSocket(sock),
  155. })
  156. self._remove_reader(sock.fileno())
  157. self.call_later(constants.ACCEPT_RETRY_DELAY,
  158. self._start_serving,
  159. protocol_factory, sock, sslcontext, server,
  160. backlog, ssl_handshake_timeout)
  161. else:
  162. raise # The event loop will catch, log and ignore it.
  163. else:
  164. extra = {'peername': addr}
  165. accept = self._accept_connection2(
  166. protocol_factory, conn, extra, sslcontext, server,
  167. ssl_handshake_timeout)
  168. self.create_task(accept)
  169. async def _accept_connection2(
  170. self, protocol_factory, conn, extra,
  171. sslcontext=None, server=None,
  172. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  173. protocol = None
  174. transport = None
  175. try:
  176. protocol = protocol_factory()
  177. waiter = self.create_future()
  178. if sslcontext:
  179. transport = self._make_ssl_transport(
  180. conn, protocol, sslcontext, waiter=waiter,
  181. server_side=True, extra=extra, server=server,
  182. ssl_handshake_timeout=ssl_handshake_timeout)
  183. else:
  184. transport = self._make_socket_transport(
  185. conn, protocol, waiter=waiter, extra=extra,
  186. server=server)
  187. try:
  188. await waiter
  189. except BaseException:
  190. transport.close()
  191. raise
  192. # It's now up to the protocol to handle the connection.
  193. except (SystemExit, KeyboardInterrupt):
  194. raise
  195. except BaseException as exc:
  196. if self._debug:
  197. context = {
  198. 'message':
  199. 'Error on transport creation for incoming connection',
  200. 'exception': exc,
  201. }
  202. if protocol is not None:
  203. context['protocol'] = protocol
  204. if transport is not None:
  205. context['transport'] = transport
  206. self.call_exception_handler(context)
  207. def _ensure_fd_no_transport(self, fd):
  208. fileno = fd
  209. if not isinstance(fileno, int):
  210. try:
  211. fileno = int(fileno.fileno())
  212. except (AttributeError, TypeError, ValueError):
  213. # This code matches selectors._fileobj_to_fd function.
  214. raise ValueError(f"Invalid file object: {fd!r}") from None
  215. try:
  216. transport = self._transports[fileno]
  217. except KeyError:
  218. pass
  219. else:
  220. if not transport.is_closing():
  221. raise RuntimeError(
  222. f'File descriptor {fd!r} is used by transport '
  223. f'{transport!r}')
  224. def _add_reader(self, fd, callback, *args):
  225. self._check_closed()
  226. handle = events.Handle(callback, args, self, None)
  227. try:
  228. key = self._selector.get_key(fd)
  229. except KeyError:
  230. self._selector.register(fd, selectors.EVENT_READ,
  231. (handle, None))
  232. else:
  233. mask, (reader, writer) = key.events, key.data
  234. self._selector.modify(fd, mask | selectors.EVENT_READ,
  235. (handle, writer))
  236. if reader is not None:
  237. reader.cancel()
  238. return handle
  239. def _remove_reader(self, fd):
  240. if self.is_closed():
  241. return False
  242. try:
  243. key = self._selector.get_key(fd)
  244. except KeyError:
  245. return False
  246. else:
  247. mask, (reader, writer) = key.events, key.data
  248. mask &= ~selectors.EVENT_READ
  249. if not mask:
  250. self._selector.unregister(fd)
  251. else:
  252. self._selector.modify(fd, mask, (None, writer))
  253. if reader is not None:
  254. reader.cancel()
  255. return True
  256. else:
  257. return False
  258. def _add_writer(self, fd, callback, *args):
  259. self._check_closed()
  260. handle = events.Handle(callback, args, self, None)
  261. try:
  262. key = self._selector.get_key(fd)
  263. except KeyError:
  264. self._selector.register(fd, selectors.EVENT_WRITE,
  265. (None, handle))
  266. else:
  267. mask, (reader, writer) = key.events, key.data
  268. self._selector.modify(fd, mask | selectors.EVENT_WRITE,
  269. (reader, handle))
  270. if writer is not None:
  271. writer.cancel()
  272. return handle
  273. def _remove_writer(self, fd):
  274. """Remove a writer callback."""
  275. if self.is_closed():
  276. return False
  277. try:
  278. key = self._selector.get_key(fd)
  279. except KeyError:
  280. return False
  281. else:
  282. mask, (reader, writer) = key.events, key.data
  283. # Remove both writer and connector.
  284. mask &= ~selectors.EVENT_WRITE
  285. if not mask:
  286. self._selector.unregister(fd)
  287. else:
  288. self._selector.modify(fd, mask, (reader, None))
  289. if writer is not None:
  290. writer.cancel()
  291. return True
  292. else:
  293. return False
  294. def add_reader(self, fd, callback, *args):
  295. """Add a reader callback."""
  296. self._ensure_fd_no_transport(fd)
  297. self._add_reader(fd, callback, *args)
  298. def remove_reader(self, fd):
  299. """Remove a reader callback."""
  300. self._ensure_fd_no_transport(fd)
  301. return self._remove_reader(fd)
  302. def add_writer(self, fd, callback, *args):
  303. """Add a writer callback.."""
  304. self._ensure_fd_no_transport(fd)
  305. self._add_writer(fd, callback, *args)
  306. def remove_writer(self, fd):
  307. """Remove a writer callback."""
  308. self._ensure_fd_no_transport(fd)
  309. return self._remove_writer(fd)
  310. async def sock_recv(self, sock, n):
  311. """Receive data from the socket.
  312. The return value is a bytes object representing the data received.
  313. The maximum amount of data to be received at once is specified by
  314. nbytes.
  315. """
  316. base_events._check_ssl_socket(sock)
  317. if self._debug and sock.gettimeout() != 0:
  318. raise ValueError("the socket must be non-blocking")
  319. try:
  320. return sock.recv(n)
  321. except (BlockingIOError, InterruptedError):
  322. pass
  323. fut = self.create_future()
  324. fd = sock.fileno()
  325. self._ensure_fd_no_transport(fd)
  326. handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
  327. fut.add_done_callback(
  328. functools.partial(self._sock_read_done, fd, handle=handle))
  329. return await fut
  330. def _sock_read_done(self, fd, fut, handle=None):
  331. if handle is None or not handle.cancelled():
  332. self.remove_reader(fd)
  333. def _sock_recv(self, fut, sock, n):
  334. # _sock_recv() can add itself as an I/O callback if the operation can't
  335. # be done immediately. Don't use it directly, call sock_recv().
  336. if fut.done():
  337. return
  338. try:
  339. data = sock.recv(n)
  340. except (BlockingIOError, InterruptedError):
  341. return # try again next time
  342. except (SystemExit, KeyboardInterrupt):
  343. raise
  344. except BaseException as exc:
  345. fut.set_exception(exc)
  346. else:
  347. fut.set_result(data)
  348. async def sock_recv_into(self, sock, buf):
  349. """Receive data from the socket.
  350. The received data is written into *buf* (a writable buffer).
  351. The return value is the number of bytes written.
  352. """
  353. base_events._check_ssl_socket(sock)
  354. if self._debug and sock.gettimeout() != 0:
  355. raise ValueError("the socket must be non-blocking")
  356. try:
  357. return sock.recv_into(buf)
  358. except (BlockingIOError, InterruptedError):
  359. pass
  360. fut = self.create_future()
  361. fd = sock.fileno()
  362. self._ensure_fd_no_transport(fd)
  363. handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
  364. fut.add_done_callback(
  365. functools.partial(self._sock_read_done, fd, handle=handle))
  366. return await fut
  367. def _sock_recv_into(self, fut, sock, buf):
  368. # _sock_recv_into() can add itself as an I/O callback if the operation
  369. # can't be done immediately. Don't use it directly, call
  370. # sock_recv_into().
  371. if fut.done():
  372. return
  373. try:
  374. nbytes = sock.recv_into(buf)
  375. except (BlockingIOError, InterruptedError):
  376. return # try again next time
  377. except (SystemExit, KeyboardInterrupt):
  378. raise
  379. except BaseException as exc:
  380. fut.set_exception(exc)
  381. else:
  382. fut.set_result(nbytes)
  383. async def sock_sendall(self, sock, data):
  384. """Send data to the socket.
  385. The socket must be connected to a remote socket. This method continues
  386. to send data from data until either all data has been sent or an
  387. error occurs. None is returned on success. On error, an exception is
  388. raised, and there is no way to determine how much data, if any, was
  389. successfully processed by the receiving end of the connection.
  390. """
  391. base_events._check_ssl_socket(sock)
  392. if self._debug and sock.gettimeout() != 0:
  393. raise ValueError("the socket must be non-blocking")
  394. try:
  395. n = sock.send(data)
  396. except (BlockingIOError, InterruptedError):
  397. n = 0
  398. if n == len(data):
  399. # all data sent
  400. return
  401. fut = self.create_future()
  402. fd = sock.fileno()
  403. self._ensure_fd_no_transport(fd)
  404. # use a trick with a list in closure to store a mutable state
  405. handle = self._add_writer(fd, self._sock_sendall, fut, sock,
  406. memoryview(data), [n])
  407. fut.add_done_callback(
  408. functools.partial(self._sock_write_done, fd, handle=handle))
  409. return await fut
  410. def _sock_sendall(self, fut, sock, view, pos):
  411. if fut.done():
  412. # Future cancellation can be scheduled on previous loop iteration
  413. return
  414. start = pos[0]
  415. try:
  416. n = sock.send(view[start:])
  417. except (BlockingIOError, InterruptedError):
  418. return
  419. except (SystemExit, KeyboardInterrupt):
  420. raise
  421. except BaseException as exc:
  422. fut.set_exception(exc)
  423. return
  424. start += n
  425. if start == len(view):
  426. fut.set_result(None)
  427. else:
  428. pos[0] = start
  429. async def sock_connect(self, sock, address):
  430. """Connect to a remote socket at address.
  431. This method is a coroutine.
  432. """
  433. base_events._check_ssl_socket(sock)
  434. if self._debug and sock.gettimeout() != 0:
  435. raise ValueError("the socket must be non-blocking")
  436. if sock.family == socket.AF_INET or (
  437. base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
  438. resolved = await self._ensure_resolved(
  439. address, family=sock.family, type=sock.type, proto=sock.proto,
  440. loop=self,
  441. )
  442. _, _, _, _, address = resolved[0]
  443. fut = self.create_future()
  444. self._sock_connect(fut, sock, address)
  445. return await fut
  446. def _sock_connect(self, fut, sock, address):
  447. fd = sock.fileno()
  448. try:
  449. sock.connect(address)
  450. except (BlockingIOError, InterruptedError):
  451. # Issue #23618: When the C function connect() fails with EINTR, the
  452. # connection runs in background. We have to wait until the socket
  453. # becomes writable to be notified when the connection succeed or
  454. # fails.
  455. self._ensure_fd_no_transport(fd)
  456. handle = self._add_writer(
  457. fd, self._sock_connect_cb, fut, sock, address)
  458. fut.add_done_callback(
  459. functools.partial(self._sock_write_done, fd, handle=handle))
  460. except (SystemExit, KeyboardInterrupt):
  461. raise
  462. except BaseException as exc:
  463. fut.set_exception(exc)
  464. else:
  465. fut.set_result(None)
  466. def _sock_write_done(self, fd, fut, handle=None):
  467. if handle is None or not handle.cancelled():
  468. self.remove_writer(fd)
  469. def _sock_connect_cb(self, fut, sock, address):
  470. if fut.done():
  471. return
  472. try:
  473. err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  474. if err != 0:
  475. # Jump to any except clause below.
  476. raise OSError(err, f'Connect call failed {address}')
  477. except (BlockingIOError, InterruptedError):
  478. # socket is still registered, the callback will be retried later
  479. pass
  480. except (SystemExit, KeyboardInterrupt):
  481. raise
  482. except BaseException as exc:
  483. fut.set_exception(exc)
  484. else:
  485. fut.set_result(None)
  486. async def sock_accept(self, sock):
  487. """Accept a connection.
  488. The socket must be bound to an address and listening for connections.
  489. The return value is a pair (conn, address) where conn is a new socket
  490. object usable to send and receive data on the connection, and address
  491. is the address bound to the socket on the other end of the connection.
  492. """
  493. base_events._check_ssl_socket(sock)
  494. if self._debug and sock.gettimeout() != 0:
  495. raise ValueError("the socket must be non-blocking")
  496. fut = self.create_future()
  497. self._sock_accept(fut, sock)
  498. return await fut
  499. def _sock_accept(self, fut, sock):
  500. fd = sock.fileno()
  501. try:
  502. conn, address = sock.accept()
  503. conn.setblocking(False)
  504. except (BlockingIOError, InterruptedError):
  505. self._ensure_fd_no_transport(fd)
  506. handle = self._add_reader(fd, self._sock_accept, fut, sock)
  507. fut.add_done_callback(
  508. functools.partial(self._sock_read_done, fd, handle=handle))
  509. except (SystemExit, KeyboardInterrupt):
  510. raise
  511. except BaseException as exc:
  512. fut.set_exception(exc)
  513. else:
  514. fut.set_result((conn, address))
  515. async def _sendfile_native(self, transp, file, offset, count):
  516. del self._transports[transp._sock_fd]
  517. resume_reading = transp.is_reading()
  518. transp.pause_reading()
  519. await transp._make_empty_waiter()
  520. try:
  521. return await self.sock_sendfile(transp._sock, file, offset, count,
  522. fallback=False)
  523. finally:
  524. transp._reset_empty_waiter()
  525. if resume_reading:
  526. transp.resume_reading()
  527. self._transports[transp._sock_fd] = transp
  528. def _process_events(self, event_list):
  529. for key, mask in event_list:
  530. fileobj, (reader, writer) = key.fileobj, key.data
  531. if mask & selectors.EVENT_READ and reader is not None:
  532. if reader._cancelled:
  533. self._remove_reader(fileobj)
  534. else:
  535. self._add_callback(reader)
  536. if mask & selectors.EVENT_WRITE and writer is not None:
  537. if writer._cancelled:
  538. self._remove_writer(fileobj)
  539. else:
  540. self._add_callback(writer)
  541. def _stop_serving(self, sock):
  542. self._remove_reader(sock.fileno())
  543. sock.close()
  544. class _SelectorTransport(transports._FlowControlMixin,
  545. transports.Transport):
  546. max_size = 256 * 1024 # Buffer size passed to recv().
  547. _buffer_factory = bytearray # Constructs initial value for self._buffer.
  548. # Attribute used in the destructor: it must be set even if the constructor
  549. # is not called (see _SelectorSslTransport which may start by raising an
  550. # exception)
  551. _sock = None
  552. def __init__(self, loop, sock, protocol, extra=None, server=None):
  553. super().__init__(extra, loop)
  554. self._extra['socket'] = trsock.TransportSocket(sock)
  555. try:
  556. self._extra['sockname'] = sock.getsockname()
  557. except OSError:
  558. self._extra['sockname'] = None
  559. if 'peername' not in self._extra:
  560. try:
  561. self._extra['peername'] = sock.getpeername()
  562. except socket.error:
  563. self._extra['peername'] = None
  564. self._sock = sock
  565. self._sock_fd = sock.fileno()
  566. self._protocol_connected = False
  567. self.set_protocol(protocol)
  568. self._server = server
  569. self._buffer = self._buffer_factory()
  570. self._conn_lost = 0 # Set when call to connection_lost scheduled.
  571. self._closing = False # Set when close() called.
  572. if self._server is not None:
  573. self._server._attach()
  574. loop._transports[self._sock_fd] = self
  575. def __repr__(self):
  576. info = [self.__class__.__name__]
  577. if self._sock is None:
  578. info.append('closed')
  579. elif self._closing:
  580. info.append('closing')
  581. info.append(f'fd={self._sock_fd}')
  582. # test if the transport was closed
  583. if self._loop is not None and not self._loop.is_closed():
  584. polling = _test_selector_event(self._loop._selector,
  585. self._sock_fd, selectors.EVENT_READ)
  586. if polling:
  587. info.append('read=polling')
  588. else:
  589. info.append('read=idle')
  590. polling = _test_selector_event(self._loop._selector,
  591. self._sock_fd,
  592. selectors.EVENT_WRITE)
  593. if polling:
  594. state = 'polling'
  595. else:
  596. state = 'idle'
  597. bufsize = self.get_write_buffer_size()
  598. info.append(f'write=<{state}, bufsize={bufsize}>')
  599. return '<{}>'.format(' '.join(info))
  600. def abort(self):
  601. self._force_close(None)
  602. def set_protocol(self, protocol):
  603. self._protocol = protocol
  604. self._protocol_connected = True
  605. def get_protocol(self):
  606. return self._protocol
  607. def is_closing(self):
  608. return self._closing
  609. def close(self):
  610. if self._closing:
  611. return
  612. self._closing = True
  613. self._loop._remove_reader(self._sock_fd)
  614. if not self._buffer:
  615. self._conn_lost += 1
  616. self._loop._remove_writer(self._sock_fd)
  617. self._loop.call_soon(self._call_connection_lost, None)
  618. def __del__(self, _warn=warnings.warn):
  619. if self._sock is not None:
  620. _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  621. self._sock.close()
  622. def _fatal_error(self, exc, message='Fatal error on transport'):
  623. # Should be called from exception handler only.
  624. if isinstance(exc, OSError):
  625. if self._loop.get_debug():
  626. logger.debug("%r: %s", self, message, exc_info=True)
  627. else:
  628. self._loop.call_exception_handler({
  629. 'message': message,
  630. 'exception': exc,
  631. 'transport': self,
  632. 'protocol': self._protocol,
  633. })
  634. self._force_close(exc)
  635. def _force_close(self, exc):
  636. if self._conn_lost:
  637. return
  638. if self._buffer:
  639. self._buffer.clear()
  640. self._loop._remove_writer(self._sock_fd)
  641. if not self._closing:
  642. self._closing = True
  643. self._loop._remove_reader(self._sock_fd)
  644. self._conn_lost += 1
  645. self._loop.call_soon(self._call_connection_lost, exc)
  646. def _call_connection_lost(self, exc):
  647. try:
  648. if self._protocol_connected:
  649. self._protocol.connection_lost(exc)
  650. finally:
  651. self._sock.close()
  652. self._sock = None
  653. self._protocol = None
  654. self._loop = None
  655. server = self._server
  656. if server is not None:
  657. server._detach()
  658. self._server = None
  659. def get_write_buffer_size(self):
  660. return len(self._buffer)
  661. def _add_reader(self, fd, callback, *args):
  662. if self._closing:
  663. return
  664. self._loop._add_reader(fd, callback, *args)
  665. class _SelectorSocketTransport(_SelectorTransport):
  666. _start_tls_compatible = True
  667. _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
  668. def __init__(self, loop, sock, protocol, waiter=None,
  669. extra=None, server=None):
  670. self._read_ready_cb = None
  671. super().__init__(loop, sock, protocol, extra, server)
  672. self._eof = False
  673. self._paused = False
  674. self._empty_waiter = None
  675. # Disable the Nagle algorithm -- small writes will be
  676. # sent without waiting for the TCP ACK. This generally
  677. # decreases the latency (in some cases significantly.)
  678. base_events._set_nodelay(self._sock)
  679. self._loop.call_soon(self._protocol.connection_made, self)
  680. # only start reading when connection_made() has been called
  681. self._loop.call_soon(self._add_reader,
  682. self._sock_fd, self._read_ready)
  683. if waiter is not None:
  684. # only wake up the waiter when connection_made() has been called
  685. self._loop.call_soon(futures._set_result_unless_cancelled,
  686. waiter, None)
  687. def set_protocol(self, protocol):
  688. if isinstance(protocol, protocols.BufferedProtocol):
  689. self._read_ready_cb = self._read_ready__get_buffer
  690. else:
  691. self._read_ready_cb = self._read_ready__data_received
  692. super().set_protocol(protocol)
  693. def is_reading(self):
  694. return not self._paused and not self._closing
  695. def pause_reading(self):
  696. if self._closing or self._paused:
  697. return
  698. self._paused = True
  699. self._loop._remove_reader(self._sock_fd)
  700. if self._loop.get_debug():
  701. logger.debug("%r pauses reading", self)
  702. def resume_reading(self):
  703. if self._closing or not self._paused:
  704. return
  705. self._paused = False
  706. self._add_reader(self._sock_fd, self._read_ready)
  707. if self._loop.get_debug():
  708. logger.debug("%r resumes reading", self)
  709. def _read_ready(self):
  710. self._read_ready_cb()
  711. def _read_ready__get_buffer(self):
  712. if self._conn_lost:
  713. return
  714. try:
  715. buf = self._protocol.get_buffer(-1)
  716. if not len(buf):
  717. raise RuntimeError('get_buffer() returned an empty buffer')
  718. except (SystemExit, KeyboardInterrupt):
  719. raise
  720. except BaseException as exc:
  721. self._fatal_error(
  722. exc, 'Fatal error: protocol.get_buffer() call failed.')
  723. return
  724. try:
  725. nbytes = self._sock.recv_into(buf)
  726. except (BlockingIOError, InterruptedError):
  727. return
  728. except (SystemExit, KeyboardInterrupt):
  729. raise
  730. except BaseException as exc:
  731. self._fatal_error(exc, 'Fatal read error on socket transport')
  732. return
  733. if not nbytes:
  734. self._read_ready__on_eof()
  735. return
  736. try:
  737. self._protocol.buffer_updated(nbytes)
  738. except (SystemExit, KeyboardInterrupt):
  739. raise
  740. except BaseException as exc:
  741. self._fatal_error(
  742. exc, 'Fatal error: protocol.buffer_updated() call failed.')
  743. def _read_ready__data_received(self):
  744. if self._conn_lost:
  745. return
  746. try:
  747. data = self._sock.recv(self.max_size)
  748. except (BlockingIOError, InterruptedError):
  749. return
  750. except (SystemExit, KeyboardInterrupt):
  751. raise
  752. except BaseException as exc:
  753. self._fatal_error(exc, 'Fatal read error on socket transport')
  754. return
  755. if not data:
  756. self._read_ready__on_eof()
  757. return
  758. try:
  759. self._protocol.data_received(data)
  760. except (SystemExit, KeyboardInterrupt):
  761. raise
  762. except BaseException as exc:
  763. self._fatal_error(
  764. exc, 'Fatal error: protocol.data_received() call failed.')
  765. def _read_ready__on_eof(self):
  766. if self._loop.get_debug():
  767. logger.debug("%r received EOF", self)
  768. try:
  769. keep_open = self._protocol.eof_received()
  770. except (SystemExit, KeyboardInterrupt):
  771. raise
  772. except BaseException as exc:
  773. self._fatal_error(
  774. exc, 'Fatal error: protocol.eof_received() call failed.')
  775. return
  776. if keep_open:
  777. # We're keeping the connection open so the
  778. # protocol can write more, but we still can't
  779. # receive more, so remove the reader callback.
  780. self._loop._remove_reader(self._sock_fd)
  781. else:
  782. self.close()
  783. def write(self, data):
  784. if not isinstance(data, (bytes, bytearray, memoryview)):
  785. raise TypeError(f'data argument must be a bytes-like object, '
  786. f'not {type(data).__name__!r}')
  787. if self._eof:
  788. raise RuntimeError('Cannot call write() after write_eof()')
  789. if self._empty_waiter is not None:
  790. raise RuntimeError('unable to write; sendfile is in progress')
  791. if not data:
  792. return
  793. if self._conn_lost:
  794. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  795. logger.warning('socket.send() raised exception.')
  796. self._conn_lost += 1
  797. return
  798. if not self._buffer:
  799. # Optimization: try to send now.
  800. try:
  801. n = self._sock.send(data)
  802. except (BlockingIOError, InterruptedError):
  803. pass
  804. except (SystemExit, KeyboardInterrupt):
  805. raise
  806. except BaseException as exc:
  807. self._fatal_error(exc, 'Fatal write error on socket transport')
  808. return
  809. else:
  810. data = data[n:]
  811. if not data:
  812. return
  813. # Not all was written; register write handler.
  814. self._loop._add_writer(self._sock_fd, self._write_ready)
  815. # Add it to the buffer.
  816. self._buffer.extend(data)
  817. self._maybe_pause_protocol()
  818. def _write_ready(self):
  819. assert self._buffer, 'Data should not be empty'
  820. if self._conn_lost:
  821. return
  822. try:
  823. n = self._sock.send(self._buffer)
  824. except (BlockingIOError, InterruptedError):
  825. pass
  826. except (SystemExit, KeyboardInterrupt):
  827. raise
  828. except BaseException as exc:
  829. self._loop._remove_writer(self._sock_fd)
  830. self._buffer.clear()
  831. self._fatal_error(exc, 'Fatal write error on socket transport')
  832. if self._empty_waiter is not None:
  833. self._empty_waiter.set_exception(exc)
  834. else:
  835. if n:
  836. del self._buffer[:n]
  837. self._maybe_resume_protocol() # May append to buffer.
  838. if not self._buffer:
  839. self._loop._remove_writer(self._sock_fd)
  840. if self._empty_waiter is not None:
  841. self._empty_waiter.set_result(None)
  842. if self._closing:
  843. self._call_connection_lost(None)
  844. elif self._eof:
  845. self._sock.shutdown(socket.SHUT_WR)
  846. def write_eof(self):
  847. if self._closing or self._eof:
  848. return
  849. self._eof = True
  850. if not self._buffer:
  851. self._sock.shutdown(socket.SHUT_WR)
  852. def can_write_eof(self):
  853. return True
  854. def _call_connection_lost(self, exc):
  855. super()._call_connection_lost(exc)
  856. if self._empty_waiter is not None:
  857. self._empty_waiter.set_exception(
  858. ConnectionError("Connection is closed by peer"))
  859. def _make_empty_waiter(self):
  860. if self._empty_waiter is not None:
  861. raise RuntimeError("Empty waiter is already set")
  862. self._empty_waiter = self._loop.create_future()
  863. if not self._buffer:
  864. self._empty_waiter.set_result(None)
  865. return self._empty_waiter
  866. def _reset_empty_waiter(self):
  867. self._empty_waiter = None
  868. class _SelectorDatagramTransport(_SelectorTransport):
  869. _buffer_factory = collections.deque
  870. def __init__(self, loop, sock, protocol, address=None,
  871. waiter=None, extra=None):
  872. super().__init__(loop, sock, protocol, extra)
  873. self._address = address
  874. self._loop.call_soon(self._protocol.connection_made, self)
  875. # only start reading when connection_made() has been called
  876. self._loop.call_soon(self._add_reader,
  877. self._sock_fd, self._read_ready)
  878. if waiter is not None:
  879. # only wake up the waiter when connection_made() has been called
  880. self._loop.call_soon(futures._set_result_unless_cancelled,
  881. waiter, None)
  882. def get_write_buffer_size(self):
  883. return sum(len(data) for data, _ in self._buffer)
  884. def _read_ready(self):
  885. if self._conn_lost:
  886. return
  887. try:
  888. data, addr = self._sock.recvfrom(self.max_size)
  889. except (BlockingIOError, InterruptedError):
  890. pass
  891. except OSError as exc:
  892. self._protocol.error_received(exc)
  893. except (SystemExit, KeyboardInterrupt):
  894. raise
  895. except BaseException as exc:
  896. self._fatal_error(exc, 'Fatal read error on datagram transport')
  897. else:
  898. self._protocol.datagram_received(data, addr)
  899. def sendto(self, data, addr=None):
  900. if not isinstance(data, (bytes, bytearray, memoryview)):
  901. raise TypeError(f'data argument must be a bytes-like object, '
  902. f'not {type(data).__name__!r}')
  903. if not data:
  904. return
  905. if self._address:
  906. if addr not in (None, self._address):
  907. raise ValueError(
  908. f'Invalid address: must be None or {self._address}')
  909. addr = self._address
  910. if self._conn_lost and self._address:
  911. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  912. logger.warning('socket.send() raised exception.')
  913. self._conn_lost += 1
  914. return
  915. if not self._buffer:
  916. # Attempt to send it right away first.
  917. try:
  918. if self._extra['peername']:
  919. self._sock.send(data)
  920. else:
  921. self._sock.sendto(data, addr)
  922. return
  923. except (BlockingIOError, InterruptedError):
  924. self._loop._add_writer(self._sock_fd, self._sendto_ready)
  925. except OSError as exc:
  926. self._protocol.error_received(exc)
  927. return
  928. except (SystemExit, KeyboardInterrupt):
  929. raise
  930. except BaseException as exc:
  931. self._fatal_error(
  932. exc, 'Fatal write error on datagram transport')
  933. return
  934. # Ensure that what we buffer is immutable.
  935. self._buffer.append((bytes(data), addr))
  936. self._maybe_pause_protocol()
  937. def _sendto_ready(self):
  938. while self._buffer:
  939. data, addr = self._buffer.popleft()
  940. try:
  941. if self._extra['peername']:
  942. self._sock.send(data)
  943. else:
  944. self._sock.sendto(data, addr)
  945. except (BlockingIOError, InterruptedError):
  946. self._buffer.appendleft((data, addr)) # Try again later.
  947. break
  948. except OSError as exc:
  949. self._protocol.error_received(exc)
  950. return
  951. except (SystemExit, KeyboardInterrupt):
  952. raise
  953. except BaseException as exc:
  954. self._fatal_error(
  955. exc, 'Fatal write error on datagram transport')
  956. return
  957. self._maybe_resume_protocol() # May append to buffer.
  958. if not self._buffer:
  959. self._loop._remove_writer(self._sock_fd)
  960. if self._closing:
  961. self._call_connection_lost(None)