http_websocket.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. """WebSocket protocol versions 13 and 8."""
  2. import asyncio
  3. import collections
  4. import json
  5. import random
  6. import re
  7. import sys
  8. import zlib
  9. from enum import IntEnum
  10. from struct import Struct
  11. from typing import Any, Callable, List, Optional, Tuple, Union
  12. from .base_protocol import BaseProtocol
  13. from .helpers import NO_EXTENSIONS
  14. from .streams import DataQueue
  15. __all__ = (
  16. "WS_CLOSED_MESSAGE",
  17. "WS_CLOSING_MESSAGE",
  18. "WS_KEY",
  19. "WebSocketReader",
  20. "WebSocketWriter",
  21. "WSMessage",
  22. "WebSocketError",
  23. "WSMsgType",
  24. "WSCloseCode",
  25. )
  26. class WSCloseCode(IntEnum):
  27. OK = 1000
  28. GOING_AWAY = 1001
  29. PROTOCOL_ERROR = 1002
  30. UNSUPPORTED_DATA = 1003
  31. INVALID_TEXT = 1007
  32. POLICY_VIOLATION = 1008
  33. MESSAGE_TOO_BIG = 1009
  34. MANDATORY_EXTENSION = 1010
  35. INTERNAL_ERROR = 1011
  36. SERVICE_RESTART = 1012
  37. TRY_AGAIN_LATER = 1013
  38. ALLOWED_CLOSE_CODES = {int(i) for i in WSCloseCode}
  39. class WSMsgType(IntEnum):
  40. # websocket spec types
  41. CONTINUATION = 0x0
  42. TEXT = 0x1
  43. BINARY = 0x2
  44. PING = 0x9
  45. PONG = 0xA
  46. CLOSE = 0x8
  47. # aiohttp specific types
  48. CLOSING = 0x100
  49. CLOSED = 0x101
  50. ERROR = 0x102
  51. text = TEXT
  52. binary = BINARY
  53. ping = PING
  54. pong = PONG
  55. close = CLOSE
  56. closing = CLOSING
  57. closed = CLOSED
  58. error = ERROR
  59. WS_KEY = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
  60. UNPACK_LEN2 = Struct("!H").unpack_from
  61. UNPACK_LEN3 = Struct("!Q").unpack_from
  62. UNPACK_CLOSE_CODE = Struct("!H").unpack
  63. PACK_LEN1 = Struct("!BB").pack
  64. PACK_LEN2 = Struct("!BBH").pack
  65. PACK_LEN3 = Struct("!BBQ").pack
  66. PACK_CLOSE_CODE = Struct("!H").pack
  67. MSG_SIZE = 2 ** 14
  68. DEFAULT_LIMIT = 2 ** 16
  69. _WSMessageBase = collections.namedtuple("_WSMessageBase", ["type", "data", "extra"])
  70. class WSMessage(_WSMessageBase):
  71. def json(self, *, loads: Callable[[Any], Any] = json.loads) -> Any:
  72. """Return parsed JSON data.
  73. .. versionadded:: 0.22
  74. """
  75. return loads(self.data)
  76. WS_CLOSED_MESSAGE = WSMessage(WSMsgType.CLOSED, None, None)
  77. WS_CLOSING_MESSAGE = WSMessage(WSMsgType.CLOSING, None, None)
  78. class WebSocketError(Exception):
  79. """WebSocket protocol parser error."""
  80. def __init__(self, code: int, message: str) -> None:
  81. self.code = code
  82. super().__init__(code, message)
  83. def __str__(self) -> str:
  84. return self.args[1]
  85. class WSHandshakeError(Exception):
  86. """WebSocket protocol handshake error."""
  87. native_byteorder = sys.byteorder
  88. # Used by _websocket_mask_python
  89. _XOR_TABLE = [bytes(a ^ b for a in range(256)) for b in range(256)]
  90. def _websocket_mask_python(mask: bytes, data: bytearray) -> None:
  91. """Websocket masking function.
  92. `mask` is a `bytes` object of length 4; `data` is a `bytearray`
  93. object of any length. The contents of `data` are masked with `mask`,
  94. as specified in section 5.3 of RFC 6455.
  95. Note that this function mutates the `data` argument.
  96. This pure-python implementation may be replaced by an optimized
  97. version when available.
  98. """
  99. assert isinstance(data, bytearray), data
  100. assert len(mask) == 4, mask
  101. if data:
  102. a, b, c, d = (_XOR_TABLE[n] for n in mask)
  103. data[::4] = data[::4].translate(a)
  104. data[1::4] = data[1::4].translate(b)
  105. data[2::4] = data[2::4].translate(c)
  106. data[3::4] = data[3::4].translate(d)
  107. if NO_EXTENSIONS: # pragma: no cover
  108. _websocket_mask = _websocket_mask_python
  109. else:
  110. try:
  111. from ._websocket import _websocket_mask_cython # type: ignore
  112. _websocket_mask = _websocket_mask_cython
  113. except ImportError: # pragma: no cover
  114. _websocket_mask = _websocket_mask_python
  115. _WS_DEFLATE_TRAILING = bytes([0x00, 0x00, 0xFF, 0xFF])
  116. _WS_EXT_RE = re.compile(
  117. r"^(?:;\s*(?:"
  118. r"(server_no_context_takeover)|"
  119. r"(client_no_context_takeover)|"
  120. r"(server_max_window_bits(?:=(\d+))?)|"
  121. r"(client_max_window_bits(?:=(\d+))?)))*$"
  122. )
  123. _WS_EXT_RE_SPLIT = re.compile(r"permessage-deflate([^,]+)?")
  124. def ws_ext_parse(extstr: Optional[str], isserver: bool = False) -> Tuple[int, bool]:
  125. if not extstr:
  126. return 0, False
  127. compress = 0
  128. notakeover = False
  129. for ext in _WS_EXT_RE_SPLIT.finditer(extstr):
  130. defext = ext.group(1)
  131. # Return compress = 15 when get `permessage-deflate`
  132. if not defext:
  133. compress = 15
  134. break
  135. match = _WS_EXT_RE.match(defext)
  136. if match:
  137. compress = 15
  138. if isserver:
  139. # Server never fail to detect compress handshake.
  140. # Server does not need to send max wbit to client
  141. if match.group(4):
  142. compress = int(match.group(4))
  143. # Group3 must match if group4 matches
  144. # Compress wbit 8 does not support in zlib
  145. # If compress level not support,
  146. # CONTINUE to next extension
  147. if compress > 15 or compress < 9:
  148. compress = 0
  149. continue
  150. if match.group(1):
  151. notakeover = True
  152. # Ignore regex group 5 & 6 for client_max_window_bits
  153. break
  154. else:
  155. if match.group(6):
  156. compress = int(match.group(6))
  157. # Group5 must match if group6 matches
  158. # Compress wbit 8 does not support in zlib
  159. # If compress level not support,
  160. # FAIL the parse progress
  161. if compress > 15 or compress < 9:
  162. raise WSHandshakeError("Invalid window size")
  163. if match.group(2):
  164. notakeover = True
  165. # Ignore regex group 5 & 6 for client_max_window_bits
  166. break
  167. # Return Fail if client side and not match
  168. elif not isserver:
  169. raise WSHandshakeError("Extension for deflate not supported" + ext.group(1))
  170. return compress, notakeover
  171. def ws_ext_gen(
  172. compress: int = 15, isserver: bool = False, server_notakeover: bool = False
  173. ) -> str:
  174. # client_notakeover=False not used for server
  175. # compress wbit 8 does not support in zlib
  176. if compress < 9 or compress > 15:
  177. raise ValueError(
  178. "Compress wbits must between 9 and 15, " "zlib does not support wbits=8"
  179. )
  180. enabledext = ["permessage-deflate"]
  181. if not isserver:
  182. enabledext.append("client_max_window_bits")
  183. if compress < 15:
  184. enabledext.append("server_max_window_bits=" + str(compress))
  185. if server_notakeover:
  186. enabledext.append("server_no_context_takeover")
  187. # if client_notakeover:
  188. # enabledext.append('client_no_context_takeover')
  189. return "; ".join(enabledext)
  190. class WSParserState(IntEnum):
  191. READ_HEADER = 1
  192. READ_PAYLOAD_LENGTH = 2
  193. READ_PAYLOAD_MASK = 3
  194. READ_PAYLOAD = 4
  195. class WebSocketReader:
  196. def __init__(
  197. self, queue: DataQueue[WSMessage], max_msg_size: int, compress: bool = True
  198. ) -> None:
  199. self.queue = queue
  200. self._max_msg_size = max_msg_size
  201. self._exc = None # type: Optional[BaseException]
  202. self._partial = bytearray()
  203. self._state = WSParserState.READ_HEADER
  204. self._opcode = None # type: Optional[int]
  205. self._frame_fin = False
  206. self._frame_opcode = None # type: Optional[int]
  207. self._frame_payload = bytearray()
  208. self._tail = b""
  209. self._has_mask = False
  210. self._frame_mask = None # type: Optional[bytes]
  211. self._payload_length = 0
  212. self._payload_length_flag = 0
  213. self._compressed = None # type: Optional[bool]
  214. self._decompressobj = None # type: Any # zlib.decompressobj actually
  215. self._compress = compress
  216. def feed_eof(self) -> None:
  217. self.queue.feed_eof()
  218. def feed_data(self, data: bytes) -> Tuple[bool, bytes]:
  219. if self._exc:
  220. return True, data
  221. try:
  222. return self._feed_data(data)
  223. except Exception as exc:
  224. self._exc = exc
  225. self.queue.set_exception(exc)
  226. return True, b""
  227. def _feed_data(self, data: bytes) -> Tuple[bool, bytes]:
  228. for fin, opcode, payload, compressed in self.parse_frame(data):
  229. if compressed and not self._decompressobj:
  230. self._decompressobj = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
  231. if opcode == WSMsgType.CLOSE:
  232. if len(payload) >= 2:
  233. close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
  234. if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:
  235. raise WebSocketError(
  236. WSCloseCode.PROTOCOL_ERROR,
  237. f"Invalid close code: {close_code}",
  238. )
  239. try:
  240. close_message = payload[2:].decode("utf-8")
  241. except UnicodeDecodeError as exc:
  242. raise WebSocketError(
  243. WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
  244. ) from exc
  245. msg = WSMessage(WSMsgType.CLOSE, close_code, close_message)
  246. elif payload:
  247. raise WebSocketError(
  248. WSCloseCode.PROTOCOL_ERROR,
  249. f"Invalid close frame: {fin} {opcode} {payload!r}",
  250. )
  251. else:
  252. msg = WSMessage(WSMsgType.CLOSE, 0, "")
  253. self.queue.feed_data(msg, 0)
  254. elif opcode == WSMsgType.PING:
  255. self.queue.feed_data(
  256. WSMessage(WSMsgType.PING, payload, ""), len(payload)
  257. )
  258. elif opcode == WSMsgType.PONG:
  259. self.queue.feed_data(
  260. WSMessage(WSMsgType.PONG, payload, ""), len(payload)
  261. )
  262. elif (
  263. opcode not in (WSMsgType.TEXT, WSMsgType.BINARY)
  264. and self._opcode is None
  265. ):
  266. raise WebSocketError(
  267. WSCloseCode.PROTOCOL_ERROR, f"Unexpected opcode={opcode!r}"
  268. )
  269. else:
  270. # load text/binary
  271. if not fin:
  272. # got partial frame payload
  273. if opcode != WSMsgType.CONTINUATION:
  274. self._opcode = opcode
  275. self._partial.extend(payload)
  276. if self._max_msg_size and len(self._partial) >= self._max_msg_size:
  277. raise WebSocketError(
  278. WSCloseCode.MESSAGE_TOO_BIG,
  279. "Message size {} exceeds limit {}".format(
  280. len(self._partial), self._max_msg_size
  281. ),
  282. )
  283. else:
  284. # previous frame was non finished
  285. # we should get continuation opcode
  286. if self._partial:
  287. if opcode != WSMsgType.CONTINUATION:
  288. raise WebSocketError(
  289. WSCloseCode.PROTOCOL_ERROR,
  290. "The opcode in non-fin frame is expected "
  291. "to be zero, got {!r}".format(opcode),
  292. )
  293. if opcode == WSMsgType.CONTINUATION:
  294. assert self._opcode is not None
  295. opcode = self._opcode
  296. self._opcode = None
  297. self._partial.extend(payload)
  298. if self._max_msg_size and len(self._partial) >= self._max_msg_size:
  299. raise WebSocketError(
  300. WSCloseCode.MESSAGE_TOO_BIG,
  301. "Message size {} exceeds limit {}".format(
  302. len(self._partial), self._max_msg_size
  303. ),
  304. )
  305. # Decompress process must to be done after all packets
  306. # received.
  307. if compressed:
  308. self._partial.extend(_WS_DEFLATE_TRAILING)
  309. payload_merged = self._decompressobj.decompress(
  310. self._partial, self._max_msg_size
  311. )
  312. if self._decompressobj.unconsumed_tail:
  313. left = len(self._decompressobj.unconsumed_tail)
  314. raise WebSocketError(
  315. WSCloseCode.MESSAGE_TOO_BIG,
  316. "Decompressed message size {} exceeds limit {}".format(
  317. self._max_msg_size + left, self._max_msg_size
  318. ),
  319. )
  320. else:
  321. payload_merged = bytes(self._partial)
  322. self._partial.clear()
  323. if opcode == WSMsgType.TEXT:
  324. try:
  325. text = payload_merged.decode("utf-8")
  326. self.queue.feed_data(
  327. WSMessage(WSMsgType.TEXT, text, ""), len(text)
  328. )
  329. except UnicodeDecodeError as exc:
  330. raise WebSocketError(
  331. WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
  332. ) from exc
  333. else:
  334. self.queue.feed_data(
  335. WSMessage(WSMsgType.BINARY, payload_merged, ""),
  336. len(payload_merged),
  337. )
  338. return False, b""
  339. def parse_frame(
  340. self, buf: bytes
  341. ) -> List[Tuple[bool, Optional[int], bytearray, Optional[bool]]]:
  342. """Return the next frame from the socket."""
  343. frames = []
  344. if self._tail:
  345. buf, self._tail = self._tail + buf, b""
  346. start_pos = 0
  347. buf_length = len(buf)
  348. while True:
  349. # read header
  350. if self._state == WSParserState.READ_HEADER:
  351. if buf_length - start_pos >= 2:
  352. data = buf[start_pos : start_pos + 2]
  353. start_pos += 2
  354. first_byte, second_byte = data
  355. fin = (first_byte >> 7) & 1
  356. rsv1 = (first_byte >> 6) & 1
  357. rsv2 = (first_byte >> 5) & 1
  358. rsv3 = (first_byte >> 4) & 1
  359. opcode = first_byte & 0xF
  360. # frame-fin = %x0 ; more frames of this message follow
  361. # / %x1 ; final frame of this message
  362. # frame-rsv1 = %x0 ;
  363. # 1 bit, MUST be 0 unless negotiated otherwise
  364. # frame-rsv2 = %x0 ;
  365. # 1 bit, MUST be 0 unless negotiated otherwise
  366. # frame-rsv3 = %x0 ;
  367. # 1 bit, MUST be 0 unless negotiated otherwise
  368. #
  369. # Remove rsv1 from this test for deflate development
  370. if rsv2 or rsv3 or (rsv1 and not self._compress):
  371. raise WebSocketError(
  372. WSCloseCode.PROTOCOL_ERROR,
  373. "Received frame with non-zero reserved bits",
  374. )
  375. if opcode > 0x7 and fin == 0:
  376. raise WebSocketError(
  377. WSCloseCode.PROTOCOL_ERROR,
  378. "Received fragmented control frame",
  379. )
  380. has_mask = (second_byte >> 7) & 1
  381. length = second_byte & 0x7F
  382. # Control frames MUST have a payload
  383. # length of 125 bytes or less
  384. if opcode > 0x7 and length > 125:
  385. raise WebSocketError(
  386. WSCloseCode.PROTOCOL_ERROR,
  387. "Control frame payload cannot be " "larger than 125 bytes",
  388. )
  389. # Set compress status if last package is FIN
  390. # OR set compress status if this is first fragment
  391. # Raise error if not first fragment with rsv1 = 0x1
  392. if self._frame_fin or self._compressed is None:
  393. self._compressed = True if rsv1 else False
  394. elif rsv1:
  395. raise WebSocketError(
  396. WSCloseCode.PROTOCOL_ERROR,
  397. "Received frame with non-zero reserved bits",
  398. )
  399. self._frame_fin = bool(fin)
  400. self._frame_opcode = opcode
  401. self._has_mask = bool(has_mask)
  402. self._payload_length_flag = length
  403. self._state = WSParserState.READ_PAYLOAD_LENGTH
  404. else:
  405. break
  406. # read payload length
  407. if self._state == WSParserState.READ_PAYLOAD_LENGTH:
  408. length = self._payload_length_flag
  409. if length == 126:
  410. if buf_length - start_pos >= 2:
  411. data = buf[start_pos : start_pos + 2]
  412. start_pos += 2
  413. length = UNPACK_LEN2(data)[0]
  414. self._payload_length = length
  415. self._state = (
  416. WSParserState.READ_PAYLOAD_MASK
  417. if self._has_mask
  418. else WSParserState.READ_PAYLOAD
  419. )
  420. else:
  421. break
  422. elif length > 126:
  423. if buf_length - start_pos >= 8:
  424. data = buf[start_pos : start_pos + 8]
  425. start_pos += 8
  426. length = UNPACK_LEN3(data)[0]
  427. self._payload_length = length
  428. self._state = (
  429. WSParserState.READ_PAYLOAD_MASK
  430. if self._has_mask
  431. else WSParserState.READ_PAYLOAD
  432. )
  433. else:
  434. break
  435. else:
  436. self._payload_length = length
  437. self._state = (
  438. WSParserState.READ_PAYLOAD_MASK
  439. if self._has_mask
  440. else WSParserState.READ_PAYLOAD
  441. )
  442. # read payload mask
  443. if self._state == WSParserState.READ_PAYLOAD_MASK:
  444. if buf_length - start_pos >= 4:
  445. self._frame_mask = buf[start_pos : start_pos + 4]
  446. start_pos += 4
  447. self._state = WSParserState.READ_PAYLOAD
  448. else:
  449. break
  450. if self._state == WSParserState.READ_PAYLOAD:
  451. length = self._payload_length
  452. payload = self._frame_payload
  453. chunk_len = buf_length - start_pos
  454. if length >= chunk_len:
  455. self._payload_length = length - chunk_len
  456. payload.extend(buf[start_pos:])
  457. start_pos = buf_length
  458. else:
  459. self._payload_length = 0
  460. payload.extend(buf[start_pos : start_pos + length])
  461. start_pos = start_pos + length
  462. if self._payload_length == 0:
  463. if self._has_mask:
  464. assert self._frame_mask is not None
  465. _websocket_mask(self._frame_mask, payload)
  466. frames.append(
  467. (self._frame_fin, self._frame_opcode, payload, self._compressed)
  468. )
  469. self._frame_payload = bytearray()
  470. self._state = WSParserState.READ_HEADER
  471. else:
  472. break
  473. self._tail = buf[start_pos:]
  474. return frames
  475. class WebSocketWriter:
  476. def __init__(
  477. self,
  478. protocol: BaseProtocol,
  479. transport: asyncio.Transport,
  480. *,
  481. use_mask: bool = False,
  482. limit: int = DEFAULT_LIMIT,
  483. random: Any = random.Random(),
  484. compress: int = 0,
  485. notakeover: bool = False,
  486. ) -> None:
  487. self.protocol = protocol
  488. self.transport = transport
  489. self.use_mask = use_mask
  490. self.randrange = random.randrange
  491. self.compress = compress
  492. self.notakeover = notakeover
  493. self._closing = False
  494. self._limit = limit
  495. self._output_size = 0
  496. self._compressobj = None # type: Any # actually compressobj
  497. async def _send_frame(
  498. self, message: bytes, opcode: int, compress: Optional[int] = None
  499. ) -> None:
  500. """Send a frame over the websocket with message as its payload."""
  501. if self._closing and not (opcode & WSMsgType.CLOSE):
  502. raise ConnectionResetError("Cannot write to closing transport")
  503. rsv = 0
  504. # Only compress larger packets (disabled)
  505. # Does small packet needs to be compressed?
  506. # if self.compress and opcode < 8 and len(message) > 124:
  507. if (compress or self.compress) and opcode < 8:
  508. if compress:
  509. # Do not set self._compress if compressing is for this frame
  510. compressobj = zlib.compressobj(level=zlib.Z_BEST_SPEED, wbits=-compress)
  511. else: # self.compress
  512. if not self._compressobj:
  513. self._compressobj = zlib.compressobj(
  514. level=zlib.Z_BEST_SPEED, wbits=-self.compress
  515. )
  516. compressobj = self._compressobj
  517. message = compressobj.compress(message)
  518. message = message + compressobj.flush(
  519. zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH
  520. )
  521. if message.endswith(_WS_DEFLATE_TRAILING):
  522. message = message[:-4]
  523. rsv = rsv | 0x40
  524. msg_length = len(message)
  525. use_mask = self.use_mask
  526. if use_mask:
  527. mask_bit = 0x80
  528. else:
  529. mask_bit = 0
  530. if msg_length < 126:
  531. header = PACK_LEN1(0x80 | rsv | opcode, msg_length | mask_bit)
  532. elif msg_length < (1 << 16):
  533. header = PACK_LEN2(0x80 | rsv | opcode, 126 | mask_bit, msg_length)
  534. else:
  535. header = PACK_LEN3(0x80 | rsv | opcode, 127 | mask_bit, msg_length)
  536. if use_mask:
  537. mask = self.randrange(0, 0xFFFFFFFF)
  538. mask = mask.to_bytes(4, "big")
  539. message = bytearray(message)
  540. _websocket_mask(mask, message)
  541. self._write(header + mask + message)
  542. self._output_size += len(header) + len(mask) + len(message)
  543. else:
  544. if len(message) > MSG_SIZE:
  545. self._write(header)
  546. self._write(message)
  547. else:
  548. self._write(header + message)
  549. self._output_size += len(header) + len(message)
  550. if self._output_size > self._limit:
  551. self._output_size = 0
  552. await self.protocol._drain_helper()
  553. def _write(self, data: bytes) -> None:
  554. if self.transport is None or self.transport.is_closing():
  555. raise ConnectionResetError("Cannot write to closing transport")
  556. self.transport.write(data)
  557. async def pong(self, message: bytes = b"") -> None:
  558. """Send pong message."""
  559. if isinstance(message, str):
  560. message = message.encode("utf-8")
  561. await self._send_frame(message, WSMsgType.PONG)
  562. async def ping(self, message: bytes = b"") -> None:
  563. """Send ping message."""
  564. if isinstance(message, str):
  565. message = message.encode("utf-8")
  566. await self._send_frame(message, WSMsgType.PING)
  567. async def send(
  568. self,
  569. message: Union[str, bytes],
  570. binary: bool = False,
  571. compress: Optional[int] = None,
  572. ) -> None:
  573. """Send a frame over the websocket with message as its payload."""
  574. if isinstance(message, str):
  575. message = message.encode("utf-8")
  576. if binary:
  577. await self._send_frame(message, WSMsgType.BINARY, compress)
  578. else:
  579. await self._send_frame(message, WSMsgType.TEXT, compress)
  580. async def close(self, code: int = 1000, message: bytes = b"") -> None:
  581. """Close the websocket, sending the specified code and message."""
  582. if isinstance(message, str):
  583. message = message.encode("utf-8")
  584. try:
  585. await self._send_frame(
  586. PACK_CLOSE_CODE(code) + message, opcode=WSMsgType.CLOSE
  587. )
  588. finally:
  589. self._closing = True