client_proto.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import asyncio
  2. from contextlib import suppress
  3. from typing import Any, Optional, Tuple
  4. from .base_protocol import BaseProtocol
  5. from .client_exceptions import (
  6. ClientOSError,
  7. ClientPayloadError,
  8. ServerDisconnectedError,
  9. ServerTimeoutError,
  10. )
  11. from .helpers import BaseTimerContext
  12. from .http import HttpResponseParser, RawResponseMessage
  13. from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
  14. class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]):
  15. """Helper class to adapt between Protocol and StreamReader."""
  16. def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
  17. BaseProtocol.__init__(self, loop=loop)
  18. DataQueue.__init__(self, loop)
  19. self._should_close = False
  20. self._payload = None
  21. self._skip_payload = False
  22. self._payload_parser = None
  23. self._timer = None
  24. self._tail = b""
  25. self._upgraded = False
  26. self._parser = None # type: Optional[HttpResponseParser]
  27. self._read_timeout = None # type: Optional[float]
  28. self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle]
  29. @property
  30. def upgraded(self) -> bool:
  31. return self._upgraded
  32. @property
  33. def should_close(self) -> bool:
  34. if self._payload is not None and not self._payload.is_eof() or self._upgraded:
  35. return True
  36. return (
  37. self._should_close
  38. or self._upgraded
  39. or self.exception() is not None
  40. or self._payload_parser is not None
  41. or len(self) > 0
  42. or bool(self._tail)
  43. )
  44. def force_close(self) -> None:
  45. self._should_close = True
  46. def close(self) -> None:
  47. transport = self.transport
  48. if transport is not None:
  49. transport.close()
  50. self.transport = None
  51. self._payload = None
  52. self._drop_timeout()
  53. def is_connected(self) -> bool:
  54. return self.transport is not None and not self.transport.is_closing()
  55. def connection_lost(self, exc: Optional[BaseException]) -> None:
  56. self._drop_timeout()
  57. if self._payload_parser is not None:
  58. with suppress(Exception):
  59. self._payload_parser.feed_eof()
  60. uncompleted = None
  61. if self._parser is not None:
  62. try:
  63. uncompleted = self._parser.feed_eof()
  64. except Exception:
  65. if self._payload is not None:
  66. self._payload.set_exception(
  67. ClientPayloadError("Response payload is not completed")
  68. )
  69. if not self.is_eof():
  70. if isinstance(exc, OSError):
  71. exc = ClientOSError(*exc.args)
  72. if exc is None:
  73. exc = ServerDisconnectedError(uncompleted)
  74. # assigns self._should_close to True as side effect,
  75. # we do it anyway below
  76. self.set_exception(exc)
  77. self._should_close = True
  78. self._parser = None
  79. self._payload = None
  80. self._payload_parser = None
  81. self._reading_paused = False
  82. super().connection_lost(exc)
  83. def eof_received(self) -> None:
  84. # should call parser.feed_eof() most likely
  85. self._drop_timeout()
  86. def pause_reading(self) -> None:
  87. super().pause_reading()
  88. self._drop_timeout()
  89. def resume_reading(self) -> None:
  90. super().resume_reading()
  91. self._reschedule_timeout()
  92. def set_exception(self, exc: BaseException) -> None:
  93. self._should_close = True
  94. self._drop_timeout()
  95. super().set_exception(exc)
  96. def set_parser(self, parser: Any, payload: Any) -> None:
  97. # TODO: actual types are:
  98. # parser: WebSocketReader
  99. # payload: FlowControlDataQueue
  100. # but they are not generi enough
  101. # Need an ABC for both types
  102. self._payload = payload
  103. self._payload_parser = parser
  104. self._drop_timeout()
  105. if self._tail:
  106. data, self._tail = self._tail, b""
  107. self.data_received(data)
  108. def set_response_params(
  109. self,
  110. *,
  111. timer: Optional[BaseTimerContext] = None,
  112. skip_payload: bool = False,
  113. read_until_eof: bool = False,
  114. auto_decompress: bool = True,
  115. read_timeout: Optional[float] = None,
  116. read_bufsize: int = 2 ** 16
  117. ) -> None:
  118. self._skip_payload = skip_payload
  119. self._read_timeout = read_timeout
  120. self._reschedule_timeout()
  121. self._parser = HttpResponseParser(
  122. self,
  123. self._loop,
  124. read_bufsize,
  125. timer=timer,
  126. payload_exception=ClientPayloadError,
  127. response_with_body=not skip_payload,
  128. read_until_eof=read_until_eof,
  129. auto_decompress=auto_decompress,
  130. )
  131. if self._tail:
  132. data, self._tail = self._tail, b""
  133. self.data_received(data)
  134. def _drop_timeout(self) -> None:
  135. if self._read_timeout_handle is not None:
  136. self._read_timeout_handle.cancel()
  137. self._read_timeout_handle = None
  138. def _reschedule_timeout(self) -> None:
  139. timeout = self._read_timeout
  140. if self._read_timeout_handle is not None:
  141. self._read_timeout_handle.cancel()
  142. if timeout:
  143. self._read_timeout_handle = self._loop.call_later(
  144. timeout, self._on_read_timeout
  145. )
  146. else:
  147. self._read_timeout_handle = None
  148. def _on_read_timeout(self) -> None:
  149. exc = ServerTimeoutError("Timeout on reading data from socket")
  150. self.set_exception(exc)
  151. if self._payload is not None:
  152. self._payload.set_exception(exc)
  153. def data_received(self, data: bytes) -> None:
  154. self._reschedule_timeout()
  155. if not data:
  156. return
  157. # custom payload parser
  158. if self._payload_parser is not None:
  159. eof, tail = self._payload_parser.feed_data(data)
  160. if eof:
  161. self._payload = None
  162. self._payload_parser = None
  163. if tail:
  164. self.data_received(tail)
  165. return
  166. else:
  167. if self._upgraded or self._parser is None:
  168. # i.e. websocket connection, websocket parser is not set yet
  169. self._tail += data
  170. else:
  171. # parse http messages
  172. try:
  173. messages, upgraded, tail = self._parser.feed_data(data)
  174. except BaseException as exc:
  175. if self.transport is not None:
  176. # connection.release() could be called BEFORE
  177. # data_received(), the transport is already
  178. # closed in this case
  179. self.transport.close()
  180. # should_close is True after the call
  181. self.set_exception(exc)
  182. return
  183. self._upgraded = upgraded
  184. payload = None
  185. for message, payload in messages:
  186. if message.should_close:
  187. self._should_close = True
  188. self._payload = payload
  189. if self._skip_payload or message.code in (204, 304):
  190. self.feed_data((message, EMPTY_PAYLOAD), 0) # type: ignore
  191. else:
  192. self.feed_data((message, payload), 0)
  193. if payload is not None:
  194. # new message(s) was processed
  195. # register timeout handler unsubscribing
  196. # either on end-of-stream or immediately for
  197. # EMPTY_PAYLOAD
  198. if payload is not EMPTY_PAYLOAD:
  199. payload.on_eof(self._drop_timeout)
  200. else:
  201. self._drop_timeout()
  202. if tail:
  203. if upgraded:
  204. self.data_received(tail)
  205. else:
  206. self._tail = tail