readable.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. // Ported from https://github.com/nodejs/undici/pull/907
  2. 'use strict'
  3. const assert = require('assert')
  4. const { Readable } = require('stream')
  5. const { RequestAbortedError, NotSupportedError, InvalidArgumentError } = require('../core/errors')
  6. const util = require('../core/util')
  7. const { ReadableStreamFrom, toUSVString } = require('../core/util')
  8. let Blob
  9. const kConsume = Symbol('kConsume')
  10. const kReading = Symbol('kReading')
  11. const kBody = Symbol('kBody')
  12. const kAbort = Symbol('abort')
  13. const kContentType = Symbol('kContentType')
  14. module.exports = class BodyReadable extends Readable {
  15. constructor ({
  16. resume,
  17. abort,
  18. contentType = '',
  19. highWaterMark = 64 * 1024 // Same as nodejs fs streams.
  20. }) {
  21. super({
  22. autoDestroy: true,
  23. read: resume,
  24. highWaterMark
  25. })
  26. this._readableState.dataEmitted = false
  27. this[kAbort] = abort
  28. this[kConsume] = null
  29. this[kBody] = null
  30. this[kContentType] = contentType
  31. // Is stream being consumed through Readable API?
  32. // This is an optimization so that we avoid checking
  33. // for 'data' and 'readable' listeners in the hot path
  34. // inside push().
  35. this[kReading] = false
  36. }
  37. destroy (err) {
  38. if (this.destroyed) {
  39. // Node < 16
  40. return this
  41. }
  42. if (!err && !this._readableState.endEmitted) {
  43. err = new RequestAbortedError()
  44. }
  45. if (err) {
  46. this[kAbort]()
  47. }
  48. return super.destroy(err)
  49. }
  50. emit (ev, ...args) {
  51. if (ev === 'data') {
  52. // Node < 16.7
  53. this._readableState.dataEmitted = true
  54. } else if (ev === 'error') {
  55. // Node < 16
  56. this._readableState.errorEmitted = true
  57. }
  58. return super.emit(ev, ...args)
  59. }
  60. on (ev, ...args) {
  61. if (ev === 'data' || ev === 'readable') {
  62. this[kReading] = true
  63. }
  64. return super.on(ev, ...args)
  65. }
  66. addListener (ev, ...args) {
  67. return this.on(ev, ...args)
  68. }
  69. off (ev, ...args) {
  70. const ret = super.off(ev, ...args)
  71. if (ev === 'data' || ev === 'readable') {
  72. this[kReading] = (
  73. this.listenerCount('data') > 0 ||
  74. this.listenerCount('readable') > 0
  75. )
  76. }
  77. return ret
  78. }
  79. removeListener (ev, ...args) {
  80. return this.off(ev, ...args)
  81. }
  82. push (chunk) {
  83. if (this[kConsume] && chunk !== null && this.readableLength === 0) {
  84. consumePush(this[kConsume], chunk)
  85. return this[kReading] ? super.push(chunk) : true
  86. }
  87. return super.push(chunk)
  88. }
  89. // https://fetch.spec.whatwg.org/#dom-body-text
  90. async text () {
  91. return consume(this, 'text')
  92. }
  93. // https://fetch.spec.whatwg.org/#dom-body-json
  94. async json () {
  95. return consume(this, 'json')
  96. }
  97. // https://fetch.spec.whatwg.org/#dom-body-blob
  98. async blob () {
  99. return consume(this, 'blob')
  100. }
  101. // https://fetch.spec.whatwg.org/#dom-body-arraybuffer
  102. async arrayBuffer () {
  103. return consume(this, 'arrayBuffer')
  104. }
  105. // https://fetch.spec.whatwg.org/#dom-body-formdata
  106. async formData () {
  107. // TODO: Implement.
  108. throw new NotSupportedError()
  109. }
  110. // https://fetch.spec.whatwg.org/#dom-body-bodyused
  111. get bodyUsed () {
  112. return util.isDisturbed(this)
  113. }
  114. // https://fetch.spec.whatwg.org/#dom-body-body
  115. get body () {
  116. if (!this[kBody]) {
  117. this[kBody] = ReadableStreamFrom(this)
  118. if (this[kConsume]) {
  119. // TODO: Is this the best way to force a lock?
  120. this[kBody].getReader() // Ensure stream is locked.
  121. assert(this[kBody].locked)
  122. }
  123. }
  124. return this[kBody]
  125. }
  126. async dump (opts) {
  127. let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144
  128. const signal = opts && opts.signal
  129. const abortFn = () => {
  130. this.destroy()
  131. }
  132. let signalListenerCleanup
  133. if (signal) {
  134. if (typeof signal !== 'object' || !('aborted' in signal)) {
  135. throw new InvalidArgumentError('signal must be an AbortSignal')
  136. }
  137. util.throwIfAborted(signal)
  138. signalListenerCleanup = util.addAbortListener(signal, abortFn)
  139. }
  140. try {
  141. for await (const chunk of this) {
  142. util.throwIfAborted(signal)
  143. limit -= Buffer.byteLength(chunk)
  144. if (limit < 0) {
  145. return
  146. }
  147. }
  148. } catch {
  149. util.throwIfAborted(signal)
  150. } finally {
  151. if (typeof signalListenerCleanup === 'function') {
  152. signalListenerCleanup()
  153. } else if (signalListenerCleanup) {
  154. signalListenerCleanup[Symbol.dispose]()
  155. }
  156. }
  157. }
  158. }
  159. // https://streams.spec.whatwg.org/#readablestream-locked
  160. function isLocked (self) {
  161. // Consume is an implicit lock.
  162. return (self[kBody] && self[kBody].locked === true) || self[kConsume]
  163. }
  164. // https://fetch.spec.whatwg.org/#body-unusable
  165. function isUnusable (self) {
  166. return util.isDisturbed(self) || isLocked(self)
  167. }
  168. async function consume (stream, type) {
  169. if (isUnusable(stream)) {
  170. throw new TypeError('unusable')
  171. }
  172. assert(!stream[kConsume])
  173. return new Promise((resolve, reject) => {
  174. stream[kConsume] = {
  175. type,
  176. stream,
  177. resolve,
  178. reject,
  179. length: 0,
  180. body: []
  181. }
  182. stream
  183. .on('error', function (err) {
  184. consumeFinish(this[kConsume], err)
  185. })
  186. .on('close', function () {
  187. if (this[kConsume].body !== null) {
  188. consumeFinish(this[kConsume], new RequestAbortedError())
  189. }
  190. })
  191. process.nextTick(consumeStart, stream[kConsume])
  192. })
  193. }
  194. function consumeStart (consume) {
  195. if (consume.body === null) {
  196. return
  197. }
  198. const { _readableState: state } = consume.stream
  199. for (const chunk of state.buffer) {
  200. consumePush(consume, chunk)
  201. }
  202. if (state.endEmitted) {
  203. consumeEnd(this[kConsume])
  204. } else {
  205. consume.stream.on('end', function () {
  206. consumeEnd(this[kConsume])
  207. })
  208. }
  209. consume.stream.resume()
  210. while (consume.stream.read() != null) {
  211. // Loop
  212. }
  213. }
  214. function consumeEnd (consume) {
  215. const { type, body, resolve, stream, length } = consume
  216. try {
  217. if (type === 'text') {
  218. resolve(toUSVString(Buffer.concat(body)))
  219. } else if (type === 'json') {
  220. resolve(JSON.parse(Buffer.concat(body)))
  221. } else if (type === 'arrayBuffer') {
  222. const dst = new Uint8Array(length)
  223. let pos = 0
  224. for (const buf of body) {
  225. dst.set(buf, pos)
  226. pos += buf.byteLength
  227. }
  228. resolve(dst)
  229. } else if (type === 'blob') {
  230. if (!Blob) {
  231. Blob = require('buffer').Blob
  232. }
  233. resolve(new Blob(body, { type: stream[kContentType] }))
  234. }
  235. consumeFinish(consume)
  236. } catch (err) {
  237. stream.destroy(err)
  238. }
  239. }
  240. function consumePush (consume, chunk) {
  241. consume.length += chunk.length
  242. consume.body.push(chunk)
  243. }
  244. function consumeFinish (consume, err) {
  245. if (consume.body === null) {
  246. return
  247. }
  248. if (err) {
  249. consume.reject(err)
  250. } else {
  251. consume.resolve()
  252. }
  253. consume.type = null
  254. consume.stream = null
  255. consume.resolve = null
  256. consume.reject = null
  257. consume.length = 0
  258. consume.body = null
  259. }