client.js 48 KB


  1. // @ts-check
  2. 'use strict'
  3. /* global WebAssembly */
  4. const assert = require('assert')
  5. const net = require('net')
  6. const util = require('./core/util')
  7. const timers = require('./timers')
  8. const Request = require('./core/request')
  9. const DispatcherBase = require('./dispatcher-base')
  10. const {
  11. RequestContentLengthMismatchError,
  12. ResponseContentLengthMismatchError,
  13. InvalidArgumentError,
  14. RequestAbortedError,
  15. HeadersTimeoutError,
  16. HeadersOverflowError,
  17. SocketError,
  18. InformationalError,
  19. BodyTimeoutError,
  20. HTTPParserError,
  21. ResponseExceededMaxSizeError,
  22. ClientDestroyedError
  23. } = require('./core/errors')
  24. const buildConnector = require('./core/connect')
  25. const {
  26. kUrl,
  27. kReset,
  28. kServerName,
  29. kClient,
  30. kBusy,
  31. kParser,
  32. kConnect,
  33. kBlocking,
  34. kResuming,
  35. kRunning,
  36. kPending,
  37. kSize,
  38. kWriting,
  39. kQueue,
  40. kConnected,
  41. kConnecting,
  42. kNeedDrain,
  43. kNoRef,
  44. kKeepAliveDefaultTimeout,
  45. kHostHeader,
  46. kPendingIdx,
  47. kRunningIdx,
  48. kError,
  49. kPipelining,
  50. kSocket,
  51. kKeepAliveTimeoutValue,
  52. kMaxHeadersSize,
  53. kKeepAliveMaxTimeout,
  54. kKeepAliveTimeoutThreshold,
  55. kHeadersTimeout,
  56. kBodyTimeout,
  57. kStrictContentLength,
  58. kConnector,
  59. kMaxRedirections,
  60. kMaxRequests,
  61. kCounter,
  62. kClose,
  63. kDestroy,
  64. kDispatch,
  65. kInterceptors,
  66. kLocalAddress,
  67. kMaxResponseSize
  68. } = require('./core/symbols')
  69. const FastBuffer = Buffer[Symbol.species]
  70. const kClosedResolve = Symbol('kClosedResolve')
  71. const channels = {}
  72. try {
  73. const diagnosticsChannel = require('diagnostics_channel')
  74. channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders')
  75. channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect')
  76. channels.connectError = diagnosticsChannel.channel('undici:client:connectError')
  77. channels.connected = diagnosticsChannel.channel('undici:client:connected')
  78. } catch {
  79. channels.sendHeaders = { hasSubscribers: false }
  80. channels.beforeConnect = { hasSubscribers: false }
  81. channels.connectError = { hasSubscribers: false }
  82. channels.connected = { hasSubscribers: false }
  83. }
  84. /**
  85. * @type {import('../types/client').default}
  86. */
  87. class Client extends DispatcherBase {
  88. /**
  89. *
  90. * @param {string|URL} url
  91. * @param {import('../types/client').Client.Options} options
  92. */
  93. constructor (url, {
  94. interceptors,
  95. maxHeaderSize,
  96. headersTimeout,
  97. socketTimeout,
  98. requestTimeout,
  99. connectTimeout,
  100. bodyTimeout,
  101. idleTimeout,
  102. keepAlive,
  103. keepAliveTimeout,
  104. maxKeepAliveTimeout,
  105. keepAliveMaxTimeout,
  106. keepAliveTimeoutThreshold,
  107. socketPath,
  108. pipelining,
  109. tls,
  110. strictContentLength,
  111. maxCachedSessions,
  112. maxRedirections,
  113. connect,
  114. maxRequestsPerClient,
  115. localAddress,
  116. maxResponseSize,
  117. autoSelectFamily,
  118. autoSelectFamilyAttemptTimeout
  119. } = {}) {
  120. super()
  121. if (keepAlive !== undefined) {
  122. throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
  123. }
  124. if (socketTimeout !== undefined) {
  125. throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
  126. }
  127. if (requestTimeout !== undefined) {
  128. throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
  129. }
  130. if (idleTimeout !== undefined) {
  131. throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
  132. }
  133. if (maxKeepAliveTimeout !== undefined) {
  134. throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
  135. }
  136. if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
  137. throw new InvalidArgumentError('invalid maxHeaderSize')
  138. }
  139. if (socketPath != null && typeof socketPath !== 'string') {
  140. throw new InvalidArgumentError('invalid socketPath')
  141. }
  142. if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
  143. throw new InvalidArgumentError('invalid connectTimeout')
  144. }
  145. if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
  146. throw new InvalidArgumentError('invalid keepAliveTimeout')
  147. }
  148. if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
  149. throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
  150. }
  151. if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
  152. throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
  153. }
  154. if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
  155. throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
  156. }
  157. if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
  158. throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
  159. }
  160. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  161. throw new InvalidArgumentError('connect must be a function or an object')
  162. }
  163. if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
  164. throw new InvalidArgumentError('maxRedirections must be a positive number')
  165. }
  166. if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
  167. throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
  168. }
  169. if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
  170. throw new InvalidArgumentError('localAddress must be valid string IP address')
  171. }
  172. if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
  173. throw new InvalidArgumentError('maxResponseSize must be a positive number')
  174. }
  175. if (
  176. autoSelectFamilyAttemptTimeout != null &&
  177. (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
  178. ) {
  179. throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
  180. }
  181. if (typeof connect !== 'function') {
  182. connect = buildConnector({
  183. ...tls,
  184. maxCachedSessions,
  185. socketPath,
  186. timeout: connectTimeout,
  187. ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
  188. ...connect
  189. })
  190. }
  191. this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
  192. ? interceptors.Client
  193. : [createRedirectInterceptor({ maxRedirections })]
  194. this[kUrl] = util.parseOrigin(url)
  195. this[kConnector] = connect
  196. this[kSocket] = null
  197. this[kPipelining] = pipelining != null ? pipelining : 1
  198. this[kMaxHeadersSize] = maxHeaderSize || 16384
  199. this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
  200. this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
  201. this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
  202. this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
  203. this[kServerName] = null
  204. this[kLocalAddress] = localAddress != null ? localAddress : null
  205. this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
  206. this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
  207. this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
  208. this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
  209. this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
  210. this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
  211. this[kMaxRedirections] = maxRedirections
  212. this[kMaxRequests] = maxRequestsPerClient
  213. this[kClosedResolve] = null
  214. this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
  215. // kQueue is built up of 3 sections separated by
  216. // the kRunningIdx and kPendingIdx indices.
  217. // | complete | running | pending |
  218. // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
  219. // kRunningIdx points to the first running element.
  220. // kPendingIdx points to the first pending element.
  221. // This implements a fast queue with an amortized
  222. // time of O(1).
  223. this[kQueue] = []
  224. this[kRunningIdx] = 0
  225. this[kPendingIdx] = 0
  226. }
  227. get pipelining () {
  228. return this[kPipelining]
  229. }
  230. set pipelining (value) {
  231. this[kPipelining] = value
  232. resume(this, true)
  233. }
  234. get [kPending] () {
  235. return this[kQueue].length - this[kPendingIdx]
  236. }
  237. get [kRunning] () {
  238. return this[kPendingIdx] - this[kRunningIdx]
  239. }
  240. get [kSize] () {
  241. return this[kQueue].length - this[kRunningIdx]
  242. }
  243. get [kConnected] () {
  244. return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
  245. }
  246. get [kBusy] () {
  247. const socket = this[kSocket]
  248. return (
  249. (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
  250. (this[kSize] >= (this[kPipelining] || 1)) ||
  251. this[kPending] > 0
  252. )
  253. }
  254. /* istanbul ignore: only used for test */
  255. [kConnect] (cb) {
  256. connect(this)
  257. this.once('connect', cb)
  258. }
  259. [kDispatch] (opts, handler) {
  260. const origin = opts.origin || this[kUrl].origin
  261. const request = new Request(origin, opts, handler)
  262. this[kQueue].push(request)
  263. if (this[kResuming]) {
  264. // Do nothing.
  265. } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
  266. // Wait a tick in case stream/iterator is ended in the same tick.
  267. this[kResuming] = 1
  268. process.nextTick(resume, this)
  269. } else {
  270. resume(this, true)
  271. }
  272. if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
  273. this[kNeedDrain] = 2
  274. }
  275. return this[kNeedDrain] < 2
  276. }
  277. async [kClose] () {
  278. return new Promise((resolve) => {
  279. if (!this[kSize]) {
  280. resolve(null)
  281. } else {
  282. this[kClosedResolve] = resolve
  283. }
  284. })
  285. }
  286. async [kDestroy] (err) {
  287. return new Promise((resolve) => {
  288. const requests = this[kQueue].splice(this[kPendingIdx])
  289. for (let i = 0; i < requests.length; i++) {
  290. const request = requests[i]
  291. errorRequest(this, request, err)
  292. }
  293. const callback = () => {
  294. if (this[kClosedResolve]) {
  295. // TODO (fix): Should we error here with ClientDestroyedError?
  296. this[kClosedResolve]()
  297. this[kClosedResolve] = null
  298. }
  299. resolve()
  300. }
  301. if (!this[kSocket]) {
  302. queueMicrotask(callback)
  303. } else {
  304. util.destroy(this[kSocket].on('close', callback), err)
  305. }
  306. resume(this)
  307. })
  308. }
  309. }
  310. const constants = require('./llhttp/constants')
  311. const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
  312. const EMPTY_BUF = Buffer.alloc(0)
  313. async function lazyllhttp () {
  314. const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined
  315. let mod
  316. try {
  317. mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64'))
  318. } catch (e) {
  319. /* istanbul ignore next */
  320. // We could check if the error was caused by the simd option not
  321. // being enabled, but the occurring of this other error
  322. // * https://github.com/emscripten-core/emscripten/issues/11495
  323. // got me to remove that check to avoid breaking Node 12.
  324. mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64'))
  325. }
  326. return await WebAssembly.instantiate(mod, {
  327. env: {
  328. /* eslint-disable camelcase */
  329. wasm_on_url: (p, at, len) => {
  330. /* istanbul ignore next */
  331. return 0
  332. },
  333. wasm_on_status: (p, at, len) => {
  334. assert.strictEqual(currentParser.ptr, p)
  335. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  336. return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  337. },
  338. wasm_on_message_begin: (p) => {
  339. assert.strictEqual(currentParser.ptr, p)
  340. return currentParser.onMessageBegin() || 0
  341. },
  342. wasm_on_header_field: (p, at, len) => {
  343. assert.strictEqual(currentParser.ptr, p)
  344. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  345. return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  346. },
  347. wasm_on_header_value: (p, at, len) => {
  348. assert.strictEqual(currentParser.ptr, p)
  349. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  350. return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  351. },
  352. wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
  353. assert.strictEqual(currentParser.ptr, p)
  354. return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
  355. },
  356. wasm_on_body: (p, at, len) => {
  357. assert.strictEqual(currentParser.ptr, p)
  358. const start = at - currentBufferPtr + currentBufferRef.byteOffset
  359. return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
  360. },
  361. wasm_on_message_complete: (p) => {
  362. assert.strictEqual(currentParser.ptr, p)
  363. return currentParser.onMessageComplete() || 0
  364. }
  365. /* eslint-enable camelcase */
  366. }
  367. })
  368. }
  369. let llhttpInstance = null
  370. let llhttpPromise = lazyllhttp()
  371. llhttpPromise.catch()
  372. let currentParser = null
  373. let currentBufferRef = null
  374. let currentBufferSize = 0
  375. let currentBufferPtr = null
  376. const TIMEOUT_HEADERS = 1
  377. const TIMEOUT_BODY = 2
  378. const TIMEOUT_IDLE = 3
  379. class Parser {
  380. constructor (client, socket, { exports }) {
  381. assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
  382. this.llhttp = exports
  383. this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
  384. this.client = client
  385. this.socket = socket
  386. this.timeout = null
  387. this.timeoutValue = null
  388. this.timeoutType = null
  389. this.statusCode = null
  390. this.statusText = ''
  391. this.upgrade = false
  392. this.headers = []
  393. this.headersSize = 0
  394. this.headersMaxSize = client[kMaxHeadersSize]
  395. this.shouldKeepAlive = false
  396. this.paused = false
  397. this.resume = this.resume.bind(this)
  398. this.bytesRead = 0
  399. this.keepAlive = ''
  400. this.contentLength = ''
  401. this.connection = ''
  402. this.maxResponseSize = client[kMaxResponseSize]
  403. }
  404. setTimeout (value, type) {
  405. this.timeoutType = type
  406. if (value !== this.timeoutValue) {
  407. timers.clearTimeout(this.timeout)
  408. if (value) {
  409. this.timeout = timers.setTimeout(onParserTimeout, value, this)
  410. // istanbul ignore else: only for jest
  411. if (this.timeout.unref) {
  412. this.timeout.unref()
  413. }
  414. } else {
  415. this.timeout = null
  416. }
  417. this.timeoutValue = value
  418. } else if (this.timeout) {
  419. // istanbul ignore else: only for jest
  420. if (this.timeout.refresh) {
  421. this.timeout.refresh()
  422. }
  423. }
  424. }
  425. resume () {
  426. if (this.socket.destroyed || !this.paused) {
  427. return
  428. }
  429. assert(this.ptr != null)
  430. assert(currentParser == null)
  431. this.llhttp.llhttp_resume(this.ptr)
  432. assert(this.timeoutType === TIMEOUT_BODY)
  433. if (this.timeout) {
  434. // istanbul ignore else: only for jest
  435. if (this.timeout.refresh) {
  436. this.timeout.refresh()
  437. }
  438. }
  439. this.paused = false
  440. this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
  441. this.readMore()
  442. }
  443. readMore () {
  444. while (!this.paused && this.ptr) {
  445. const chunk = this.socket.read()
  446. if (chunk === null) {
  447. break
  448. }
  449. this.execute(chunk)
  450. }
  451. }
  452. execute (data) {
  453. assert(this.ptr != null)
  454. assert(currentParser == null)
  455. assert(!this.paused)
  456. const { socket, llhttp } = this
  457. if (data.length > currentBufferSize) {
  458. if (currentBufferPtr) {
  459. llhttp.free(currentBufferPtr)
  460. }
  461. currentBufferSize = Math.ceil(data.length / 4096) * 4096
  462. currentBufferPtr = llhttp.malloc(currentBufferSize)
  463. }
  464. new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
  465. // Call `execute` on the wasm parser.
  466. // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
  467. // and finally the length of bytes to parse.
  468. // The return value is an error code or `constants.ERROR.OK`.
  469. try {
  470. let ret
  471. try {
  472. currentBufferRef = data
  473. currentParser = this
  474. ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
  475. /* eslint-disable-next-line no-useless-catch */
  476. } catch (err) {
  477. /* istanbul ignore next: difficult to make a test case for */
  478. throw err
  479. } finally {
  480. currentParser = null
  481. currentBufferRef = null
  482. }
  483. const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
  484. if (ret === constants.ERROR.PAUSED_UPGRADE) {
  485. this.onUpgrade(data.slice(offset))
  486. } else if (ret === constants.ERROR.PAUSED) {
  487. this.paused = true
  488. socket.unshift(data.slice(offset))
  489. } else if (ret !== constants.ERROR.OK) {
  490. const ptr = llhttp.llhttp_get_error_reason(this.ptr)
  491. let message = ''
  492. /* istanbul ignore else: difficult to make a test case for */
  493. if (ptr) {
  494. const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
  495. message =
  496. 'Response does not match the HTTP/1.1 protocol (' +
  497. Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
  498. ')'
  499. }
  500. throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
  501. }
  502. } catch (err) {
  503. util.destroy(socket, err)
  504. }
  505. }
  506. destroy () {
  507. assert(this.ptr != null)
  508. assert(currentParser == null)
  509. this.llhttp.llhttp_free(this.ptr)
  510. this.ptr = null
  511. timers.clearTimeout(this.timeout)
  512. this.timeout = null
  513. this.timeoutValue = null
  514. this.timeoutType = null
  515. this.paused = false
  516. }
  517. onStatus (buf) {
  518. this.statusText = buf.toString()
  519. }
  520. onMessageBegin () {
  521. const { socket, client } = this
  522. /* istanbul ignore next: difficult to make a test case for */
  523. if (socket.destroyed) {
  524. return -1
  525. }
  526. const request = client[kQueue][client[kRunningIdx]]
  527. if (!request) {
  528. return -1
  529. }
  530. }
  531. onHeaderField (buf) {
  532. const len = this.headers.length
  533. if ((len & 1) === 0) {
  534. this.headers.push(buf)
  535. } else {
  536. this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
  537. }
  538. this.trackHeader(buf.length)
  539. }
  540. onHeaderValue (buf) {
  541. let len = this.headers.length
  542. if ((len & 1) === 1) {
  543. this.headers.push(buf)
  544. len += 1
  545. } else {
  546. this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
  547. }
  548. const key = this.headers[len - 2]
  549. if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') {
  550. this.keepAlive += buf.toString()
  551. } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') {
  552. this.connection += buf.toString()
  553. } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') {
  554. this.contentLength += buf.toString()
  555. }
  556. this.trackHeader(buf.length)
  557. }
  558. trackHeader (len) {
  559. this.headersSize += len
  560. if (this.headersSize >= this.headersMaxSize) {
  561. util.destroy(this.socket, new HeadersOverflowError())
  562. }
  563. }
  564. onUpgrade (head) {
  565. const { upgrade, client, socket, headers, statusCode } = this
  566. assert(upgrade)
  567. const request = client[kQueue][client[kRunningIdx]]
  568. assert(request)
  569. assert(!socket.destroyed)
  570. assert(socket === client[kSocket])
  571. assert(!this.paused)
  572. assert(request.upgrade || request.method === 'CONNECT')
  573. this.statusCode = null
  574. this.statusText = ''
  575. this.shouldKeepAlive = null
  576. assert(this.headers.length % 2 === 0)
  577. this.headers = []
  578. this.headersSize = 0
  579. socket.unshift(head)
  580. socket[kParser].destroy()
  581. socket[kParser] = null
  582. socket[kClient] = null
  583. socket[kError] = null
  584. socket
  585. .removeListener('error', onSocketError)
  586. .removeListener('readable', onSocketReadable)
  587. .removeListener('end', onSocketEnd)
  588. .removeListener('close', onSocketClose)
  589. client[kSocket] = null
  590. client[kQueue][client[kRunningIdx]++] = null
  591. client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
  592. try {
  593. request.onUpgrade(statusCode, headers, socket)
  594. } catch (err) {
  595. util.destroy(socket, err)
  596. }
  597. resume(client)
  598. }
  599. onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
  600. const { client, socket, headers, statusText } = this
  601. /* istanbul ignore next: difficult to make a test case for */
  602. if (socket.destroyed) {
  603. return -1
  604. }
  605. const request = client[kQueue][client[kRunningIdx]]
  606. /* istanbul ignore next: difficult to make a test case for */
  607. if (!request) {
  608. return -1
  609. }
  610. assert(!this.upgrade)
  611. assert(this.statusCode < 200)
  612. if (statusCode === 100) {
  613. util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
  614. return -1
  615. }
  616. /* this can only happen if server is misbehaving */
  617. if (upgrade && !request.upgrade) {
  618. util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
  619. return -1
  620. }
  621. assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)
  622. this.statusCode = statusCode
  623. this.shouldKeepAlive = (
  624. shouldKeepAlive ||
  625. // Override llhttp value which does not allow keepAlive for HEAD.
  626. (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
  627. )
  628. if (this.statusCode >= 200) {
  629. const bodyTimeout = request.bodyTimeout != null
  630. ? request.bodyTimeout
  631. : client[kBodyTimeout]
  632. this.setTimeout(bodyTimeout, TIMEOUT_BODY)
  633. } else if (this.timeout) {
  634. // istanbul ignore else: only for jest
  635. if (this.timeout.refresh) {
  636. this.timeout.refresh()
  637. }
  638. }
  639. if (request.method === 'CONNECT') {
  640. assert(client[kRunning] === 1)
  641. this.upgrade = true
  642. return 2
  643. }
  644. if (upgrade) {
  645. assert(client[kRunning] === 1)
  646. this.upgrade = true
  647. return 2
  648. }
  649. assert(this.headers.length % 2 === 0)
  650. this.headers = []
  651. this.headersSize = 0
  652. if (this.shouldKeepAlive && client[kPipelining]) {
  653. const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
  654. if (keepAliveTimeout != null) {
  655. const timeout = Math.min(
  656. keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
  657. client[kKeepAliveMaxTimeout]
  658. )
  659. if (timeout <= 0) {
  660. socket[kReset] = true
  661. } else {
  662. client[kKeepAliveTimeoutValue] = timeout
  663. }
  664. } else {
  665. client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
  666. }
  667. } else {
  668. // Stop more requests from being dispatched.
  669. socket[kReset] = true
  670. }
  671. let pause
  672. try {
  673. pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
  674. } catch (err) {
  675. util.destroy(socket, err)
  676. return -1
  677. }
  678. if (request.method === 'HEAD') {
  679. return 1
  680. }
  681. if (statusCode < 200) {
  682. return 1
  683. }
  684. if (socket[kBlocking]) {
  685. socket[kBlocking] = false
  686. resume(client)
  687. }
  688. return pause ? constants.ERROR.PAUSED : 0
  689. }
  690. onBody (buf) {
  691. const { client, socket, statusCode, maxResponseSize } = this
  692. if (socket.destroyed) {
  693. return -1
  694. }
  695. const request = client[kQueue][client[kRunningIdx]]
  696. assert(request)
  697. assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
  698. if (this.timeout) {
  699. // istanbul ignore else: only for jest
  700. if (this.timeout.refresh) {
  701. this.timeout.refresh()
  702. }
  703. }
  704. assert(statusCode >= 200)
  705. if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
  706. util.destroy(socket, new ResponseExceededMaxSizeError())
  707. return -1
  708. }
  709. this.bytesRead += buf.length
  710. try {
  711. if (request.onData(buf) === false) {
  712. return constants.ERROR.PAUSED
  713. }
  714. } catch (err) {
  715. util.destroy(socket, err)
  716. return -1
  717. }
  718. }
  719. onMessageComplete () {
  720. const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
  721. if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
  722. return -1
  723. }
  724. if (upgrade) {
  725. return
  726. }
  727. const request = client[kQueue][client[kRunningIdx]]
  728. assert(request)
  729. assert(statusCode >= 100)
  730. this.statusCode = null
  731. this.statusText = ''
  732. this.bytesRead = 0
  733. this.contentLength = ''
  734. this.keepAlive = ''
  735. this.connection = ''
  736. assert(this.headers.length % 2 === 0)
  737. this.headers = []
  738. this.headersSize = 0
  739. if (statusCode < 200) {
  740. return
  741. }
  742. /* istanbul ignore next: should be handled by llhttp? */
  743. if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
  744. util.destroy(socket, new ResponseContentLengthMismatchError())
  745. return -1
  746. }
  747. try {
  748. request.onComplete(headers)
  749. } catch (err) {
  750. errorRequest(client, request, err)
  751. }
  752. client[kQueue][client[kRunningIdx]++] = null
  753. if (socket[kWriting]) {
  754. assert.strictEqual(client[kRunning], 0)
  755. // Response completed before request.
  756. util.destroy(socket, new InformationalError('reset'))
  757. return constants.ERROR.PAUSED
  758. } else if (!shouldKeepAlive) {
  759. util.destroy(socket, new InformationalError('reset'))
  760. return constants.ERROR.PAUSED
  761. } else if (socket[kReset] && client[kRunning] === 0) {
  762. // Destroy socket once all requests have completed.
  763. // The request at the tail of the pipeline is the one
  764. // that requested reset and no further requests should
  765. // have been queued since then.
  766. util.destroy(socket, new InformationalError('reset'))
  767. return constants.ERROR.PAUSED
  768. } else if (client[kPipelining] === 1) {
  769. // We must wait a full event loop cycle to reuse this socket to make sure
  770. // that non-spec compliant servers are not closing the connection even if they
  771. // said they won't.
  772. setImmediate(resume, client)
  773. } else {
  774. resume(client)
  775. }
  776. }
  777. }
  778. function onParserTimeout (parser) {
  779. const { socket, timeoutType, client } = parser
  780. /* istanbul ignore else */
  781. if (timeoutType === TIMEOUT_HEADERS) {
  782. if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
  783. assert(!parser.paused, 'cannot be paused while waiting for headers')
  784. util.destroy(socket, new HeadersTimeoutError())
  785. }
  786. } else if (timeoutType === TIMEOUT_BODY) {
  787. if (!parser.paused) {
  788. util.destroy(socket, new BodyTimeoutError())
  789. }
  790. } else if (timeoutType === TIMEOUT_IDLE) {
  791. assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
  792. util.destroy(socket, new InformationalError('socket idle timeout'))
  793. }
  794. }
  795. function onSocketReadable () {
  796. const { [kParser]: parser } = this
  797. parser.readMore()
  798. }
  799. function onSocketError (err) {
  800. const { [kParser]: parser } = this
  801. assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
  802. // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
  803. // to the user.
  804. if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
  805. // We treat all incoming data so for as a valid response.
  806. parser.onMessageComplete()
  807. return
  808. }
  809. this[kError] = err
  810. onError(this[kClient], err)
  811. }
  812. function onError (client, err) {
  813. if (
  814. client[kRunning] === 0 &&
  815. err.code !== 'UND_ERR_INFO' &&
  816. err.code !== 'UND_ERR_SOCKET'
  817. ) {
  818. // Error is not caused by running request and not a recoverable
  819. // socket error.
  820. assert(client[kPendingIdx] === client[kRunningIdx])
  821. const requests = client[kQueue].splice(client[kRunningIdx])
  822. for (let i = 0; i < requests.length; i++) {
  823. const request = requests[i]
  824. errorRequest(client, request, err)
  825. }
  826. assert(client[kSize] === 0)
  827. }
  828. }
  829. function onSocketEnd () {
  830. const { [kParser]: parser } = this
  831. if (parser.statusCode && !parser.shouldKeepAlive) {
  832. // We treat all incoming data so far as a valid response.
  833. parser.onMessageComplete()
  834. return
  835. }
  836. util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
  837. }
  838. function onSocketClose () {
  839. const { [kClient]: client } = this
  840. if (!this[kError] && this[kParser].statusCode && !this[kParser].shouldKeepAlive) {
  841. // We treat all incoming data so far as a valid response.
  842. this[kParser].onMessageComplete()
  843. }
  844. this[kParser].destroy()
  845. this[kParser] = null
  846. const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
  847. client[kSocket] = null
  848. if (client.destroyed) {
  849. assert(client[kPending] === 0)
  850. // Fail entire queue.
  851. const requests = client[kQueue].splice(client[kRunningIdx])
  852. for (let i = 0; i < requests.length; i++) {
  853. const request = requests[i]
  854. errorRequest(client, request, err)
  855. }
  856. } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
  857. // Fail head of pipeline.
  858. const request = client[kQueue][client[kRunningIdx]]
  859. client[kQueue][client[kRunningIdx]++] = null
  860. errorRequest(client, request, err)
  861. }
  862. client[kPendingIdx] = client[kRunningIdx]
  863. assert(client[kRunning] === 0)
  864. client.emit('disconnect', client[kUrl], [client], err)
  865. resume(client)
  866. }
  867. async function connect (client) {
  868. assert(!client[kConnecting])
  869. assert(!client[kSocket])
  870. let { host, hostname, protocol, port } = client[kUrl]
  871. // Resolve ipv6
  872. if (hostname[0] === '[') {
  873. const idx = hostname.indexOf(']')
  874. assert(idx !== -1)
  875. const ip = hostname.substr(1, idx - 1)
  876. assert(net.isIP(ip))
  877. hostname = ip
  878. }
  879. client[kConnecting] = true
  880. if (channels.beforeConnect.hasSubscribers) {
  881. channels.beforeConnect.publish({
  882. connectParams: {
  883. host,
  884. hostname,
  885. protocol,
  886. port,
  887. servername: client[kServerName],
  888. localAddress: client[kLocalAddress]
  889. },
  890. connector: client[kConnector]
  891. })
  892. }
  893. try {
  894. const socket = await new Promise((resolve, reject) => {
  895. client[kConnector]({
  896. host,
  897. hostname,
  898. protocol,
  899. port,
  900. servername: client[kServerName],
  901. localAddress: client[kLocalAddress]
  902. }, (err, socket) => {
  903. if (err) {
  904. reject(err)
  905. } else {
  906. resolve(socket)
  907. }
  908. })
  909. })
  910. if (client.destroyed) {
  911. util.destroy(socket.on('error', () => {}), new ClientDestroyedError())
  912. return
  913. }
  914. if (!llhttpInstance) {
  915. llhttpInstance = await llhttpPromise
  916. llhttpPromise = null
  917. }
  918. client[kConnecting] = false
  919. assert(socket)
  920. socket[kNoRef] = false
  921. socket[kWriting] = false
  922. socket[kReset] = false
  923. socket[kBlocking] = false
  924. socket[kError] = null
  925. socket[kParser] = new Parser(client, socket, llhttpInstance)
  926. socket[kClient] = client
  927. socket[kCounter] = 0
  928. socket[kMaxRequests] = client[kMaxRequests]
  929. socket
  930. .on('error', onSocketError)
  931. .on('readable', onSocketReadable)
  932. .on('end', onSocketEnd)
  933. .on('close', onSocketClose)
  934. client[kSocket] = socket
  935. if (channels.connected.hasSubscribers) {
  936. channels.connected.publish({
  937. connectParams: {
  938. host,
  939. hostname,
  940. protocol,
  941. port,
  942. servername: client[kServerName],
  943. localAddress: client[kLocalAddress]
  944. },
  945. connector: client[kConnector],
  946. socket
  947. })
  948. }
  949. client.emit('connect', client[kUrl], [client])
  950. } catch (err) {
  951. if (client.destroyed) {
  952. return
  953. }
  954. client[kConnecting] = false
  955. if (channels.connectError.hasSubscribers) {
  956. channels.connectError.publish({
  957. connectParams: {
  958. host,
  959. hostname,
  960. protocol,
  961. port,
  962. servername: client[kServerName],
  963. localAddress: client[kLocalAddress]
  964. },
  965. connector: client[kConnector],
  966. error: err
  967. })
  968. }
  969. if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
  970. assert(client[kRunning] === 0)
  971. while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
  972. const request = client[kQueue][client[kPendingIdx]++]
  973. errorRequest(client, request, err)
  974. }
  975. } else {
  976. onError(client, err)
  977. }
  978. client.emit('connectionError', client[kUrl], [client], err)
  979. }
  980. resume(client)
  981. }
  982. function emitDrain (client) {
  983. client[kNeedDrain] = 0
  984. client.emit('drain', client[kUrl], [client])
  985. }
  986. function resume (client, sync) {
  987. if (client[kResuming] === 2) {
  988. return
  989. }
  990. client[kResuming] = 2
  991. _resume(client, sync)
  992. client[kResuming] = 0
  993. if (client[kRunningIdx] > 256) {
  994. client[kQueue].splice(0, client[kRunningIdx])
  995. client[kPendingIdx] -= client[kRunningIdx]
  996. client[kRunningIdx] = 0
  997. }
  998. }
  999. function _resume (client, sync) {
  1000. while (true) {
  1001. if (client.destroyed) {
  1002. assert(client[kPending] === 0)
  1003. return
  1004. }
  1005. if (client[kClosedResolve] && !client[kSize]) {
  1006. client[kClosedResolve]()
  1007. client[kClosedResolve] = null
  1008. return
  1009. }
  1010. const socket = client[kSocket]
  1011. if (socket && !socket.destroyed) {
  1012. if (client[kSize] === 0) {
  1013. if (!socket[kNoRef] && socket.unref) {
  1014. socket.unref()
  1015. socket[kNoRef] = true
  1016. }
  1017. } else if (socket[kNoRef] && socket.ref) {
  1018. socket.ref()
  1019. socket[kNoRef] = false
  1020. }
  1021. if (client[kSize] === 0) {
  1022. if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
  1023. socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
  1024. }
  1025. } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
  1026. if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
  1027. const request = client[kQueue][client[kRunningIdx]]
  1028. const headersTimeout = request.headersTimeout != null
  1029. ? request.headersTimeout
  1030. : client[kHeadersTimeout]
  1031. socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
  1032. }
  1033. }
  1034. }
  1035. if (client[kBusy]) {
  1036. client[kNeedDrain] = 2
  1037. } else if (client[kNeedDrain] === 2) {
  1038. if (sync) {
  1039. client[kNeedDrain] = 1
  1040. process.nextTick(emitDrain, client)
  1041. } else {
  1042. emitDrain(client)
  1043. }
  1044. continue
  1045. }
  1046. if (client[kPending] === 0) {
  1047. return
  1048. }
  1049. if (client[kRunning] >= (client[kPipelining] || 1)) {
  1050. return
  1051. }
  1052. const request = client[kQueue][client[kPendingIdx]]
  1053. if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
  1054. if (client[kRunning] > 0) {
  1055. return
  1056. }
  1057. client[kServerName] = request.servername
  1058. if (socket && socket.servername !== request.servername) {
  1059. util.destroy(socket, new InformationalError('servername changed'))
  1060. return
  1061. }
  1062. }
  1063. if (client[kConnecting]) {
  1064. return
  1065. }
  1066. if (!socket) {
  1067. connect(client)
  1068. return
  1069. }
  1070. if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
  1071. return
  1072. }
  1073. if (client[kRunning] > 0 && !request.idempotent) {
  1074. // Non-idempotent request cannot be retried.
  1075. // Ensure that no other requests are inflight and
  1076. // could cause failure.
  1077. return
  1078. }
  1079. if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
  1080. // Don't dispatch an upgrade until all preceding requests have completed.
  1081. // A misbehaving server might upgrade the connection before all pipelined
  1082. // request has completed.
  1083. return
  1084. }
  1085. if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
  1086. request.body
  1087. .on('data', /* istanbul ignore next */ function () {
  1088. /* istanbul ignore next */
  1089. assert(false)
  1090. })
  1091. .on('error', function (err) {
  1092. errorRequest(client, request, err)
  1093. })
  1094. .on('end', function () {
  1095. util.destroy(this)
  1096. })
  1097. request.body = null
  1098. }
  1099. if (client[kRunning] > 0 &&
  1100. (util.isStream(request.body) || util.isAsyncIterable(request.body))) {
  1101. // Request with stream or iterator body can error while other requests
  1102. // are inflight and indirectly error those as well.
  1103. // Ensure this doesn't happen by waiting for inflight
  1104. // to complete before dispatching.
  1105. // Request with stream or iterator body cannot be retried.
  1106. // Ensure that no other requests are inflight and
  1107. // could cause failure.
  1108. return
  1109. }
  1110. if (!request.aborted && write(client, request)) {
  1111. client[kPendingIdx]++
  1112. } else {
  1113. client[kQueue].splice(client[kPendingIdx], 1)
  1114. }
  1115. }
  1116. }
  1117. function write (client, request) {
  1118. const { body, method, path, host, upgrade, headers, blocking, reset } = request
  1119. // https://tools.ietf.org/html/rfc7231#section-4.3.1
  1120. // https://tools.ietf.org/html/rfc7231#section-4.3.2
  1121. // https://tools.ietf.org/html/rfc7231#section-4.3.5
  1122. // Sending a payload body on a request that does not
  1123. // expect it can cause undefined behavior on some
  1124. // servers and corrupt connection state. Do not
  1125. // re-use the connection for further requests.
  1126. const expectsPayload = (
  1127. method === 'PUT' ||
  1128. method === 'POST' ||
  1129. method === 'PATCH'
  1130. )
  1131. if (body && typeof body.read === 'function') {
  1132. // Try to read EOF in order to get length.
  1133. body.read(0)
  1134. }
  1135. let contentLength = util.bodyLength(body)
  1136. if (contentLength === null) {
  1137. contentLength = request.contentLength
  1138. }
  1139. if (contentLength === 0 && !expectsPayload) {
  1140. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  1141. // A user agent SHOULD NOT send a Content-Length header field when
  1142. // the request message does not contain a payload body and the method
  1143. // semantics do not anticipate such a body.
  1144. contentLength = null
  1145. }
  1146. if (request.contentLength !== null && request.contentLength !== contentLength) {
  1147. if (client[kStrictContentLength]) {
  1148. errorRequest(client, request, new RequestContentLengthMismatchError())
  1149. return false
  1150. }
  1151. process.emitWarning(new RequestContentLengthMismatchError())
  1152. }
  1153. const socket = client[kSocket]
  1154. try {
  1155. request.onConnect((err) => {
  1156. if (request.aborted || request.completed) {
  1157. return
  1158. }
  1159. errorRequest(client, request, err || new RequestAbortedError())
  1160. util.destroy(socket, new InformationalError('aborted'))
  1161. })
  1162. } catch (err) {
  1163. errorRequest(client, request, err)
  1164. }
  1165. if (request.aborted) {
  1166. return false
  1167. }
  1168. if (method === 'HEAD') {
  1169. // https://github.com/mcollina/undici/issues/258
  1170. // Close after a HEAD request to interop with misbehaving servers
  1171. // that may send a body in the response.
  1172. socket[kReset] = true
  1173. }
  1174. if (upgrade || method === 'CONNECT') {
  1175. // On CONNECT or upgrade, block pipeline from dispatching further
  1176. // requests on this connection.
  1177. socket[kReset] = true
  1178. }
  1179. if (reset != null) {
  1180. socket[kReset] = reset
  1181. }
  1182. if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
  1183. socket[kReset] = true
  1184. }
  1185. if (blocking) {
  1186. socket[kBlocking] = true
  1187. }
  1188. let header = `${method} ${path} HTTP/1.1\r\n`
  1189. if (typeof host === 'string') {
  1190. header += `host: ${host}\r\n`
  1191. } else {
  1192. header += client[kHostHeader]
  1193. }
  1194. if (upgrade) {
  1195. header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
  1196. } else if (client[kPipelining] && !socket[kReset]) {
  1197. header += 'connection: keep-alive\r\n'
  1198. } else {
  1199. header += 'connection: close\r\n'
  1200. }
  1201. if (headers) {
  1202. header += headers
  1203. }
  1204. if (channels.sendHeaders.hasSubscribers) {
  1205. channels.sendHeaders.publish({ request, headers: header, socket })
  1206. }
  1207. /* istanbul ignore else: assertion */
  1208. if (!body) {
  1209. if (contentLength === 0) {
  1210. socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
  1211. } else {
  1212. assert(contentLength === null, 'no body must not have content length')
  1213. socket.write(`${header}\r\n`, 'latin1')
  1214. }
  1215. request.onRequestSent()
  1216. } else if (util.isBuffer(body)) {
  1217. assert(contentLength === body.byteLength, 'buffer body must have content length')
  1218. socket.cork()
  1219. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  1220. socket.write(body)
  1221. socket.uncork()
  1222. request.onBodySent(body)
  1223. request.onRequestSent()
  1224. if (!expectsPayload) {
  1225. socket[kReset] = true
  1226. }
  1227. } else if (util.isBlobLike(body)) {
  1228. if (typeof body.stream === 'function') {
  1229. writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
  1230. } else {
  1231. writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
  1232. }
  1233. } else if (util.isStream(body)) {
  1234. writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
  1235. } else if (util.isIterable(body)) {
  1236. writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
  1237. } else {
  1238. assert(false)
  1239. }
  1240. return true
  1241. }
  1242. function writeStream ({ body, client, request, socket, contentLength, header, expectsPayload }) {
  1243. assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
  1244. let finished = false
  1245. const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
  1246. const onData = function (chunk) {
  1247. if (finished) {
  1248. return
  1249. }
  1250. try {
  1251. if (!writer.write(chunk) && this.pause) {
  1252. this.pause()
  1253. }
  1254. } catch (err) {
  1255. util.destroy(this, err)
  1256. }
  1257. }
  1258. const onDrain = function () {
  1259. if (finished) {
  1260. return
  1261. }
  1262. if (body.resume) {
  1263. body.resume()
  1264. }
  1265. }
  1266. const onAbort = function () {
  1267. onFinished(new RequestAbortedError())
  1268. }
  1269. const onFinished = function (err) {
  1270. if (finished) {
  1271. return
  1272. }
  1273. finished = true
  1274. assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
  1275. socket
  1276. .off('drain', onDrain)
  1277. .off('error', onFinished)
  1278. body
  1279. .removeListener('data', onData)
  1280. .removeListener('end', onFinished)
  1281. .removeListener('error', onFinished)
  1282. .removeListener('close', onAbort)
  1283. if (!err) {
  1284. try {
  1285. writer.end()
  1286. } catch (er) {
  1287. err = er
  1288. }
  1289. }
  1290. writer.destroy(err)
  1291. if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
  1292. util.destroy(body, err)
  1293. } else {
  1294. util.destroy(body)
  1295. }
  1296. }
  1297. body
  1298. .on('data', onData)
  1299. .on('end', onFinished)
  1300. .on('error', onFinished)
  1301. .on('close', onAbort)
  1302. if (body.resume) {
  1303. body.resume()
  1304. }
  1305. socket
  1306. .on('drain', onDrain)
  1307. .on('error', onFinished)
  1308. }
  1309. async function writeBlob ({ body, client, request, socket, contentLength, header, expectsPayload }) {
  1310. assert(contentLength === body.size, 'blob body must have content length')
  1311. try {
  1312. if (contentLength != null && contentLength !== body.size) {
  1313. throw new RequestContentLengthMismatchError()
  1314. }
  1315. const buffer = Buffer.from(await body.arrayBuffer())
  1316. socket.cork()
  1317. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  1318. socket.write(buffer)
  1319. socket.uncork()
  1320. request.onBodySent(buffer)
  1321. request.onRequestSent()
  1322. if (!expectsPayload) {
  1323. socket[kReset] = true
  1324. }
  1325. resume(client)
  1326. } catch (err) {
  1327. util.destroy(socket, err)
  1328. }
  1329. }
  1330. async function writeIterable ({ body, client, request, socket, contentLength, header, expectsPayload }) {
  1331. assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
  1332. let callback = null
  1333. function onDrain () {
  1334. if (callback) {
  1335. const cb = callback
  1336. callback = null
  1337. cb()
  1338. }
  1339. }
  1340. const waitForDrain = () => new Promise((resolve, reject) => {
  1341. assert(callback === null)
  1342. if (socket[kError]) {
  1343. reject(socket[kError])
  1344. } else {
  1345. callback = resolve
  1346. }
  1347. })
  1348. socket
  1349. .on('close', onDrain)
  1350. .on('drain', onDrain)
  1351. const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
  1352. try {
  1353. // It's up to the user to somehow abort the async iterable.
  1354. for await (const chunk of body) {
  1355. if (socket[kError]) {
  1356. throw socket[kError]
  1357. }
  1358. if (!writer.write(chunk)) {
  1359. await waitForDrain()
  1360. }
  1361. }
  1362. writer.end()
  1363. } catch (err) {
  1364. writer.destroy(err)
  1365. } finally {
  1366. socket
  1367. .off('close', onDrain)
  1368. .off('drain', onDrain)
  1369. }
  1370. }
  1371. class AsyncWriter {
  1372. constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
  1373. this.socket = socket
  1374. this.request = request
  1375. this.contentLength = contentLength
  1376. this.client = client
  1377. this.bytesWritten = 0
  1378. this.expectsPayload = expectsPayload
  1379. this.header = header
  1380. socket[kWriting] = true
  1381. }
  1382. write (chunk) {
  1383. const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
  1384. if (socket[kError]) {
  1385. throw socket[kError]
  1386. }
  1387. if (socket.destroyed) {
  1388. return false
  1389. }
  1390. const len = Buffer.byteLength(chunk)
  1391. if (!len) {
  1392. return true
  1393. }
  1394. // We should defer writing chunks.
  1395. if (contentLength !== null && bytesWritten + len > contentLength) {
  1396. if (client[kStrictContentLength]) {
  1397. throw new RequestContentLengthMismatchError()
  1398. }
  1399. process.emitWarning(new RequestContentLengthMismatchError())
  1400. }
  1401. socket.cork()
  1402. if (bytesWritten === 0) {
  1403. if (!expectsPayload) {
  1404. socket[kReset] = true
  1405. }
  1406. if (contentLength === null) {
  1407. socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
  1408. } else {
  1409. socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
  1410. }
  1411. }
  1412. if (contentLength === null) {
  1413. socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
  1414. }
  1415. this.bytesWritten += len
  1416. const ret = socket.write(chunk)
  1417. socket.uncork()
  1418. request.onBodySent(chunk)
  1419. if (!ret) {
  1420. if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
  1421. // istanbul ignore else: only for jest
  1422. if (socket[kParser].timeout.refresh) {
  1423. socket[kParser].timeout.refresh()
  1424. }
  1425. }
  1426. }
  1427. return ret
  1428. }
  1429. end () {
  1430. const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
  1431. request.onRequestSent()
  1432. socket[kWriting] = false
  1433. if (socket[kError]) {
  1434. throw socket[kError]
  1435. }
  1436. if (socket.destroyed) {
  1437. return
  1438. }
  1439. if (bytesWritten === 0) {
  1440. if (expectsPayload) {
  1441. // https://tools.ietf.org/html/rfc7230#section-3.3.2
  1442. // A user agent SHOULD send a Content-Length in a request message when
  1443. // no Transfer-Encoding is sent and the request method defines a meaning
  1444. // for an enclosed payload body.
  1445. socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
  1446. } else {
  1447. socket.write(`${header}\r\n`, 'latin1')
  1448. }
  1449. } else if (contentLength === null) {
  1450. socket.write('\r\n0\r\n\r\n', 'latin1')
  1451. }
  1452. if (contentLength !== null && bytesWritten !== contentLength) {
  1453. if (client[kStrictContentLength]) {
  1454. throw new RequestContentLengthMismatchError()
  1455. } else {
  1456. process.emitWarning(new RequestContentLengthMismatchError())
  1457. }
  1458. }
  1459. if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
  1460. // istanbul ignore else: only for jest
  1461. if (socket[kParser].timeout.refresh) {
  1462. socket[kParser].timeout.refresh()
  1463. }
  1464. }
  1465. resume(client)
  1466. }
  1467. destroy (err) {
  1468. const { socket, client } = this
  1469. socket[kWriting] = false
  1470. if (err) {
  1471. assert(client[kRunning] <= 1, 'pipeline should only contain this request')
  1472. util.destroy(socket, err)
  1473. }
  1474. }
  1475. }
  1476. function errorRequest (client, request, err) {
  1477. try {
  1478. request.onError(err)
  1479. assert(request.aborted)
  1480. } catch (err) {
  1481. client.emit('error', err)
  1482. }
  1483. }
  1484. module.exports = Client