12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796 |
- // @ts-check
- 'use strict'
- /* global WebAssembly */
- const assert = require('assert')
- const net = require('net')
- const util = require('./core/util')
- const timers = require('./timers')
- const Request = require('./core/request')
- const DispatcherBase = require('./dispatcher-base')
- const {
- RequestContentLengthMismatchError,
- ResponseContentLengthMismatchError,
- InvalidArgumentError,
- RequestAbortedError,
- HeadersTimeoutError,
- HeadersOverflowError,
- SocketError,
- InformationalError,
- BodyTimeoutError,
- HTTPParserError,
- ResponseExceededMaxSizeError,
- ClientDestroyedError
- } = require('./core/errors')
- const buildConnector = require('./core/connect')
- const {
- kUrl,
- kReset,
- kServerName,
- kClient,
- kBusy,
- kParser,
- kConnect,
- kBlocking,
- kResuming,
- kRunning,
- kPending,
- kSize,
- kWriting,
- kQueue,
- kConnected,
- kConnecting,
- kNeedDrain,
- kNoRef,
- kKeepAliveDefaultTimeout,
- kHostHeader,
- kPendingIdx,
- kRunningIdx,
- kError,
- kPipelining,
- kSocket,
- kKeepAliveTimeoutValue,
- kMaxHeadersSize,
- kKeepAliveMaxTimeout,
- kKeepAliveTimeoutThreshold,
- kHeadersTimeout,
- kBodyTimeout,
- kStrictContentLength,
- kConnector,
- kMaxRedirections,
- kMaxRequests,
- kCounter,
- kClose,
- kDestroy,
- kDispatch,
- kInterceptors,
- kLocalAddress,
- kMaxResponseSize
- } = require('./core/symbols')
- const FastBuffer = Buffer[Symbol.species]
- const kClosedResolve = Symbol('kClosedResolve')
- const channels = {}
- try {
- const diagnosticsChannel = require('diagnostics_channel')
- channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders')
- channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect')
- channels.connectError = diagnosticsChannel.channel('undici:client:connectError')
- channels.connected = diagnosticsChannel.channel('undici:client:connected')
- } catch {
- channels.sendHeaders = { hasSubscribers: false }
- channels.beforeConnect = { hasSubscribers: false }
- channels.connectError = { hasSubscribers: false }
- channels.connected = { hasSubscribers: false }
- }
- /**
- * @type {import('../types/client').default}
- */
- class Client extends DispatcherBase {
- /**
- *
- * @param {string|URL} url
- * @param {import('../types/client').Client.Options} options
- */
- constructor (url, {
- interceptors,
- maxHeaderSize,
- headersTimeout,
- socketTimeout,
- requestTimeout,
- connectTimeout,
- bodyTimeout,
- idleTimeout,
- keepAlive,
- keepAliveTimeout,
- maxKeepAliveTimeout,
- keepAliveMaxTimeout,
- keepAliveTimeoutThreshold,
- socketPath,
- pipelining,
- tls,
- strictContentLength,
- maxCachedSessions,
- maxRedirections,
- connect,
- maxRequestsPerClient,
- localAddress,
- maxResponseSize,
- autoSelectFamily,
- autoSelectFamilyAttemptTimeout
- } = {}) {
- super()
- if (keepAlive !== undefined) {
- throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
- }
- if (socketTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
- }
- if (requestTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
- }
- if (idleTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
- }
- if (maxKeepAliveTimeout !== undefined) {
- throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
- }
- if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
- throw new InvalidArgumentError('invalid maxHeaderSize')
- }
- if (socketPath != null && typeof socketPath !== 'string') {
- throw new InvalidArgumentError('invalid socketPath')
- }
- if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
- throw new InvalidArgumentError('invalid connectTimeout')
- }
- if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
- throw new InvalidArgumentError('invalid keepAliveTimeout')
- }
- if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
- throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
- }
- if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
- throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
- }
- if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
- throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
- }
- if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
- throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
- }
- if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
- throw new InvalidArgumentError('connect must be a function or an object')
- }
- if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
- throw new InvalidArgumentError('maxRedirections must be a positive number')
- }
- if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
- throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
- }
- if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
- throw new InvalidArgumentError('localAddress must be valid string IP address')
- }
- if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
- throw new InvalidArgumentError('maxResponseSize must be a positive number')
- }
- if (
- autoSelectFamilyAttemptTimeout != null &&
- (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
- ) {
- throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
- }
- if (typeof connect !== 'function') {
- connect = buildConnector({
- ...tls,
- maxCachedSessions,
- socketPath,
- timeout: connectTimeout,
- ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
- ...connect
- })
- }
- this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
- ? interceptors.Client
- : [createRedirectInterceptor({ maxRedirections })]
- this[kUrl] = util.parseOrigin(url)
- this[kConnector] = connect
- this[kSocket] = null
- this[kPipelining] = pipelining != null ? pipelining : 1
- this[kMaxHeadersSize] = maxHeaderSize || 16384
- this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
- this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
- this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
- this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
- this[kServerName] = null
- this[kLocalAddress] = localAddress != null ? localAddress : null
- this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
- this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
- this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
- this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
- this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
- this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
- this[kMaxRedirections] = maxRedirections
- this[kMaxRequests] = maxRequestsPerClient
- this[kClosedResolve] = null
- this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
- // kQueue is built up of 3 sections separated by
- // the kRunningIdx and kPendingIdx indices.
- // | complete | running | pending |
- // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
- // kRunningIdx points to the first running element.
- // kPendingIdx points to the first pending element.
- // This implements a fast queue with an amortized
- // time of O(1).
- this[kQueue] = []
- this[kRunningIdx] = 0
- this[kPendingIdx] = 0
- }
- get pipelining () {
- return this[kPipelining]
- }
- set pipelining (value) {
- this[kPipelining] = value
- resume(this, true)
- }
- get [kPending] () {
- return this[kQueue].length - this[kPendingIdx]
- }
- get [kRunning] () {
- return this[kPendingIdx] - this[kRunningIdx]
- }
- get [kSize] () {
- return this[kQueue].length - this[kRunningIdx]
- }
- get [kConnected] () {
- return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
- }
- get [kBusy] () {
- const socket = this[kSocket]
- return (
- (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
- (this[kSize] >= (this[kPipelining] || 1)) ||
- this[kPending] > 0
- )
- }
- /* istanbul ignore: only used for test */
- [kConnect] (cb) {
- connect(this)
- this.once('connect', cb)
- }
- [kDispatch] (opts, handler) {
- const origin = opts.origin || this[kUrl].origin
- const request = new Request(origin, opts, handler)
- this[kQueue].push(request)
- if (this[kResuming]) {
- // Do nothing.
- } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
- // Wait a tick in case stream/iterator is ended in the same tick.
- this[kResuming] = 1
- process.nextTick(resume, this)
- } else {
- resume(this, true)
- }
- if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
- this[kNeedDrain] = 2
- }
- return this[kNeedDrain] < 2
- }
- async [kClose] () {
- return new Promise((resolve) => {
- if (!this[kSize]) {
- resolve(null)
- } else {
- this[kClosedResolve] = resolve
- }
- })
- }
- async [kDestroy] (err) {
- return new Promise((resolve) => {
- const requests = this[kQueue].splice(this[kPendingIdx])
- for (let i = 0; i < requests.length; i++) {
- const request = requests[i]
- errorRequest(this, request, err)
- }
- const callback = () => {
- if (this[kClosedResolve]) {
- // TODO (fix): Should we error here with ClientDestroyedError?
- this[kClosedResolve]()
- this[kClosedResolve] = null
- }
- resolve()
- }
- if (!this[kSocket]) {
- queueMicrotask(callback)
- } else {
- util.destroy(this[kSocket].on('close', callback), err)
- }
- resume(this)
- })
- }
- }
- const constants = require('./llhttp/constants')
- const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
- const EMPTY_BUF = Buffer.alloc(0)
- async function lazyllhttp () {
- const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined
- let mod
- try {
- mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64'))
- } catch (e) {
- /* istanbul ignore next */
- // We could check if the error was caused by the simd option not
- // being enabled, but the occurring of this other error
- // * https://github.com/emscripten-core/emscripten/issues/11495
- // got me to remove that check to avoid breaking Node 12.
- mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64'))
- }
- return await WebAssembly.instantiate(mod, {
- env: {
- /* eslint-disable camelcase */
- wasm_on_url: (p, at, len) => {
- /* istanbul ignore next */
- return 0
- },
- wasm_on_status: (p, at, len) => {
- assert.strictEqual(currentParser.ptr, p)
- const start = at - currentBufferPtr + currentBufferRef.byteOffset
- return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
- },
- wasm_on_message_begin: (p) => {
- assert.strictEqual(currentParser.ptr, p)
- return currentParser.onMessageBegin() || 0
- },
- wasm_on_header_field: (p, at, len) => {
- assert.strictEqual(currentParser.ptr, p)
- const start = at - currentBufferPtr + currentBufferRef.byteOffset
- return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
- },
- wasm_on_header_value: (p, at, len) => {
- assert.strictEqual(currentParser.ptr, p)
- const start = at - currentBufferPtr + currentBufferRef.byteOffset
- return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
- },
- wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
- assert.strictEqual(currentParser.ptr, p)
- return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
- },
- wasm_on_body: (p, at, len) => {
- assert.strictEqual(currentParser.ptr, p)
- const start = at - currentBufferPtr + currentBufferRef.byteOffset
- return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
- },
- wasm_on_message_complete: (p) => {
- assert.strictEqual(currentParser.ptr, p)
- return currentParser.onMessageComplete() || 0
- }
- /* eslint-enable camelcase */
- }
- })
- }
- let llhttpInstance = null
- let llhttpPromise = lazyllhttp()
- llhttpPromise.catch()
- let currentParser = null
- let currentBufferRef = null
- let currentBufferSize = 0
- let currentBufferPtr = null
- const TIMEOUT_HEADERS = 1
- const TIMEOUT_BODY = 2
- const TIMEOUT_IDLE = 3
- class Parser {
- constructor (client, socket, { exports }) {
- assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
- this.llhttp = exports
- this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
- this.client = client
- this.socket = socket
- this.timeout = null
- this.timeoutValue = null
- this.timeoutType = null
- this.statusCode = null
- this.statusText = ''
- this.upgrade = false
- this.headers = []
- this.headersSize = 0
- this.headersMaxSize = client[kMaxHeadersSize]
- this.shouldKeepAlive = false
- this.paused = false
- this.resume = this.resume.bind(this)
- this.bytesRead = 0
- this.keepAlive = ''
- this.contentLength = ''
- this.connection = ''
- this.maxResponseSize = client[kMaxResponseSize]
- }
- setTimeout (value, type) {
- this.timeoutType = type
- if (value !== this.timeoutValue) {
- timers.clearTimeout(this.timeout)
- if (value) {
- this.timeout = timers.setTimeout(onParserTimeout, value, this)
- // istanbul ignore else: only for jest
- if (this.timeout.unref) {
- this.timeout.unref()
- }
- } else {
- this.timeout = null
- }
- this.timeoutValue = value
- } else if (this.timeout) {
- // istanbul ignore else: only for jest
- if (this.timeout.refresh) {
- this.timeout.refresh()
- }
- }
- }
- resume () {
- if (this.socket.destroyed || !this.paused) {
- return
- }
- assert(this.ptr != null)
- assert(currentParser == null)
- this.llhttp.llhttp_resume(this.ptr)
- assert(this.timeoutType === TIMEOUT_BODY)
- if (this.timeout) {
- // istanbul ignore else: only for jest
- if (this.timeout.refresh) {
- this.timeout.refresh()
- }
- }
- this.paused = false
- this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
- this.readMore()
- }
- readMore () {
- while (!this.paused && this.ptr) {
- const chunk = this.socket.read()
- if (chunk === null) {
- break
- }
- this.execute(chunk)
- }
- }
- execute (data) {
- assert(this.ptr != null)
- assert(currentParser == null)
- assert(!this.paused)
- const { socket, llhttp } = this
- if (data.length > currentBufferSize) {
- if (currentBufferPtr) {
- llhttp.free(currentBufferPtr)
- }
- currentBufferSize = Math.ceil(data.length / 4096) * 4096
- currentBufferPtr = llhttp.malloc(currentBufferSize)
- }
- new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
- // Call `execute` on the wasm parser.
- // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
- // and finally the length of bytes to parse.
- // The return value is an error code or `constants.ERROR.OK`.
- try {
- let ret
- try {
- currentBufferRef = data
- currentParser = this
- ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
- /* eslint-disable-next-line no-useless-catch */
- } catch (err) {
- /* istanbul ignore next: difficult to make a test case for */
- throw err
- } finally {
- currentParser = null
- currentBufferRef = null
- }
- const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
- if (ret === constants.ERROR.PAUSED_UPGRADE) {
- this.onUpgrade(data.slice(offset))
- } else if (ret === constants.ERROR.PAUSED) {
- this.paused = true
- socket.unshift(data.slice(offset))
- } else if (ret !== constants.ERROR.OK) {
- const ptr = llhttp.llhttp_get_error_reason(this.ptr)
- let message = ''
- /* istanbul ignore else: difficult to make a test case for */
- if (ptr) {
- const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
- message =
- 'Response does not match the HTTP/1.1 protocol (' +
- Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
- ')'
- }
- throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
- }
- } catch (err) {
- util.destroy(socket, err)
- }
- }
- destroy () {
- assert(this.ptr != null)
- assert(currentParser == null)
- this.llhttp.llhttp_free(this.ptr)
- this.ptr = null
- timers.clearTimeout(this.timeout)
- this.timeout = null
- this.timeoutValue = null
- this.timeoutType = null
- this.paused = false
- }
- onStatus (buf) {
- this.statusText = buf.toString()
- }
- onMessageBegin () {
- const { socket, client } = this
- /* istanbul ignore next: difficult to make a test case for */
- if (socket.destroyed) {
- return -1
- }
- const request = client[kQueue][client[kRunningIdx]]
- if (!request) {
- return -1
- }
- }
- onHeaderField (buf) {
- const len = this.headers.length
- if ((len & 1) === 0) {
- this.headers.push(buf)
- } else {
- this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
- }
- this.trackHeader(buf.length)
- }
- onHeaderValue (buf) {
- let len = this.headers.length
- if ((len & 1) === 1) {
- this.headers.push(buf)
- len += 1
- } else {
- this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
- }
- const key = this.headers[len - 2]
- if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') {
- this.keepAlive += buf.toString()
- } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') {
- this.connection += buf.toString()
- } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') {
- this.contentLength += buf.toString()
- }
- this.trackHeader(buf.length)
- }
- trackHeader (len) {
- this.headersSize += len
- if (this.headersSize >= this.headersMaxSize) {
- util.destroy(this.socket, new HeadersOverflowError())
- }
- }
- onUpgrade (head) {
- const { upgrade, client, socket, headers, statusCode } = this
- assert(upgrade)
- const request = client[kQueue][client[kRunningIdx]]
- assert(request)
- assert(!socket.destroyed)
- assert(socket === client[kSocket])
- assert(!this.paused)
- assert(request.upgrade || request.method === 'CONNECT')
- this.statusCode = null
- this.statusText = ''
- this.shouldKeepAlive = null
- assert(this.headers.length % 2 === 0)
- this.headers = []
- this.headersSize = 0
- socket.unshift(head)
- socket[kParser].destroy()
- socket[kParser] = null
- socket[kClient] = null
- socket[kError] = null
- socket
- .removeListener('error', onSocketError)
- .removeListener('readable', onSocketReadable)
- .removeListener('end', onSocketEnd)
- .removeListener('close', onSocketClose)
- client[kSocket] = null
- client[kQueue][client[kRunningIdx]++] = null
- client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
- try {
- request.onUpgrade(statusCode, headers, socket)
- } catch (err) {
- util.destroy(socket, err)
- }
- resume(client)
- }
- onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
- const { client, socket, headers, statusText } = this
- /* istanbul ignore next: difficult to make a test case for */
- if (socket.destroyed) {
- return -1
- }
- const request = client[kQueue][client[kRunningIdx]]
- /* istanbul ignore next: difficult to make a test case for */
- if (!request) {
- return -1
- }
- assert(!this.upgrade)
- assert(this.statusCode < 200)
- if (statusCode === 100) {
- util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
- return -1
- }
- /* this can only happen if server is misbehaving */
- if (upgrade && !request.upgrade) {
- util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
- return -1
- }
- assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)
- this.statusCode = statusCode
- this.shouldKeepAlive = (
- shouldKeepAlive ||
- // Override llhttp value which does not allow keepAlive for HEAD.
- (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
- )
- if (this.statusCode >= 200) {
- const bodyTimeout = request.bodyTimeout != null
- ? request.bodyTimeout
- : client[kBodyTimeout]
- this.setTimeout(bodyTimeout, TIMEOUT_BODY)
- } else if (this.timeout) {
- // istanbul ignore else: only for jest
- if (this.timeout.refresh) {
- this.timeout.refresh()
- }
- }
- if (request.method === 'CONNECT') {
- assert(client[kRunning] === 1)
- this.upgrade = true
- return 2
- }
- if (upgrade) {
- assert(client[kRunning] === 1)
- this.upgrade = true
- return 2
- }
- assert(this.headers.length % 2 === 0)
- this.headers = []
- this.headersSize = 0
- if (this.shouldKeepAlive && client[kPipelining]) {
- const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
- if (keepAliveTimeout != null) {
- const timeout = Math.min(
- keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
- client[kKeepAliveMaxTimeout]
- )
- if (timeout <= 0) {
- socket[kReset] = true
- } else {
- client[kKeepAliveTimeoutValue] = timeout
- }
- } else {
- client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
- }
- } else {
- // Stop more requests from being dispatched.
- socket[kReset] = true
- }
- let pause
- try {
- pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
- } catch (err) {
- util.destroy(socket, err)
- return -1
- }
- if (request.method === 'HEAD') {
- return 1
- }
- if (statusCode < 200) {
- return 1
- }
- if (socket[kBlocking]) {
- socket[kBlocking] = false
- resume(client)
- }
- return pause ? constants.ERROR.PAUSED : 0
- }
- onBody (buf) {
- const { client, socket, statusCode, maxResponseSize } = this
- if (socket.destroyed) {
- return -1
- }
- const request = client[kQueue][client[kRunningIdx]]
- assert(request)
- assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
- if (this.timeout) {
- // istanbul ignore else: only for jest
- if (this.timeout.refresh) {
- this.timeout.refresh()
- }
- }
- assert(statusCode >= 200)
- if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
- util.destroy(socket, new ResponseExceededMaxSizeError())
- return -1
- }
- this.bytesRead += buf.length
- try {
- if (request.onData(buf) === false) {
- return constants.ERROR.PAUSED
- }
- } catch (err) {
- util.destroy(socket, err)
- return -1
- }
- }
- onMessageComplete () {
- const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
- if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
- return -1
- }
- if (upgrade) {
- return
- }
- const request = client[kQueue][client[kRunningIdx]]
- assert(request)
- assert(statusCode >= 100)
- this.statusCode = null
- this.statusText = ''
- this.bytesRead = 0
- this.contentLength = ''
- this.keepAlive = ''
- this.connection = ''
- assert(this.headers.length % 2 === 0)
- this.headers = []
- this.headersSize = 0
- if (statusCode < 200) {
- return
- }
- /* istanbul ignore next: should be handled by llhttp? */
- if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
- util.destroy(socket, new ResponseContentLengthMismatchError())
- return -1
- }
- try {
- request.onComplete(headers)
- } catch (err) {
- errorRequest(client, request, err)
- }
- client[kQueue][client[kRunningIdx]++] = null
- if (socket[kWriting]) {
- assert.strictEqual(client[kRunning], 0)
- // Response completed before request.
- util.destroy(socket, new InformationalError('reset'))
- return constants.ERROR.PAUSED
- } else if (!shouldKeepAlive) {
- util.destroy(socket, new InformationalError('reset'))
- return constants.ERROR.PAUSED
- } else if (socket[kReset] && client[kRunning] === 0) {
- // Destroy socket once all requests have completed.
- // The request at the tail of the pipeline is the one
- // that requested reset and no further requests should
- // have been queued since then.
- util.destroy(socket, new InformationalError('reset'))
- return constants.ERROR.PAUSED
- } else if (client[kPipelining] === 1) {
- // We must wait a full event loop cycle to reuse this socket to make sure
- // that non-spec compliant servers are not closing the connection even if they
- // said they won't.
- setImmediate(resume, client)
- } else {
- resume(client)
- }
- }
- }
- function onParserTimeout (parser) {
- const { socket, timeoutType, client } = parser
- /* istanbul ignore else */
- if (timeoutType === TIMEOUT_HEADERS) {
- if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
- assert(!parser.paused, 'cannot be paused while waiting for headers')
- util.destroy(socket, new HeadersTimeoutError())
- }
- } else if (timeoutType === TIMEOUT_BODY) {
- if (!parser.paused) {
- util.destroy(socket, new BodyTimeoutError())
- }
- } else if (timeoutType === TIMEOUT_IDLE) {
- assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
- util.destroy(socket, new InformationalError('socket idle timeout'))
- }
- }
- function onSocketReadable () {
- const { [kParser]: parser } = this
- parser.readMore()
- }
- function onSocketError (err) {
- const { [kParser]: parser } = this
- assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
- // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
- // to the user.
- if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
- // We treat all incoming data so for as a valid response.
- parser.onMessageComplete()
- return
- }
- this[kError] = err
- onError(this[kClient], err)
- }
- function onError (client, err) {
- if (
- client[kRunning] === 0 &&
- err.code !== 'UND_ERR_INFO' &&
- err.code !== 'UND_ERR_SOCKET'
- ) {
- // Error is not caused by running request and not a recoverable
- // socket error.
- assert(client[kPendingIdx] === client[kRunningIdx])
- const requests = client[kQueue].splice(client[kRunningIdx])
- for (let i = 0; i < requests.length; i++) {
- const request = requests[i]
- errorRequest(client, request, err)
- }
- assert(client[kSize] === 0)
- }
- }
- function onSocketEnd () {
- const { [kParser]: parser } = this
- if (parser.statusCode && !parser.shouldKeepAlive) {
- // We treat all incoming data so far as a valid response.
- parser.onMessageComplete()
- return
- }
- util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
- }
- function onSocketClose () {
- const { [kClient]: client } = this
- if (!this[kError] && this[kParser].statusCode && !this[kParser].shouldKeepAlive) {
- // We treat all incoming data so far as a valid response.
- this[kParser].onMessageComplete()
- }
- this[kParser].destroy()
- this[kParser] = null
- const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
- client[kSocket] = null
- if (client.destroyed) {
- assert(client[kPending] === 0)
- // Fail entire queue.
- const requests = client[kQueue].splice(client[kRunningIdx])
- for (let i = 0; i < requests.length; i++) {
- const request = requests[i]
- errorRequest(client, request, err)
- }
- } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
- // Fail head of pipeline.
- const request = client[kQueue][client[kRunningIdx]]
- client[kQueue][client[kRunningIdx]++] = null
- errorRequest(client, request, err)
- }
- client[kPendingIdx] = client[kRunningIdx]
- assert(client[kRunning] === 0)
- client.emit('disconnect', client[kUrl], [client], err)
- resume(client)
- }
- async function connect (client) {
- assert(!client[kConnecting])
- assert(!client[kSocket])
- let { host, hostname, protocol, port } = client[kUrl]
- // Resolve ipv6
- if (hostname[0] === '[') {
- const idx = hostname.indexOf(']')
- assert(idx !== -1)
- const ip = hostname.substr(1, idx - 1)
- assert(net.isIP(ip))
- hostname = ip
- }
- client[kConnecting] = true
- if (channels.beforeConnect.hasSubscribers) {
- channels.beforeConnect.publish({
- connectParams: {
- host,
- hostname,
- protocol,
- port,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- },
- connector: client[kConnector]
- })
- }
- try {
- const socket = await new Promise((resolve, reject) => {
- client[kConnector]({
- host,
- hostname,
- protocol,
- port,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- }, (err, socket) => {
- if (err) {
- reject(err)
- } else {
- resolve(socket)
- }
- })
- })
- if (client.destroyed) {
- util.destroy(socket.on('error', () => {}), new ClientDestroyedError())
- return
- }
- if (!llhttpInstance) {
- llhttpInstance = await llhttpPromise
- llhttpPromise = null
- }
- client[kConnecting] = false
- assert(socket)
- socket[kNoRef] = false
- socket[kWriting] = false
- socket[kReset] = false
- socket[kBlocking] = false
- socket[kError] = null
- socket[kParser] = new Parser(client, socket, llhttpInstance)
- socket[kClient] = client
- socket[kCounter] = 0
- socket[kMaxRequests] = client[kMaxRequests]
- socket
- .on('error', onSocketError)
- .on('readable', onSocketReadable)
- .on('end', onSocketEnd)
- .on('close', onSocketClose)
- client[kSocket] = socket
- if (channels.connected.hasSubscribers) {
- channels.connected.publish({
- connectParams: {
- host,
- hostname,
- protocol,
- port,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- },
- connector: client[kConnector],
- socket
- })
- }
- client.emit('connect', client[kUrl], [client])
- } catch (err) {
- if (client.destroyed) {
- return
- }
- client[kConnecting] = false
- if (channels.connectError.hasSubscribers) {
- channels.connectError.publish({
- connectParams: {
- host,
- hostname,
- protocol,
- port,
- servername: client[kServerName],
- localAddress: client[kLocalAddress]
- },
- connector: client[kConnector],
- error: err
- })
- }
- if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
- assert(client[kRunning] === 0)
- while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
- const request = client[kQueue][client[kPendingIdx]++]
- errorRequest(client, request, err)
- }
- } else {
- onError(client, err)
- }
- client.emit('connectionError', client[kUrl], [client], err)
- }
- resume(client)
- }
- function emitDrain (client) {
- client[kNeedDrain] = 0
- client.emit('drain', client[kUrl], [client])
- }
- function resume (client, sync) {
- if (client[kResuming] === 2) {
- return
- }
- client[kResuming] = 2
- _resume(client, sync)
- client[kResuming] = 0
- if (client[kRunningIdx] > 256) {
- client[kQueue].splice(0, client[kRunningIdx])
- client[kPendingIdx] -= client[kRunningIdx]
- client[kRunningIdx] = 0
- }
- }
- function _resume (client, sync) {
- while (true) {
- if (client.destroyed) {
- assert(client[kPending] === 0)
- return
- }
- if (client[kClosedResolve] && !client[kSize]) {
- client[kClosedResolve]()
- client[kClosedResolve] = null
- return
- }
- const socket = client[kSocket]
- if (socket && !socket.destroyed) {
- if (client[kSize] === 0) {
- if (!socket[kNoRef] && socket.unref) {
- socket.unref()
- socket[kNoRef] = true
- }
- } else if (socket[kNoRef] && socket.ref) {
- socket.ref()
- socket[kNoRef] = false
- }
- if (client[kSize] === 0) {
- if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
- socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
- }
- } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
- if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
- const request = client[kQueue][client[kRunningIdx]]
- const headersTimeout = request.headersTimeout != null
- ? request.headersTimeout
- : client[kHeadersTimeout]
- socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
- }
- }
- }
- if (client[kBusy]) {
- client[kNeedDrain] = 2
- } else if (client[kNeedDrain] === 2) {
- if (sync) {
- client[kNeedDrain] = 1
- process.nextTick(emitDrain, client)
- } else {
- emitDrain(client)
- }
- continue
- }
- if (client[kPending] === 0) {
- return
- }
- if (client[kRunning] >= (client[kPipelining] || 1)) {
- return
- }
- const request = client[kQueue][client[kPendingIdx]]
- if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
- if (client[kRunning] > 0) {
- return
- }
- client[kServerName] = request.servername
- if (socket && socket.servername !== request.servername) {
- util.destroy(socket, new InformationalError('servername changed'))
- return
- }
- }
- if (client[kConnecting]) {
- return
- }
- if (!socket) {
- connect(client)
- return
- }
- if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
- return
- }
- if (client[kRunning] > 0 && !request.idempotent) {
- // Non-idempotent request cannot be retried.
- // Ensure that no other requests are inflight and
- // could cause failure.
- return
- }
- if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
- // Don't dispatch an upgrade until all preceding requests have completed.
- // A misbehaving server might upgrade the connection before all pipelined
- // request has completed.
- return
- }
- if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
- request.body
- .on('data', /* istanbul ignore next */ function () {
- /* istanbul ignore next */
- assert(false)
- })
- .on('error', function (err) {
- errorRequest(client, request, err)
- })
- .on('end', function () {
- util.destroy(this)
- })
- request.body = null
- }
- if (client[kRunning] > 0 &&
- (util.isStream(request.body) || util.isAsyncIterable(request.body))) {
- // Request with stream or iterator body can error while other requests
- // are inflight and indirectly error those as well.
- // Ensure this doesn't happen by waiting for inflight
- // to complete before dispatching.
- // Request with stream or iterator body cannot be retried.
- // Ensure that no other requests are inflight and
- // could cause failure.
- return
- }
- if (!request.aborted && write(client, request)) {
- client[kPendingIdx]++
- } else {
- client[kQueue].splice(client[kPendingIdx], 1)
- }
- }
- }
- function write (client, request) {
- const { body, method, path, host, upgrade, headers, blocking, reset } = request
- // https://tools.ietf.org/html/rfc7231#section-4.3.1
- // https://tools.ietf.org/html/rfc7231#section-4.3.2
- // https://tools.ietf.org/html/rfc7231#section-4.3.5
- // Sending a payload body on a request that does not
- // expect it can cause undefined behavior on some
- // servers and corrupt connection state. Do not
- // re-use the connection for further requests.
- const expectsPayload = (
- method === 'PUT' ||
- method === 'POST' ||
- method === 'PATCH'
- )
- if (body && typeof body.read === 'function') {
- // Try to read EOF in order to get length.
- body.read(0)
- }
- let contentLength = util.bodyLength(body)
- if (contentLength === null) {
- contentLength = request.contentLength
- }
- if (contentLength === 0 && !expectsPayload) {
- // https://tools.ietf.org/html/rfc7230#section-3.3.2
- // A user agent SHOULD NOT send a Content-Length header field when
- // the request message does not contain a payload body and the method
- // semantics do not anticipate such a body.
- contentLength = null
- }
- if (request.contentLength !== null && request.contentLength !== contentLength) {
- if (client[kStrictContentLength]) {
- errorRequest(client, request, new RequestContentLengthMismatchError())
- return false
- }
- process.emitWarning(new RequestContentLengthMismatchError())
- }
- const socket = client[kSocket]
- try {
- request.onConnect((err) => {
- if (request.aborted || request.completed) {
- return
- }
- errorRequest(client, request, err || new RequestAbortedError())
- util.destroy(socket, new InformationalError('aborted'))
- })
- } catch (err) {
- errorRequest(client, request, err)
- }
- if (request.aborted) {
- return false
- }
- if (method === 'HEAD') {
- // https://github.com/mcollina/undici/issues/258
- // Close after a HEAD request to interop with misbehaving servers
- // that may send a body in the response.
- socket[kReset] = true
- }
- if (upgrade || method === 'CONNECT') {
- // On CONNECT or upgrade, block pipeline from dispatching further
- // requests on this connection.
- socket[kReset] = true
- }
- if (reset != null) {
- socket[kReset] = reset
- }
- if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
- socket[kReset] = true
- }
- if (blocking) {
- socket[kBlocking] = true
- }
- let header = `${method} ${path} HTTP/1.1\r\n`
- if (typeof host === 'string') {
- header += `host: ${host}\r\n`
- } else {
- header += client[kHostHeader]
- }
- if (upgrade) {
- header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
- } else if (client[kPipelining] && !socket[kReset]) {
- header += 'connection: keep-alive\r\n'
- } else {
- header += 'connection: close\r\n'
- }
- if (headers) {
- header += headers
- }
- if (channels.sendHeaders.hasSubscribers) {
- channels.sendHeaders.publish({ request, headers: header, socket })
- }
- /* istanbul ignore else: assertion */
- if (!body) {
- if (contentLength === 0) {
- socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
- } else {
- assert(contentLength === null, 'no body must not have content length')
- socket.write(`${header}\r\n`, 'latin1')
- }
- request.onRequestSent()
- } else if (util.isBuffer(body)) {
- assert(contentLength === body.byteLength, 'buffer body must have content length')
- socket.cork()
- socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
- socket.write(body)
- socket.uncork()
- request.onBodySent(body)
- request.onRequestSent()
- if (!expectsPayload) {
- socket[kReset] = true
- }
- } else if (util.isBlobLike(body)) {
- if (typeof body.stream === 'function') {
- writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
- } else {
- writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
- }
- } else if (util.isStream(body)) {
- writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
- } else if (util.isIterable(body)) {
- writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
- } else {
- assert(false)
- }
- return true
- }
- function writeStream ({ body, client, request, socket, contentLength, header, expectsPayload }) {
- assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
- let finished = false
- const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
- const onData = function (chunk) {
- if (finished) {
- return
- }
- try {
- if (!writer.write(chunk) && this.pause) {
- this.pause()
- }
- } catch (err) {
- util.destroy(this, err)
- }
- }
- const onDrain = function () {
- if (finished) {
- return
- }
- if (body.resume) {
- body.resume()
- }
- }
- const onAbort = function () {
- onFinished(new RequestAbortedError())
- }
- const onFinished = function (err) {
- if (finished) {
- return
- }
- finished = true
- assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
- socket
- .off('drain', onDrain)
- .off('error', onFinished)
- body
- .removeListener('data', onData)
- .removeListener('end', onFinished)
- .removeListener('error', onFinished)
- .removeListener('close', onAbort)
- if (!err) {
- try {
- writer.end()
- } catch (er) {
- err = er
- }
- }
- writer.destroy(err)
- if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
- util.destroy(body, err)
- } else {
- util.destroy(body)
- }
- }
- body
- .on('data', onData)
- .on('end', onFinished)
- .on('error', onFinished)
- .on('close', onAbort)
- if (body.resume) {
- body.resume()
- }
- socket
- .on('drain', onDrain)
- .on('error', onFinished)
- }
- async function writeBlob ({ body, client, request, socket, contentLength, header, expectsPayload }) {
- assert(contentLength === body.size, 'blob body must have content length')
- try {
- if (contentLength != null && contentLength !== body.size) {
- throw new RequestContentLengthMismatchError()
- }
- const buffer = Buffer.from(await body.arrayBuffer())
- socket.cork()
- socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
- socket.write(buffer)
- socket.uncork()
- request.onBodySent(buffer)
- request.onRequestSent()
- if (!expectsPayload) {
- socket[kReset] = true
- }
- resume(client)
- } catch (err) {
- util.destroy(socket, err)
- }
- }
- async function writeIterable ({ body, client, request, socket, contentLength, header, expectsPayload }) {
- assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
- let callback = null
- function onDrain () {
- if (callback) {
- const cb = callback
- callback = null
- cb()
- }
- }
- const waitForDrain = () => new Promise((resolve, reject) => {
- assert(callback === null)
- if (socket[kError]) {
- reject(socket[kError])
- } else {
- callback = resolve
- }
- })
- socket
- .on('close', onDrain)
- .on('drain', onDrain)
- const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
- try {
- // It's up to the user to somehow abort the async iterable.
- for await (const chunk of body) {
- if (socket[kError]) {
- throw socket[kError]
- }
- if (!writer.write(chunk)) {
- await waitForDrain()
- }
- }
- writer.end()
- } catch (err) {
- writer.destroy(err)
- } finally {
- socket
- .off('close', onDrain)
- .off('drain', onDrain)
- }
- }
- class AsyncWriter {
- constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
- this.socket = socket
- this.request = request
- this.contentLength = contentLength
- this.client = client
- this.bytesWritten = 0
- this.expectsPayload = expectsPayload
- this.header = header
- socket[kWriting] = true
- }
- write (chunk) {
- const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
- if (socket[kError]) {
- throw socket[kError]
- }
- if (socket.destroyed) {
- return false
- }
- const len = Buffer.byteLength(chunk)
- if (!len) {
- return true
- }
- // We should defer writing chunks.
- if (contentLength !== null && bytesWritten + len > contentLength) {
- if (client[kStrictContentLength]) {
- throw new RequestContentLengthMismatchError()
- }
- process.emitWarning(new RequestContentLengthMismatchError())
- }
- socket.cork()
- if (bytesWritten === 0) {
- if (!expectsPayload) {
- socket[kReset] = true
- }
- if (contentLength === null) {
- socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
- } else {
- socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
- }
- }
- if (contentLength === null) {
- socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
- }
- this.bytesWritten += len
- const ret = socket.write(chunk)
- socket.uncork()
- request.onBodySent(chunk)
- if (!ret) {
- if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
- // istanbul ignore else: only for jest
- if (socket[kParser].timeout.refresh) {
- socket[kParser].timeout.refresh()
- }
- }
- }
- return ret
- }
- end () {
- const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
- request.onRequestSent()
- socket[kWriting] = false
- if (socket[kError]) {
- throw socket[kError]
- }
- if (socket.destroyed) {
- return
- }
- if (bytesWritten === 0) {
- if (expectsPayload) {
- // https://tools.ietf.org/html/rfc7230#section-3.3.2
- // A user agent SHOULD send a Content-Length in a request message when
- // no Transfer-Encoding is sent and the request method defines a meaning
- // for an enclosed payload body.
- socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
- } else {
- socket.write(`${header}\r\n`, 'latin1')
- }
- } else if (contentLength === null) {
- socket.write('\r\n0\r\n\r\n', 'latin1')
- }
- if (contentLength !== null && bytesWritten !== contentLength) {
- if (client[kStrictContentLength]) {
- throw new RequestContentLengthMismatchError()
- } else {
- process.emitWarning(new RequestContentLengthMismatchError())
- }
- }
- if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
- // istanbul ignore else: only for jest
- if (socket[kParser].timeout.refresh) {
- socket[kParser].timeout.refresh()
- }
- }
- resume(client)
- }
- destroy (err) {
- const { socket, client } = this
- socket[kWriting] = false
- if (err) {
- assert(client[kRunning] <= 1, 'pipeline should only contain this request')
- util.destroy(socket, err)
- }
- }
- }
- function errorRequest (client, request, err) {
- try {
- request.onError(err)
- assert(request.aborted)
- } catch (err) {
- client.emit('error', err)
- }
- }
- module.exports = Client
|