receiver.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. 'use strict'
  2. const { Writable } = require('stream')
  3. const diagnosticsChannel = require('diagnostics_channel')
  4. const { parserStates, opcodes, states, emptyBuffer } = require('./constants')
  5. const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
  6. const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
  7. const { WebsocketFrameSend } = require('./frame')
  8. // This code was influenced by ws released under the MIT license.
  9. // Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
  10. // Copyright (c) 2013 Arnout Kazemier and contributors
  11. // Copyright (c) 2016 Luigi Pinca and contributors
  12. const channels = {}
  13. channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
  14. channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
  15. class ByteParser extends Writable {
  16. #buffers = []
  17. #byteOffset = 0
  18. #state = parserStates.INFO
  19. #info = {}
  20. #fragments = []
  21. constructor (ws) {
  22. super()
  23. this.ws = ws
  24. }
  25. /**
  26. * @param {Buffer} chunk
  27. * @param {() => void} callback
  28. */
  29. _write (chunk, _, callback) {
  30. this.#buffers.push(chunk)
  31. this.#byteOffset += chunk.length
  32. this.run(callback)
  33. }
  34. /**
  35. * Runs whenever a new chunk is received.
  36. * Callback is called whenever there are no more chunks buffering,
  37. * or not enough bytes are buffered to parse.
  38. */
  39. run (callback) {
  40. while (true) {
  41. if (this.#state === parserStates.INFO) {
  42. // If there aren't enough bytes to parse the payload length, etc.
  43. if (this.#byteOffset < 2) {
  44. return callback()
  45. }
  46. const buffer = this.consume(2)
  47. this.#info.fin = (buffer[0] & 0x80) !== 0
  48. this.#info.opcode = buffer[0] & 0x0F
  49. // If we receive a fragmented message, we use the type of the first
  50. // frame to parse the full message as binary/text, when it's terminated
  51. this.#info.originalOpcode ??= this.#info.opcode
  52. this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
  53. if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
  54. // Only text and binary frames can be fragmented
  55. failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
  56. return
  57. }
  58. const payloadLength = buffer[1] & 0x7F
  59. if (payloadLength <= 125) {
  60. this.#info.payloadLength = payloadLength
  61. this.#state = parserStates.READ_DATA
  62. } else if (payloadLength === 126) {
  63. this.#state = parserStates.PAYLOADLENGTH_16
  64. } else if (payloadLength === 127) {
  65. this.#state = parserStates.PAYLOADLENGTH_64
  66. }
  67. if (this.#info.fragmented && payloadLength > 125) {
  68. // A fragmented frame can't be fragmented itself
  69. failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
  70. return
  71. } else if (
  72. (this.#info.opcode === opcodes.PING ||
  73. this.#info.opcode === opcodes.PONG ||
  74. this.#info.opcode === opcodes.CLOSE) &&
  75. payloadLength > 125
  76. ) {
  77. // Control frames can have a payload length of 125 bytes MAX
  78. failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
  79. return
  80. } else if (this.#info.opcode === opcodes.CLOSE) {
  81. if (payloadLength === 1) {
  82. failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
  83. return
  84. }
  85. const body = this.consume(payloadLength)
  86. this.#info.closeInfo = this.parseCloseBody(false, body)
  87. if (!this.ws[kSentClose]) {
  88. // If an endpoint receives a Close frame and did not previously send a
  89. // Close frame, the endpoint MUST send a Close frame in response. (When
  90. // sending a Close frame in response, the endpoint typically echos the
  91. // status code it received.)
  92. const body = Buffer.allocUnsafe(2)
  93. body.writeUInt16BE(this.#info.closeInfo.code, 0)
  94. const closeFrame = new WebsocketFrameSend(body)
  95. this.ws[kResponse].socket.write(
  96. closeFrame.createFrame(opcodes.CLOSE),
  97. (err) => {
  98. if (!err) {
  99. this.ws[kSentClose] = true
  100. }
  101. }
  102. )
  103. }
  104. // Upon either sending or receiving a Close control frame, it is said
  105. // that _The WebSocket Closing Handshake is Started_ and that the
  106. // WebSocket connection is in the CLOSING state.
  107. this.ws[kReadyState] = states.CLOSING
  108. this.ws[kReceivedClose] = true
  109. this.end()
  110. return
  111. } else if (this.#info.opcode === opcodes.PING) {
  112. // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
  113. // response, unless it already received a Close frame.
  114. // A Pong frame sent in response to a Ping frame must have identical
  115. // "Application data"
  116. const body = this.consume(payloadLength)
  117. if (!this.ws[kReceivedClose]) {
  118. const frame = new WebsocketFrameSend(body)
  119. this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
  120. if (channels.ping.hasSubscribers) {
  121. channels.ping.publish({
  122. payload: body
  123. })
  124. }
  125. }
  126. this.#state = parserStates.INFO
  127. if (this.#byteOffset > 0) {
  128. continue
  129. } else {
  130. callback()
  131. return
  132. }
  133. } else if (this.#info.opcode === opcodes.PONG) {
  134. // A Pong frame MAY be sent unsolicited. This serves as a
  135. // unidirectional heartbeat. A response to an unsolicited Pong frame is
  136. // not expected.
  137. const body = this.consume(payloadLength)
  138. if (channels.pong.hasSubscribers) {
  139. channels.pong.publish({
  140. payload: body
  141. })
  142. }
  143. if (this.#byteOffset > 0) {
  144. continue
  145. } else {
  146. callback()
  147. return
  148. }
  149. }
  150. } else if (this.#state === parserStates.PAYLOADLENGTH_16) {
  151. if (this.#byteOffset < 2) {
  152. return callback()
  153. }
  154. const buffer = this.consume(2)
  155. this.#info.payloadLength = buffer.readUInt16BE(0)
  156. this.#state = parserStates.READ_DATA
  157. } else if (this.#state === parserStates.PAYLOADLENGTH_64) {
  158. if (this.#byteOffset < 8) {
  159. return callback()
  160. }
  161. const buffer = this.consume(8)
  162. const upper = buffer.readUInt32BE(0)
  163. // 2^31 is the maxinimum bytes an arraybuffer can contain
  164. // on 32-bit systems. Although, on 64-bit systems, this is
  165. // 2^53-1 bytes.
  166. // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length
  167. // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
  168. // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
  169. if (upper > 2 ** 31 - 1) {
  170. failWebsocketConnection(this.ws, 'Received payload length > 2^31 bytes.')
  171. return
  172. }
  173. const lower = buffer.readUInt32BE(4)
  174. this.#info.payloadLength = (upper << 8) + lower
  175. this.#state = parserStates.READ_DATA
  176. } else if (this.#state === parserStates.READ_DATA) {
  177. if (this.#byteOffset < this.#info.payloadLength) {
  178. // If there is still more data in this chunk that needs to be read
  179. return callback()
  180. } else if (this.#byteOffset >= this.#info.payloadLength) {
  181. // If the server sent multiple frames in a single chunk
  182. const body = this.consume(this.#info.payloadLength)
  183. this.#fragments.push(body)
  184. // If the frame is unfragmented, or a fragmented frame was terminated,
  185. // a message was received
  186. if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
  187. const fullMessage = Buffer.concat(this.#fragments)
  188. websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)
  189. this.#info = {}
  190. this.#fragments.length = 0
  191. }
  192. this.#state = parserStates.INFO
  193. }
  194. }
  195. if (this.#byteOffset > 0) {
  196. continue
  197. } else {
  198. callback()
  199. break
  200. }
  201. }
  202. }
  203. /**
  204. * Take n bytes from the buffered Buffers
  205. * @param {number} n
  206. * @returns {Buffer|null}
  207. */
  208. consume (n) {
  209. if (n > this.#byteOffset) {
  210. return null
  211. } else if (n === 0) {
  212. return emptyBuffer
  213. }
  214. if (this.#buffers[0].length === n) {
  215. this.#byteOffset -= this.#buffers[0].length
  216. return this.#buffers.shift()
  217. }
  218. const buffer = Buffer.allocUnsafe(n)
  219. let offset = 0
  220. while (offset !== n) {
  221. const next = this.#buffers[0]
  222. const { length } = next
  223. if (length + offset === n) {
  224. buffer.set(this.#buffers.shift(), offset)
  225. break
  226. } else if (length + offset > n) {
  227. buffer.set(next.subarray(0, n - offset), offset)
  228. this.#buffers[0] = next.subarray(n - offset)
  229. break
  230. } else {
  231. buffer.set(this.#buffers.shift(), offset)
  232. offset += next.length
  233. }
  234. }
  235. this.#byteOffset -= n
  236. return buffer
  237. }
  238. parseCloseBody (onlyCode, data) {
  239. // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
  240. /** @type {number|undefined} */
  241. let code
  242. if (data.length >= 2) {
  243. // _The WebSocket Connection Close Code_ is
  244. // defined as the status code (Section 7.4) contained in the first Close
  245. // control frame received by the application
  246. code = data.readUInt16BE(0)
  247. }
  248. if (onlyCode) {
  249. if (!isValidStatusCode(code)) {
  250. return null
  251. }
  252. return { code }
  253. }
  254. // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6
  255. /** @type {Buffer} */
  256. let reason = data.subarray(2)
  257. // Remove BOM
  258. if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) {
  259. reason = reason.subarray(3)
  260. }
  261. if (code !== undefined && !isValidStatusCode(code)) {
  262. return null
  263. }
  264. try {
  265. // TODO: optimize this
  266. reason = new TextDecoder('utf-8', { fatal: true }).decode(reason)
  267. } catch {
  268. return null
  269. }
  270. return { code, reason }
  271. }
  272. get closingInfo () {
  273. return this.#info.closeInfo
  274. }
  275. }
  276. module.exports = {
  277. ByteParser
  278. }