diff options
| author | Adam Mathes <adam@adammathes.com> | 2026-02-14 14:46:37 -0800 |
|---|---|---|
| committer | Adam Mathes <adam@adammathes.com> | 2026-02-14 14:46:37 -0800 |
| commit | afa87af01c79a9baa539f2992d32154d2a4739bd (patch) | |
| tree | 92c7416db734270a2fee1d72ee9cc119379ff8e1 /vanilla/node_modules/undici/lib/api | |
| parent | 3b927e84d200402281f68181cd4253bc77e5528d (diff) | |
| download | neko-afa87af01c79a9baa539f2992d32154d2a4739bd.tar.gz neko-afa87af01c79a9baa539f2992d32154d2a4739bd.tar.bz2 neko-afa87af01c79a9baa539f2992d32154d2a4739bd.zip | |
task: delete vanilla js prototype\n\n- Removed vanilla/ directory and web/dist/vanilla directory\n- Updated Makefile, Dockerfile, and CI workflow to remove vanilla references\n- Cleaned up web/web.go to remove vanilla embed and routes\n- Verified build and tests pass\n\nCloses NK-2tcnmq
Diffstat (limited to 'vanilla/node_modules/undici/lib/api')
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/abort-signal.js | 59 | ||||
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/api-connect.js | 110 | ||||
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/api-pipeline.js | 252 | ||||
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/api-request.js | 214 | ||||
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/api-stream.js | 209 | ||||
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/api-upgrade.js | 111 | ||||
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/index.js | 7 | ||||
| -rw-r--r-- | vanilla/node_modules/undici/lib/api/readable.js | 580 |
8 files changed, 0 insertions, 1542 deletions
diff --git a/vanilla/node_modules/undici/lib/api/abort-signal.js b/vanilla/node_modules/undici/lib/api/abort-signal.js deleted file mode 100644 index 608170b..0000000 --- a/vanilla/node_modules/undici/lib/api/abort-signal.js +++ /dev/null @@ -1,59 +0,0 @@ -'use strict' - -const { addAbortListener } = require('../core/util') -const { RequestAbortedError } = require('../core/errors') - -const kListener = Symbol('kListener') -const kSignal = Symbol('kSignal') - -function abort (self) { - if (self.abort) { - self.abort(self[kSignal]?.reason) - } else { - self.reason = self[kSignal]?.reason ?? new RequestAbortedError() - } - removeSignal(self) -} - -function addSignal (self, signal) { - self.reason = null - - self[kSignal] = null - self[kListener] = null - - if (!signal) { - return - } - - if (signal.aborted) { - abort(self) - return - } - - self[kSignal] = signal - self[kListener] = () => { - abort(self) - } - - addAbortListener(self[kSignal], self[kListener]) -} - -function removeSignal (self) { - if (!self[kSignal]) { - return - } - - if ('removeEventListener' in self[kSignal]) { - self[kSignal].removeEventListener('abort', self[kListener]) - } else { - self[kSignal].removeListener('abort', self[kListener]) - } - - self[kSignal] = null - self[kListener] = null -} - -module.exports = { - addSignal, - removeSignal -} diff --git a/vanilla/node_modules/undici/lib/api/api-connect.js b/vanilla/node_modules/undici/lib/api/api-connect.js deleted file mode 100644 index c8b86dd..0000000 --- a/vanilla/node_modules/undici/lib/api/api-connect.js +++ /dev/null @@ -1,110 +0,0 @@ -'use strict' - -const assert = require('node:assert') -const { AsyncResource } = require('node:async_hooks') -const { InvalidArgumentError, SocketError } = require('../core/errors') -const util = require('../core/util') -const { addSignal, removeSignal } = require('./abort-signal') - -class ConnectHandler extends AsyncResource { - constructor (opts, callback) { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('invalid opts') - } - - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') - } - - const { signal, opaque, responseHeaders } = opts - - if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { - throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') - } - - super('UNDICI_CONNECT') - - this.opaque = opaque || null - this.responseHeaders = responseHeaders || null - this.callback = callback - this.abort = null - - addSignal(this, signal) - } - - onConnect (abort, context) { - if (this.reason) { - abort(this.reason) - return - } - - assert(this.callback) - - this.abort = abort - this.context = context - } - - onHeaders () { - throw new SocketError('bad connect', null) - } - - onUpgrade (statusCode, rawHeaders, socket) { - const { callback, opaque, context } = this - - removeSignal(this) - - this.callback = null - - let headers = rawHeaders - // Indicates is an HTTP2Session - if (headers != null) { - headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - } - - this.runInAsyncScope(callback, null, null, { - statusCode, - headers, - socket, - opaque, - context - }) - } - - onError (err) { - const { callback, opaque } = this - - removeSignal(this) - - if (callback) { - this.callback = null - queueMicrotask(() => { - this.runInAsyncScope(callback, null, err, { opaque }) - }) - } - } -} - -function connect (opts, callback) { - if (callback === undefined) { - return new Promise((resolve, reject) => { - connect.call(this, opts, (err, data) => { - return err ? reject(err) : resolve(data) - }) - }) - } - - try { - const connectHandler = new ConnectHandler(opts, callback) - const connectOptions = { ...opts, method: 'CONNECT' } - - this.dispatch(connectOptions, connectHandler) - } catch (err) { - if (typeof callback !== 'function') { - throw err - } - const opaque = opts?.opaque - queueMicrotask(() => callback(err, { opaque })) - } -} - -module.exports = connect diff --git a/vanilla/node_modules/undici/lib/api/api-pipeline.js b/vanilla/node_modules/undici/lib/api/api-pipeline.js deleted file mode 100644 index 77f3520..0000000 --- a/vanilla/node_modules/undici/lib/api/api-pipeline.js +++ /dev/null @@ -1,252 +0,0 @@ -'use strict' - -const { - Readable, - Duplex, - PassThrough -} = require('node:stream') -const assert = require('node:assert') -const { AsyncResource } = require('node:async_hooks') -const { - InvalidArgumentError, - InvalidReturnValueError, - RequestAbortedError -} = require('../core/errors') -const util = require('../core/util') -const { addSignal, removeSignal } = require('./abort-signal') - -function noop () {} - -const kResume = Symbol('resume') - -class PipelineRequest extends Readable { - constructor () { - super({ autoDestroy: true }) - - this[kResume] = null - } - - _read () { - const { [kResume]: resume } = this - - if (resume) { - this[kResume] = null - resume() - } - } - - _destroy (err, callback) { - this._read() - - callback(err) - } -} - -class PipelineResponse extends Readable { - constructor (resume) { - super({ autoDestroy: true }) - this[kResume] = resume - } - - _read () { - this[kResume]() - } - - _destroy (err, callback) { - if (!err && !this._readableState.endEmitted) { - err = new RequestAbortedError() - } - - callback(err) - } -} - -class PipelineHandler extends AsyncResource { - constructor (opts, handler) { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('invalid opts') - } - - if (typeof handler !== 'function') { - throw new InvalidArgumentError('invalid handler') - } - - const { signal, method, opaque, onInfo, responseHeaders } = opts - - if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { - throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') - } - - if (method === 'CONNECT') { - throw new InvalidArgumentError('invalid method') - } - - if (onInfo && typeof onInfo !== 'function') { - throw new InvalidArgumentError('invalid onInfo callback') - } - - super('UNDICI_PIPELINE') - - this.opaque = opaque || null - this.responseHeaders = responseHeaders || null - this.handler = handler - this.abort = null - this.context = null - this.onInfo = onInfo || null - - this.req = new PipelineRequest().on('error', noop) - - this.ret = new Duplex({ - readableObjectMode: opts.objectMode, - autoDestroy: true, - read: () => { - const { body } = this - - if (body?.resume) { - body.resume() - } - }, - write: (chunk, encoding, callback) => { - const { req } = this - - if (req.push(chunk, encoding) || req._readableState.destroyed) { - callback() - } else { - req[kResume] = callback - } - }, - destroy: (err, callback) => { - const { body, req, res, ret, abort } = this - - if (!err && !ret._readableState.endEmitted) { - err = new RequestAbortedError() - } - - if (abort && err) { - abort() - } - - util.destroy(body, err) - util.destroy(req, err) - util.destroy(res, err) - - removeSignal(this) - - callback(err) - } - }).on('prefinish', () => { - const { req } = this - - // Node < 15 does not call _final in same tick. - req.push(null) - }) - - this.res = null - - addSignal(this, signal) - } - - onConnect (abort, context) { - const { res } = this - - if (this.reason) { - abort(this.reason) - return - } - - assert(!res, 'pipeline cannot be retried') - - this.abort = abort - this.context = context - } - - onHeaders (statusCode, rawHeaders, resume) { - const { opaque, handler, context } = this - - if (statusCode < 200) { - if (this.onInfo) { - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - this.onInfo({ statusCode, headers }) - } - return - } - - this.res = new PipelineResponse(resume) - - let body - try { - this.handler = null - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - body = this.runInAsyncScope(handler, null, { - statusCode, - headers, - opaque, - body: this.res, - context - }) - } catch (err) { - this.res.on('error', noop) - throw err - } - - if (!body || typeof body.on !== 'function') { - throw new InvalidReturnValueError('expected Readable') - } - - body - .on('data', (chunk) => { - const { ret, body } = this - - if (!ret.push(chunk) && body.pause) { - body.pause() - } - }) - .on('error', (err) => { - const { ret } = this - - util.destroy(ret, err) - }) - .on('end', () => { - const { ret } = this - - ret.push(null) - }) - .on('close', () => { - const { ret } = this - - if (!ret._readableState.ended) { - util.destroy(ret, new RequestAbortedError()) - } - }) - - this.body = body - } - - onData (chunk) { - const { res } = this - return res.push(chunk) - } - - onComplete (trailers) { - const { res } = this - res.push(null) - } - - onError (err) { - const { ret } = this - this.handler = null - util.destroy(ret, err) - } -} - -function pipeline (opts, handler) { - try { - const pipelineHandler = new PipelineHandler(opts, handler) - this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler) - return pipelineHandler.ret - } catch (err) { - return new PassThrough().destroy(err) - } -} - -module.exports = pipeline diff --git a/vanilla/node_modules/undici/lib/api/api-request.js b/vanilla/node_modules/undici/lib/api/api-request.js deleted file mode 100644 index f6d15f7..0000000 --- a/vanilla/node_modules/undici/lib/api/api-request.js +++ /dev/null @@ -1,214 +0,0 @@ -'use strict' - -const assert = require('node:assert') -const { AsyncResource } = require('node:async_hooks') -const { Readable } = require('./readable') -const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') -const util = require('../core/util') - -function noop () {} - -class RequestHandler extends AsyncResource { - constructor (opts, callback) { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('invalid opts') - } - - const { signal, method, opaque, body, onInfo, responseHeaders, highWaterMark } = opts - - try { - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') - } - - if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) { - throw new InvalidArgumentError('invalid highWaterMark') - } - - if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { - throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') - } - - if (method === 'CONNECT') { - throw new InvalidArgumentError('invalid method') - } - - if (onInfo && typeof onInfo !== 'function') { - throw new InvalidArgumentError('invalid onInfo callback') - } - - super('UNDICI_REQUEST') - } catch (err) { - if (util.isStream(body)) { - util.destroy(body.on('error', noop), err) - } - throw err - } - - this.method = method - this.responseHeaders = responseHeaders || null - this.opaque = opaque || null - this.callback = callback - this.res = null - this.abort = null - this.body = body - this.trailers = {} - this.context = null - this.onInfo = onInfo || null - this.highWaterMark = highWaterMark - this.reason = null - this.removeAbortListener = null - - if (signal?.aborted) { - this.reason = signal.reason ?? new RequestAbortedError() - } else if (signal) { - this.removeAbortListener = util.addAbortListener(signal, () => { - this.reason = signal.reason ?? new RequestAbortedError() - if (this.res) { - util.destroy(this.res.on('error', noop), this.reason) - } else if (this.abort) { - this.abort(this.reason) - } - }) - } - } - - onConnect (abort, context) { - if (this.reason) { - abort(this.reason) - return - } - - assert(this.callback) - - this.abort = abort - this.context = context - } - - onHeaders (statusCode, rawHeaders, resume, statusMessage) { - const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this - - const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - - if (statusCode < 200) { - if (this.onInfo) { - this.onInfo({ statusCode, headers }) - } - return - } - - const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers - const contentType = parsedHeaders['content-type'] - const contentLength = parsedHeaders['content-length'] - const res = new Readable({ - resume, - abort, - contentType, - contentLength: this.method !== 'HEAD' && contentLength - ? Number(contentLength) - : null, - highWaterMark - }) - - if (this.removeAbortListener) { - res.on('close', this.removeAbortListener) - this.removeAbortListener = null - } - - this.callback = null - this.res = res - if (callback !== null) { - try { - this.runInAsyncScope(callback, null, null, { - statusCode, - statusText: statusMessage, - headers, - trailers: this.trailers, - opaque, - body: res, - context - }) - } catch (err) { - // If the callback throws synchronously, we need to handle it - // Remove reference to res to allow res being garbage collected - this.res = null - - // Destroy the response stream - util.destroy(res.on('error', noop), err) - - // Use queueMicrotask to re-throw the error so it reaches uncaughtException - queueMicrotask(() => { - throw err - }) - } - } - } - - onData (chunk) { - return this.res.push(chunk) - } - - onComplete (trailers) { - util.parseHeaders(trailers, this.trailers) - this.res.push(null) - } - - onError (err) { - const { res, callback, body, opaque } = this - - if (callback) { - // TODO: Does this need queueMicrotask? - this.callback = null - queueMicrotask(() => { - this.runInAsyncScope(callback, null, err, { opaque }) - }) - } - - if (res) { - this.res = null - // Ensure all queued handlers are invoked before destroying res. - queueMicrotask(() => { - util.destroy(res.on('error', noop), err) - }) - } - - if (body) { - this.body = null - - if (util.isStream(body)) { - body.on('error', noop) - util.destroy(body, err) - } - } - - if (this.removeAbortListener) { - this.removeAbortListener() - this.removeAbortListener = null - } - } -} - -function request (opts, callback) { - if (callback === undefined) { - return new Promise((resolve, reject) => { - request.call(this, opts, (err, data) => { - return err ? reject(err) : resolve(data) - }) - }) - } - - try { - const handler = new RequestHandler(opts, callback) - - this.dispatch(opts, handler) - } catch (err) { - if (typeof callback !== 'function') { - throw err - } - const opaque = opts?.opaque - queueMicrotask(() => callback(err, { opaque })) - } -} - -module.exports = request -module.exports.RequestHandler = RequestHandler diff --git a/vanilla/node_modules/undici/lib/api/api-stream.js b/vanilla/node_modules/undici/lib/api/api-stream.js deleted file mode 100644 index 5d0b3fb..0000000 --- a/vanilla/node_modules/undici/lib/api/api-stream.js +++ /dev/null @@ -1,209 +0,0 @@ -'use strict' - -const assert = require('node:assert') -const { finished } = require('node:stream') -const { AsyncResource } = require('node:async_hooks') -const { InvalidArgumentError, InvalidReturnValueError } = require('../core/errors') -const util = require('../core/util') -const { addSignal, removeSignal } = require('./abort-signal') - -function noop () {} - -class StreamHandler extends AsyncResource { - constructor (opts, factory, callback) { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('invalid opts') - } - - const { signal, method, opaque, body, onInfo, responseHeaders } = opts - - try { - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') - } - - if (typeof factory !== 'function') { - throw new InvalidArgumentError('invalid factory') - } - - if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { - throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') - } - - if (method === 'CONNECT') { - throw new InvalidArgumentError('invalid method') - } - - if (onInfo && typeof onInfo !== 'function') { - throw new InvalidArgumentError('invalid onInfo callback') - } - - super('UNDICI_STREAM') - } catch (err) { - if (util.isStream(body)) { - util.destroy(body.on('error', noop), err) - } - throw err - } - - this.responseHeaders = responseHeaders || null - this.opaque = opaque || null - this.factory = factory - this.callback = callback - this.res = null - this.abort = null - this.context = null - this.trailers = null - this.body = body - this.onInfo = onInfo || null - - if (util.isStream(body)) { - body.on('error', (err) => { - this.onError(err) - }) - } - - addSignal(this, signal) - } - - onConnect (abort, context) { - if (this.reason) { - abort(this.reason) - return - } - - assert(this.callback) - - this.abort = abort - this.context = context - } - - onHeaders (statusCode, rawHeaders, resume, statusMessage) { - const { factory, opaque, context, responseHeaders } = this - - const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - - if (statusCode < 200) { - if (this.onInfo) { - this.onInfo({ statusCode, headers }) - } - return - } - - this.factory = null - - if (factory === null) { - return - } - - const res = this.runInAsyncScope(factory, null, { - statusCode, - headers, - opaque, - context - }) - - if ( - !res || - typeof res.write !== 'function' || - typeof res.end !== 'function' || - typeof res.on !== 'function' - ) { - throw new InvalidReturnValueError('expected Writable') - } - - // TODO: Avoid finished. It registers an unnecessary amount of listeners. - finished(res, { readable: false }, (err) => { - const { callback, res, opaque, trailers, abort } = this - - this.res = null - if (err || !res?.readable) { - util.destroy(res, err) - } - - this.callback = null - this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) - - if (err) { - abort() - } - }) - - res.on('drain', resume) - - this.res = res - - const needDrain = res.writableNeedDrain !== undefined - ? res.writableNeedDrain - : res._writableState?.needDrain - - return needDrain !== true - } - - onData (chunk) { - const { res } = this - - return res ? res.write(chunk) : true - } - - onComplete (trailers) { - const { res } = this - - removeSignal(this) - - if (!res) { - return - } - - this.trailers = util.parseHeaders(trailers) - - res.end() - } - - onError (err) { - const { res, callback, opaque, body } = this - - removeSignal(this) - - this.factory = null - - if (res) { - this.res = null - util.destroy(res, err) - } else if (callback) { - this.callback = null - queueMicrotask(() => { - this.runInAsyncScope(callback, null, err, { opaque }) - }) - } - - if (body) { - this.body = null - util.destroy(body, err) - } - } -} - -function stream (opts, factory, callback) { - if (callback === undefined) { - return new Promise((resolve, reject) => { - stream.call(this, opts, factory, (err, data) => { - return err ? reject(err) : resolve(data) - }) - }) - } - - try { - const handler = new StreamHandler(opts, factory, callback) - - this.dispatch(opts, handler) - } catch (err) { - if (typeof callback !== 'function') { - throw err - } - const opaque = opts?.opaque - queueMicrotask(() => callback(err, { opaque })) - } -} - -module.exports = stream diff --git a/vanilla/node_modules/undici/lib/api/api-upgrade.js b/vanilla/node_modules/undici/lib/api/api-upgrade.js deleted file mode 100644 index 2b03f20..0000000 --- a/vanilla/node_modules/undici/lib/api/api-upgrade.js +++ /dev/null @@ -1,111 +0,0 @@ -'use strict' - -const { InvalidArgumentError, SocketError } = require('../core/errors') -const { AsyncResource } = require('node:async_hooks') -const assert = require('node:assert') -const util = require('../core/util') -const { kHTTP2Stream } = require('../core/symbols') -const { addSignal, removeSignal } = require('./abort-signal') - -class UpgradeHandler extends AsyncResource { - constructor (opts, callback) { - if (!opts || typeof opts !== 'object') { - throw new InvalidArgumentError('invalid opts') - } - - if (typeof callback !== 'function') { - throw new InvalidArgumentError('invalid callback') - } - - const { signal, opaque, responseHeaders } = opts - - if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { - throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') - } - - super('UNDICI_UPGRADE') - - this.responseHeaders = responseHeaders || null - this.opaque = opaque || null - this.callback = callback - this.abort = null - this.context = null - - addSignal(this, signal) - } - - onConnect (abort, context) { - if (this.reason) { - abort(this.reason) - return - } - - assert(this.callback) - - this.abort = abort - this.context = null - } - - onHeaders () { - throw new SocketError('bad upgrade', null) - } - - onUpgrade (statusCode, rawHeaders, socket) { - assert(socket[kHTTP2Stream] === true ? statusCode === 200 : statusCode === 101) - - const { callback, opaque, context } = this - - removeSignal(this) - - this.callback = null - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) - this.runInAsyncScope(callback, null, null, { - headers, - socket, - opaque, - context - }) - } - - onError (err) { - const { callback, opaque } = this - - removeSignal(this) - - if (callback) { - this.callback = null - queueMicrotask(() => { - this.runInAsyncScope(callback, null, err, { opaque }) - }) - } - } -} - -function upgrade (opts, callback) { - if (callback === undefined) { - return new Promise((resolve, reject) => { - upgrade.call(this, opts, (err, data) => { - return err ? reject(err) : resolve(data) - }) - }) - } - - try { - const upgradeHandler = new UpgradeHandler(opts, callback) - const upgradeOpts = { - ...opts, - method: opts.method || 'GET', - upgrade: opts.protocol || 'Websocket' - } - - this.dispatch(upgradeOpts, upgradeHandler) - } catch (err) { - if (typeof callback !== 'function') { - throw err - } - const opaque = opts?.opaque - queueMicrotask(() => callback(err, { opaque })) - } -} - -module.exports = upgrade diff --git a/vanilla/node_modules/undici/lib/api/index.js b/vanilla/node_modules/undici/lib/api/index.js deleted file mode 100644 index 8983a5e..0000000 --- a/vanilla/node_modules/undici/lib/api/index.js +++ /dev/null @@ -1,7 +0,0 @@ -'use strict' - -module.exports.request = require('./api-request') -module.exports.stream = require('./api-stream') -module.exports.pipeline = require('./api-pipeline') -module.exports.upgrade = require('./api-upgrade') -module.exports.connect = require('./api-connect') diff --git a/vanilla/node_modules/undici/lib/api/readable.js b/vanilla/node_modules/undici/lib/api/readable.js deleted file mode 100644 index cdede95..0000000 --- a/vanilla/node_modules/undici/lib/api/readable.js +++ /dev/null @@ -1,580 +0,0 @@ -'use strict' - -const assert = require('node:assert') -const { Readable } = require('node:stream') -const { RequestAbortedError, NotSupportedError, InvalidArgumentError, AbortError } = require('../core/errors') -const util = require('../core/util') -const { ReadableStreamFrom } = require('../core/util') - -const kConsume = Symbol('kConsume') -const kReading = Symbol('kReading') -const kBody = Symbol('kBody') -const kAbort = Symbol('kAbort') -const kContentType = Symbol('kContentType') -const kContentLength = Symbol('kContentLength') -const kUsed = Symbol('kUsed') -const kBytesRead = Symbol('kBytesRead') - -const noop = () => {} - -/** - * @class - * @extends {Readable} - * @see https://fetch.spec.whatwg.org/#body - */ -class BodyReadable extends Readable { - /** - * @param {object} opts - * @param {(this: Readable, size: number) => void} opts.resume - * @param {() => (void | null)} opts.abort - * @param {string} [opts.contentType = ''] - * @param {number} [opts.contentLength] - * @param {number} [opts.highWaterMark = 64 * 1024] - */ - constructor ({ - resume, - abort, - contentType = '', - contentLength, - highWaterMark = 64 * 1024 // Same as nodejs fs streams. - }) { - super({ - autoDestroy: true, - read: resume, - highWaterMark - }) - - this._readableState.dataEmitted = false - - this[kAbort] = abort - - /** @type {Consume | null} */ - this[kConsume] = null - - /** @type {number} */ - this[kBytesRead] = 0 - - /** @type {ReadableStream|null} */ - this[kBody] = null - - /** @type {boolean} */ - this[kUsed] = false - - /** @type {string} */ - this[kContentType] = contentType - - /** @type {number|null} */ - this[kContentLength] = Number.isFinite(contentLength) ? contentLength : null - - /** - * Is stream being consumed through Readable API? - * This is an optimization so that we avoid checking - * for 'data' and 'readable' listeners in the hot path - * inside push(). - * - * @type {boolean} - */ - this[kReading] = false - } - - /** - * @param {Error|null} err - * @param {(error:(Error|null)) => void} callback - * @returns {void} - */ - _destroy (err, callback) { - if (!err && !this._readableState.endEmitted) { - err = new RequestAbortedError() - } - - if (err) { - this[kAbort]() - } - - // Workaround for Node "bug". If the stream is destroyed in same - // tick as it is created, then a user who is waiting for a - // promise (i.e micro tick) for installing an 'error' listener will - // never get a chance and will always encounter an unhandled exception. - if (!this[kUsed]) { - setImmediate(callback, err) - } else { - callback(err) - } - } - - /** - * @param {string|symbol} event - * @param {(...args: any[]) => void} listener - * @returns {this} - */ - on (event, listener) { - if (event === 'data' || event === 'readable') { - this[kReading] = true - this[kUsed] = true - } - return super.on(event, listener) - } - - /** - * @param {string|symbol} event - * @param {(...args: any[]) => void} listener - * @returns {this} - */ - addListener (event, listener) { - return this.on(event, listener) - } - - /** - * @param {string|symbol} event - * @param {(...args: any[]) => void} listener - * @returns {this} - */ - off (event, listener) { - const ret = super.off(event, listener) - if (event === 'data' || event === 'readable') { - this[kReading] = ( - this.listenerCount('data') > 0 || - this.listenerCount('readable') > 0 - ) - } - return ret - } - - /** - * @param {string|symbol} event - * @param {(...args: any[]) => void} listener - * @returns {this} - */ - removeListener (event, listener) { - return this.off(event, listener) - } - - /** - * @param {Buffer|null} chunk - * @returns {boolean} - */ - push (chunk) { - if (chunk) { - this[kBytesRead] += chunk.length - if (this[kConsume]) { - consumePush(this[kConsume], chunk) - return this[kReading] ? super.push(chunk) : true - } - } - - return super.push(chunk) - } - - /** - * Consumes and returns the body as a string. - * - * @see https://fetch.spec.whatwg.org/#dom-body-text - * @returns {Promise<string>} - */ - text () { - return consume(this, 'text') - } - - /** - * Consumes and returns the body as a JavaScript Object. - * - * @see https://fetch.spec.whatwg.org/#dom-body-json - * @returns {Promise<unknown>} - */ - json () { - return consume(this, 'json') - } - - /** - * Consumes and returns the body as a Blob - * - * @see https://fetch.spec.whatwg.org/#dom-body-blob - * @returns {Promise<Blob>} - */ - blob () { - return consume(this, 'blob') - } - - /** - * Consumes and returns the body as an Uint8Array. - * - * @see https://fetch.spec.whatwg.org/#dom-body-bytes - * @returns {Promise<Uint8Array>} - */ - bytes () { - return consume(this, 'bytes') - } - - /** - * Consumes and returns the body as an ArrayBuffer. - * - * @see https://fetch.spec.whatwg.org/#dom-body-arraybuffer - * @returns {Promise<ArrayBuffer>} - */ - arrayBuffer () { - return consume(this, 'arrayBuffer') - } - - /** - * Not implemented - * - * @see https://fetch.spec.whatwg.org/#dom-body-formdata - * @throws {NotSupportedError} - */ - async formData () { - // TODO: Implement. - throw new NotSupportedError() - } - - /** - * Returns true if the body is not null and the body has been consumed. - * Otherwise, returns false. - * - * @see https://fetch.spec.whatwg.org/#dom-body-bodyused - * @readonly - * @returns {boolean} - */ - get bodyUsed () { - return util.isDisturbed(this) - } - - /** - * @see https://fetch.spec.whatwg.org/#dom-body-body - * @readonly - * @returns {ReadableStream} - */ - get body () { - if (!this[kBody]) { - this[kBody] = ReadableStreamFrom(this) - if (this[kConsume]) { - // TODO: Is this the best way to force a lock? - this[kBody].getReader() // Ensure stream is locked. - assert(this[kBody].locked) - } - } - return this[kBody] - } - - /** - * Dumps the response body by reading `limit` number of bytes. - * @param {object} opts - * @param {number} [opts.limit = 131072] Number of bytes to read. - * @param {AbortSignal} [opts.signal] An AbortSignal to cancel the dump. - * @returns {Promise<null>} - */ - dump (opts) { - const signal = opts?.signal - - if (signal != null && (typeof signal !== 'object' || !('aborted' in signal))) { - return Promise.reject(new InvalidArgumentError('signal must be an AbortSignal')) - } - - const limit = opts?.limit && Number.isFinite(opts.limit) - ? opts.limit - : 128 * 1024 - - if (signal?.aborted) { - return Promise.reject(signal.reason ?? new AbortError()) - } - - if (this._readableState.closeEmitted) { - return Promise.resolve(null) - } - - return new Promise((resolve, reject) => { - if ( - (this[kContentLength] && (this[kContentLength] > limit)) || - this[kBytesRead] > limit - ) { - this.destroy(new AbortError()) - } - - if (signal) { - const onAbort = () => { - this.destroy(signal.reason ?? new AbortError()) - } - signal.addEventListener('abort', onAbort) - this - .on('close', function () { - signal.removeEventListener('abort', onAbort) - if (signal.aborted) { - reject(signal.reason ?? new AbortError()) - } else { - resolve(null) - } - }) - } else { - this.on('close', resolve) - } - - this - .on('error', noop) - .on('data', () => { - if (this[kBytesRead] > limit) { - this.destroy() - } - }) - .resume() - }) - } - - /** - * @param {BufferEncoding} encoding - * @returns {this} - */ - setEncoding (encoding) { - if (Buffer.isEncoding(encoding)) { - this._readableState.encoding = encoding - } - return this - } -} - -/** - * @see https://streams.spec.whatwg.org/#readablestream-locked - * @param {BodyReadable} bodyReadable - * @returns {boolean} - */ -function isLocked (bodyReadable) { - // Consume is an implicit lock. - return bodyReadable[kBody]?.locked === true || bodyReadable[kConsume] !== null -} - -/** - * @see https://fetch.spec.whatwg.org/#body-unusable - * @param {BodyReadable} bodyReadable - * @returns {boolean} - */ -function isUnusable (bodyReadable) { - return util.isDisturbed(bodyReadable) || isLocked(bodyReadable) -} - -/** - * @typedef {'text' | 'json' | 'blob' | 'bytes' | 'arrayBuffer'} ConsumeType - */ - -/** - * @template {ConsumeType} T - * @typedef {T extends 'text' ? string : - * T extends 'json' ? unknown : - * T extends 'blob' ? Blob : - * T extends 'arrayBuffer' ? ArrayBuffer : - * T extends 'bytes' ? Uint8Array : - * never - * } ConsumeReturnType - */ -/** - * @typedef {object} Consume - * @property {ConsumeType} type - * @property {BodyReadable} stream - * @property {((value?: any) => void)} resolve - * @property {((err: Error) => void)} reject - * @property {number} length - * @property {Buffer[]} body - */ - -/** - * @template {ConsumeType} T - * @param {BodyReadable} stream - * @param {T} type - * @returns {Promise<ConsumeReturnType<T>>} - */ -function consume (stream, type) { - assert(!stream[kConsume]) - - return new Promise((resolve, reject) => { - if (isUnusable(stream)) { - const rState = stream._readableState - if (rState.destroyed && rState.closeEmitted === false) { - stream - .on('error', reject) - .on('close', () => { - reject(new TypeError('unusable')) - }) - } else { - reject(rState.errored ?? new TypeError('unusable')) - } - } else { - queueMicrotask(() => { - stream[kConsume] = { - type, - stream, - resolve, - reject, - length: 0, - body: [] - } - - stream - .on('error', function (err) { - consumeFinish(this[kConsume], err) - }) - .on('close', function () { - if (this[kConsume].body !== null) { - consumeFinish(this[kConsume], new RequestAbortedError()) - } - }) - - consumeStart(stream[kConsume]) - }) - } - }) -} - -/** - * @param {Consume} consume - * @returns {void} - */ -function consumeStart (consume) { - if (consume.body === null) { - return - } - - const { _readableState: state } = consume.stream - - if (state.bufferIndex) { - const start = state.bufferIndex - const end = state.buffer.length - for (let n = start; n < end; n++) { - consumePush(consume, state.buffer[n]) - } - } else { - for (const chunk of state.buffer) { - consumePush(consume, chunk) - } - } - - if (state.endEmitted) { - consumeEnd(this[kConsume], this._readableState.encoding) - } else { - consume.stream.on('end', function () { - consumeEnd(this[kConsume], this._readableState.encoding) - }) - } - - consume.stream.resume() - - while (consume.stream.read() != null) { - // Loop - } -} - -/** - * @param {Buffer[]} chunks - * @param {number} length - * @param {BufferEncoding} [encoding='utf8'] - * @returns {string} - */ -function chunksDecode (chunks, length, encoding) { - if (chunks.length === 0 || length === 0) { - return '' - } - const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks, length) - const bufferLength = buffer.length - - // Skip BOM. - const start = - bufferLength > 2 && - buffer[0] === 0xef && - buffer[1] === 0xbb && - buffer[2] === 0xbf - ? 3 - : 0 - if (!encoding || encoding === 'utf8' || encoding === 'utf-8') { - return buffer.utf8Slice(start, bufferLength) - } else { - return buffer.subarray(start, bufferLength).toString(encoding) - } -} - -/** - * @param {Buffer[]} chunks - * @param {number} length - * @returns {Uint8Array} - */ -function chunksConcat (chunks, length) { - if (chunks.length === 0 || length === 0) { - return new Uint8Array(0) - } - if (chunks.length === 1) { - // fast-path - return new Uint8Array(chunks[0]) - } - const buffer = new Uint8Array(Buffer.allocUnsafeSlow(length).buffer) - - let offset = 0 - for (let i = 0; i < chunks.length; ++i) { - const chunk = chunks[i] - buffer.set(chunk, offset) - offset += chunk.length - } - - return buffer -} - -/** - * @param {Consume} consume - * @param {BufferEncoding} encoding - * @returns {void} - */ -function consumeEnd (consume, encoding) { - const { type, body, resolve, stream, length } = consume - - try { - if (type === 'text') { - resolve(chunksDecode(body, length, encoding)) - } else if (type === 'json') { - resolve(JSON.parse(chunksDecode(body, length, encoding))) - } else if (type === 'arrayBuffer') { - resolve(chunksConcat(body, length).buffer) - } else if (type === 'blob') { - resolve(new Blob(body, { type: stream[kContentType] })) - } else if (type === 'bytes') { - resolve(chunksConcat(body, length)) - } - - consumeFinish(consume) - } catch (err) { - stream.destroy(err) - } -} - -/** - * @param {Consume} consume - * @param {Buffer} chunk - * @returns {void} - */ -function consumePush (consume, chunk) { - consume.length += chunk.length - consume.body.push(chunk) -} - -/** - * @param {Consume} consume - * @param {Error} [err] - * @returns {void} - */ -function consumeFinish (consume, err) { - if (consume.body === null) { - return - } - - if (err) { - consume.reject(err) - } else { - consume.resolve() - } - - // Reset the consume object to allow for garbage collection. - consume.type = null - consume.stream = null - consume.resolve = null - consume.reject = null - consume.length = 0 - consume.body = null -} - -module.exports = { - Readable: BodyReadable, - chunksDecode -} |
