api-stream.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. 'use strict'
  2. const { finished, PassThrough } = require('stream')
  3. const {
  4. InvalidArgumentError,
  5. InvalidReturnValueError,
  6. RequestAbortedError
  7. } = require('../core/errors')
  8. const util = require('../core/util')
  9. const { getResolveErrorBodyCallback } = require('./util')
  10. const { AsyncResource } = require('async_hooks')
  11. const { addSignal, removeSignal } = require('./abort-signal')
  12. class StreamHandler extends AsyncResource {
  13. constructor (opts, factory, callback) {
  14. if (!opts || typeof opts !== 'object') {
  15. throw new InvalidArgumentError('invalid opts')
  16. }
  17. const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
  18. try {
  19. if (typeof callback !== 'function') {
  20. throw new InvalidArgumentError('invalid callback')
  21. }
  22. if (typeof factory !== 'function') {
  23. throw new InvalidArgumentError('invalid factory')
  24. }
  25. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  26. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  27. }
  28. if (method === 'CONNECT') {
  29. throw new InvalidArgumentError('invalid method')
  30. }
  31. if (onInfo && typeof onInfo !== 'function') {
  32. throw new InvalidArgumentError('invalid onInfo callback')
  33. }
  34. super('UNDICI_STREAM')
  35. } catch (err) {
  36. if (util.isStream(body)) {
  37. util.destroy(body.on('error', util.nop), err)
  38. }
  39. throw err
  40. }
  41. this.responseHeaders = responseHeaders || null
  42. this.opaque = opaque || null
  43. this.factory = factory
  44. this.callback = callback
  45. this.res = null
  46. this.abort = null
  47. this.context = null
  48. this.trailers = null
  49. this.body = body
  50. this.onInfo = onInfo || null
  51. this.throwOnError = throwOnError || false
  52. if (util.isStream(body)) {
  53. body.on('error', (err) => {
  54. this.onError(err)
  55. })
  56. }
  57. addSignal(this, signal)
  58. }
  59. onConnect (abort, context) {
  60. if (!this.callback) {
  61. throw new RequestAbortedError()
  62. }
  63. this.abort = abort
  64. this.context = context
  65. }
  66. onHeaders (statusCode, rawHeaders, resume, statusMessage) {
  67. const { factory, opaque, context, callback, responseHeaders } = this
  68. const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  69. if (statusCode < 200) {
  70. if (this.onInfo) {
  71. this.onInfo({ statusCode, headers })
  72. }
  73. return
  74. }
  75. this.factory = null
  76. let res
  77. if (this.throwOnError && statusCode >= 400) {
  78. const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
  79. const contentType = parsedHeaders['content-type']
  80. res = new PassThrough()
  81. this.callback = null
  82. this.runInAsyncScope(getResolveErrorBodyCallback, null,
  83. { callback, body: res, contentType, statusCode, statusMessage, headers }
  84. )
  85. } else {
  86. res = this.runInAsyncScope(factory, null, {
  87. statusCode,
  88. headers,
  89. opaque,
  90. context
  91. })
  92. if (
  93. !res ||
  94. typeof res.write !== 'function' ||
  95. typeof res.end !== 'function' ||
  96. typeof res.on !== 'function'
  97. ) {
  98. throw new InvalidReturnValueError('expected Writable')
  99. }
  100. // TODO: Avoid finished. It registers an unnecessary amount of listeners.
  101. finished(res, { readable: false }, (err) => {
  102. const { callback, res, opaque, trailers, abort } = this
  103. this.res = null
  104. if (err || !res.readable) {
  105. util.destroy(res, err)
  106. }
  107. this.callback = null
  108. this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
  109. if (err) {
  110. abort()
  111. }
  112. })
  113. }
  114. res.on('drain', resume)
  115. this.res = res
  116. const needDrain = res.writableNeedDrain !== undefined
  117. ? res.writableNeedDrain
  118. : res._writableState && res._writableState.needDrain
  119. return needDrain !== true
  120. }
  121. onData (chunk) {
  122. const { res } = this
  123. return res.write(chunk)
  124. }
  125. onComplete (trailers) {
  126. const { res } = this
  127. removeSignal(this)
  128. this.trailers = util.parseHeaders(trailers)
  129. res.end()
  130. }
  131. onError (err) {
  132. const { res, callback, opaque, body } = this
  133. removeSignal(this)
  134. this.factory = null
  135. if (res) {
  136. this.res = null
  137. util.destroy(res, err)
  138. } else if (callback) {
  139. this.callback = null
  140. queueMicrotask(() => {
  141. this.runInAsyncScope(callback, null, err, { opaque })
  142. })
  143. }
  144. if (body) {
  145. this.body = null
  146. util.destroy(body, err)
  147. }
  148. }
  149. }
  150. function stream (opts, factory, callback) {
  151. if (callback === undefined) {
  152. return new Promise((resolve, reject) => {
  153. stream.call(this, opts, factory, (err, data) => {
  154. return err ? reject(err) : resolve(data)
  155. })
  156. })
  157. }
  158. try {
  159. this.dispatch(opts, new StreamHandler(opts, factory, callback))
  160. } catch (err) {
  161. if (typeof callback !== 'function') {
  162. throw err
  163. }
  164. const opaque = opts && opts.opaque
  165. queueMicrotask(() => callback(err, { opaque }))
  166. }
  167. }
  168. module.exports = stream