aboutsummaryrefslogtreecommitdiffstats
path: root/vanilla/node_modules/undici/lib/dispatcher/client-h2.js
diff options
context:
space:
mode:
Diffstat (limited to 'vanilla/node_modules/undici/lib/dispatcher/client-h2.js')
-rw-r--r--vanilla/node_modules/undici/lib/dispatcher/client-h2.js990
1 files changed, 0 insertions, 990 deletions
diff --git a/vanilla/node_modules/undici/lib/dispatcher/client-h2.js b/vanilla/node_modules/undici/lib/dispatcher/client-h2.js
deleted file mode 100644
index 0969108..0000000
--- a/vanilla/node_modules/undici/lib/dispatcher/client-h2.js
+++ /dev/null
@@ -1,990 +0,0 @@
-'use strict'
-
-const assert = require('node:assert')
-const { pipeline } = require('node:stream')
-const util = require('../core/util.js')
-const {
- RequestContentLengthMismatchError,
- RequestAbortedError,
- SocketError,
- InformationalError,
- InvalidArgumentError
-} = require('../core/errors.js')
-const {
- kUrl,
- kReset,
- kClient,
- kRunning,
- kPending,
- kQueue,
- kPendingIdx,
- kRunningIdx,
- kError,
- kSocket,
- kStrictContentLength,
- kOnError,
- kMaxConcurrentStreams,
- kPingInterval,
- kHTTP2Session,
- kHTTP2InitialWindowSize,
- kHTTP2ConnectionWindowSize,
- kResume,
- kSize,
- kHTTPContext,
- kClosed,
- kBodyTimeout,
- kEnableConnectProtocol,
- kRemoteSettings,
- kHTTP2Stream,
- kHTTP2SessionState
-} = require('../core/symbols.js')
-const { channels } = require('../core/diagnostics.js')
-
-const kOpenStreams = Symbol('open streams')
-
-let extractBody
-
-/** @type {import('http2')} */
-let http2
-try {
- http2 = require('node:http2')
-} catch {
- // @ts-ignore
- http2 = { constants: {} }
-}
-
-const {
- constants: {
- HTTP2_HEADER_AUTHORITY,
- HTTP2_HEADER_METHOD,
- HTTP2_HEADER_PATH,
- HTTP2_HEADER_SCHEME,
- HTTP2_HEADER_CONTENT_LENGTH,
- HTTP2_HEADER_EXPECT,
- HTTP2_HEADER_STATUS,
- HTTP2_HEADER_PROTOCOL,
- NGHTTP2_REFUSED_STREAM,
- NGHTTP2_CANCEL
- }
-} = http2
-
-function parseH2Headers (headers) {
- const result = []
-
- for (const [name, value] of Object.entries(headers)) {
- // h2 may concat the header value by array
- // e.g. Set-Cookie
- if (Array.isArray(value)) {
- for (const subvalue of value) {
- // we need to provide each header value of header name
- // because the headers handler expect name-value pair
- result.push(Buffer.from(name), Buffer.from(subvalue))
- }
- } else {
- result.push(Buffer.from(name), Buffer.from(value))
- }
- }
-
- return result
-}
-
-function connectH2 (client, socket) {
- client[kSocket] = socket
-
- const http2InitialWindowSize = client[kHTTP2InitialWindowSize]
- const http2ConnectionWindowSize = client[kHTTP2ConnectionWindowSize]
-
- const session = http2.connect(client[kUrl], {
- createConnection: () => socket,
- peerMaxConcurrentStreams: client[kMaxConcurrentStreams],
- settings: {
- // TODO(metcoder95): add support for PUSH
- enablePush: false,
- ...(http2InitialWindowSize != null ? { initialWindowSize: http2InitialWindowSize } : null)
- }
- })
-
- client[kSocket] = socket
- session[kOpenStreams] = 0
- session[kClient] = client
- session[kSocket] = socket
- session[kHTTP2SessionState] = {
- ping: {
- interval: client[kPingInterval] === 0 ? null : setInterval(onHttp2SendPing, client[kPingInterval], session).unref()
- }
- }
- // We set it to true by default in a best-effort; however once connected to an H2 server
- // we will check if extended CONNECT protocol is supported or not
- // and set this value accordingly.
- session[kEnableConnectProtocol] = false
- // States whether or not we have received the remote settings from the server
- session[kRemoteSettings] = false
-
- // Apply connection-level flow control once connected (if supported).
- if (http2ConnectionWindowSize) {
- util.addListener(session, 'connect', applyConnectionWindowSize.bind(session, http2ConnectionWindowSize))
- }
-
- util.addListener(session, 'error', onHttp2SessionError)
- util.addListener(session, 'frameError', onHttp2FrameError)
- util.addListener(session, 'end', onHttp2SessionEnd)
- util.addListener(session, 'goaway', onHttp2SessionGoAway)
- util.addListener(session, 'close', onHttp2SessionClose)
- util.addListener(session, 'remoteSettings', onHttp2RemoteSettings)
- // TODO (@metcoder95): implement SETTINGS support
- // util.addListener(session, 'localSettings', onHttp2RemoteSettings)
-
- session.unref()
-
- client[kHTTP2Session] = session
- socket[kHTTP2Session] = session
-
- util.addListener(socket, 'error', onHttp2SocketError)
- util.addListener(socket, 'end', onHttp2SocketEnd)
- util.addListener(socket, 'close', onHttp2SocketClose)
-
- socket[kClosed] = false
- socket.on('close', onSocketClose)
-
- return {
- version: 'h2',
- defaultPipelining: Infinity,
- /**
- * @param {import('../core/request.js')} request
- * @returns {boolean}
- */
- write (request) {
- return writeH2(client, request)
- },
- /**
- * @returns {void}
- */
- resume () {
- resumeH2(client)
- },
- /**
- * @param {Error | null} err
- * @param {() => void} callback
- */
- destroy (err, callback) {
- if (socket[kClosed]) {
- queueMicrotask(callback)
- } else {
- socket.destroy(err).on('close', callback)
- }
- },
- /**
- * @type {boolean}
- */
- get destroyed () {
- return socket.destroyed
- },
- /**
- * @param {import('../core/request.js')} request
- * @returns {boolean}
- */
- busy (request) {
- if (request != null) {
- if (client[kRunning] > 0) {
- // We are already processing requests
-
- // Non-idempotent request cannot be retried.
- // Ensure that no other requests are inflight and
- // could cause failure.
- if (request.idempotent === false) return true
- // Don't dispatch an upgrade until all preceding requests have completed.
- // Possibly, we do not have remote settings confirmed yet.
- if ((request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false) return true
- // Request with stream or iterator body can error while other requests
- // are inflight and indirectly error those as well.
- // Ensure this doesn't happen by waiting for inflight
- // to complete before dispatching.
-
- // Request with stream or iterator body cannot be retried.
- // Ensure that no other requests are inflight and
- // could cause failure.
- if (util.bodyLength(request.body) !== 0 &&
- (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) return true
- } else {
- return (request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false
- }
- }
-
- return false
- }
- }
-}
-
-function resumeH2 (client) {
- const socket = client[kSocket]
-
- if (socket?.destroyed === false) {
- if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) {
- socket.unref()
- client[kHTTP2Session].unref()
- } else {
- socket.ref()
- client[kHTTP2Session].ref()
- }
- }
-}
-
-function applyConnectionWindowSize (connectionWindowSize) {
- try {
- if (typeof this.setLocalWindowSize === 'function') {
- this.setLocalWindowSize(connectionWindowSize)
- }
- } catch {
- // Best-effort only.
- }
-}
-
-function onHttp2RemoteSettings (settings) {
- // Fallbacks are a safe bet, remote setting will always override
- this[kClient][kMaxConcurrentStreams] = settings.maxConcurrentStreams ?? this[kClient][kMaxConcurrentStreams]
- /**
- * From RFC-8441
- * A sender MUST NOT send a SETTINGS_ENABLE_CONNECT_PROTOCOL parameter
- * with the value of 0 after previously sending a value of 1.
- */
- // Note: Cannot be tested in Node, it does not supports disabling the extended CONNECT protocol once enabled
- if (this[kRemoteSettings] === true && this[kEnableConnectProtocol] === true && settings.enableConnectProtocol === false) {
- const err = new InformationalError('HTTP/2: Server disabled extended CONNECT protocol against RFC-8441')
- this[kSocket][kError] = err
- this[kClient][kOnError](err)
- return
- }
-
- this[kEnableConnectProtocol] = settings.enableConnectProtocol ?? this[kEnableConnectProtocol]
- this[kRemoteSettings] = true
- this[kClient][kResume]()
-}
-
-function onHttp2SendPing (session) {
- const state = session[kHTTP2SessionState]
- if ((session.closed || session.destroyed) && state.ping.interval != null) {
- clearInterval(state.ping.interval)
- state.ping.interval = null
- return
- }
-
- // If no ping sent, do nothing
- session.ping(onPing.bind(session))
-
- function onPing (err, duration) {
- const client = this[kClient]
- const socket = this[kClient]
-
- if (err != null) {
- const error = new InformationalError(`HTTP/2: "PING" errored - type ${err.message}`)
- socket[kError] = error
- client[kOnError](error)
- } else {
- client.emit('ping', duration)
- }
- }
-}
-
-function onHttp2SessionError (err) {
- assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
-
- this[kSocket][kError] = err
- this[kClient][kOnError](err)
-}
-
-function onHttp2FrameError (type, code, id) {
- if (id === 0) {
- const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
- this[kSocket][kError] = err
- this[kClient][kOnError](err)
- }
-}
-
-function onHttp2SessionEnd () {
- const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
- this.destroy(err)
- util.destroy(this[kSocket], err)
-}
-
-/**
- * This is the root cause of #3011
- * We need to handle GOAWAY frames properly, and trigger the session close
- * along with the socket right away
- *
- * @this {import('http2').ClientHttp2Session}
- * @param {number} errorCode
- */
-function onHttp2SessionGoAway (errorCode) {
- // TODO(mcollina): Verify if GOAWAY implements the spec correctly:
- // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8
- // Specifically, we do not verify the "valid" stream id.
-
- const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]))
- const client = this[kClient]
-
- client[kSocket] = null
- client[kHTTPContext] = null
-
- // this is an HTTP2 session
- this.close()
- this[kHTTP2Session] = null
-
- util.destroy(this[kSocket], err)
-
- // Fail head of pipeline.
- if (client[kRunningIdx] < client[kQueue].length) {
- const request = client[kQueue][client[kRunningIdx]]
- client[kQueue][client[kRunningIdx]++] = null
- util.errorRequest(client, request, err)
- client[kPendingIdx] = client[kRunningIdx]
- }
-
- assert(client[kRunning] === 0)
-
- client.emit('disconnect', client[kUrl], [client], err)
- client.emit('connectionError', client[kUrl], [client], err)
-
- client[kResume]()
-}
-
-function onHttp2SessionClose () {
- const { [kClient]: client, [kHTTP2SessionState]: state } = this
- const { [kSocket]: socket } = client
-
- const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))
-
- client[kSocket] = null
- client[kHTTPContext] = null
-
- if (state.ping.interval != null) {
- clearInterval(state.ping.interval)
- state.ping.interval = null
- }
-
- if (client.destroyed) {
- assert(client[kPending] === 0)
-
- // Fail entire queue.
- const requests = client[kQueue].splice(client[kRunningIdx])
- for (let i = 0; i < requests.length; i++) {
- const request = requests[i]
- util.errorRequest(client, request, err)
- }
- }
-}
-
-function onHttp2SocketClose () {
- const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
-
- const client = this[kHTTP2Session][kClient]
-
- client[kSocket] = null
- client[kHTTPContext] = null
-
- if (this[kHTTP2Session] !== null) {
- this[kHTTP2Session].destroy(err)
- }
-
- client[kPendingIdx] = client[kRunningIdx]
-
- assert(client[kRunning] === 0)
-
- client.emit('disconnect', client[kUrl], [client], err)
-
- client[kResume]()
-}
-
-function onHttp2SocketError (err) {
- assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
-
- this[kError] = err
-
- this[kClient][kOnError](err)
-}
-
-function onHttp2SocketEnd () {
- util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
-}
-
-function onSocketClose () {
- this[kClosed] = true
-}
-
-// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
-function shouldSendContentLength (method) {
- return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
-}
-
-function writeH2 (client, request) {
- const requestTimeout = request.bodyTimeout ?? client[kBodyTimeout]
- const session = client[kHTTP2Session]
- const { method, path, host, upgrade, expectContinue, signal, protocol, headers: reqHeaders } = request
- let { body } = request
-
- if (upgrade != null && upgrade !== 'websocket') {
- util.errorRequest(client, request, new InvalidArgumentError(`Custom upgrade "${upgrade}" not supported over HTTP/2`))
- return false
- }
-
- const headers = {}
- for (let n = 0; n < reqHeaders.length; n += 2) {
- const key = reqHeaders[n + 0]
- const val = reqHeaders[n + 1]
-
- if (key === 'cookie') {
- if (headers[key] != null) {
- headers[key] = Array.isArray(headers[key]) ? (headers[key].push(val), headers[key]) : [headers[key], val]
- } else {
- headers[key] = val
- }
-
- continue
- }
-
- if (Array.isArray(val)) {
- for (let i = 0; i < val.length; i++) {
- if (headers[key]) {
- headers[key] += `, ${val[i]}`
- } else {
- headers[key] = val[i]
- }
- }
- } else if (headers[key]) {
- headers[key] += `, ${val}`
- } else {
- headers[key] = val
- }
- }
-
- /** @type {import('node:http2').ClientHttp2Stream} */
- let stream = null
-
- const { hostname, port } = client[kUrl]
-
- headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
- headers[HTTP2_HEADER_METHOD] = method
-
- const abort = (err) => {
- if (request.aborted || request.completed) {
- return
- }
-
- err = err || new RequestAbortedError()
-
- util.errorRequest(client, request, err)
-
- if (stream != null) {
- // Some chunks might still come after abort,
- // let's ignore them
- stream.removeAllListeners('data')
-
- // On Abort, we close the stream to send RST_STREAM frame
- stream.close()
-
- // We move the running index to the next request
- client[kOnError](err)
- client[kResume]()
- }
-
- // We do not destroy the socket as we can continue using the session
- // the stream gets destroyed and the session remains to create new streams
- util.destroy(body, err)
- }
-
- try {
- // We are already connected, streams are pending.
- // We can call on connect, and wait for abort
- request.onConnect(abort)
- } catch (err) {
- util.errorRequest(client, request, err)
- }
-
- if (request.aborted) {
- return false
- }
-
- if (upgrade || method === 'CONNECT') {
- session.ref()
-
- if (upgrade === 'websocket') {
- // We cannot upgrade to websocket if extended CONNECT protocol is not supported
- if (session[kEnableConnectProtocol] === false) {
- util.errorRequest(client, request, new InformationalError('HTTP/2: Extended CONNECT protocol not supported by server'))
- session.unref()
- return false
- }
-
- // We force the method to CONNECT
- // as per RFC-8441
- // https://datatracker.ietf.org/doc/html/rfc8441#section-4
- headers[HTTP2_HEADER_METHOD] = 'CONNECT'
- headers[HTTP2_HEADER_PROTOCOL] = 'websocket'
- // :path and :scheme headers must be omitted when sending CONNECT but set if extended-CONNECT
- headers[HTTP2_HEADER_PATH] = path
-
- if (protocol === 'ws:' || protocol === 'wss:') {
- headers[HTTP2_HEADER_SCHEME] = protocol === 'ws:' ? 'http' : 'https'
- } else {
- headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
- }
-
- stream = session.request(headers, { endStream: false, signal })
- stream[kHTTP2Stream] = true
-
- stream.once('response', (headers, _flags) => {
- const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
-
- request.onUpgrade(statusCode, parseH2Headers(realHeaders), stream)
-
- ++session[kOpenStreams]
- client[kQueue][client[kRunningIdx]++] = null
- })
-
- stream.on('error', () => {
- if (stream.rstCode === NGHTTP2_REFUSED_STREAM || stream.rstCode === NGHTTP2_CANCEL) {
- // NGHTTP2_REFUSED_STREAM (7) or NGHTTP2_CANCEL (8)
- // We do not treat those as errors as the server might
- // not support websockets and refuse the stream
- abort(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`))
- }
- })
-
- stream.once('close', () => {
- session[kOpenStreams] -= 1
- if (session[kOpenStreams] === 0) session.unref()
- })
-
- stream.setTimeout(requestTimeout)
- return true
- }
-
- // TODO: consolidate once we support CONNECT properly
- // NOTE: We are already connected, streams are pending, first request
- // will create a new stream. We trigger a request to create the stream and wait until
- // `ready` event is triggered
- // We disabled endStream to allow the user to write to the stream
- stream = session.request(headers, { endStream: false, signal })
- stream[kHTTP2Stream] = true
- stream.on('response', headers => {
- const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
-
- request.onUpgrade(statusCode, parseH2Headers(realHeaders), stream)
- ++session[kOpenStreams]
- client[kQueue][client[kRunningIdx]++] = null
- })
- stream.once('close', () => {
- session[kOpenStreams] -= 1
- if (session[kOpenStreams] === 0) session.unref()
- })
- stream.setTimeout(requestTimeout)
-
- return true
- }
-
- // https://tools.ietf.org/html/rfc7540#section-8.3
- // :path and :scheme headers must be omitted when sending CONNECT
- headers[HTTP2_HEADER_PATH] = path
- headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
-
- // https://tools.ietf.org/html/rfc7231#section-4.3.1
- // https://tools.ietf.org/html/rfc7231#section-4.3.2
- // https://tools.ietf.org/html/rfc7231#section-4.3.5
-
- // Sending a payload body on a request that does not
- // expect it can cause undefined behavior on some
- // servers and corrupt connection state. Do not
- // re-use the connection for further requests.
-
- const expectsPayload = (
- method === 'PUT' ||
- method === 'POST' ||
- method === 'PATCH'
- )
-
- if (body && typeof body.read === 'function') {
- // Try to read EOF in order to get length.
- body.read(0)
- }
-
- let contentLength = util.bodyLength(body)
-
- if (util.isFormDataLike(body)) {
- extractBody ??= require('../web/fetch/body.js').extractBody
-
- const [bodyStream, contentType] = extractBody(body)
- headers['content-type'] = contentType
-
- body = bodyStream.stream
- contentLength = bodyStream.length
- }
-
- if (contentLength == null) {
- contentLength = request.contentLength
- }
-
- if (!expectsPayload) {
- // https://tools.ietf.org/html/rfc7230#section-3.3.2
- // A user agent SHOULD NOT send a Content-Length header field when
- // the request message does not contain a payload body and the method
- // semantics do not anticipate such a body.
- // And for methods that don't expect a payload, omit Content-Length.
- contentLength = null
- }
-
- // https://github.com/nodejs/undici/issues/2046
- // A user agent may send a Content-Length header with 0 value, this should be allowed.
- if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
- if (client[kStrictContentLength]) {
- util.errorRequest(client, request, new RequestContentLengthMismatchError())
- return false
- }
-
- process.emitWarning(new RequestContentLengthMismatchError())
- }
-
- if (contentLength != null) {
- assert(body || contentLength === 0, 'no body must not have content length')
- headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
- }
-
- session.ref()
-
- if (channels.sendHeaders.hasSubscribers) {
- let header = ''
- for (const key in headers) {
- header += `${key}: ${headers[key]}\r\n`
- }
- channels.sendHeaders.publish({ request, headers: header, socket: session[kSocket] })
- }
-
- // TODO(metcoder95): add support for sending trailers
- const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null
- if (expectContinue) {
- headers[HTTP2_HEADER_EXPECT] = '100-continue'
- stream = session.request(headers, { endStream: shouldEndStream, signal })
- stream[kHTTP2Stream] = true
-
- stream.once('continue', writeBodyH2)
- } else {
- stream = session.request(headers, {
- endStream: shouldEndStream,
- signal
- })
- stream[kHTTP2Stream] = true
-
- writeBodyH2()
- }
-
- // Increment counter as we have new streams open
- ++session[kOpenStreams]
- stream.setTimeout(requestTimeout)
-
- // Track whether we received a response (headers)
- let responseReceived = false
-
- stream.once('response', headers => {
- const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
- request.onResponseStarted()
- responseReceived = true
-
- // Due to the stream nature, it is possible we face a race condition
- // where the stream has been assigned, but the request has been aborted
- // the request remains in-flight and headers hasn't been received yet
- // for those scenarios, best effort is to destroy the stream immediately
- // as there's no value to keep it open.
- if (request.aborted) {
- stream.removeAllListeners('data')
- return
- }
-
- if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
- stream.pause()
- }
- })
-
- stream.on('data', (chunk) => {
- if (request.onData(chunk) === false) {
- stream.pause()
- }
- })
-
- stream.once('end', () => {
- stream.removeAllListeners('data')
- // If we received a response, this is a normal completion
- if (responseReceived) {
- if (!request.aborted && !request.completed) {
- request.onComplete({})
- }
-
- client[kQueue][client[kRunningIdx]++] = null
- client[kResume]()
- } else {
- // Stream ended without receiving a response - this is an error
- // (e.g., server destroyed the stream before sending headers)
- abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
- client[kQueue][client[kRunningIdx]++] = null
- client[kPendingIdx] = client[kRunningIdx]
- client[kResume]()
- }
- })
-
- stream.once('close', () => {
- stream.removeAllListeners('data')
- session[kOpenStreams] -= 1
- if (session[kOpenStreams] === 0) {
- session.unref()
- }
- })
-
- stream.once('error', function (err) {
- stream.removeAllListeners('data')
- abort(err)
- })
-
- stream.once('frameError', (type, code) => {
- stream.removeAllListeners('data')
- abort(new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`))
- })
-
- stream.on('aborted', () => {
- stream.removeAllListeners('data')
- })
-
- stream.on('timeout', () => {
- const err = new InformationalError(`HTTP/2: "stream timeout after ${requestTimeout}"`)
- stream.removeAllListeners('data')
- session[kOpenStreams] -= 1
-
- if (session[kOpenStreams] === 0) {
- session.unref()
- }
-
- abort(err)
- })
-
- stream.once('trailers', trailers => {
- if (request.aborted || request.completed) {
- return
- }
-
- request.onComplete(trailers)
- })
-
- return true
-
- function writeBodyH2 () {
- if (!body || contentLength === 0) {
- writeBuffer(
- abort,
- stream,
- null,
- client,
- request,
- client[kSocket],
- contentLength,
- expectsPayload
- )
- } else if (util.isBuffer(body)) {
- writeBuffer(
- abort,
- stream,
- body,
- client,
- request,
- client[kSocket],
- contentLength,
- expectsPayload
- )
- } else if (util.isBlobLike(body)) {
- if (typeof body.stream === 'function') {
- writeIterable(
- abort,
- stream,
- body.stream(),
- client,
- request,
- client[kSocket],
- contentLength,
- expectsPayload
- )
- } else {
- writeBlob(
- abort,
- stream,
- body,
- client,
- request,
- client[kSocket],
- contentLength,
- expectsPayload
- )
- }
- } else if (util.isStream(body)) {
- writeStream(
- abort,
- client[kSocket],
- expectsPayload,
- stream,
- body,
- client,
- request,
- contentLength
- )
- } else if (util.isIterable(body)) {
- writeIterable(
- abort,
- stream,
- body,
- client,
- request,
- client[kSocket],
- contentLength,
- expectsPayload
- )
- } else {
- assert(false)
- }
- }
-}
-
-function writeBuffer (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
- try {
- if (body != null && util.isBuffer(body)) {
- assert(contentLength === body.byteLength, 'buffer body must have content length')
- h2stream.cork()
- h2stream.write(body)
- h2stream.uncork()
- h2stream.end()
-
- request.onBodySent(body)
- }
-
- if (!expectsPayload) {
- socket[kReset] = true
- }
-
- request.onRequestSent()
- client[kResume]()
- } catch (error) {
- abort(error)
- }
-}
-
-function writeStream (abort, socket, expectsPayload, h2stream, body, client, request, contentLength) {
- assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
-
- // For HTTP/2, is enough to pipe the stream
- const pipe = pipeline(
- body,
- h2stream,
- (err) => {
- if (err) {
- util.destroy(pipe, err)
- abort(err)
- } else {
- util.removeAllListeners(pipe)
- request.onRequestSent()
-
- if (!expectsPayload) {
- socket[kReset] = true
- }
-
- client[kResume]()
- }
- }
- )
-
- util.addListener(pipe, 'data', onPipeData)
-
- function onPipeData (chunk) {
- request.onBodySent(chunk)
- }
-}
-
-async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
- assert(contentLength === body.size, 'blob body must have content length')
-
- try {
- if (contentLength != null && contentLength !== body.size) {
- throw new RequestContentLengthMismatchError()
- }
-
- const buffer = Buffer.from(await body.arrayBuffer())
-
- h2stream.cork()
- h2stream.write(buffer)
- h2stream.uncork()
- h2stream.end()
-
- request.onBodySent(buffer)
- request.onRequestSent()
-
- if (!expectsPayload) {
- socket[kReset] = true
- }
-
- client[kResume]()
- } catch (err) {
- abort(err)
- }
-}
-
-async function writeIterable (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
- assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
-
- let callback = null
- function onDrain () {
- if (callback) {
- const cb = callback
- callback = null
- cb()
- }
- }
-
- const waitForDrain = () => new Promise((resolve, reject) => {
- assert(callback === null)
-
- if (socket[kError]) {
- reject(socket[kError])
- } else {
- callback = resolve
- }
- })
-
- h2stream
- .on('close', onDrain)
- .on('drain', onDrain)
-
- try {
- // It's up to the user to somehow abort the async iterable.
- for await (const chunk of body) {
- if (socket[kError]) {
- throw socket[kError]
- }
-
- const res = h2stream.write(chunk)
- request.onBodySent(chunk)
- if (!res) {
- await waitForDrain()
- }
- }
-
- h2stream.end()
-
- request.onRequestSent()
-
- if (!expectsPayload) {
- socket[kReset] = true
- }
-
- client[kResume]()
- } catch (err) {
- abort(err)
- } finally {
- h2stream
- .off('close', onDrain)
- .off('drain', onDrain)
- }
-}
-
-module.exports = connectH2