aboutsummaryrefslogtreecommitdiffstats
path: root/vanilla/node_modules/undici/lib/dispatcher/pool-base.js
diff options
context:
space:
mode:
authorAdam Mathes <adam@adammathes.com>2026-02-13 21:34:48 -0800
committerAdam Mathes <adam@adammathes.com>2026-02-13 21:34:48 -0800
commit76cb9c2a39d477a64824a985ade40507e3bbade1 (patch)
tree41e997aa9c6f538d3a136af61dae9424db2005a9 /vanilla/node_modules/undici/lib/dispatcher/pool-base.js
parent819a39a21ac992b1393244a4c283bbb125208c69 (diff)
downloadneko-76cb9c2a39d477a64824a985ade40507e3bbade1.tar.gz
neko-76cb9c2a39d477a64824a985ade40507e3bbade1.tar.bz2
neko-76cb9c2a39d477a64824a985ade40507e3bbade1.zip
feat(vanilla): add testing infrastructure and tests (NK-wjnczv)
Diffstat (limited to 'vanilla/node_modules/undici/lib/dispatcher/pool-base.js')
-rw-r--r--vanilla/node_modules/undici/lib/dispatcher/pool-base.js214
1 files changed, 214 insertions, 0 deletions
diff --git a/vanilla/node_modules/undici/lib/dispatcher/pool-base.js b/vanilla/node_modules/undici/lib/dispatcher/pool-base.js
new file mode 100644
index 0000000..6c1f238
--- /dev/null
+++ b/vanilla/node_modules/undici/lib/dispatcher/pool-base.js
@@ -0,0 +1,214 @@
+'use strict'
+
+const { PoolStats } = require('../util/stats.js')
+const DispatcherBase = require('./dispatcher-base')
+const FixedQueue = require('./fixed-queue')
+const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('../core/symbols')
+
+const kClients = Symbol('clients')
+const kNeedDrain = Symbol('needDrain')
+const kQueue = Symbol('queue')
+const kClosedResolve = Symbol('closed resolve')
+const kOnDrain = Symbol('onDrain')
+const kOnConnect = Symbol('onConnect')
+const kOnDisconnect = Symbol('onDisconnect')
+const kOnConnectionError = Symbol('onConnectionError')
+const kGetDispatcher = Symbol('get dispatcher')
+const kAddClient = Symbol('add client')
+const kRemoveClient = Symbol('remove client')
+
+class PoolBase extends DispatcherBase {
+ [kQueue] = new FixedQueue();
+
+ [kQueued] = 0;
+
+ [kClients] = [];
+
+ [kNeedDrain] = false;
+
+ [kOnDrain] (client, origin, targets) {
+ const queue = this[kQueue]
+
+ let needDrain = false
+
+ while (!needDrain) {
+ const item = queue.shift()
+ if (!item) {
+ break
+ }
+ this[kQueued]--
+ needDrain = !client.dispatch(item.opts, item.handler)
+ }
+
+ client[kNeedDrain] = needDrain
+
+ if (!needDrain && this[kNeedDrain]) {
+ this[kNeedDrain] = false
+ this.emit('drain', origin, [this, ...targets])
+ }
+
+ if (this[kClosedResolve] && queue.isEmpty()) {
+ const closeAll = []
+ for (let i = 0; i < this[kClients].length; i++) {
+ const client = this[kClients][i]
+ if (!client.destroyed) {
+ closeAll.push(client.close())
+ }
+ }
+ return Promise.all(closeAll)
+ .then(this[kClosedResolve])
+ }
+ }
+
+ [kOnConnect] = (origin, targets) => {
+ this.emit('connect', origin, [this, ...targets])
+ };
+
+ [kOnDisconnect] = (origin, targets, err) => {
+ this.emit('disconnect', origin, [this, ...targets], err)
+ };
+
+ [kOnConnectionError] = (origin, targets, err) => {
+ this.emit('connectionError', origin, [this, ...targets], err)
+ }
+
+ get [kBusy] () {
+ return this[kNeedDrain]
+ }
+
+ get [kConnected] () {
+ let ret = 0
+ for (const { [kConnected]: connected } of this[kClients]) {
+ ret += connected
+ }
+ return ret
+ }
+
+ get [kFree] () {
+ let ret = 0
+ for (const { [kConnected]: connected, [kNeedDrain]: needDrain } of this[kClients]) {
+ ret += connected && !needDrain
+ }
+ return ret
+ }
+
+ get [kPending] () {
+ let ret = this[kQueued]
+ for (const { [kPending]: pending } of this[kClients]) {
+ ret += pending
+ }
+ return ret
+ }
+
+ get [kRunning] () {
+ let ret = 0
+ for (const { [kRunning]: running } of this[kClients]) {
+ ret += running
+ }
+ return ret
+ }
+
+ get [kSize] () {
+ let ret = this[kQueued]
+ for (const { [kSize]: size } of this[kClients]) {
+ ret += size
+ }
+ return ret
+ }
+
+ get stats () {
+ return new PoolStats(this)
+ }
+
+ [kClose] () {
+ if (this[kQueue].isEmpty()) {
+ const closeAll = []
+ for (let i = 0; i < this[kClients].length; i++) {
+ const client = this[kClients][i]
+ if (!client.destroyed) {
+ closeAll.push(client.close())
+ }
+ }
+ return Promise.all(closeAll)
+ } else {
+ return new Promise((resolve) => {
+ this[kClosedResolve] = resolve
+ })
+ }
+ }
+
+ [kDestroy] (err) {
+ while (true) {
+ const item = this[kQueue].shift()
+ if (!item) {
+ break
+ }
+ item.handler.onError(err)
+ }
+
+ const destroyAll = new Array(this[kClients].length)
+ for (let i = 0; i < this[kClients].length; i++) {
+ destroyAll[i] = this[kClients][i].destroy(err)
+ }
+ return Promise.all(destroyAll)
+ }
+
+ [kDispatch] (opts, handler) {
+ const dispatcher = this[kGetDispatcher]()
+
+ if (!dispatcher) {
+ this[kNeedDrain] = true
+ this[kQueue].push({ opts, handler })
+ this[kQueued]++
+ } else if (!dispatcher.dispatch(opts, handler)) {
+ dispatcher[kNeedDrain] = true
+ this[kNeedDrain] = !this[kGetDispatcher]()
+ }
+
+ return !this[kNeedDrain]
+ }
+
+ [kAddClient] (client) {
+ client
+ .on('drain', this[kOnDrain].bind(this, client))
+ .on('connect', this[kOnConnect])
+ .on('disconnect', this[kOnDisconnect])
+ .on('connectionError', this[kOnConnectionError])
+
+ this[kClients].push(client)
+
+ if (this[kNeedDrain]) {
+ queueMicrotask(() => {
+ if (this[kNeedDrain]) {
+ this[kOnDrain](client, client[kUrl], [client, this])
+ }
+ })
+ }
+
+ return this
+ }
+
+ [kRemoveClient] (client) {
+ client.close(() => {
+ const idx = this[kClients].indexOf(client)
+ if (idx !== -1) {
+ this[kClients].splice(idx, 1)
+ }
+ })
+
+ this[kNeedDrain] = this[kClients].some(dispatcher => (
+ !dispatcher[kNeedDrain] &&
+ dispatcher.closed !== true &&
+ dispatcher.destroyed !== true
+ ))
+ }
+}
+
+module.exports = {
+ PoolBase,
+ kClients,
+ kNeedDrain,
+ kAddClient,
+ kRemoveClient,
+ kGetDispatcher
+}