diff options
Diffstat (limited to 'vanilla/node_modules/undici/lib/interceptor')
8 files changed, 1582 insertions, 0 deletions
diff --git a/vanilla/node_modules/undici/lib/interceptor/cache.js b/vanilla/node_modules/undici/lib/interceptor/cache.js new file mode 100644 index 0000000..81d7cb1 --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/cache.js @@ -0,0 +1,495 @@ +'use strict' + +const assert = require('node:assert') +const { Readable } = require('node:stream') +const util = require('../core/util') +const CacheHandler = require('../handler/cache-handler') +const MemoryCacheStore = require('../cache/memory-cache-store') +const CacheRevalidationHandler = require('../handler/cache-revalidation-handler') +const { assertCacheStore, assertCacheMethods, makeCacheKey, normalizeHeaders, parseCacheControlHeader } = require('../util/cache.js') +const { AbortError } = require('../core/errors.js') + +/** + * @param {(string | RegExp)[] | undefined} origins + * @param {string} name + */ +function assertCacheOrigins (origins, name) { + if (origins === undefined) return + if (!Array.isArray(origins)) { + throw new TypeError(`expected ${name} to be an array or undefined, got ${typeof origins}`) + } + for (let i = 0; i < origins.length; i++) { + const origin = origins[i] + if (typeof origin !== 'string' && !(origin instanceof RegExp)) { + throw new TypeError(`expected ${name}[${i}] to be a string or RegExp, got ${typeof origin}`) + } + } +} + +const nop = () => {} + +/** + * @typedef {(options: import('../../types/dispatcher.d.ts').default.DispatchOptions, handler: import('../../types/dispatcher.d.ts').default.DispatchHandler) => void} DispatchFn + */ + +/** + * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result + * @param {import('../../types/cache-interceptor.d.ts').default.CacheControlDirectives | undefined} cacheControlDirectives + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} opts + * @returns {boolean} + */ +function needsRevalidation (result, cacheControlDirectives, { headers = {} }) { + // Always revalidate requests with the no-cache request directive. + if (cacheControlDirectives?.['no-cache']) { + return true + } + + // Always revalidate requests with unqualified no-cache response directive. + if (result.cacheControlDirectives?.['no-cache'] && !Array.isArray(result.cacheControlDirectives['no-cache'])) { + return true + } + + // Always revalidate requests with conditional headers. + if (headers['if-modified-since'] || headers['if-none-match']) { + return true + } + + return false +} + +/** + * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result + * @param {import('../../types/cache-interceptor.d.ts').default.CacheControlDirectives | undefined} cacheControlDirectives + * @returns {boolean} + */ +function isStale (result, cacheControlDirectives) { + const now = Date.now() + if (now > result.staleAt) { + // Response is stale + if (cacheControlDirectives?.['max-stale']) { + // There's a threshold where we can serve stale responses, let's see if + // we're in it + // https://www.rfc-editor.org/rfc/rfc9111.html#name-max-stale + const gracePeriod = result.staleAt + (cacheControlDirectives['max-stale'] * 1000) + return now > gracePeriod + } + + return true + } + + if (cacheControlDirectives?.['min-fresh']) { + // https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.3 + + // At this point, staleAt is always > now + const timeLeftTillStale = result.staleAt - now + const threshold = cacheControlDirectives['min-fresh'] * 1000 + + return timeLeftTillStale <= threshold + } + + return false +} + +/** + * Check if we're within the stale-while-revalidate window for a stale response + * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result + * @returns {boolean} + */ +function withinStaleWhileRevalidateWindow (result) { + const staleWhileRevalidate = result.cacheControlDirectives?.['stale-while-revalidate'] + if (!staleWhileRevalidate) { + return false + } + + const now = Date.now() + const staleWhileRevalidateExpiry = result.staleAt + (staleWhileRevalidate * 1000) + return now <= staleWhileRevalidateExpiry +} + +/** + * @param {DispatchFn} dispatch + * @param {import('../../types/cache-interceptor.d.ts').default.CacheHandlerOptions} globalOpts + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} cacheKey + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandler} handler + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} opts + * @param {import('../../types/cache-interceptor.d.ts').default.CacheControlDirectives | undefined} reqCacheControl + */ +function handleUncachedResponse ( + dispatch, + globalOpts, + cacheKey, + handler, + opts, + reqCacheControl +) { + if (reqCacheControl?.['only-if-cached']) { + let aborted = false + try { + if (typeof handler.onConnect === 'function') { + handler.onConnect(() => { + aborted = true + }) + + if (aborted) { + return + } + } + + if (typeof handler.onHeaders === 'function') { + handler.onHeaders(504, [], nop, 'Gateway Timeout') + if (aborted) { + return + } + } + + if (typeof handler.onComplete === 'function') { + handler.onComplete([]) + } + } catch (err) { + if (typeof handler.onError === 'function') { + handler.onError(err) + } + } + + return true + } + + return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) +} + +/** + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandler} handler + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} opts + * @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result + * @param {number} age + * @param {any} context + * @param {boolean} isStale + */ +function sendCachedValue (handler, opts, result, age, context, isStale) { + // TODO (perf): Readable.from path can be optimized... + const stream = util.isStream(result.body) + ? result.body + : Readable.from(result.body ?? []) + + assert(!stream.destroyed, 'stream should not be destroyed') + assert(!stream.readableDidRead, 'stream should not be readableDidRead') + + const controller = { + resume () { + stream.resume() + }, + pause () { + stream.pause() + }, + get paused () { + return stream.isPaused() + }, + get aborted () { + return stream.destroyed + }, + get reason () { + return stream.errored + }, + abort (reason) { + stream.destroy(reason ?? new AbortError()) + } + } + + stream + .on('error', function (err) { + if (!this.readableEnded) { + if (typeof handler.onResponseError === 'function') { + handler.onResponseError(controller, err) + } else { + throw err + } + } + }) + .on('close', function () { + if (!this.errored) { + handler.onResponseEnd?.(controller, {}) + } + }) + + handler.onRequestStart?.(controller, context) + + if (stream.destroyed) { + return + } + + // Add the age header + // https://www.rfc-editor.org/rfc/rfc9111.html#name-age + const headers = { ...result.headers, age: String(age) } + + if (isStale) { + // Add warning header + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Warning + headers.warning = '110 - "response is stale"' + } + + handler.onResponseStart?.(controller, result.statusCode, headers, result.statusMessage) + + if (opts.method === 'HEAD') { + stream.destroy() + } else { + stream.on('data', function (chunk) { + handler.onResponseData?.(controller, chunk) + }) + } +} + +/** + * @param {DispatchFn} dispatch + * @param {import('../../types/cache-interceptor.d.ts').default.CacheHandlerOptions} globalOpts + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} cacheKey + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandler} handler + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} opts + * @param {import('../../types/cache-interceptor.d.ts').default.CacheControlDirectives | undefined} reqCacheControl + * @param {import('../../types/cache-interceptor.d.ts').default.GetResult | undefined} result + */ +function handleResult ( + dispatch, + globalOpts, + cacheKey, + handler, + opts, + reqCacheControl, + result +) { + if (!result) { + return handleUncachedResponse(dispatch, globalOpts, cacheKey, handler, opts, reqCacheControl) + } + + const now = Date.now() + if (now > result.deleteAt) { + // Response is expired, cache store shouldn't have given this to us + return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) + } + + const age = Math.round((now - result.cachedAt) / 1000) + if (reqCacheControl?.['max-age'] && age >= reqCacheControl['max-age']) { + // Response is considered expired for this specific request + // https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.1 + return dispatch(opts, handler) + } + + const stale = isStale(result, reqCacheControl) + const revalidate = needsRevalidation(result, reqCacheControl, opts) + + // Check if the response is stale + if (stale || revalidate) { + if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) { + // If body is a stream we can't revalidate... + // TODO (fix): This could be less strict... + return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler)) + } + + // RFC 5861: If we're within stale-while-revalidate window, serve stale immediately + // and revalidate in background, unless immediate revalidation is necessary + if (!revalidate && withinStaleWhileRevalidateWindow(result)) { + // Serve stale response immediately + sendCachedValue(handler, opts, result, age, null, true) + + // Start background revalidation (fire-and-forget) + queueMicrotask(() => { + const headers = { + ...opts.headers, + 'if-modified-since': new Date(result.cachedAt).toUTCString() + } + + if (result.etag) { + headers['if-none-match'] = result.etag + } + + if (result.vary) { + for (const key in result.vary) { + if (result.vary[key] != null) { + headers[key] = result.vary[key] + } + } + } + + // Background revalidation - update cache if we get new data + dispatch( + { + ...opts, + headers + }, + new CacheHandler(globalOpts, cacheKey, { + // Silent handler that just updates the cache + onRequestStart () {}, + onRequestUpgrade () {}, + onResponseStart () {}, + onResponseData () {}, + onResponseEnd () {}, + onResponseError () {} + }) + ) + }) + + return true + } + + let withinStaleIfErrorThreshold = false + const staleIfErrorExpiry = result.cacheControlDirectives['stale-if-error'] ?? reqCacheControl?.['stale-if-error'] + if (staleIfErrorExpiry) { + withinStaleIfErrorThreshold = now < (result.staleAt + (staleIfErrorExpiry * 1000)) + } + + const headers = { + ...opts.headers, + 'if-modified-since': new Date(result.cachedAt).toUTCString() + } + + if (result.etag) { + headers['if-none-match'] = result.etag + } + + if (result.vary) { + for (const key in result.vary) { + if (result.vary[key] != null) { + headers[key] = result.vary[key] + } + } + } + + // We need to revalidate the response + return dispatch( + { + ...opts, + headers + }, + new CacheRevalidationHandler( + (success, context) => { + if (success) { + // TODO: successful revalidation should be considered fresh (not give stale warning). + sendCachedValue(handler, opts, result, age, context, stale) + } else if (util.isStream(result.body)) { + result.body.on('error', nop).destroy() + } + }, + new CacheHandler(globalOpts, cacheKey, handler), + withinStaleIfErrorThreshold + ) + ) + } + + // Dump request body. + if (util.isStream(opts.body)) { + opts.body.on('error', nop).destroy() + } + + sendCachedValue(handler, opts, result, age, null, false) +} + +/** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} [opts] + * @returns {import('../../types/dispatcher.d.ts').default.DispatcherComposeInterceptor} + */ +module.exports = (opts = {}) => { + const { + store = new MemoryCacheStore(), + methods = ['GET'], + cacheByDefault = undefined, + type = 'shared', + origins = undefined + } = opts + + if (typeof opts !== 'object' || opts === null) { + throw new TypeError(`expected type of opts to be an Object, got ${opts === null ? 'null' : typeof opts}`) + } + + assertCacheStore(store, 'opts.store') + assertCacheMethods(methods, 'opts.methods') + assertCacheOrigins(origins, 'opts.origins') + + if (typeof cacheByDefault !== 'undefined' && typeof cacheByDefault !== 'number') { + throw new TypeError(`expected opts.cacheByDefault to be number or undefined, got ${typeof cacheByDefault}`) + } + + if (typeof type !== 'undefined' && type !== 'shared' && type !== 'private') { + throw new TypeError(`expected opts.type to be shared, private, or undefined, got ${typeof type}`) + } + + const globalOpts = { + store, + methods, + cacheByDefault, + type + } + + const safeMethodsToNotCache = util.safeHTTPMethods.filter(method => methods.includes(method) === false) + + return dispatch => { + return (opts, handler) => { + if (!opts.origin || safeMethodsToNotCache.includes(opts.method)) { + // Not a method we want to cache or we don't have the origin, skip + return dispatch(opts, handler) + } + + // Check if origin is in whitelist + if (origins !== undefined) { + const requestOrigin = opts.origin.toString().toLowerCase() + let isAllowed = false + + for (let i = 0; i < origins.length; i++) { + const allowed = origins[i] + if (typeof allowed === 'string') { + if (allowed.toLowerCase() === requestOrigin) { + isAllowed = true + break + } + } else if (allowed.test(requestOrigin)) { + isAllowed = true + break + } + } + + if (!isAllowed) { + return dispatch(opts, handler) + } + } + + opts = { + ...opts, + headers: normalizeHeaders(opts) + } + + const reqCacheControl = opts.headers?.['cache-control'] + ? parseCacheControlHeader(opts.headers['cache-control']) + : undefined + + if (reqCacheControl?.['no-store']) { + return dispatch(opts, handler) + } + + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheKey} + */ + const cacheKey = makeCacheKey(opts) + const result = store.get(cacheKey) + + if (result && typeof result.then === 'function') { + return result + .then(result => handleResult(dispatch, + globalOpts, + cacheKey, + handler, + opts, + reqCacheControl, + result + )) + } else { + return handleResult( + dispatch, + globalOpts, + cacheKey, + handler, + opts, + reqCacheControl, + result + ) + } + } + } +} diff --git a/vanilla/node_modules/undici/lib/interceptor/decompress.js b/vanilla/node_modules/undici/lib/interceptor/decompress.js new file mode 100644 index 0000000..ee4202a --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/decompress.js @@ -0,0 +1,259 @@ +'use strict' + +const { createInflate, createGunzip, createBrotliDecompress, createZstdDecompress } = require('node:zlib') +const { pipeline } = require('node:stream') +const DecoratorHandler = require('../handler/decorator-handler') +const { runtimeFeatures } = require('../util/runtime-features') + +/** @typedef {import('node:stream').Transform} Transform */ +/** @typedef {import('node:stream').Transform} Controller */ +/** @typedef {Transform&import('node:zlib').Zlib} DecompressorStream */ + +/** @type {Record<string, () => DecompressorStream>} */ +const supportedEncodings = { + gzip: createGunzip, + 'x-gzip': createGunzip, + br: createBrotliDecompress, + deflate: createInflate, + compress: createInflate, + 'x-compress': createInflate, + ...(runtimeFeatures.has('zstd') ? { zstd: createZstdDecompress } : {}) +} + +const defaultSkipStatusCodes = /** @type {const} */ ([204, 304]) + +let warningEmitted = /** @type {boolean} */ (false) + +/** + * @typedef {Object} DecompressHandlerOptions + * @property {number[]|Readonly<number[]>} [skipStatusCodes=[204, 304]] - List of status codes to skip decompression for + * @property {boolean} [skipErrorResponses] - Whether to skip decompression for error responses (status codes >= 400) + */ + +class DecompressHandler extends DecoratorHandler { + /** @type {Transform[]} */ + #decompressors = [] + /** @type {Readonly<number[]>} */ + #skipStatusCodes + /** @type {boolean} */ + #skipErrorResponses + + constructor (handler, { skipStatusCodes = defaultSkipStatusCodes, skipErrorResponses = true } = {}) { + super(handler) + this.#skipStatusCodes = skipStatusCodes + this.#skipErrorResponses = skipErrorResponses + } + + /** + * Determines if decompression should be skipped based on encoding and status code + * @param {string} contentEncoding - Content-Encoding header value + * @param {number} statusCode - HTTP status code of the response + * @returns {boolean} - True if decompression should be skipped + */ + #shouldSkipDecompression (contentEncoding, statusCode) { + if (!contentEncoding || statusCode < 200) return true + if (this.#skipStatusCodes.includes(statusCode)) return true + if (this.#skipErrorResponses && statusCode >= 400) return true + return false + } + + /** + * Creates a chain of decompressors for multiple content encodings + * + * @param {string} encodings - Comma-separated list of content encodings + * @returns {Array<DecompressorStream>} - Array of decompressor streams + * @throws {Error} - If the number of content-encodings exceeds the maximum allowed + */ + #createDecompressionChain (encodings) { + const parts = encodings.split(',') + + // Limit the number of content-encodings to prevent resource exhaustion. + // CVE fix similar to urllib3 (GHSA-gm62-xv2j-4w53) and curl (CVE-2022-32206). + const maxContentEncodings = 5 + if (parts.length > maxContentEncodings) { + throw new Error(`too many content-encodings in response: ${parts.length}, maximum allowed is ${maxContentEncodings}`) + } + + /** @type {DecompressorStream[]} */ + const decompressors = [] + + for (let i = parts.length - 1; i >= 0; i--) { + const encoding = parts[i].trim() + if (!encoding) continue + + if (!supportedEncodings[encoding]) { + decompressors.length = 0 // Clear if unsupported encoding + return decompressors // Unsupported encoding + } + + decompressors.push(supportedEncodings[encoding]()) + } + + return decompressors + } + + /** + * Sets up event handlers for a decompressor stream using readable events + * @param {DecompressorStream} decompressor - The decompressor stream + * @param {Controller} controller - The controller to coordinate with + * @returns {void} + */ + #setupDecompressorEvents (decompressor, controller) { + decompressor.on('readable', () => { + let chunk + while ((chunk = decompressor.read()) !== null) { + const result = super.onResponseData(controller, chunk) + if (result === false) { + break + } + } + }) + + decompressor.on('error', (error) => { + super.onResponseError(controller, error) + }) + } + + /** + * Sets up event handling for a single decompressor + * @param {Controller} controller - The controller to handle events + * @returns {void} + */ + #setupSingleDecompressor (controller) { + const decompressor = this.#decompressors[0] + this.#setupDecompressorEvents(decompressor, controller) + + decompressor.on('end', () => { + super.onResponseEnd(controller, {}) + }) + } + + /** + * Sets up event handling for multiple chained decompressors using pipeline + * @param {Controller} controller - The controller to handle events + * @returns {void} + */ + #setupMultipleDecompressors (controller) { + const lastDecompressor = this.#decompressors[this.#decompressors.length - 1] + this.#setupDecompressorEvents(lastDecompressor, controller) + + pipeline(this.#decompressors, (err) => { + if (err) { + super.onResponseError(controller, err) + return + } + super.onResponseEnd(controller, {}) + }) + } + + /** + * Cleans up decompressor references to prevent memory leaks + * @returns {void} + */ + #cleanupDecompressors () { + this.#decompressors.length = 0 + } + + /** + * @param {Controller} controller + * @param {number} statusCode + * @param {Record<string, string | string[] | undefined>} headers + * @param {string} statusMessage + * @returns {void} + */ + onResponseStart (controller, statusCode, headers, statusMessage) { + const contentEncoding = headers['content-encoding'] + + // If content encoding is not supported or status code is in skip list + if (this.#shouldSkipDecompression(contentEncoding, statusCode)) { + return super.onResponseStart(controller, statusCode, headers, statusMessage) + } + + const decompressors = this.#createDecompressionChain(contentEncoding.toLowerCase()) + + if (decompressors.length === 0) { + this.#cleanupDecompressors() + return super.onResponseStart(controller, statusCode, headers, statusMessage) + } + + this.#decompressors = decompressors + + // Remove compression headers since we're decompressing + const { 'content-encoding': _, 'content-length': __, ...newHeaders } = headers + + if (this.#decompressors.length === 1) { + this.#setupSingleDecompressor(controller) + } else { + this.#setupMultipleDecompressors(controller) + } + + return super.onResponseStart(controller, statusCode, newHeaders, statusMessage) + } + + /** + * @param {Controller} controller + * @param {Buffer} chunk + * @returns {void} + */ + onResponseData (controller, chunk) { + if (this.#decompressors.length > 0) { + this.#decompressors[0].write(chunk) + return + } + super.onResponseData(controller, chunk) + } + + /** + * @param {Controller} controller + * @param {Record<string, string | string[]> | undefined} trailers + * @returns {void} + */ + onResponseEnd (controller, trailers) { + if (this.#decompressors.length > 0) { + this.#decompressors[0].end() + this.#cleanupDecompressors() + return + } + super.onResponseEnd(controller, trailers) + } + + /** + * @param {Controller} controller + * @param {Error} err + * @returns {void} + */ + onResponseError (controller, err) { + if (this.#decompressors.length > 0) { + for (const decompressor of this.#decompressors) { + decompressor.destroy(err) + } + this.#cleanupDecompressors() + } + super.onResponseError(controller, err) + } +} + +/** + * Creates a decompression interceptor for HTTP responses + * @param {DecompressHandlerOptions} [options] - Options for the interceptor + * @returns {Function} - Interceptor function + */ +function createDecompressInterceptor (options = {}) { + // Emit experimental warning only once + if (!warningEmitted) { + process.emitWarning( + 'DecompressInterceptor is experimental and subject to change', + 'ExperimentalWarning' + ) + warningEmitted = true + } + + return (dispatch) => { + return (opts, handler) => { + const decompressHandler = new DecompressHandler(handler, options) + return dispatch(opts, decompressHandler) + } + } +} + +module.exports = createDecompressInterceptor diff --git a/vanilla/node_modules/undici/lib/interceptor/deduplicate.js b/vanilla/node_modules/undici/lib/interceptor/deduplicate.js new file mode 100644 index 0000000..11c4f37 --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/deduplicate.js @@ -0,0 +1,107 @@ +'use strict' + +const diagnosticsChannel = require('node:diagnostics_channel') +const util = require('../core/util') +const DeduplicationHandler = require('../handler/deduplication-handler') +const { normalizeHeaders, makeCacheKey, makeDeduplicationKey } = require('../util/cache.js') + +const pendingRequestsChannel = diagnosticsChannel.channel('undici:request:pending-requests') + +/** + * @param {import('../../types/interceptors.d.ts').default.DeduplicateInterceptorOpts} [opts] + * @returns {import('../../types/dispatcher.d.ts').default.DispatcherComposeInterceptor} + */ +module.exports = (opts = {}) => { + const { + methods = ['GET'], + skipHeaderNames = [], + excludeHeaderNames = [] + } = opts + + if (typeof opts !== 'object' || opts === null) { + throw new TypeError(`expected type of opts to be an Object, got ${opts === null ? 'null' : typeof opts}`) + } + + if (!Array.isArray(methods)) { + throw new TypeError(`expected opts.methods to be an array, got ${typeof methods}`) + } + + for (const method of methods) { + if (!util.safeHTTPMethods.includes(method)) { + throw new TypeError(`expected opts.methods to only contain safe HTTP methods, got ${method}`) + } + } + + if (!Array.isArray(skipHeaderNames)) { + throw new TypeError(`expected opts.skipHeaderNames to be an array, got ${typeof skipHeaderNames}`) + } + + if (!Array.isArray(excludeHeaderNames)) { + throw new TypeError(`expected opts.excludeHeaderNames to be an array, got ${typeof excludeHeaderNames}`) + } + + // Convert to lowercase Set for case-insensitive header matching + const skipHeaderNamesSet = new Set(skipHeaderNames.map(name => name.toLowerCase())) + + // Convert to lowercase Set for case-insensitive header exclusion from deduplication key + const excludeHeaderNamesSet = new Set(excludeHeaderNames.map(name => name.toLowerCase())) + + /** + * Map of pending requests for deduplication + * @type {Map<string, DeduplicationHandler>} + */ + const pendingRequests = new Map() + + return dispatch => { + return (opts, handler) => { + if (!opts.origin || methods.includes(opts.method) === false) { + return dispatch(opts, handler) + } + + opts = { + ...opts, + headers: normalizeHeaders(opts) + } + + // Skip deduplication if request contains any of the specified headers + if (skipHeaderNamesSet.size > 0) { + for (const headerName of Object.keys(opts.headers)) { + if (skipHeaderNamesSet.has(headerName.toLowerCase())) { + return dispatch(opts, handler) + } + } + } + + const cacheKey = makeCacheKey(opts) + const dedupeKey = makeDeduplicationKey(cacheKey, excludeHeaderNamesSet) + + // Check if there's already a pending request for this key + const pendingHandler = pendingRequests.get(dedupeKey) + if (pendingHandler) { + // Add this handler to the waiting list + pendingHandler.addWaitingHandler(handler) + return true + } + + // Create a new deduplication handler + const deduplicationHandler = new DeduplicationHandler( + handler, + () => { + // Clean up when request completes + pendingRequests.delete(dedupeKey) + if (pendingRequestsChannel.hasSubscribers) { + pendingRequestsChannel.publish({ size: pendingRequests.size, key: dedupeKey, type: 'removed' }) + } + } + ) + + // Register the pending request + pendingRequests.set(dedupeKey, deduplicationHandler) + if (pendingRequestsChannel.hasSubscribers) { + pendingRequestsChannel.publish({ size: pendingRequests.size, key: dedupeKey, type: 'added' }) + } + + return dispatch(opts, deduplicationHandler) + } + } +} diff --git a/vanilla/node_modules/undici/lib/interceptor/dns.js b/vanilla/node_modules/undici/lib/interceptor/dns.js new file mode 100644 index 0000000..9dba957 --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/dns.js @@ -0,0 +1,474 @@ +'use strict' +const { isIP } = require('node:net') +const { lookup } = require('node:dns') +const DecoratorHandler = require('../handler/decorator-handler') +const { InvalidArgumentError, InformationalError } = require('../core/errors') +const maxInt = Math.pow(2, 31) - 1 + +class DNSStorage { + #maxItems = 0 + #records = new Map() + + constructor (opts) { + this.#maxItems = opts.maxItems + } + + get size () { + return this.#records.size + } + + get (hostname) { + return this.#records.get(hostname) ?? null + } + + set (hostname, records) { + this.#records.set(hostname, records) + } + + delete (hostname) { + this.#records.delete(hostname) + } + + // Delegate to storage decide can we do more lookups or not + full () { + return this.size >= this.#maxItems + } +} + +class DNSInstance { + #maxTTL = 0 + #maxItems = 0 + dualStack = true + affinity = null + lookup = null + pick = null + storage = null + + constructor (opts) { + this.#maxTTL = opts.maxTTL + this.#maxItems = opts.maxItems + this.dualStack = opts.dualStack + this.affinity = opts.affinity + this.lookup = opts.lookup ?? this.#defaultLookup + this.pick = opts.pick ?? this.#defaultPick + this.storage = opts.storage ?? new DNSStorage(opts) + } + + runLookup (origin, opts, cb) { + const ips = this.storage.get(origin.hostname) + + // If full, we just return the origin + if (ips == null && this.storage.full()) { + cb(null, origin) + return + } + + const newOpts = { + affinity: this.affinity, + dualStack: this.dualStack, + lookup: this.lookup, + pick: this.pick, + ...opts.dns, + maxTTL: this.#maxTTL, + maxItems: this.#maxItems + } + + // If no IPs we lookup + if (ips == null) { + this.lookup(origin, newOpts, (err, addresses) => { + if (err || addresses == null || addresses.length === 0) { + cb(err ?? new InformationalError('No DNS entries found')) + return + } + + this.setRecords(origin, addresses) + const records = this.storage.get(origin.hostname) + + const ip = this.pick( + origin, + records, + newOpts.affinity + ) + + let port + if (typeof ip.port === 'number') { + port = `:${ip.port}` + } else if (origin.port !== '') { + port = `:${origin.port}` + } else { + port = '' + } + + cb( + null, + new URL(`${origin.protocol}//${ + ip.family === 6 ? `[${ip.address}]` : ip.address + }${port}`) + ) + }) + } else { + // If there's IPs we pick + const ip = this.pick( + origin, + ips, + newOpts.affinity + ) + + // If no IPs we lookup - deleting old records + if (ip == null) { + this.storage.delete(origin.hostname) + this.runLookup(origin, opts, cb) + return + } + + let port + if (typeof ip.port === 'number') { + port = `:${ip.port}` + } else if (origin.port !== '') { + port = `:${origin.port}` + } else { + port = '' + } + + cb( + null, + new URL(`${origin.protocol}//${ + ip.family === 6 ? `[${ip.address}]` : ip.address + }${port}`) + ) + } + } + + #defaultLookup (origin, opts, cb) { + lookup( + origin.hostname, + { + all: true, + family: this.dualStack === false ? this.affinity : 0, + order: 'ipv4first' + }, + (err, addresses) => { + if (err) { + return cb(err) + } + + const results = new Map() + + for (const addr of addresses) { + // On linux we found duplicates, we attempt to remove them with + // the latest record + results.set(`${addr.address}:${addr.family}`, addr) + } + + cb(null, results.values()) + } + ) + } + + #defaultPick (origin, hostnameRecords, affinity) { + let ip = null + const { records, offset } = hostnameRecords + + let family + if (this.dualStack) { + if (affinity == null) { + // Balance between ip families + if (offset == null || offset === maxInt) { + hostnameRecords.offset = 0 + affinity = 4 + } else { + hostnameRecords.offset++ + affinity = (hostnameRecords.offset & 1) === 1 ? 6 : 4 + } + } + + if (records[affinity] != null && records[affinity].ips.length > 0) { + family = records[affinity] + } else { + family = records[affinity === 4 ? 6 : 4] + } + } else { + family = records[affinity] + } + + // If no IPs we return null + if (family == null || family.ips.length === 0) { + return ip + } + + if (family.offset == null || family.offset === maxInt) { + family.offset = 0 + } else { + family.offset++ + } + + const position = family.offset % family.ips.length + ip = family.ips[position] ?? null + + if (ip == null) { + return ip + } + + if (Date.now() - ip.timestamp > ip.ttl) { // record TTL is already in ms + // We delete expired records + // It is possible that they have different TTL, so we manage them individually + family.ips.splice(position, 1) + return this.pick(origin, hostnameRecords, affinity) + } + + return ip + } + + pickFamily (origin, ipFamily) { + const records = this.storage.get(origin.hostname)?.records + if (!records) { + return null + } + + const family = records[ipFamily] + if (!family) { + return null + } + + if (family.offset == null || family.offset === maxInt) { + family.offset = 0 + } else { + family.offset++ + } + + const position = family.offset % family.ips.length + const ip = family.ips[position] ?? null + if (ip == null) { + return ip + } + + if (Date.now() - ip.timestamp > ip.ttl) { // record TTL is already in ms + // We delete expired records + // It is possible that they have different TTL, so we manage them individually + family.ips.splice(position, 1) + } + + return ip + } + + setRecords (origin, addresses) { + const timestamp = Date.now() + const records = { records: { 4: null, 6: null } } + let minTTL = this.#maxTTL + for (const record of addresses) { + record.timestamp = timestamp + if (typeof record.ttl === 'number') { + // The record TTL is expected to be in ms + record.ttl = Math.min(record.ttl, this.#maxTTL) + minTTL = Math.min(minTTL, record.ttl) + } else { + record.ttl = this.#maxTTL + } + + const familyRecords = records.records[record.family] ?? { ips: [] } + + familyRecords.ips.push(record) + records.records[record.family] = familyRecords + } + + // We provide a default TTL if external storage will be used without TTL per record-level support + this.storage.set(origin.hostname, records, { ttl: minTTL }) + } + + deleteRecords (origin) { + this.storage.delete(origin.hostname) + } + + getHandler (meta, opts) { + return new DNSDispatchHandler(this, meta, opts) + } +} + +class DNSDispatchHandler extends DecoratorHandler { + #state = null + #opts = null + #dispatch = null + #origin = null + #controller = null + #newOrigin = null + #firstTry = true + + constructor (state, { origin, handler, dispatch, newOrigin }, opts) { + super(handler) + this.#origin = origin + this.#newOrigin = newOrigin + this.#opts = { ...opts } + this.#state = state + this.#dispatch = dispatch + } + + onResponseError (controller, err) { + switch (err.code) { + case 'ETIMEDOUT': + case 'ECONNREFUSED': { + if (this.#state.dualStack) { + if (!this.#firstTry) { + super.onResponseError(controller, err) + return + } + this.#firstTry = false + + // Pick an ip address from the other family + const otherFamily = this.#newOrigin.hostname[0] === '[' ? 4 : 6 + const ip = this.#state.pickFamily(this.#origin, otherFamily) + if (ip == null) { + super.onResponseError(controller, err) + return + } + + let port + if (typeof ip.port === 'number') { + port = `:${ip.port}` + } else if (this.#origin.port !== '') { + port = `:${this.#origin.port}` + } else { + port = '' + } + + const dispatchOpts = { + ...this.#opts, + origin: `${this.#origin.protocol}//${ + ip.family === 6 ? `[${ip.address}]` : ip.address + }${port}` + } + this.#dispatch(dispatchOpts, this) + return + } + + // if dual-stack disabled, we error out + super.onResponseError(controller, err) + break + } + case 'ENOTFOUND': + this.#state.deleteRecords(this.#origin) + super.onResponseError(controller, err) + break + default: + super.onResponseError(controller, err) + break + } + } +} + +module.exports = interceptorOpts => { + if ( + interceptorOpts?.maxTTL != null && + (typeof interceptorOpts?.maxTTL !== 'number' || interceptorOpts?.maxTTL < 0) + ) { + throw new InvalidArgumentError('Invalid maxTTL. Must be a positive number') + } + + if ( + interceptorOpts?.maxItems != null && + (typeof interceptorOpts?.maxItems !== 'number' || + interceptorOpts?.maxItems < 1) + ) { + throw new InvalidArgumentError( + 'Invalid maxItems. Must be a positive number and greater than zero' + ) + } + + if ( + interceptorOpts?.affinity != null && + interceptorOpts?.affinity !== 4 && + interceptorOpts?.affinity !== 6 + ) { + throw new InvalidArgumentError('Invalid affinity. Must be either 4 or 6') + } + + if ( + interceptorOpts?.dualStack != null && + typeof interceptorOpts?.dualStack !== 'boolean' + ) { + throw new InvalidArgumentError('Invalid dualStack. Must be a boolean') + } + + if ( + interceptorOpts?.lookup != null && + typeof interceptorOpts?.lookup !== 'function' + ) { + throw new InvalidArgumentError('Invalid lookup. Must be a function') + } + + if ( + interceptorOpts?.pick != null && + typeof interceptorOpts?.pick !== 'function' + ) { + throw new InvalidArgumentError('Invalid pick. Must be a function') + } + + if ( + interceptorOpts?.storage != null && + (typeof interceptorOpts?.storage?.get !== 'function' || + typeof interceptorOpts?.storage?.set !== 'function' || + typeof interceptorOpts?.storage?.full !== 'function' || + typeof interceptorOpts?.storage?.delete !== 'function' + ) + ) { + throw new InvalidArgumentError('Invalid storage. Must be a object with methods: { get, set, full, delete }') + } + + const dualStack = interceptorOpts?.dualStack ?? true + let affinity + if (dualStack) { + affinity = interceptorOpts?.affinity ?? null + } else { + affinity = interceptorOpts?.affinity ?? 4 + } + + const opts = { + maxTTL: interceptorOpts?.maxTTL ?? 10e3, // Expressed in ms + lookup: interceptorOpts?.lookup ?? null, + pick: interceptorOpts?.pick ?? null, + dualStack, + affinity, + maxItems: interceptorOpts?.maxItems ?? Infinity, + storage: interceptorOpts?.storage + } + + const instance = new DNSInstance(opts) + + return dispatch => { + return function dnsInterceptor (origDispatchOpts, handler) { + const origin = + origDispatchOpts.origin.constructor === URL + ? origDispatchOpts.origin + : new URL(origDispatchOpts.origin) + + if (isIP(origin.hostname) !== 0) { + return dispatch(origDispatchOpts, handler) + } + + instance.runLookup(origin, origDispatchOpts, (err, newOrigin) => { + if (err) { + return handler.onResponseError(null, err) + } + + const dispatchOpts = { + ...origDispatchOpts, + servername: origin.hostname, // For SNI on TLS + origin: newOrigin.origin, + headers: { + host: origin.host, + ...origDispatchOpts.headers + } + } + + dispatch( + dispatchOpts, + instance.getHandler( + { origin, dispatch, handler, newOrigin }, + origDispatchOpts + ) + ) + }) + + return true + } + } +} diff --git a/vanilla/node_modules/undici/lib/interceptor/dump.js b/vanilla/node_modules/undici/lib/interceptor/dump.js new file mode 100644 index 0000000..4810a09 --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/dump.js @@ -0,0 +1,112 @@ +'use strict' + +const { InvalidArgumentError, RequestAbortedError } = require('../core/errors') +const DecoratorHandler = require('../handler/decorator-handler') + +class DumpHandler extends DecoratorHandler { + #maxSize = 1024 * 1024 + #dumped = false + #size = 0 + #controller = null + aborted = false + reason = false + + constructor ({ maxSize, signal }, handler) { + if (maxSize != null && (!Number.isFinite(maxSize) || maxSize < 1)) { + throw new InvalidArgumentError('maxSize must be a number greater than 0') + } + + super(handler) + + this.#maxSize = maxSize ?? this.#maxSize + // this.#handler = handler + } + + #abort (reason) { + this.aborted = true + this.reason = reason + } + + onRequestStart (controller, context) { + controller.abort = this.#abort.bind(this) + this.#controller = controller + + return super.onRequestStart(controller, context) + } + + onResponseStart (controller, statusCode, headers, statusMessage) { + const contentLength = headers['content-length'] + + if (contentLength != null && contentLength > this.#maxSize) { + throw new RequestAbortedError( + `Response size (${contentLength}) larger than maxSize (${ + this.#maxSize + })` + ) + } + + if (this.aborted === true) { + return true + } + + return super.onResponseStart(controller, statusCode, headers, statusMessage) + } + + onResponseError (controller, err) { + if (this.#dumped) { + return + } + + // On network errors before connect, controller will be null + err = this.#controller?.reason ?? err + + super.onResponseError(controller, err) + } + + onResponseData (controller, chunk) { + this.#size = this.#size + chunk.length + + if (this.#size >= this.#maxSize) { + this.#dumped = true + + if (this.aborted === true) { + super.onResponseError(controller, this.reason) + } else { + super.onResponseEnd(controller, {}) + } + } + + return true + } + + onResponseEnd (controller, trailers) { + if (this.#dumped) { + return + } + + if (this.#controller.aborted === true) { + super.onResponseError(controller, this.reason) + return + } + + super.onResponseEnd(controller, trailers) + } +} + +function createDumpInterceptor ( + { maxSize: defaultMaxSize } = { + maxSize: 1024 * 1024 + } +) { + return dispatch => { + return function Intercept (opts, handler) { + const { dumpMaxSize = defaultMaxSize } = opts + + const dumpHandler = new DumpHandler({ maxSize: dumpMaxSize, signal: opts.signal }, handler) + + return dispatch(opts, dumpHandler) + } + } +} + +module.exports = createDumpInterceptor diff --git a/vanilla/node_modules/undici/lib/interceptor/redirect.js b/vanilla/node_modules/undici/lib/interceptor/redirect.js new file mode 100644 index 0000000..b7df180 --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/redirect.js @@ -0,0 +1,21 @@ +'use strict' + +const RedirectHandler = require('../handler/redirect-handler') + +function createRedirectInterceptor ({ maxRedirections: defaultMaxRedirections } = {}) { + return (dispatch) => { + return function Intercept (opts, handler) { + const { maxRedirections = defaultMaxRedirections, ...rest } = opts + + if (maxRedirections == null || maxRedirections === 0) { + return dispatch(opts, handler) + } + + const dispatchOpts = { ...rest } // Stop sub dispatcher from also redirecting. + const redirectHandler = new RedirectHandler(dispatch, maxRedirections, dispatchOpts, handler) + return dispatch(dispatchOpts, redirectHandler) + } + } +} + +module.exports = createRedirectInterceptor diff --git a/vanilla/node_modules/undici/lib/interceptor/response-error.js b/vanilla/node_modules/undici/lib/interceptor/response-error.js new file mode 100644 index 0000000..a8105aa --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/response-error.js @@ -0,0 +1,95 @@ +'use strict' + +// const { parseHeaders } = require('../core/util') +const DecoratorHandler = require('../handler/decorator-handler') +const { ResponseError } = require('../core/errors') + +class ResponseErrorHandler extends DecoratorHandler { + #statusCode + #contentType + #decoder + #headers + #body + + constructor (_opts, { handler }) { + super(handler) + } + + #checkContentType (contentType) { + return (this.#contentType ?? '').indexOf(contentType) === 0 + } + + onRequestStart (controller, context) { + this.#statusCode = 0 + this.#contentType = null + this.#decoder = null + this.#headers = null + this.#body = '' + + return super.onRequestStart(controller, context) + } + + onResponseStart (controller, statusCode, headers, statusMessage) { + this.#statusCode = statusCode + this.#headers = headers + this.#contentType = headers['content-type'] + + if (this.#statusCode < 400) { + return super.onResponseStart(controller, statusCode, headers, statusMessage) + } + + if (this.#checkContentType('application/json') || this.#checkContentType('text/plain')) { + this.#decoder = new TextDecoder('utf-8') + } + } + + onResponseData (controller, chunk) { + if (this.#statusCode < 400) { + return super.onResponseData(controller, chunk) + } + + this.#body += this.#decoder?.decode(chunk, { stream: true }) ?? '' + } + + onResponseEnd (controller, trailers) { + if (this.#statusCode >= 400) { + this.#body += this.#decoder?.decode(undefined, { stream: false }) ?? '' + + if (this.#checkContentType('application/json')) { + try { + this.#body = JSON.parse(this.#body) + } catch { + // Do nothing... + } + } + + let err + const stackTraceLimit = Error.stackTraceLimit + Error.stackTraceLimit = 0 + try { + err = new ResponseError('Response Error', this.#statusCode, { + body: this.#body, + headers: this.#headers + }) + } finally { + Error.stackTraceLimit = stackTraceLimit + } + + super.onResponseError(controller, err) + } else { + super.onResponseEnd(controller, trailers) + } + } + + onResponseError (controller, err) { + super.onResponseError(controller, err) + } +} + +module.exports = () => { + return (dispatch) => { + return function Intercept (opts, handler) { + return dispatch(opts, new ResponseErrorHandler(opts, { handler })) + } + } +} diff --git a/vanilla/node_modules/undici/lib/interceptor/retry.js b/vanilla/node_modules/undici/lib/interceptor/retry.js new file mode 100644 index 0000000..1c16fd8 --- /dev/null +++ b/vanilla/node_modules/undici/lib/interceptor/retry.js @@ -0,0 +1,19 @@ +'use strict' +const RetryHandler = require('../handler/retry-handler') + +module.exports = globalOpts => { + return dispatch => { + return function retryInterceptor (opts, handler) { + return dispatch( + opts, + new RetryHandler( + { ...opts, retryOptions: { ...globalOpts, ...opts.retryOptions } }, + { + handler, + dispatch + } + ) + ) + } + } +} |
