api-pipeline.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. 'use strict'
  2. const {
  3. Readable,
  4. Duplex,
  5. PassThrough
  6. } = require('stream')
  7. const {
  8. InvalidArgumentError,
  9. InvalidReturnValueError,
  10. RequestAbortedError
  11. } = require('../core/errors')
  12. const util = require('../core/util')
  13. const { AsyncResource } = require('async_hooks')
  14. const { addSignal, removeSignal } = require('./abort-signal')
  15. const assert = require('assert')
  16. const kResume = Symbol('resume')
  17. class PipelineRequest extends Readable {
  18. constructor () {
  19. super({ autoDestroy: true })
  20. this[kResume] = null
  21. }
  22. _read () {
  23. const { [kResume]: resume } = this
  24. if (resume) {
  25. this[kResume] = null
  26. resume()
  27. }
  28. }
  29. _destroy (err, callback) {
  30. this._read()
  31. callback(err)
  32. }
  33. }
  34. class PipelineResponse extends Readable {
  35. constructor (resume) {
  36. super({ autoDestroy: true })
  37. this[kResume] = resume
  38. }
  39. _read () {
  40. this[kResume]()
  41. }
  42. _destroy (err, callback) {
  43. if (!err && !this._readableState.endEmitted) {
  44. err = new RequestAbortedError()
  45. }
  46. callback(err)
  47. }
  48. }
  49. class PipelineHandler extends AsyncResource {
  50. constructor (opts, handler) {
  51. if (!opts || typeof opts !== 'object') {
  52. throw new InvalidArgumentError('invalid opts')
  53. }
  54. if (typeof handler !== 'function') {
  55. throw new InvalidArgumentError('invalid handler')
  56. }
  57. const { signal, method, opaque, onInfo, responseHeaders } = opts
  58. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  59. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  60. }
  61. if (method === 'CONNECT') {
  62. throw new InvalidArgumentError('invalid method')
  63. }
  64. if (onInfo && typeof onInfo !== 'function') {
  65. throw new InvalidArgumentError('invalid onInfo callback')
  66. }
  67. super('UNDICI_PIPELINE')
  68. this.opaque = opaque || null
  69. this.responseHeaders = responseHeaders || null
  70. this.handler = handler
  71. this.abort = null
  72. this.context = null
  73. this.onInfo = onInfo || null
  74. this.req = new PipelineRequest().on('error', util.nop)
  75. this.ret = new Duplex({
  76. readableObjectMode: opts.objectMode,
  77. autoDestroy: true,
  78. read: () => {
  79. const { body } = this
  80. if (body && body.resume) {
  81. body.resume()
  82. }
  83. },
  84. write: (chunk, encoding, callback) => {
  85. const { req } = this
  86. if (req.push(chunk, encoding) || req._readableState.destroyed) {
  87. callback()
  88. } else {
  89. req[kResume] = callback
  90. }
  91. },
  92. destroy: (err, callback) => {
  93. const { body, req, res, ret, abort } = this
  94. if (!err && !ret._readableState.endEmitted) {
  95. err = new RequestAbortedError()
  96. }
  97. if (abort && err) {
  98. abort()
  99. }
  100. util.destroy(body, err)
  101. util.destroy(req, err)
  102. util.destroy(res, err)
  103. removeSignal(this)
  104. callback(err)
  105. }
  106. }).on('prefinish', () => {
  107. const { req } = this
  108. // Node < 15 does not call _final in same tick.
  109. req.push(null)
  110. })
  111. this.res = null
  112. addSignal(this, signal)
  113. }
  114. onConnect (abort, context) {
  115. const { ret, res } = this
  116. assert(!res, 'pipeline cannot be retried')
  117. if (ret.destroyed) {
  118. throw new RequestAbortedError()
  119. }
  120. this.abort = abort
  121. this.context = context
  122. }
  123. onHeaders (statusCode, rawHeaders, resume) {
  124. const { opaque, handler, context } = this
  125. if (statusCode < 200) {
  126. if (this.onInfo) {
  127. const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  128. this.onInfo({ statusCode, headers })
  129. }
  130. return
  131. }
  132. this.res = new PipelineResponse(resume)
  133. let body
  134. try {
  135. this.handler = null
  136. const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  137. body = this.runInAsyncScope(handler, null, {
  138. statusCode,
  139. headers,
  140. opaque,
  141. body: this.res,
  142. context
  143. })
  144. } catch (err) {
  145. this.res.on('error', util.nop)
  146. throw err
  147. }
  148. if (!body || typeof body.on !== 'function') {
  149. throw new InvalidReturnValueError('expected Readable')
  150. }
  151. body
  152. .on('data', (chunk) => {
  153. const { ret, body } = this
  154. if (!ret.push(chunk) && body.pause) {
  155. body.pause()
  156. }
  157. })
  158. .on('error', (err) => {
  159. const { ret } = this
  160. util.destroy(ret, err)
  161. })
  162. .on('end', () => {
  163. const { ret } = this
  164. ret.push(null)
  165. })
  166. .on('close', () => {
  167. const { ret } = this
  168. if (!ret._readableState.ended) {
  169. util.destroy(ret, new RequestAbortedError())
  170. }
  171. })
  172. this.body = body
  173. }
  174. onData (chunk) {
  175. const { res } = this
  176. return res.push(chunk)
  177. }
  178. onComplete (trailers) {
  179. const { res } = this
  180. res.push(null)
  181. }
  182. onError (err) {
  183. const { ret } = this
  184. this.handler = null
  185. util.destroy(ret, err)
  186. }
  187. }
  188. function pipeline (opts, handler) {
  189. try {
  190. const pipelineHandler = new PipelineHandler(opts, handler)
  191. this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
  192. return pipelineHandler.ret
  193. } catch (err) {
  194. return new PassThrough().destroy(err)
  195. }
  196. }
  197. module.exports = pipeline