api-request.js 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. 'use strict'
  2. const Readable = require('./readable')
  3. const {
  4. InvalidArgumentError,
  5. RequestAbortedError
  6. } = require('../core/errors')
  7. const util = require('../core/util')
  8. const { getResolveErrorBodyCallback } = require('./util')
  9. const { AsyncResource } = require('async_hooks')
  10. const { addSignal, removeSignal } = require('./abort-signal')
  11. class RequestHandler extends AsyncResource {
  12. constructor (opts, callback) {
  13. if (!opts || typeof opts !== 'object') {
  14. throw new InvalidArgumentError('invalid opts')
  15. }
  16. const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts
  17. try {
  18. if (typeof callback !== 'function') {
  19. throw new InvalidArgumentError('invalid callback')
  20. }
  21. if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) {
  22. throw new InvalidArgumentError('invalid highWaterMark')
  23. }
  24. if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
  25. throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
  26. }
  27. if (method === 'CONNECT') {
  28. throw new InvalidArgumentError('invalid method')
  29. }
  30. if (onInfo && typeof onInfo !== 'function') {
  31. throw new InvalidArgumentError('invalid onInfo callback')
  32. }
  33. super('UNDICI_REQUEST')
  34. } catch (err) {
  35. if (util.isStream(body)) {
  36. util.destroy(body.on('error', util.nop), err)
  37. }
  38. throw err
  39. }
  40. this.responseHeaders = responseHeaders || null
  41. this.opaque = opaque || null
  42. this.callback = callback
  43. this.res = null
  44. this.abort = null
  45. this.body = body
  46. this.trailers = {}
  47. this.context = null
  48. this.onInfo = onInfo || null
  49. this.throwOnError = throwOnError
  50. this.highWaterMark = highWaterMark
  51. if (util.isStream(body)) {
  52. body.on('error', (err) => {
  53. this.onError(err)
  54. })
  55. }
  56. addSignal(this, signal)
  57. }
  58. onConnect (abort, context) {
  59. if (!this.callback) {
  60. throw new RequestAbortedError()
  61. }
  62. this.abort = abort
  63. this.context = context
  64. }
  65. onHeaders (statusCode, rawHeaders, resume, statusMessage) {
  66. const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this
  67. const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
  68. if (statusCode < 200) {
  69. if (this.onInfo) {
  70. this.onInfo({ statusCode, headers })
  71. }
  72. return
  73. }
  74. const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
  75. const contentType = parsedHeaders['content-type']
  76. const body = new Readable({ resume, abort, contentType, highWaterMark })
  77. this.callback = null
  78. this.res = body
  79. if (callback !== null) {
  80. if (this.throwOnError && statusCode >= 400) {
  81. this.runInAsyncScope(getResolveErrorBodyCallback, null,
  82. { callback, body, contentType, statusCode, statusMessage, headers }
  83. )
  84. } else {
  85. this.runInAsyncScope(callback, null, null, {
  86. statusCode,
  87. headers,
  88. trailers: this.trailers,
  89. opaque,
  90. body,
  91. context
  92. })
  93. }
  94. }
  95. }
  96. onData (chunk) {
  97. const { res } = this
  98. return res.push(chunk)
  99. }
  100. onComplete (trailers) {
  101. const { res } = this
  102. removeSignal(this)
  103. util.parseHeaders(trailers, this.trailers)
  104. res.push(null)
  105. }
  106. onError (err) {
  107. const { res, callback, body, opaque } = this
  108. removeSignal(this)
  109. if (callback) {
  110. // TODO: Does this need queueMicrotask?
  111. this.callback = null
  112. queueMicrotask(() => {
  113. this.runInAsyncScope(callback, null, err, { opaque })
  114. })
  115. }
  116. if (res) {
  117. this.res = null
  118. // Ensure all queued handlers are invoked before destroying res.
  119. queueMicrotask(() => {
  120. util.destroy(res, err)
  121. })
  122. }
  123. if (body) {
  124. this.body = null
  125. util.destroy(body, err)
  126. }
  127. }
  128. }
  129. function request (opts, callback) {
  130. if (callback === undefined) {
  131. return new Promise((resolve, reject) => {
  132. request.call(this, opts, (err, data) => {
  133. return err ? reject(err) : resolve(data)
  134. })
  135. })
  136. }
  137. try {
  138. this.dispatch(opts, new RequestHandler(opts, callback))
  139. } catch (err) {
  140. if (typeof callback !== 'function') {
  141. throw err
  142. }
  143. const opaque = opts && opts.opaque
  144. queueMicrotask(() => callback(err, { opaque }))
  145. }
  146. }
  147. module.exports = request