Skip to content

Commit 585cdf6

Browse files
authored
feat(websocket): handle ping/pong frames & fix fragmented frames (nodejs#1809)
* feat(websocket): handle ping/pong frames & fix bugs * fix: don't quit parsing on ping/pong frame * fix: parse fragmented frames * fix: remove hack in tests
1 parent 7e12397 commit 585cdf6

File tree

8 files changed

+247
-27
lines changed

8 files changed

+247
-27
lines changed

docs/api/DiagnosticsChannel.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,29 @@ diagnosticsChannel.channel('undici:websocket:socket_error').subscribe((error) =>
176176
console.log(error)
177177
})
178178
```
179+
180+
## `undici:websocket:ping`
181+
182+
This message is published after the client receives a ping frame, if the connection is not closing.
183+
184+
```js
185+
import diagnosticsChannel from 'diagnostics_channel'
186+
187+
diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload }) => {
188+
// a Buffer or undefined, containing the optional application data of the frame
189+
console.log(payload)
190+
})
191+
```
192+
193+
## `undici:websocket:pong`
194+
195+
This message is published after the client receives a pong frame.
196+
197+
```js
198+
import diagnosticsChannel from 'diagnostics_channel'
199+
200+
diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
201+
// a Buffer or undefined, containing the optional application data of the frame
202+
console.log(payload)
203+
})
204+
```

lib/websocket/connection.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ const { fetching } = require('../fetch/index')
2121
const { getGlobalDispatcher } = require('../..')
2222

2323
const channels = {}
24-
channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
25-
channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
2624
channels.open = diagnosticsChannel.channel('undici:websocket:open')
2725
channels.close = diagnosticsChannel.channel('undici:websocket:close')
2826
channels.socketError = diagnosticsChannel.channel('undici:websocket:socket_error')

lib/websocket/receiver.js

Lines changed: 85 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
'use strict'
22

33
const { Writable } = require('stream')
4+
const diagnosticsChannel = require('diagnostics_channel')
45
const { parserStates, opcodes, states } = require('./constants')
56
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
67
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
78
const { WebsocketFrameSend } = require('./frame')
89

10+
const channels = {}
11+
channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
12+
channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
13+
914
class ByteParser extends Writable {
1015
#buffers = []
1116
#byteOffset = 0
1217

1318
#state = parserStates.INFO
1419

1520
#info = {}
21+
#fragments = []
1622

1723
constructor (ws) {
1824
super()
@@ -48,9 +54,13 @@ class ByteParser extends Writable {
4854
this.#info.fin = (buffer[0] & 0x80) !== 0
4955
this.#info.opcode = buffer[0] & 0x0F
5056

51-
const fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
57+
// If we receive a fragmented message, we use the type of the first
58+
// frame to parse the full message as binary/text, when it's terminated
59+
this.#info.originalOpcode ??= this.#info.opcode
60+
61+
this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
5262

53-
if (fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
63+
if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
5464
// Only text and binary frames can be fragmented
5565
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
5666
return
@@ -67,7 +77,11 @@ class ByteParser extends Writable {
6777
this.#state = parserStates.PAYLOADLENGTH_64
6878
}
6979

70-
if (
80+
if (this.#info.fragmented && payloadLength > 125) {
81+
// A fragmented frame can't be fragmented itself
82+
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
83+
return
84+
} else if (
7185
(this.#info.opcode === opcodes.PING ||
7286
this.#info.opcode === opcodes.PONG ||
7387
this.#info.opcode === opcodes.CLOSE) &&
@@ -109,10 +123,63 @@ class ByteParser extends Writable {
109123
// that _The WebSocket Closing Handshake is Started_ and that the
110124
// WebSocket connection is in the CLOSING state.
111125
this.ws[kReadyState] = states.CLOSING
112-
113126
this.ws[kReceivedClose] = true
114127

128+
this.#buffers = [buffer.subarray(2 + payloadLength)]
129+
this.#byteOffset -= 2 + payloadLength
130+
131+
this.end()
132+
115133
return
134+
} else if (this.#info.opcode === opcodes.PING) {
135+
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
136+
// response, unless it already received a Close frame.
137+
// A Pong frame sent in response to a Ping frame must have identical
138+
// "Application data"
139+
140+
if (!this.ws[kReceivedClose]) {
141+
const body = payloadLength === 0 ? undefined : buffer.subarray(2, payloadLength + 2)
142+
const frame = new WebsocketFrameSend(body)
143+
144+
this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
145+
146+
if (channels.ping.hasSubscribers) {
147+
channels.ping.publish({
148+
payload: body
149+
})
150+
}
151+
}
152+
153+
this.#buffers = [buffer.subarray(2 + payloadLength)]
154+
this.#byteOffset -= 2 + payloadLength
155+
this.#state = parserStates.INFO
156+
157+
if (this.#byteOffset > 0) {
158+
return this.run(callback)
159+
} else {
160+
callback()
161+
return
162+
}
163+
} else if (this.#info.opcode === opcodes.PONG) {
164+
// A Pong frame MAY be sent unsolicited. This serves as a
165+
// unidirectional heartbeat. A response to an unsolicited Pong frame is
166+
// not expected.
167+
168+
if (channels.pong.hasSubscribers) {
169+
channels.pong.publish({
170+
payload: buffer.subarray(2, payloadLength + 2)
171+
})
172+
}
173+
174+
this.#buffers = [buffer.subarray(2 + payloadLength)]
175+
this.#byteOffset -= 2 + payloadLength
176+
177+
if (this.#byteOffset > 0) {
178+
return this.run(callback)
179+
} else {
180+
callback()
181+
return
182+
}
116183
}
117184

118185
// TODO: handle control frames here. Since they are unfragmented, and can
@@ -154,19 +221,28 @@ class ByteParser extends Writable {
154221
// If the server sent multiple frames in a single chunk
155222
const buffer = Buffer.concat(this.#buffers, this.#byteOffset)
156223

157-
this.#info.data = buffer.subarray(0, this.#info.payloadLength)
224+
const body = buffer.subarray(0, this.#info.payloadLength)
225+
this.#fragments.push(body)
158226

159227
if (this.#byteOffset > this.#info.payloadLength) {
160-
this.#buffers = [buffer.subarray(this.#info.data.length)]
161-
this.#byteOffset -= this.#info.data.length
228+
this.#buffers = [buffer.subarray(body.length)]
229+
this.#byteOffset -= body.length
162230
} else {
163231
this.#buffers.length = 0
164232
this.#byteOffset = 0
165233
}
166234

167-
websocketMessageReceived(this.ws, this.#info.opcode, this.#info.data)
235+
// If the frame is unfragmented, or a fragmented frame was terminated,
236+
// a message was received
237+
if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
238+
const fullMessage = Buffer.concat(this.#fragments)
239+
240+
websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)
241+
242+
this.#info = {}
243+
this.#fragments.length = 0
244+
}
168245

169-
this.#info = {}
170246
this.#state = parserStates.INFO
171247
}
172248
}

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@
4646
"build:wasm": "node build/wasm.js --docker",
4747
"lint": "standard | snazzy",
4848
"lint:fix": "standard --fix | snazzy",
49-
"test": "npm run test:tap && npm run test:node-fetch && npm run test:fetch && npm run test:wpt && npm run test:jest && tsd",
49+
"test": "npm run test:tap && npm run test:node-fetch && npm run test:fetch && npm run test:wpt && npm run test:websocket && npm run test:jest && tsd",
5050
"test:node-fetch": "node scripts/verifyVersion.js 16 || mocha test/node-fetch",
5151
"test:fetch": "node scripts/verifyVersion.js 16 || (npm run build:node && tap test/fetch/*.js && tap test/webidl/*.js)",
5252
"test:jest": "node scripts/verifyVersion.js 14 || jest",
5353
"test:tap": "tap test/*.js test/diagnostics-channel/*.js",
5454
"test:tdd": "tap test/*.js test/diagnostics-channel/*.js -w",
5555
"test:typescript": "tsd && tsc test/imports/undici-import.ts",
56+
"test:websocket": "node scripts/verifyVersion.js 18 || tap test/websocket/*.js",
5657
"test:wpt": "node scripts/verifyVersion 18 || (node test/wpt/start-fetch.mjs && node test/wpt/start-FileAPI.mjs && node test/wpt/start-mimesniff.mjs && node test/wpt/start-xhr.mjs && node test/wpt/start-websockets.mjs)",
5758
"coverage": "nyc --reporter=text --reporter=html npm run test",
5859
"coverage:ci": "nyc --reporter=lcov npm run test",

test/websocket/fragments.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
'use strict'
2+
3+
const { test } = require('tap')
4+
const { WebSocketServer } = require('ws')
5+
const { WebSocket } = require('../..')
6+
const diagnosticsChannel = require('diagnostics_channel')
7+
8+
test('Fragmented frame with a ping frame in the middle of it', (t) => {
9+
t.plan(2)
10+
11+
const server = new WebSocketServer({ port: 0 })
12+
13+
server.on('connection', (ws) => {
14+
const socket = ws._socket
15+
16+
socket.write(Buffer.from([0x01, 0x03, 0x48, 0x65, 0x6c])) // Text frame "Hel"
17+
socket.write(Buffer.from([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) // ping "Hello"
18+
socket.write(Buffer.from([0x80, 0x02, 0x6c, 0x6f])) // Text frame "lo"
19+
})
20+
21+
t.teardown(() => {
22+
for (const client of server.clients) {
23+
client.close()
24+
}
25+
26+
server.close()
27+
})
28+
29+
const ws = new WebSocket(`ws://localhost:${server.address().port}`)
30+
31+
ws.addEventListener('message', ({ data }) => {
32+
t.same(data, 'Hello')
33+
34+
ws.close()
35+
})
36+
37+
diagnosticsChannel.channel('undici:websocket:ping').subscribe(
38+
({ payload }) => t.same(payload, Buffer.from('Hello'))
39+
)
40+
})

test/websocket/opening-handshake.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
'use strict'
2+
3+
const { test } = require('tap')
4+
const { createServer } = require('http')
5+
const { WebSocketServer } = require('ws')
6+
const { WebSocket } = require('../..')
7+
8+
test('WebSocket connecting to server that isn\'t a Websocket server', (t) => {
9+
t.plan(5)
10+
11+
const server = createServer((req, res) => {
12+
t.equal(req.headers.connection, 'upgrade')
13+
t.equal(req.headers.upgrade, 'websocket')
14+
t.ok(req.headers['sec-websocket-key'])
15+
t.equal(req.headers['sec-websocket-version'], '13')
16+
17+
res.end()
18+
server.unref()
19+
}).listen(0, () => {
20+
const ws = new WebSocket(`ws://localhost:${server.address().port}`)
21+
22+
// Server isn't a websocket server
23+
ws.onmessage = ws.onopen = t.fail
24+
25+
ws.addEventListener('error', t.pass)
26+
})
27+
28+
t.teardown(server.close.bind(server))
29+
})
30+
31+
test('Open event is emitted', (t) => {
32+
t.plan(1)
33+
34+
const server = new WebSocketServer({ port: 0 })
35+
36+
server.on('connection', (ws) => {
37+
ws.close(1000)
38+
})
39+
40+
t.teardown(server.close.bind(server))
41+
42+
const ws = new WebSocket(`ws://localhost:${server.address().port}`)
43+
44+
ws.onmessage = ws.onerror = t.fail
45+
ws.addEventListener('open', t.pass)
46+
})

test/websocket/ping-pong.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
'use strict'
2+
3+
const { test } = require('tap')
4+
const { WebSocketServer } = require('ws')
5+
const diagnosticsChannel = require('diagnostics_channel')
6+
const { WebSocket } = require('../..')
7+
8+
test('Receives ping and parses body', (t) => {
9+
t.plan(1)
10+
11+
const server = new WebSocketServer({ port: 0 })
12+
13+
server.on('connection', (ws) => {
14+
ws.ping('Hello, world')
15+
})
16+
17+
t.teardown(server.close.bind(server))
18+
19+
const ws = new WebSocket(`ws://localhost:${server.address().port}`)
20+
ws.onerror = ws.onmessage = t.fail
21+
22+
diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload }) => {
23+
t.same(payload, Buffer.from('Hello, world'))
24+
ws.close()
25+
})
26+
})
27+
28+
test('Receives pong and parses body', (t) => {
29+
t.plan(1)
30+
31+
const server = new WebSocketServer({ port: 0 })
32+
33+
server.on('connection', (ws) => {
34+
ws.pong('Pong')
35+
})
36+
37+
t.teardown(server.close.bind(server))
38+
39+
const ws = new WebSocket(`ws://localhost:${server.address().port}`)
40+
ws.onerror = ws.onmessage = t.fail
41+
42+
diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
43+
t.same(payload, Buffer.from('Pong'))
44+
ws.close()
45+
})
46+
})

test/wpt/server/websocket.mjs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,6 @@
11
import { WebSocketServer } from 'ws'
22
import { server } from './server.mjs'
33

4-
// When sending a buffer to ws' send method, it auto
5-
// sets the type to binary. This breaks some tests.
6-
const textData = [
7-
'¥¥¥¥¥¥',
8-
'Message to send',
9-
'𐐇',
10-
'\ufffd',
11-
'',
12-
'null',
13-
'c'.repeat(65000)
14-
]
15-
164
// The file router server handles sending the url, closing,
175
// and sending messages back to the main process for us.
186
// The types for WebSocketServer don't include a `request`
@@ -24,7 +12,7 @@ const wss = new WebSocketServer({
2412
})
2513

2614
wss.on('connection', (ws) => {
27-
ws.on('message', (data) => {
15+
ws.on('message', (data, isBinary) => {
2816
const str = data.toString('utf-8')
2917

3018
if (str === 'Goodbye') {
@@ -34,8 +22,7 @@ wss.on('connection', (ws) => {
3422
return
3523
}
3624

37-
const binary = !textData.includes(str)
38-
ws.send(data, { binary })
25+
ws.send(data, { binary: isBinary })
3926
})
4027

4128
// Some tests, such as `Create-blocked-port.any.js` do NOT

0 commit comments

Comments
 (0)