2
2
3
3
const { Writable } = require ( 'stream' )
4
4
const diagnosticsChannel = require ( 'diagnostics_channel' )
5
- const { parserStates, opcodes, states } = require ( './constants' )
5
+ const { parserStates, opcodes, states, emptyBuffer } = require ( './constants' )
6
6
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require ( './symbols' )
7
7
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require ( './util' )
8
8
const { WebsocketFrameSend } = require ( './frame' )
9
9
10
+ // This code was influenced by ws released under the MIT license.
11
+ // Copyright (c) 2011 Einar Otto Stangvik <[email protected] >
12
+ // Copyright (c) 2013 Arnout Kazemier and contributors
13
+ // Copyright (c) 2016 Luigi Pinca and contributors
14
+
10
15
const channels = { }
11
16
channels . ping = diagnosticsChannel . channel ( 'undici:websocket:ping' )
12
17
channels . pong = diagnosticsChannel . channel ( 'undici:websocket:pong' )
@@ -49,7 +54,7 @@ class ByteParser extends Writable {
49
54
return callback ( )
50
55
}
51
56
52
- const buffer = Buffer . concat ( this . #buffers , this . #byteOffset )
57
+ const buffer = this . consume ( 2 )
53
58
54
59
this . #info. fin = ( buffer [ 0 ] & 0x80 ) !== 0
55
60
this . #info. opcode = buffer [ 0 ] & 0x0F
@@ -96,7 +101,7 @@ class ByteParser extends Writable {
96
101
return
97
102
}
98
103
99
- const body = buffer . subarray ( 2 , payloadLength + 2 )
104
+ const body = this . consume ( payloadLength )
100
105
101
106
this . #info. closeInfo = this . parseCloseBody ( false , body )
102
107
@@ -125,9 +130,6 @@ class ByteParser extends Writable {
125
130
this . ws [ kReadyState ] = states . CLOSING
126
131
this . ws [ kReceivedClose ] = true
127
132
128
- this . #buffers = [ buffer . subarray ( 2 + payloadLength ) ]
129
- this . #byteOffset -= 2 + payloadLength
130
-
131
133
this . end ( )
132
134
133
135
return
@@ -137,8 +139,9 @@ class ByteParser extends Writable {
137
139
// A Pong frame sent in response to a Ping frame must have identical
138
140
// "Application data"
139
141
142
+ const body = this . consume ( payloadLength )
143
+
140
144
if ( ! this . ws [ kReceivedClose ] ) {
141
- const body = payloadLength === 0 ? undefined : buffer . subarray ( 2 , payloadLength + 2 )
142
145
const frame = new WebsocketFrameSend ( body )
143
146
144
147
this . ws [ kResponse ] . socket . write ( frame . createFrame ( opcodes . PONG ) )
@@ -150,8 +153,6 @@ class ByteParser extends Writable {
150
153
}
151
154
}
152
155
153
- this . #buffers = [ buffer . subarray ( 2 + payloadLength ) ]
154
- this . #byteOffset -= 2 + payloadLength
155
156
this . #state = parserStates . INFO
156
157
157
158
if ( this . #byteOffset > 0 ) {
@@ -165,72 +166,50 @@ class ByteParser extends Writable {
165
166
// unidirectional heartbeat. A response to an unsolicited Pong frame is
166
167
// not expected.
167
168
169
+ const body = this . consume ( payloadLength )
170
+
168
171
if ( channels . pong . hasSubscribers ) {
169
172
channels . pong . publish ( {
170
- payload : buffer . subarray ( 2 , payloadLength + 2 )
173
+ payload : body
171
174
} )
172
175
}
173
176
174
- this . #buffers = [ buffer . subarray ( 2 + payloadLength ) ]
175
- this . #byteOffset -= 2 + payloadLength
176
-
177
177
if ( this . #byteOffset > 0 ) {
178
178
return this . run ( callback )
179
179
} else {
180
180
callback ( )
181
181
return
182
182
}
183
183
}
184
-
185
- // TODO: handle control frames here. Since they are unfragmented, and can
186
- // be sent in the middle of other frames, we shouldn't parse them as normal.
187
-
188
- this . #buffers = [ buffer . subarray ( 2 ) ]
189
- this . #byteOffset -= 2
190
184
} else if ( this . #state === parserStates . PAYLOADLENGTH_16 ) {
191
185
if ( this . #byteOffset < 2 ) {
192
186
return callback ( )
193
187
}
194
188
195
- const buffer = Buffer . concat ( this . #buffers , this . #byteOffset )
189
+ const buffer = this . consume ( 2 )
196
190
197
- // TODO: optimize this
198
- this . #info. payloadLength = buffer . subarray ( 0 , 2 ) . readUInt16BE ( 0 )
191
+ this . #info. payloadLength = buffer . readUInt16BE ( 0 )
199
192
this . #state = parserStates . READ_DATA
200
-
201
- this . #buffers = [ buffer . subarray ( 2 ) ]
202
- this . #byteOffset -= 2
203
193
} else if ( this . #state === parserStates . PAYLOADLENGTH_64 ) {
204
194
if ( this . #byteOffset < 8 ) {
205
195
return callback ( )
206
196
}
207
197
208
- const buffer = Buffer . concat ( this . #buffers , this . #byteOffset )
198
+ const buffer = this . consume ( 8 )
209
199
210
200
// TODO: optimize this
211
- this . #info. payloadLength = buffer . subarray ( 0 , 8 ) . readBigUint64BE ( 0 )
201
+ this . #info. payloadLength = buffer . readBigUint64BE ( 0 )
212
202
this . #state = parserStates . READ_DATA
213
-
214
- this . #buffers = [ buffer . subarray ( 8 ) ]
215
- this . #byteOffset -= 8
216
203
} else if ( this . #state === parserStates . READ_DATA ) {
217
204
if ( this . #byteOffset < this . #info. payloadLength ) {
218
205
// If there is still more data in this chunk that needs to be read
219
206
return callback ( )
220
207
} else if ( this . #byteOffset >= this . #info. payloadLength ) {
221
208
// If the server sent multiple frames in a single chunk
222
- const buffer = Buffer . concat ( this . #buffers, this . #byteOffset)
223
209
224
- const body = buffer . subarray ( 0 , this . #info. payloadLength )
225
- this . #fragments. push ( body )
210
+ const body = this . consume ( this . #info. payloadLength )
226
211
227
- if ( this . #byteOffset > this . #info. payloadLength ) {
228
- this . #buffers = [ buffer . subarray ( body . length ) ]
229
- this . #byteOffset -= body . length
230
- } else {
231
- this . #buffers. length = 0
232
- this . #byteOffset = 0
233
- }
212
+ this . #fragments. push ( body )
234
213
235
214
// If the frame is unfragmented, or a fragmented frame was terminated,
236
215
// a message was received
@@ -254,6 +233,48 @@ class ByteParser extends Writable {
254
233
}
255
234
}
256
235
236
+ /**
237
+ * Take n bytes from the buffered Buffers
238
+ * @param {number } n
239
+ * @returns {Buffer|null }
240
+ */
241
+ consume ( n ) {
242
+ if ( n > this . #byteOffset) {
243
+ return null
244
+ } else if ( n === 0 ) {
245
+ return emptyBuffer
246
+ }
247
+
248
+ if ( this . #buffers[ 0 ] . length === n ) {
249
+ this . #byteOffset -= this . #buffers[ 0 ] . length
250
+ return this . #buffers. shift ( )
251
+ }
252
+
253
+ const buffer = Buffer . allocUnsafe ( n )
254
+ let offset = 0
255
+
256
+ while ( offset !== n ) {
257
+ const next = this . #buffers[ 0 ]
258
+ const { length } = next
259
+
260
+ if ( length + offset === n ) {
261
+ buffer . set ( this . #buffers. shift ( ) , offset )
262
+ break
263
+ } else if ( length + offset > n ) {
264
+ buffer . set ( next . subarray ( 0 , n - offset ) , offset )
265
+ this . #buffers[ 0 ] = next . subarray ( n - offset )
266
+ break
267
+ } else {
268
+ buffer . set ( this . #buffers. shift ( ) , offset )
269
+ offset += next . length
270
+ }
271
+ }
272
+
273
+ this . #byteOffset -= n
274
+
275
+ return buffer
276
+ }
277
+
257
278
parseCloseBody ( onlyCode , data ) {
258
279
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
259
280
/** @type {number|undefined } */
0 commit comments