123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- 'use strict'
- const { InvalidArgumentError } = require('./core/errors')
- const { kClients, kRunning, kClose, kDestroy, kDispatch, kInterceptors } = require('./core/symbols')
- const DispatcherBase = require('./dispatcher-base')
- const Pool = require('./pool')
- const Client = require('./client')
- const util = require('./core/util')
- const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
- const { WeakRef, FinalizationRegistry } = require('./compat/dispatcher-weakref')()
- const kOnConnect = Symbol('onConnect')
- const kOnDisconnect = Symbol('onDisconnect')
- const kOnConnectionError = Symbol('onConnectionError')
- const kMaxRedirections = Symbol('maxRedirections')
- const kOnDrain = Symbol('onDrain')
- const kFactory = Symbol('factory')
- const kFinalizer = Symbol('finalizer')
- const kOptions = Symbol('options')
- function defaultFactory (origin, opts) {
- return opts && opts.connections === 1
- ? new Client(origin, opts)
- : new Pool(origin, opts)
- }
- class Agent extends DispatcherBase {
- constructor ({ factory = defaultFactory, maxRedirections = 0, connect, ...options } = {}) {
- super()
- if (typeof factory !== 'function') {
- throw new InvalidArgumentError('factory must be a function.')
- }
- if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
- throw new InvalidArgumentError('connect must be a function or an object')
- }
- if (!Number.isInteger(maxRedirections) || maxRedirections < 0) {
- throw new InvalidArgumentError('maxRedirections must be a positive number')
- }
- if (connect && typeof connect !== 'function') {
- connect = { ...connect }
- }
- this[kInterceptors] = options.interceptors && options.interceptors.Agent && Array.isArray(options.interceptors.Agent)
- ? options.interceptors.Agent
- : [createRedirectInterceptor({ maxRedirections })]
- this[kOptions] = { ...util.deepClone(options), connect }
- this[kOptions].interceptors = options.interceptors
- ? { ...options.interceptors }
- : undefined
- this[kMaxRedirections] = maxRedirections
- this[kFactory] = factory
- this[kClients] = new Map()
- this[kFinalizer] = new FinalizationRegistry(/* istanbul ignore next: gc is undeterministic */ key => {
- const ref = this[kClients].get(key)
- if (ref !== undefined && ref.deref() === undefined) {
- this[kClients].delete(key)
- }
- })
- const agent = this
- this[kOnDrain] = (origin, targets) => {
- agent.emit('drain', origin, [agent, ...targets])
- }
- this[kOnConnect] = (origin, targets) => {
- agent.emit('connect', origin, [agent, ...targets])
- }
- this[kOnDisconnect] = (origin, targets, err) => {
- agent.emit('disconnect', origin, [agent, ...targets], err)
- }
- this[kOnConnectionError] = (origin, targets, err) => {
- agent.emit('connectionError', origin, [agent, ...targets], err)
- }
- }
- get [kRunning] () {
- let ret = 0
- for (const ref of this[kClients].values()) {
- const client = ref.deref()
- /* istanbul ignore next: gc is undeterministic */
- if (client) {
- ret += client[kRunning]
- }
- }
- return ret
- }
- [kDispatch] (opts, handler) {
- let key
- if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
- key = String(opts.origin)
- } else {
- throw new InvalidArgumentError('opts.origin must be a non-empty string or URL.')
- }
- const ref = this[kClients].get(key)
- let dispatcher = ref ? ref.deref() : null
- if (!dispatcher) {
- dispatcher = this[kFactory](opts.origin, this[kOptions])
- .on('drain', this[kOnDrain])
- .on('connect', this[kOnConnect])
- .on('disconnect', this[kOnDisconnect])
- .on('connectionError', this[kOnConnectionError])
- this[kClients].set(key, new WeakRef(dispatcher))
- this[kFinalizer].register(dispatcher, key)
- }
- return dispatcher.dispatch(opts, handler)
- }
- async [kClose] () {
- const closePromises = []
- for (const ref of this[kClients].values()) {
- const client = ref.deref()
- /* istanbul ignore else: gc is undeterministic */
- if (client) {
- closePromises.push(client.close())
- }
- }
- await Promise.all(closePromises)
- }
- async [kDestroy] (err) {
- const destroyPromises = []
- for (const ref of this[kClients].values()) {
- const client = ref.deref()
- /* istanbul ignore else: gc is undeterministic */
- if (client) {
- destroyPromises.push(client.destroy(err))
- }
- }
- await Promise.all(destroyPromises)
- }
- }
- module.exports = Agent
|