agent.js 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. 'use strict'
  2. const { InvalidArgumentError } = require('./core/errors')
  3. const { kClients, kRunning, kClose, kDestroy, kDispatch, kInterceptors } = require('./core/symbols')
  4. const DispatcherBase = require('./dispatcher-base')
  5. const Pool = require('./pool')
  6. const Client = require('./client')
  7. const util = require('./core/util')
  8. const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
  9. const { WeakRef, FinalizationRegistry } = require('./compat/dispatcher-weakref')()
  10. const kOnConnect = Symbol('onConnect')
  11. const kOnDisconnect = Symbol('onDisconnect')
  12. const kOnConnectionError = Symbol('onConnectionError')
  13. const kMaxRedirections = Symbol('maxRedirections')
  14. const kOnDrain = Symbol('onDrain')
  15. const kFactory = Symbol('factory')
  16. const kFinalizer = Symbol('finalizer')
  17. const kOptions = Symbol('options')
  18. function defaultFactory (origin, opts) {
  19. return opts && opts.connections === 1
  20. ? new Client(origin, opts)
  21. : new Pool(origin, opts)
  22. }
  23. class Agent extends DispatcherBase {
  24. constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) {
  25. super()
  26. if (typeof factory !== 'function') {
  27. throw new InvalidArgumentError('factory must be a function.')
  28. }
  29. if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
  30. throw new InvalidArgumentError('connect must be a function or an object')
  31. }
  32. if (!Number.isInteger(maxRedirections) || maxRedirections < 0) {
  33. throw new InvalidArgumentError('maxRedirections must be a positive number')
  34. }
  35. if (connect && typeof connect !== 'function') {
  36. connect = { ...connect }
  37. }
  38. this[kInterceptors] = options.interceptors && options.interceptors.Agent && Array.isArray(options.interceptors.Agent)
  39. ? options.interceptors.Agent
  40. : [createRedirectInterceptor({ maxRedirections })]
  41. this[kOptions] = { ...util.deepClone(options), connect }
  42. this[kOptions].interceptors = options.interceptors
  43. ? { ...options.interceptors }
  44. : undefined
  45. this[kMaxRedirections] = maxRedirections
  46. this[kFactory] = factory
  47. this[kClients] = new Map()
  48. this[kFinalizer] = new FinalizationRegistry(/* istanbul ignore next: gc is undeterministic */ key => {
  49. const ref = this[kClients].get(key)
  50. if (ref !== undefined && ref.deref() === undefined) {
  51. this[kClients].delete(key)
  52. }
  53. })
  54. const agent = this
  55. this[kOnDrain] = (origin, targets) => {
  56. agent.emit('drain', origin, [agent, ...targets])
  57. }
  58. this[kOnConnect] = (origin, targets) => {
  59. agent.emit('connect', origin, [agent, ...targets])
  60. }
  61. this[kOnDisconnect] = (origin, targets, err) => {
  62. agent.emit('disconnect', origin, [agent, ...targets], err)
  63. }
  64. this[kOnConnectionError] = (origin, targets, err) => {
  65. agent.emit('connectionError', origin, [agent, ...targets], err)
  66. }
  67. }
  68. get [kRunning] () {
  69. let ret = 0
  70. for (const ref of this[kClients].values()) {
  71. const client = ref.deref()
  72. /* istanbul ignore next: gc is undeterministic */
  73. if (client) {
  74. ret += client[kRunning]
  75. }
  76. }
  77. return ret
  78. }
  79. [kDispatch] (opts, handler) {
  80. let key
  81. if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
  82. key = String(opts.origin)
  83. } else {
  84. throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
  85. }
  86. const ref = this[kClients].get(key)
  87. let dispatcher = ref ? ref.deref() : null
  88. if (!dispatcher) {
  89. dispatcher = this[kFactory](opts.origin, this[kOptions])
  90. .on('drain', this[kOnDrain])
  91. .on('connect', this[kOnConnect])
  92. .on('disconnect', this[kOnDisconnect])
  93. .on('connectionError', this[kOnConnectionError])
  94. this[kClients].set(key, new WeakRef(dispatcher))
  95. this[kFinalizer].register(dispatcher, key)
  96. }
  97. return dispatcher.dispatch(opts, handler)
  98. }
  99. async [kClose] () {
  100. const closePromises = []
  101. for (const ref of this[kClients].values()) {
  102. const client = ref.deref()
  103. /* istanbul ignore else: gc is undeterministic */
  104. if (client) {
  105. closePromises.push(client.close())
  106. }
  107. }
  108. await Promise.all(closePromises)
  109. }
  110. async [kDestroy] (err) {
  111. const destroyPromises = []
  112. for (const ref of this[kClients].values()) {
  113. const client = ref.deref()
  114. /* istanbul ignore else: gc is undeterministic */
  115. if (client) {
  116. destroyPromises.push(client.destroy(err))
  117. }
  118. }
  119. await Promise.all(destroyPromises)
  120. }
  121. }
  122. module.exports = Agent