123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- 'use strict'
- const DispatcherBase = require('./dispatcher-base')
- const FixedQueue = require('./node/fixed-queue')
- const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('./core/symbols')
- const PoolStats = require('./pool-stats')
- const kClients = Symbol('clients')
- const kNeedDrain = Symbol('needDrain')
- const kQueue = Symbol('queue')
- const kClosedResolve = Symbol('closed resolve')
- const kOnDrain = Symbol('onDrain')
- const kOnConnect = Symbol('onConnect')
- const kOnDisconnect = Symbol('onDisconnect')
- const kOnConnectionError = Symbol('onConnectionError')
- const kGetDispatcher = Symbol('get dispatcher')
- const kAddClient = Symbol('add client')
- const kRemoveClient = Symbol('remove client')
- const kStats = Symbol('stats')
- class PoolBase extends DispatcherBase {
- constructor () {
- super()
- this[kQueue] = new FixedQueue()
- this[kClients] = []
- this[kQueued] = 0
- const pool = this
- this[kOnDrain] = function onDrain (origin, targets) {
- const queue = pool[kQueue]
- let needDrain = false
- while (!needDrain) {
- const item = queue.shift()
- if (!item) {
- break
- }
- pool[kQueued]--
- needDrain = !this.dispatch(item.opts, item.handler)
- }
- this[kNeedDrain] = needDrain
- if (!this[kNeedDrain] && pool[kNeedDrain]) {
- pool[kNeedDrain] = false
- pool.emit('drain', origin, [pool, ...targets])
- }
- if (pool[kClosedResolve] && queue.isEmpty()) {
- Promise
- .all(pool[kClients].map(c => c.close()))
- .then(pool[kClosedResolve])
- }
- }
- this[kOnConnect] = (origin, targets) => {
- pool.emit('connect', origin, [pool, ...targets])
- }
- this[kOnDisconnect] = (origin, targets, err) => {
- pool.emit('disconnect', origin, [pool, ...targets], err)
- }
- this[kOnConnectionError] = (origin, targets, err) => {
- pool.emit('connectionError', origin, [pool, ...targets], err)
- }
- this[kStats] = new PoolStats(this)
- }
- get [kBusy] () {
- return this[kNeedDrain]
- }
- get [kConnected] () {
- return this[kClients].filter(client => client[kConnected]).length
- }
- get [kFree] () {
- return this[kClients].filter(client => client[kConnected] && !client[kNeedDrain]).length
- }
- get [kPending] () {
- let ret = this[kQueued]
- for (const { [kPending]: pending } of this[kClients]) {
- ret += pending
- }
- return ret
- }
- get [kRunning] () {
- let ret = 0
- for (const { [kRunning]: running } of this[kClients]) {
- ret += running
- }
- return ret
- }
- get [kSize] () {
- let ret = this[kQueued]
- for (const { [kSize]: size } of this[kClients]) {
- ret += size
- }
- return ret
- }
- get stats () {
- return this[kStats]
- }
- async [kClose] () {
- if (this[kQueue].isEmpty()) {
- return Promise.all(this[kClients].map(c => c.close()))
- } else {
- return new Promise((resolve) => {
- this[kClosedResolve] = resolve
- })
- }
- }
- async [kDestroy] (err) {
- while (true) {
- const item = this[kQueue].shift()
- if (!item) {
- break
- }
- item.handler.onError(err)
- }
- return Promise.all(this[kClients].map(c => c.destroy(err)))
- }
- [kDispatch] (opts, handler) {
- const dispatcher = this[kGetDispatcher]()
- if (!dispatcher) {
- this[kNeedDrain] = true
- this[kQueue].push({ opts, handler })
- this[kQueued]++
- } else if (!dispatcher.dispatch(opts, handler)) {
- dispatcher[kNeedDrain] = true
- this[kNeedDrain] = !this[kGetDispatcher]()
- }
- return !this[kNeedDrain]
- }
- [kAddClient] (client) {
- client
- .on('drain', this[kOnDrain])
- .on('connect', this[kOnConnect])
- .on('disconnect', this[kOnDisconnect])
- .on('connectionError', this[kOnConnectionError])
- this[kClients].push(client)
- if (this[kNeedDrain]) {
- process.nextTick(() => {
- if (this[kNeedDrain]) {
- this[kOnDrain](client[kUrl], [this, client])
- }
- })
- }
- return this
- }
- [kRemoveClient] (client) {
- client.close(() => {
- const idx = this[kClients].indexOf(client)
- if (idx !== -1) {
- this[kClients].splice(idx, 1)
- }
- })
- this[kNeedDrain] = this[kClients].some(dispatcher => (
- !dispatcher[kNeedDrain] &&
- dispatcher.closed !== true &&
- dispatcher.destroyed !== true
- ))
- }
- }
- module.exports = {
- PoolBase,
- kClients,
- kNeedDrain,
- kAddClient,
- kRemoveClient,
- kGetDispatcher
- }
|