aboutsummaryrefslogtreecommitdiffstats
path: root/vanilla/node_modules/undici/lib/dispatcher/client-h2.js
diff options
context:
space:
mode:
authorAdam Mathes <adam@adammathes.com>2026-02-13 21:34:48 -0800
committerAdam Mathes <adam@adammathes.com>2026-02-13 21:34:48 -0800
commit76cb9c2a39d477a64824a985ade40507e3bbade1 (patch)
tree41e997aa9c6f538d3a136af61dae9424db2005a9 /vanilla/node_modules/undici/lib/dispatcher/client-h2.js
parent819a39a21ac992b1393244a4c283bbb125208c69 (diff)
downloadneko-76cb9c2a39d477a64824a985ade40507e3bbade1.tar.gz
neko-76cb9c2a39d477a64824a985ade40507e3bbade1.tar.bz2
neko-76cb9c2a39d477a64824a985ade40507e3bbade1.zip
feat(vanilla): add testing infrastructure and tests (NK-wjnczv)
Diffstat (limited to 'vanilla/node_modules/undici/lib/dispatcher/client-h2.js')
-rw-r--r--vanilla/node_modules/undici/lib/dispatcher/client-h2.js990
1 files changed, 990 insertions, 0 deletions
diff --git a/vanilla/node_modules/undici/lib/dispatcher/client-h2.js b/vanilla/node_modules/undici/lib/dispatcher/client-h2.js
new file mode 100644
index 0000000..0969108
--- /dev/null
+++ b/vanilla/node_modules/undici/lib/dispatcher/client-h2.js
@@ -0,0 +1,990 @@
+'use strict'
+
+const assert = require('node:assert')
+const { pipeline } = require('node:stream')
+const util = require('../core/util.js')
+const {
+ RequestContentLengthMismatchError,
+ RequestAbortedError,
+ SocketError,
+ InformationalError,
+ InvalidArgumentError
+} = require('../core/errors.js')
+const {
+ kUrl,
+ kReset,
+ kClient,
+ kRunning,
+ kPending,
+ kQueue,
+ kPendingIdx,
+ kRunningIdx,
+ kError,
+ kSocket,
+ kStrictContentLength,
+ kOnError,
+ kMaxConcurrentStreams,
+ kPingInterval,
+ kHTTP2Session,
+ kHTTP2InitialWindowSize,
+ kHTTP2ConnectionWindowSize,
+ kResume,
+ kSize,
+ kHTTPContext,
+ kClosed,
+ kBodyTimeout,
+ kEnableConnectProtocol,
+ kRemoteSettings,
+ kHTTP2Stream,
+ kHTTP2SessionState
+} = require('../core/symbols.js')
+const { channels } = require('../core/diagnostics.js')
+
+const kOpenStreams = Symbol('open streams')
+
+let extractBody
+
+/** @type {import('http2')} */
+let http2
+try {
+ http2 = require('node:http2')
+} catch {
+ // @ts-ignore
+ http2 = { constants: {} }
+}
+
+const {
+ constants: {
+ HTTP2_HEADER_AUTHORITY,
+ HTTP2_HEADER_METHOD,
+ HTTP2_HEADER_PATH,
+ HTTP2_HEADER_SCHEME,
+ HTTP2_HEADER_CONTENT_LENGTH,
+ HTTP2_HEADER_EXPECT,
+ HTTP2_HEADER_STATUS,
+ HTTP2_HEADER_PROTOCOL,
+ NGHTTP2_REFUSED_STREAM,
+ NGHTTP2_CANCEL
+ }
+} = http2
+
+function parseH2Headers (headers) {
+ const result = []
+
+ for (const [name, value] of Object.entries(headers)) {
+ // h2 may concat the header value by array
+ // e.g. Set-Cookie
+ if (Array.isArray(value)) {
+ for (const subvalue of value) {
+ // we need to provide each header value of header name
+ // because the headers handler expect name-value pair
+ result.push(Buffer.from(name), Buffer.from(subvalue))
+ }
+ } else {
+ result.push(Buffer.from(name), Buffer.from(value))
+ }
+ }
+
+ return result
+}
+
+function connectH2 (client, socket) {
+ client[kSocket] = socket
+
+ const http2InitialWindowSize = client[kHTTP2InitialWindowSize]
+ const http2ConnectionWindowSize = client[kHTTP2ConnectionWindowSize]
+
+ const session = http2.connect(client[kUrl], {
+ createConnection: () => socket,
+ peerMaxConcurrentStreams: client[kMaxConcurrentStreams],
+ settings: {
+ // TODO(metcoder95): add support for PUSH
+ enablePush: false,
+ ...(http2InitialWindowSize != null ? { initialWindowSize: http2InitialWindowSize } : null)
+ }
+ })
+
+ client[kSocket] = socket
+ session[kOpenStreams] = 0
+ session[kClient] = client
+ session[kSocket] = socket
+ session[kHTTP2SessionState] = {
+ ping: {
+ interval: client[kPingInterval] === 0 ? null : setInterval(onHttp2SendPing, client[kPingInterval], session).unref()
+ }
+ }
+ // We set it to true by default in a best-effort; however once connected to an H2 server
+ // we will check if extended CONNECT protocol is supported or not
+ // and set this value accordingly.
+ session[kEnableConnectProtocol] = false
+ // States whether or not we have received the remote settings from the server
+ session[kRemoteSettings] = false
+
+ // Apply connection-level flow control once connected (if supported).
+ if (http2ConnectionWindowSize) {
+ util.addListener(session, 'connect', applyConnectionWindowSize.bind(session, http2ConnectionWindowSize))
+ }
+
+ util.addListener(session, 'error', onHttp2SessionError)
+ util.addListener(session, 'frameError', onHttp2FrameError)
+ util.addListener(session, 'end', onHttp2SessionEnd)
+ util.addListener(session, 'goaway', onHttp2SessionGoAway)
+ util.addListener(session, 'close', onHttp2SessionClose)
+ util.addListener(session, 'remoteSettings', onHttp2RemoteSettings)
+ // TODO (@metcoder95): implement SETTINGS support
+ // util.addListener(session, 'localSettings', onHttp2RemoteSettings)
+
+ session.unref()
+
+ client[kHTTP2Session] = session
+ socket[kHTTP2Session] = session
+
+ util.addListener(socket, 'error', onHttp2SocketError)
+ util.addListener(socket, 'end', onHttp2SocketEnd)
+ util.addListener(socket, 'close', onHttp2SocketClose)
+
+ socket[kClosed] = false
+ socket.on('close', onSocketClose)
+
+ return {
+ version: 'h2',
+ defaultPipelining: Infinity,
+ /**
+ * @param {import('../core/request.js')} request
+ * @returns {boolean}
+ */
+ write (request) {
+ return writeH2(client, request)
+ },
+ /**
+ * @returns {void}
+ */
+ resume () {
+ resumeH2(client)
+ },
+ /**
+ * @param {Error | null} err
+ * @param {() => void} callback
+ */
+ destroy (err, callback) {
+ if (socket[kClosed]) {
+ queueMicrotask(callback)
+ } else {
+ socket.destroy(err).on('close', callback)
+ }
+ },
+ /**
+ * @type {boolean}
+ */
+ get destroyed () {
+ return socket.destroyed
+ },
+ /**
+ * @param {import('../core/request.js')} request
+ * @returns {boolean}
+ */
+ busy (request) {
+ if (request != null) {
+ if (client[kRunning] > 0) {
+ // We are already processing requests
+
+ // Non-idempotent request cannot be retried.
+ // Ensure that no other requests are inflight and
+ // could cause failure.
+ if (request.idempotent === false) return true
+ // Don't dispatch an upgrade until all preceding requests have completed.
+ // Possibly, we do not have remote settings confirmed yet.
+ if ((request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false) return true
+ // Request with stream or iterator body can error while other requests
+ // are inflight and indirectly error those as well.
+ // Ensure this doesn't happen by waiting for inflight
+ // to complete before dispatching.
+
+ // Request with stream or iterator body cannot be retried.
+ // Ensure that no other requests are inflight and
+ // could cause failure.
+ if (util.bodyLength(request.body) !== 0 &&
+ (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) return true
+ } else {
+ return (request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false
+ }
+ }
+
+ return false
+ }
+ }
+}
+
+function resumeH2 (client) {
+ const socket = client[kSocket]
+
+ if (socket?.destroyed === false) {
+ if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) {
+ socket.unref()
+ client[kHTTP2Session].unref()
+ } else {
+ socket.ref()
+ client[kHTTP2Session].ref()
+ }
+ }
+}
+
+function applyConnectionWindowSize (connectionWindowSize) {
+ try {
+ if (typeof this.setLocalWindowSize === 'function') {
+ this.setLocalWindowSize(connectionWindowSize)
+ }
+ } catch {
+ // Best-effort only.
+ }
+}
+
+function onHttp2RemoteSettings (settings) {
+ // Fallbacks are a safe bet, remote setting will always override
+ this[kClient][kMaxConcurrentStreams] = settings.maxConcurrentStreams ?? this[kClient][kMaxConcurrentStreams]
+ /**
+ * From RFC-8441
+ * A sender MUST NOT send a SETTINGS_ENABLE_CONNECT_PROTOCOL parameter
+ * with the value of 0 after previously sending a value of 1.
+ */
+ // Note: Cannot be tested in Node, it does not supports disabling the extended CONNECT protocol once enabled
+ if (this[kRemoteSettings] === true && this[kEnableConnectProtocol] === true && settings.enableConnectProtocol === false) {
+ const err = new InformationalError('HTTP/2: Server disabled extended CONNECT protocol against RFC-8441')
+ this[kSocket][kError] = err
+ this[kClient][kOnError](err)
+ return
+ }
+
+ this[kEnableConnectProtocol] = settings.enableConnectProtocol ?? this[kEnableConnectProtocol]
+ this[kRemoteSettings] = true
+ this[kClient][kResume]()
+}
+
+function onHttp2SendPing (session) {
+ const state = session[kHTTP2SessionState]
+ if ((session.closed || session.destroyed) && state.ping.interval != null) {
+ clearInterval(state.ping.interval)
+ state.ping.interval = null
+ return
+ }
+
+ // If no ping sent, do nothing
+ session.ping(onPing.bind(session))
+
+ function onPing (err, duration) {
+ const client = this[kClient]
+ const socket = this[kClient]
+
+ if (err != null) {
+ const error = new InformationalError(`HTTP/2: "PING" errored - type ${err.message}`)
+ socket[kError] = error
+ client[kOnError](error)
+ } else {
+ client.emit('ping', duration)
+ }
+ }
+}
+
+function onHttp2SessionError (err) {
+ assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
+
+ this[kSocket][kError] = err
+ this[kClient][kOnError](err)
+}
+
+function onHttp2FrameError (type, code, id) {
+ if (id === 0) {
+ const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
+ this[kSocket][kError] = err
+ this[kClient][kOnError](err)
+ }
+}
+
+function onHttp2SessionEnd () {
+ const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
+ this.destroy(err)
+ util.destroy(this[kSocket], err)
+}
+
+/**
+ * This is the root cause of #3011
+ * We need to handle GOAWAY frames properly, and trigger the session close
+ * along with the socket right away
+ *
+ * @this {import('http2').ClientHttp2Session}
+ * @param {number} errorCode
+ */
+function onHttp2SessionGoAway (errorCode) {
+ // TODO(mcollina): Verify if GOAWAY implements the spec correctly:
+ // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8
+ // Specifically, we do not verify the "valid" stream id.
+
+ const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]))
+ const client = this[kClient]
+
+ client[kSocket] = null
+ client[kHTTPContext] = null
+
+ // this is an HTTP2 session
+ this.close()
+ this[kHTTP2Session] = null
+
+ util.destroy(this[kSocket], err)
+
+ // Fail head of pipeline.
+ if (client[kRunningIdx] < client[kQueue].length) {
+ const request = client[kQueue][client[kRunningIdx]]
+ client[kQueue][client[kRunningIdx]++] = null
+ util.errorRequest(client, request, err)
+ client[kPendingIdx] = client[kRunningIdx]
+ }
+
+ assert(client[kRunning] === 0)
+
+ client.emit('disconnect', client[kUrl], [client], err)
+ client.emit('connectionError', client[kUrl], [client], err)
+
+ client[kResume]()
+}
+
+function onHttp2SessionClose () {
+ const { [kClient]: client, [kHTTP2SessionState]: state } = this
+ const { [kSocket]: socket } = client
+
+ const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))
+
+ client[kSocket] = null
+ client[kHTTPContext] = null
+
+ if (state.ping.interval != null) {
+ clearInterval(state.ping.interval)
+ state.ping.interval = null
+ }
+
+ if (client.destroyed) {
+ assert(client[kPending] === 0)
+
+ // Fail entire queue.
+ const requests = client[kQueue].splice(client[kRunningIdx])
+ for (let i = 0; i < requests.length; i++) {
+ const request = requests[i]
+ util.errorRequest(client, request, err)
+ }
+ }
+}
+
+function onHttp2SocketClose () {
+ const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
+
+ const client = this[kHTTP2Session][kClient]
+
+ client[kSocket] = null
+ client[kHTTPContext] = null
+
+ if (this[kHTTP2Session] !== null) {
+ this[kHTTP2Session].destroy(err)
+ }
+
+ client[kPendingIdx] = client[kRunningIdx]
+
+ assert(client[kRunning] === 0)
+
+ client.emit('disconnect', client[kUrl], [client], err)
+
+ client[kResume]()
+}
+
+function onHttp2SocketError (err) {
+ assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
+
+ this[kError] = err
+
+ this[kClient][kOnError](err)
+}
+
+function onHttp2SocketEnd () {
+ util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
+}
+
+function onSocketClose () {
+ this[kClosed] = true
+}
+
+// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
+function shouldSendContentLength (method) {
+ return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
+}
+
+function writeH2 (client, request) {
+ const requestTimeout = request.bodyTimeout ?? client[kBodyTimeout]
+ const session = client[kHTTP2Session]
+ const { method, path, host, upgrade, expectContinue, signal, protocol, headers: reqHeaders } = request
+ let { body } = request
+
+ if (upgrade != null && upgrade !== 'websocket') {
+ util.errorRequest(client, request, new InvalidArgumentError(`Custom upgrade "${upgrade}" not supported over HTTP/2`))
+ return false
+ }
+
+ const headers = {}
+ for (let n = 0; n < reqHeaders.length; n += 2) {
+ const key = reqHeaders[n + 0]
+ const val = reqHeaders[n + 1]
+
+ if (key === 'cookie') {
+ if (headers[key] != null) {
+ headers[key] = Array.isArray(headers[key]) ? (headers[key].push(val), headers[key]) : [headers[key], val]
+ } else {
+ headers[key] = val
+ }
+
+ continue
+ }
+
+ if (Array.isArray(val)) {
+ for (let i = 0; i < val.length; i++) {
+ if (headers[key]) {
+ headers[key] += `, ${val[i]}`
+ } else {
+ headers[key] = val[i]
+ }
+ }
+ } else if (headers[key]) {
+ headers[key] += `, ${val}`
+ } else {
+ headers[key] = val
+ }
+ }
+
+ /** @type {import('node:http2').ClientHttp2Stream} */
+ let stream = null
+
+ const { hostname, port } = client[kUrl]
+
+ headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
+ headers[HTTP2_HEADER_METHOD] = method
+
+ const abort = (err) => {
+ if (request.aborted || request.completed) {
+ return
+ }
+
+ err = err || new RequestAbortedError()
+
+ util.errorRequest(client, request, err)
+
+ if (stream != null) {
+ // Some chunks might still come after abort,
+ // let's ignore them
+ stream.removeAllListeners('data')
+
+ // On Abort, we close the stream to send RST_STREAM frame
+ stream.close()
+
+ // We move the running index to the next request
+ client[kOnError](err)
+ client[kResume]()
+ }
+
+ // We do not destroy the socket as we can continue using the session
+ // the stream gets destroyed and the session remains to create new streams
+ util.destroy(body, err)
+ }
+
+ try {
+ // We are already connected, streams are pending.
+ // We can call on connect, and wait for abort
+ request.onConnect(abort)
+ } catch (err) {
+ util.errorRequest(client, request, err)
+ }
+
+ if (request.aborted) {
+ return false
+ }
+
+ if (upgrade || method === 'CONNECT') {
+ session.ref()
+
+ if (upgrade === 'websocket') {
+ // We cannot upgrade to websocket if extended CONNECT protocol is not supported
+ if (session[kEnableConnectProtocol] === false) {
+ util.errorRequest(client, request, new InformationalError('HTTP/2: Extended CONNECT protocol not supported by server'))
+ session.unref()
+ return false
+ }
+
+ // We force the method to CONNECT
+ // as per RFC-8441
+ // https://datatracker.ietf.org/doc/html/rfc8441#section-4
+ headers[HTTP2_HEADER_METHOD] = 'CONNECT'
+ headers[HTTP2_HEADER_PROTOCOL] = 'websocket'
+ // :path and :scheme headers must be omitted when sending CONNECT but set if extended-CONNECT
+ headers[HTTP2_HEADER_PATH] = path
+
+ if (protocol === 'ws:' || protocol === 'wss:') {
+ headers[HTTP2_HEADER_SCHEME] = protocol === 'ws:' ? 'http' : 'https'
+ } else {
+ headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
+ }
+
+ stream = session.request(headers, { endStream: false, signal })
+ stream[kHTTP2Stream] = true
+
+ stream.once('response', (headers, _flags) => {
+ const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
+
+ request.onUpgrade(statusCode, parseH2Headers(realHeaders), stream)
+
+ ++session[kOpenStreams]
+ client[kQueue][client[kRunningIdx]++] = null
+ })
+
+ stream.on('error', () => {
+ if (stream.rstCode === NGHTTP2_REFUSED_STREAM || stream.rstCode === NGHTTP2_CANCEL) {
+ // NGHTTP2_REFUSED_STREAM (7) or NGHTTP2_CANCEL (8)
+ // We do not treat those as errors as the server might
+ // not support websockets and refuse the stream
+ abort(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`))
+ }
+ })
+
+ stream.once('close', () => {
+ session[kOpenStreams] -= 1
+ if (session[kOpenStreams] === 0) session.unref()
+ })
+
+ stream.setTimeout(requestTimeout)
+ return true
+ }
+
+ // TODO: consolidate once we support CONNECT properly
+ // NOTE: We are already connected, streams are pending, first request
+ // will create a new stream. We trigger a request to create the stream and wait until
+ // `ready` event is triggered
+ // We disabled endStream to allow the user to write to the stream
+ stream = session.request(headers, { endStream: false, signal })
+ stream[kHTTP2Stream] = true
+ stream.on('response', headers => {
+ const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
+
+ request.onUpgrade(statusCode, parseH2Headers(realHeaders), stream)
+ ++session[kOpenStreams]
+ client[kQueue][client[kRunningIdx]++] = null
+ })
+ stream.once('close', () => {
+ session[kOpenStreams] -= 1
+ if (session[kOpenStreams] === 0) session.unref()
+ })
+ stream.setTimeout(requestTimeout)
+
+ return true
+ }
+
+ // https://tools.ietf.org/html/rfc7540#section-8.3
+ // :path and :scheme headers must be omitted when sending CONNECT
+ headers[HTTP2_HEADER_PATH] = path
+ headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
+
+ // https://tools.ietf.org/html/rfc7231#section-4.3.1
+ // https://tools.ietf.org/html/rfc7231#section-4.3.2
+ // https://tools.ietf.org/html/rfc7231#section-4.3.5
+
+ // Sending a payload body on a request that does not
+ // expect it can cause undefined behavior on some
+ // servers and corrupt connection state. Do not
+ // re-use the connection for further requests.
+
+ const expectsPayload = (
+ method === 'PUT' ||
+ method === 'POST' ||
+ method === 'PATCH'
+ )
+
+ if (body && typeof body.read === 'function') {
+ // Try to read EOF in order to get length.
+ body.read(0)
+ }
+
+ let contentLength = util.bodyLength(body)
+
+ if (util.isFormDataLike(body)) {
+ extractBody ??= require('../web/fetch/body.js').extractBody
+
+ const [bodyStream, contentType] = extractBody(body)
+ headers['content-type'] = contentType
+
+ body = bodyStream.stream
+ contentLength = bodyStream.length
+ }
+
+ if (contentLength == null) {
+ contentLength = request.contentLength
+ }
+
+ if (!expectsPayload) {
+ // https://tools.ietf.org/html/rfc7230#section-3.3.2
+ // A user agent SHOULD NOT send a Content-Length header field when
+ // the request message does not contain a payload body and the method
+ // semantics do not anticipate such a body.
+ // And for methods that don't expect a payload, omit Content-Length.
+ contentLength = null
+ }
+
+ // https://github.com/nodejs/undici/issues/2046
+ // A user agent may send a Content-Length header with 0 value, this should be allowed.
+ if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
+ if (client[kStrictContentLength]) {
+ util.errorRequest(client, request, new RequestContentLengthMismatchError())
+ return false
+ }
+
+ process.emitWarning(new RequestContentLengthMismatchError())
+ }
+
+ if (contentLength != null) {
+ assert(body || contentLength === 0, 'no body must not have content length')
+ headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
+ }
+
+ session.ref()
+
+ if (channels.sendHeaders.hasSubscribers) {
+ let header = ''
+ for (const key in headers) {
+ header += `${key}: ${headers[key]}\r\n`
+ }
+ channels.sendHeaders.publish({ request, headers: header, socket: session[kSocket] })
+ }
+
+ // TODO(metcoder95): add support for sending trailers
+ const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
+ if (expectContinue) {
+ headers[HTTP2_HEADER_EXPECT] = '100-continue'
+ stream = session.request(headers, { endStream: shouldEndStream, signal })
+ stream[kHTTP2Stream] = true
+
+ stream.once('continue', writeBodyH2)
+ } else {
+ stream = session.request(headers, {
+ endStream: shouldEndStream,
+ signal
+ })
+ stream[kHTTP2Stream] = true
+
+ writeBodyH2()
+ }
+
+ // Increment counter as we have new streams open
+ ++session[kOpenStreams]
+ stream.setTimeout(requestTimeout)
+
+ // Track whether we received a response (headers)
+ let responseReceived = false
+
+ stream.once('response', headers => {
+ const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
+ request.onResponseStarted()
+ responseReceived = true
+
+ // Due to the stream nature, it is possible we face a race condition
+ // where the stream has been assigned, but the request has been aborted
+ // the request remains in-flight and headers hasn't been received yet
+ // for those scenarios, best effort is to destroy the stream immediately
+ // as there's no value to keep it open.
+ if (request.aborted) {
+ stream.removeAllListeners('data')
+ return
+ }
+
+ if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
+ stream.pause()
+ }
+ })
+
+ stream.on('data', (chunk) => {
+ if (request.onData(chunk) === false) {
+ stream.pause()
+ }
+ })
+
+ stream.once('end', () => {
+ stream.removeAllListeners('data')
+ // If we received a response, this is a normal completion
+ if (responseReceived) {
+ if (!request.aborted && !request.completed) {
+ request.onComplete({})
+ }
+
+ client[kQueue][client[kRunningIdx]++] = null
+ client[kResume]()
+ } else {
+ // Stream ended without receiving a response - this is an error
+ // (e.g., server destroyed the stream before sending headers)
+ abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
+ client[kQueue][client[kRunningIdx]++] = null
+ client[kPendingIdx] = client[kRunningIdx]
+ client[kResume]()
+ }
+ })
+
+ stream.once('close', () => {
+ stream.removeAllListeners('data')
+ session[kOpenStreams] -= 1
+ if (session[kOpenStreams] === 0) {
+ session.unref()
+ }
+ })
+
+ stream.once('error', function (err) {
+ stream.removeAllListeners('data')
+ abort(err)
+ })
+
+ stream.once('frameError', (type, code) => {
+ stream.removeAllListeners('data')
+ abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
+ })
+
+ stream.on('aborted', () => {
+ stream.removeAllListeners('data')
+ })
+
+ stream.on('timeout', () => {
+ const err = new InformationalError(`HTTP/2: "stream timeout after ${requestTimeout}"`)
+ stream.removeAllListeners('data')
+ session[kOpenStreams] -= 1
+
+ if (session[kOpenStreams] === 0) {
+ session.unref()
+ }
+
+ abort(err)
+ })
+
+ stream.once('trailers', trailers => {
+ if (request.aborted || request.completed) {
+ return
+ }
+
+ request.onComplete(trailers)
+ })
+
+ return true
+
+ function writeBodyH2 () {
+ if (!body || contentLength === 0) {
+ writeBuffer(
+ abort,
+ stream,
+ null,
+ client,
+ request,
+ client[kSocket],
+ contentLength,
+ expectsPayload
+ )
+ } else if (util.isBuffer(body)) {
+ writeBuffer(
+ abort,
+ stream,
+ body,
+ client,
+ request,
+ client[kSocket],
+ contentLength,
+ expectsPayload
+ )
+ } else if (util.isBlobLike(body)) {
+ if (typeof body.stream === 'function') {
+ writeIterable(
+ abort,
+ stream,
+ body.stream(),
+ client,
+ request,
+ client[kSocket],
+ contentLength,
+ expectsPayload
+ )
+ } else {
+ writeBlob(
+ abort,
+ stream,
+ body,
+ client,
+ request,
+ client[kSocket],
+ contentLength,
+ expectsPayload
+ )
+ }
+ } else if (util.isStream(body)) {
+ writeStream(
+ abort,
+ client[kSocket],
+ expectsPayload,
+ stream,
+ body,
+ client,
+ request,
+ contentLength
+ )
+ } else if (util.isIterable(body)) {
+ writeIterable(
+ abort,
+ stream,
+ body,
+ client,
+ request,
+ client[kSocket],
+ contentLength,
+ expectsPayload
+ )
+ } else {
+ assert(false)
+ }
+ }
+}
+
+function writeBuffer (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
+ try {
+ if (body != null && util.isBuffer(body)) {
+ assert(contentLength === body.byteLength, 'buffer body must have content length')
+ h2stream.cork()
+ h2stream.write(body)
+ h2stream.uncork()
+ h2stream.end()
+
+ request.onBodySent(body)
+ }
+
+ if (!expectsPayload) {
+ socket[kReset] = true
+ }
+
+ request.onRequestSent()
+ client[kResume]()
+ } catch (error) {
+ abort(error)
+ }
+}
+
+function writeStream (abort, socket, expectsPayload, h2stream, body, client, request, contentLength) {
+ assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
+
+ // For HTTP/2, is enough to pipe the stream
+ const pipe = pipeline(
+ body,
+ h2stream,
+ (err) => {
+ if (err) {
+ util.destroy(pipe, err)
+ abort(err)
+ } else {
+ util.removeAllListeners(pipe)
+ request.onRequestSent()
+
+ if (!expectsPayload) {
+ socket[kReset] = true
+ }
+
+ client[kResume]()
+ }
+ }
+ )
+
+ util.addListener(pipe, 'data', onPipeData)
+
+ function onPipeData (chunk) {
+ request.onBodySent(chunk)
+ }
+}
+
+async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
+ assert(contentLength === body.size, 'blob body must have content length')
+
+ try {
+ if (contentLength != null && contentLength !== body.size) {
+ throw new RequestContentLengthMismatchError()
+ }
+
+ const buffer = Buffer.from(await body.arrayBuffer())
+
+ h2stream.cork()
+ h2stream.write(buffer)
+ h2stream.uncork()
+ h2stream.end()
+
+ request.onBodySent(buffer)
+ request.onRequestSent()
+
+ if (!expectsPayload) {
+ socket[kReset] = true
+ }
+
+ client[kResume]()
+ } catch (err) {
+ abort(err)
+ }
+}
+
+async function writeIterable (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
+ assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
+
+ let callback = null
+ function onDrain () {
+ if (callback) {
+ const cb = callback
+ callback = null
+ cb()
+ }
+ }
+
+ const waitForDrain = () => new Promise((resolve, reject) => {
+ assert(callback === null)
+
+ if (socket[kError]) {
+ reject(socket[kError])
+ } else {
+ callback = resolve
+ }
+ })
+
+ h2stream
+ .on('close', onDrain)
+ .on('drain', onDrain)
+
+ try {
+ // It's up to the user to somehow abort the async iterable.
+ for await (const chunk of body) {
+ if (socket[kError]) {
+ throw socket[kError]
+ }
+
+ const res = h2stream.write(chunk)
+ request.onBodySent(chunk)
+ if (!res) {
+ await waitForDrain()
+ }
+ }
+
+ h2stream.end()
+
+ request.onRequestSent()
+
+ if (!expectsPayload) {
+ socket[kReset] = true
+ }
+
+ client[kResume]()
+ } catch (err) {
+ abort(err)
+ } finally {
+ h2stream
+ .off('close', onDrain)
+ .off('drain', onDrain)
+ }
+}
+
+module.exports = connectH2