balanced-pool.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. 'use strict'
  2. const {
  3. BalancedPoolMissingUpstreamError,
  4. InvalidArgumentError
  5. } = require('./core/errors')
  6. const {
  7. PoolBase,
  8. kClients,
  9. kNeedDrain,
  10. kAddClient,
  11. kRemoveClient,
  12. kGetDispatcher
  13. } = require('./pool-base')
  14. const Pool = require('./pool')
  15. const { kUrl, kInterceptors } = require('./core/symbols')
  16. const { parseOrigin } = require('./core/util')
  17. const kFactory = Symbol('factory')
  18. const kOptions = Symbol('options')
  19. const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
  20. const kCurrentWeight = Symbol('kCurrentWeight')
  21. const kIndex = Symbol('kIndex')
  22. const kWeight = Symbol('kWeight')
  23. const kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
  24. const kErrorPenalty = Symbol('kErrorPenalty')
  25. function getGreatestCommonDivisor (a, b) {
  26. if (b === 0) return a
  27. return getGreatestCommonDivisor(b, a % b)
  28. }
  29. function defaultFactory (origin, opts) {
  30. return new Pool(origin, opts)
  31. }
  32. class BalancedPool extends PoolBase {
  33. constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) {
  34. super()
  35. this[kOptions] = opts
  36. this[kIndex] = -1
  37. this[kCurrentWeight] = 0
  38. this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
  39. this[kErrorPenalty] = this[kOptions].errorPenalty || 15
  40. if (!Array.isArray(upstreams)) {
  41. upstreams = [upstreams]
  42. }
  43. if (typeof factory !== 'function') {
  44. throw new InvalidArgumentError('factory must be a function.')
  45. }
  46. this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool)
  47. ? opts.interceptors.BalancedPool
  48. : []
  49. this[kFactory] = factory
  50. for (const upstream of upstreams) {
  51. this.addUpstream(upstream)
  52. }
  53. this._updateBalancedPoolStats()
  54. }
  55. addUpstream (upstream) {
  56. const upstreamOrigin = parseOrigin(upstream).origin
  57. if (this[kClients].find((pool) => (
  58. pool[kUrl].origin === upstreamOrigin &&
  59. pool.closed !== true &&
  60. pool.destroyed !== true
  61. ))) {
  62. return this
  63. }
  64. const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))
  65. this[kAddClient](pool)
  66. pool.on('connect', () => {
  67. pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
  68. })
  69. pool.on('connectionError', () => {
  70. pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
  71. this._updateBalancedPoolStats()
  72. })
  73. pool.on('disconnect', (...args) => {
  74. const err = args[2]
  75. if (err && err.code === 'UND_ERR_SOCKET') {
  76. // decrease the weight of the pool.
  77. pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
  78. this._updateBalancedPoolStats()
  79. }
  80. })
  81. for (const client of this[kClients]) {
  82. client[kWeight] = this[kMaxWeightPerServer]
  83. }
  84. this._updateBalancedPoolStats()
  85. return this
  86. }
  87. _updateBalancedPoolStats () {
  88. this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0)
  89. }
  90. removeUpstream (upstream) {
  91. const upstreamOrigin = parseOrigin(upstream).origin
  92. const pool = this[kClients].find((pool) => (
  93. pool[kUrl].origin === upstreamOrigin &&
  94. pool.closed !== true &&
  95. pool.destroyed !== true
  96. ))
  97. if (pool) {
  98. this[kRemoveClient](pool)
  99. }
  100. return this
  101. }
  102. get upstreams () {
  103. return this[kClients]
  104. .filter(dispatcher => dispatcher.closed !== true && dispatcher.destroyed !== true)
  105. .map((p) => p[kUrl].origin)
  106. }
  107. [kGetDispatcher] () {
  108. // We validate that pools is greater than 0,
  109. // otherwise we would have to wait until an upstream
  110. // is added, which might never happen.
  111. if (this[kClients].length === 0) {
  112. throw new BalancedPoolMissingUpstreamError()
  113. }
  114. const dispatcher = this[kClients].find(dispatcher => (
  115. !dispatcher[kNeedDrain] &&
  116. dispatcher.closed !== true &&
  117. dispatcher.destroyed !== true
  118. ))
  119. if (!dispatcher) {
  120. return
  121. }
  122. const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)
  123. if (allClientsBusy) {
  124. return
  125. }
  126. let counter = 0
  127. let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
  128. while (counter++ < this[kClients].length) {
  129. this[kIndex] = (this[kIndex] + 1) % this[kClients].length
  130. const pool = this[kClients][this[kIndex]]
  131. // find pool index with the largest weight
  132. if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
  133. maxWeightIndex = this[kIndex]
  134. }
  135. // decrease the current weight every `this[kClients].length`.
  136. if (this[kIndex] === 0) {
  137. // Set the current weight to the next lower weight.
  138. this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]
  139. if (this[kCurrentWeight] <= 0) {
  140. this[kCurrentWeight] = this[kMaxWeightPerServer]
  141. }
  142. }
  143. if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
  144. return pool
  145. }
  146. }
  147. this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
  148. this[kIndex] = maxWeightIndex
  149. return this[kClients][maxWeightIndex]
  150. }
  151. }
  152. module.exports = BalancedPool