aboutsummaryrefslogtreecommitdiffstats
path: root/vanilla/node_modules/undici/lib/web/websocket/sender.js
blob: c647bf629d7c1f923dfce2d696faedc140d9d225 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
'use strict'

const { WebsocketFrameSend } = require('./frame')
const { opcodes, sendHints } = require('./constants')
const FixedQueue = require('../../dispatcher/fixed-queue')

/**
 * @typedef {object} SendQueueNode
 * @property {Promise<void> | null} promise
 * @property {((...args: any[]) => any)} callback
 * @property {Buffer | null} frame
 */

class SendQueue {
  /**
   * @type {FixedQueue}
   */
  #queue = new FixedQueue()

  /**
   * @type {boolean}
   */
  #running = false

  /** @type {import('node:net').Socket} */
  #socket

  constructor (socket) {
    this.#socket = socket
  }

  add (item, cb, hint) {
    if (hint !== sendHints.blob) {
      if (!this.#running) {
        // TODO(@tsctx): support fast-path for string on running
        if (hint === sendHints.text) {
          // special fast-path for string
          const { 0: head, 1: body } = WebsocketFrameSend.createFastTextFrame(item)
          this.#socket.cork()
          this.#socket.write(head)
          this.#socket.write(body, cb)
          this.#socket.uncork()
        } else {
          // direct writing
          this.#socket.write(createFrame(item, hint), cb)
        }
      } else {
        /** @type {SendQueueNode} */
        const node = {
          promise: null,
          callback: cb,
          frame: createFrame(item, hint)
        }
        this.#queue.push(node)
      }
      return
    }

    /** @type {SendQueueNode} */
    const node = {
      promise: item.arrayBuffer().then((ab) => {
        node.promise = null
        node.frame = createFrame(ab, hint)
      }),
      callback: cb,
      frame: null
    }

    this.#queue.push(node)

    if (!this.#running) {
      this.#run()
    }
  }

  async #run () {
    this.#running = true
    const queue = this.#queue
    while (!queue.isEmpty()) {
      const node = queue.shift()
      // wait pending promise
      if (node.promise !== null) {
        await node.promise
      }
      // write
      this.#socket.write(node.frame, node.callback)
      // cleanup
      node.callback = node.frame = null
    }
    this.#running = false
  }
}

function createFrame (data, hint) {
  return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.text ? opcodes.TEXT : opcodes.BINARY)
}

function toBuffer (data, hint) {
  switch (hint) {
    case sendHints.text:
    case sendHints.typedArray:
      return new Uint8Array(data.buffer, data.byteOffset, data.byteLength)
    case sendHints.arrayBuffer:
    case sendHints.blob:
      return new Uint8Array(data)
  }
}

module.exports = { SendQueue }