123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- 'use strict'
- const {
- Readable,
- Duplex,
- PassThrough
- } = require('stream')
- const {
- InvalidArgumentError,
- InvalidReturnValueError,
- RequestAbortedError
- } = require('../core/errors')
- const util = require('../core/util')
- const { AsyncResource } = require('async_hooks')
- const { addSignal, removeSignal } = require('./abort-signal')
- const assert = require('assert')
- const kResume = Symbol('resume')
- class PipelineRequest extends Readable {
- constructor () {
- super({ autoDestroy: true })
- this[kResume] = null
- }
- _read () {
- const { [kResume]: resume } = this
- if (resume) {
- this[kResume] = null
- resume()
- }
- }
- _destroy (err, callback) {
- this._read()
- callback(err)
- }
- }
- class PipelineResponse extends Readable {
- constructor (resume) {
- super({ autoDestroy: true })
- this[kResume] = resume
- }
- _read () {
- this[kResume]()
- }
- _destroy (err, callback) {
- if (!err && !this._readableState.endEmitted) {
- err = new RequestAbortedError()
- }
- callback(err)
- }
- }
- class PipelineHandler extends AsyncResource {
- constructor (opts, handler) {
- if (!opts || typeof opts !== 'object') {
- throw new InvalidArgumentError('invalid opts')
- }
- if (typeof handler !== 'function') {
- throw new InvalidArgumentError('invalid handler')
- }
- const { signal, method, opaque, onInfo, responseHeaders } = opts
- if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
- throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
- }
- if (method === 'CONNECT') {
- throw new InvalidArgumentError('invalid method')
- }
- if (onInfo && typeof onInfo !== 'function') {
- throw new InvalidArgumentError('invalid onInfo callback')
- }
- super('UNDICI_PIPELINE')
- this.opaque = opaque || null
- this.responseHeaders = responseHeaders || null
- this.handler = handler
- this.abort = null
- this.context = null
- this.onInfo = onInfo || null
- this.req = new PipelineRequest().on('error', util.nop)
- this.ret = new Duplex({
- readableObjectMode: opts.objectMode,
- autoDestroy: true,
- read: () => {
- const { body } = this
- if (body && body.resume) {
- body.resume()
- }
- },
- write: (chunk, encoding, callback) => {
- const { req } = this
- if (req.push(chunk, encoding) || req._readableState.destroyed) {
- callback()
- } else {
- req[kResume] = callback
- }
- },
- destroy: (err, callback) => {
- const { body, req, res, ret, abort } = this
- if (!err && !ret._readableState.endEmitted) {
- err = new RequestAbortedError()
- }
- if (abort && err) {
- abort()
- }
- util.destroy(body, err)
- util.destroy(req, err)
- util.destroy(res, err)
- removeSignal(this)
- callback(err)
- }
- }).on('prefinish', () => {
- const { req } = this
- // Node < 15 does not call _final in same tick.
- req.push(null)
- })
- this.res = null
- addSignal(this, signal)
- }
- onConnect (abort, context) {
- const { ret, res } = this
- assert(!res, 'pipeline cannot be retried')
- if (ret.destroyed) {
- throw new RequestAbortedError()
- }
- this.abort = abort
- this.context = context
- }
- onHeaders (statusCode, rawHeaders, resume) {
- const { opaque, handler, context } = this
- if (statusCode < 200) {
- if (this.onInfo) {
- const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
- this.onInfo({ statusCode, headers })
- }
- return
- }
- this.res = new PipelineResponse(resume)
- let body
- try {
- this.handler = null
- const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
- body = this.runInAsyncScope(handler, null, {
- statusCode,
- headers,
- opaque,
- body: this.res,
- context
- })
- } catch (err) {
- this.res.on('error', util.nop)
- throw err
- }
- if (!body || typeof body.on !== 'function') {
- throw new InvalidReturnValueError('expected Readable')
- }
- body
- .on('data', (chunk) => {
- const { ret, body } = this
- if (!ret.push(chunk) && body.pause) {
- body.pause()
- }
- })
- .on('error', (err) => {
- const { ret } = this
- util.destroy(ret, err)
- })
- .on('end', () => {
- const { ret } = this
- ret.push(null)
- })
- .on('close', () => {
- const { ret } = this
- if (!ret._readableState.ended) {
- util.destroy(ret, new RequestAbortedError())
- }
- })
- this.body = body
- }
- onData (chunk) {
- const { res } = this
- return res.push(chunk)
- }
- onComplete (trailers) {
- const { res } = this
- res.push(null)
- }
- onError (err) {
- const { ret } = this
- this.handler = null
- util.destroy(ret, err)
- }
- }
- function pipeline (opts, handler) {
- try {
- const pipelineHandler = new PipelineHandler(opts, handler)
- this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
- return pipelineHandler.ret
- } catch (err) {
- return new PassThrough().destroy(err)
- }
- }
- module.exports = pipeline
|