Skip to content

Commit 9271564

Browse files
feat(#2458): WebSocket through HTTP/2 (#4540)
* feat(#2458): WebSocket through HTTP/2 * refactor: adjust upgrade testing * test: add testing * fix: fixup * fix: fixup! * feat: implement busy for h2 * fix: fixup! * fix: fixup * feat: adjust upgrade for H2 * feat: extend Websocket to handle h2 * feat: implement experimental warning * refactor: Update lib/web/websocket/connection.js Co-authored-by: Khafra <[email protected]> * refactor: Update lib/web/websocket/connection.js Co-authored-by: Khafra <[email protected]> * refactor: code review * docs: add documentation warning for it * fix: compatibility with backpressure * fix: bad merge * test: fix order of close * docs: add documentation * docs: Update docs/docs/api/WebSocket.md Co-authored-by: Khafra <[email protected]> * test: flakiness? --------- Co-authored-by: Khafra <[email protected]>
1 parent 2fa4db2 commit 9271564

File tree

10 files changed

+396
-55
lines changed

10 files changed

+396
-55
lines changed

docs/docs/api/WebSocket.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,27 @@ import { WebSocket } from 'undici'
3434
const ws = new WebSocket('wss://echo.websocket.events', ['echo', 'chat'])
3535
```
3636

37+
### Example with HTTP/2:
38+
39+
> ⚠️ Warning: WebSocket over HTTP/2 is experimental, it is likely to change in the future.
40+
41+
> 🗒️ Note: WebSocket over HTTP/2 may be enabled by default in a future version,
42+
> this will happen by enabling HTTP/2 connections as the default behavior of Undici's Agent as well the global dispatcher.
43+
> Stay tuned to the changelog for more information.
44+
45+
This example will not work in browsers or other platforms that don't allow passing an object.
46+
47+
```mjs
48+
import { Agent } from 'undici'
49+
50+
const agent = new Agent({ allowH2: true })
51+
52+
const ws = new WebSocket('wss://echo.websocket.events', {
53+
dispatcher: agent,
54+
protocols: ['echo', 'chat']
55+
})
56+
```
57+
3758
# Class: WebSocketStream
3859

3960
> ⚠️ Warning: the WebSocketStream API has not been finalized and is likely to change.

lib/api/api-upgrade.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const { InvalidArgumentError, SocketError } = require('../core/errors')
44
const { AsyncResource } = require('node:async_hooks')
55
const assert = require('node:assert')
66
const util = require('../core/util')
7+
const { kHTTP2Stream } = require('../core/symbols')
78
const { addSignal, removeSignal } = require('./abort-signal')
89

910
class UpgradeHandler extends AsyncResource {
@@ -50,7 +51,7 @@ class UpgradeHandler extends AsyncResource {
5051
}
5152

5253
onUpgrade (statusCode, rawHeaders, socket) {
53-
assert(statusCode === 101)
54+
assert(socket[kHTTP2Stream] === true ? statusCode === 200 : statusCode === 101)
5455

5556
const { callback, opaque, context } = this
5657

lib/core/symbols.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ module.exports = {
6262
kListeners: Symbol('listeners'),
6363
kHTTPContext: Symbol('http context'),
6464
kMaxConcurrentStreams: Symbol('max concurrent streams'),
65+
kEnableConnectProtocol: Symbol('http2session connect protocol'),
66+
kRemoteSettings: Symbol('http2session remote settings'),
67+
kHTTP2Stream: Symbol('http2session client stream'),
6568
kNoProxyAgent: Symbol('no proxy agent'),
6669
kHttpProxyAgent: Symbol('http proxy agent'),
6770
kHttpsProxyAgent: Symbol('https proxy agent')

lib/dispatcher/client-h2.js

Lines changed: 146 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ const {
77
RequestContentLengthMismatchError,
88
RequestAbortedError,
99
SocketError,
10-
InformationalError
10+
InformationalError,
11+
InvalidArgumentError
1112
} = require('../core/errors.js')
1213
const {
1314
kUrl,
@@ -28,7 +29,10 @@ const {
2829
kSize,
2930
kHTTPContext,
3031
kClosed,
31-
kBodyTimeout
32+
kBodyTimeout,
33+
kEnableConnectProtocol,
34+
kRemoteSettings,
35+
kHTTP2Stream
3236
} = require('../core/symbols.js')
3337
const { channels } = require('../core/diagnostics.js')
3438

@@ -53,7 +57,10 @@ const {
5357
HTTP2_HEADER_SCHEME,
5458
HTTP2_HEADER_CONTENT_LENGTH,
5559
HTTP2_HEADER_EXPECT,
56-
HTTP2_HEADER_STATUS
60+
HTTP2_HEADER_STATUS,
61+
HTTP2_HEADER_PROTOCOL,
62+
NGHTTP2_REFUSED_STREAM,
63+
NGHTTP2_CANCEL
5764
}
5865
} = http2
5966

@@ -93,12 +100,21 @@ function connectH2 (client, socket) {
93100
session[kClient] = client
94101
session[kSocket] = socket
95102
session[kHTTP2Session] = null
103+
// We set it to true by default in a best-effort; however once connected to an H2 server
104+
// we will check if extended CONNECT protocol is supported or not
105+
// and set this value accordingly.
106+
session[kEnableConnectProtocol] = false
107+
// States whether or not we have received the remote settings from the server
108+
session[kRemoteSettings] = false
96109

97110
util.addListener(session, 'error', onHttp2SessionError)
98111
util.addListener(session, 'frameError', onHttp2FrameError)
99112
util.addListener(session, 'end', onHttp2SessionEnd)
100113
util.addListener(session, 'goaway', onHttp2SessionGoAway)
101114
util.addListener(session, 'close', onHttp2SessionClose)
115+
util.addListener(session, 'remoteSettings', onHttp2RemoteSettings)
116+
// TODO (@metcoder95): implement SETTINGS support
117+
// util.addListener(session, 'localSettings', onHttp2RemoteSettings)
102118

103119
session.unref()
104120

@@ -115,23 +131,67 @@ function connectH2 (client, socket) {
115131
return {
116132
version: 'h2',
117133
defaultPipelining: Infinity,
134+
/**
135+
* @param {import('../core/request.js')} request
136+
* @returns {boolean}
137+
*/
118138
write (request) {
119139
return writeH2(client, request)
120140
},
141+
/**
142+
* @returns {void}
143+
*/
121144
resume () {
122145
resumeH2(client)
123146
},
147+
/**
148+
* @param {Error | null} err
149+
* @param {() => void} callback
150+
*/
124151
destroy (err, callback) {
125152
if (socket[kClosed]) {
126153
queueMicrotask(callback)
127154
} else {
128155
socket.destroy(err).on('close', callback)
129156
}
130157
},
158+
/**
159+
* @type {boolean}
160+
*/
131161
get destroyed () {
132162
return socket.destroyed
133163
},
134-
busy () {
164+
/**
165+
* @param {import('../core/request.js')} request
166+
* @returns {boolean}
167+
*/
168+
busy (request) {
169+
if (request != null) {
170+
if (client[kRunning] > 0) {
171+
// We are already processing requests
172+
173+
// Non-idempotent request cannot be retried.
174+
// Ensure that no other requests are inflight and
175+
// could cause failure.
176+
if (request.idempotent === false) return true
177+
// Don't dispatch an upgrade until all preceding requests have completed.
178+
// Possibly, we do not have remote settings confirmed yet.
179+
if ((request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false) return true
180+
// Request with stream or iterator body can error while other requests
181+
// are inflight and indirectly error those as well.
182+
// Ensure this doesn't happen by waiting for inflight
183+
// to complete before dispatching.
184+
185+
// Request with stream or iterator body cannot be retried.
186+
// Ensure that no other requests are inflight and
187+
// could cause failure.
188+
if (util.bodyLength(request.body) !== 0 &&
189+
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) return true
190+
} else {
191+
return (request.upgrade === 'websocket' || request.method === 'CONNECT') && session[kRemoteSettings] === false
192+
}
193+
}
194+
135195
return false
136196
}
137197
}
@@ -151,6 +211,27 @@ function resumeH2 (client) {
151211
}
152212
}
153213

214+
function onHttp2RemoteSettings (settings) {
215+
// Fallbacks are a safe bet, remote setting will always override
216+
this[kClient][kMaxConcurrentStreams] = settings.maxConcurrentStreams ?? this[kClient][kMaxConcurrentStreams]
217+
/**
218+
* From RFC-8441
219+
* A sender MUST NOT send a SETTINGS_ENABLE_CONNECT_PROTOCOL parameter
220+
* with the value of 0 after previously sending a value of 1.
221+
*/
222+
// Note: Cannot be tested in Node, it does not supports disabling the extended CONNECT protocol once enabled
223+
if (this[kRemoteSettings] === true && this[kEnableConnectProtocol] === true && settings.enableConnectProtocol === false) {
224+
const err = new InformationalError('HTTP/2: Server disabled extended CONNECT protocol against RFC-8441')
225+
this[kSocket][kError] = err
226+
this[kClient][kOnError](err)
227+
return
228+
}
229+
230+
this[kEnableConnectProtocol] = settings.enableConnectProtocol ?? this[kEnableConnectProtocol]
231+
this[kRemoteSettings] = true
232+
this[kClient][kResume]()
233+
}
234+
154235
function onHttp2SessionError (err) {
155236
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
156237

@@ -282,8 +363,8 @@ function writeH2 (client, request) {
282363
const { method, path, host, upgrade, expectContinue, signal, protocol, headers: reqHeaders } = request
283364
let { body } = request
284365

285-
if (upgrade) {
286-
util.errorRequest(client, request, new Error('Upgrade not supported for H2'))
366+
if (upgrade != null && upgrade !== 'websocket') {
367+
util.errorRequest(client, request, new InvalidArgumentError(`Custom upgrade "${upgrade}" not supported over HTTP/2`))
287368
return false
288369
}
289370

@@ -364,13 +445,68 @@ function writeH2 (client, request) {
364445
return false
365446
}
366447

367-
if (method === 'CONNECT') {
448+
if (upgrade || method === 'CONNECT') {
368449
session.ref()
369-
// We are already connected, streams are pending, first request
450+
451+
if (upgrade === 'websocket') {
452+
// We cannot upgrade to websocket if extended CONNECT protocol is not supported
453+
if (session[kEnableConnectProtocol] === false) {
454+
util.errorRequest(client, request, new InformationalError('HTTP/2: Extended CONNECT protocol not supported by server'))
455+
session.unref()
456+
return false
457+
}
458+
459+
// We force the method to CONNECT
460+
// as per RFC-8441
461+
// https://datatracker.ietf.org/doc/html/rfc8441#section-4
462+
headers[HTTP2_HEADER_METHOD] = 'CONNECT'
463+
headers[HTTP2_HEADER_PROTOCOL] = 'websocket'
464+
// :path and :scheme headers must be omitted when sending CONNECT but set if extended-CONNECT
465+
headers[HTTP2_HEADER_PATH] = path
466+
467+
if (protocol === 'ws:' || protocol === 'wss:') {
468+
headers[HTTP2_HEADER_SCHEME] = protocol === 'ws:' ? 'http' : 'https'
469+
} else {
470+
headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
471+
}
472+
473+
stream = session.request(headers, { endStream: false, signal })
474+
stream[kHTTP2Stream] = true
475+
476+
stream.once('response', (headers, _flags) => {
477+
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
478+
479+
request.onUpgrade(statusCode, parseH2Headers(realHeaders), stream)
480+
481+
++session[kOpenStreams]
482+
client[kQueue][client[kRunningIdx]++] = null
483+
})
484+
485+
stream.on('error', () => {
486+
if (stream.rstCode === NGHTTP2_REFUSED_STREAM || stream.rstCode === NGHTTP2_CANCEL) {
487+
// NGHTTP2_REFUSED_STREAM (7) or NGHTTP2_CANCEL (8)
488+
// We do not treat those as errors as the server might
489+
// not support websockets and refuse the stream
490+
abort(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`))
491+
}
492+
})
493+
494+
stream.once('close', () => {
495+
session[kOpenStreams] -= 1
496+
if (session[kOpenStreams] === 0) session.unref()
497+
})
498+
499+
stream.setTimeout(requestTimeout)
500+
return true
501+
}
502+
503+
// TODO: consolidate once we support CONNECT properly
504+
// NOTE: We are already connected, streams are pending, first request
370505
// will create a new stream. We trigger a request to create the stream and wait until
371506
// `ready` event is triggered
372507
// We disabled endStream to allow the user to write to the stream
373508
stream = session.request(headers, { endStream: false, signal })
509+
stream[kHTTP2Stream] = true
374510
stream.on('response', headers => {
375511
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
376512

@@ -389,7 +525,6 @@ function writeH2 (client, request) {
389525

390526
// https://tools.ietf.org/html/rfc7540#section-8.3
391527
// :path and :scheme headers must be omitted when sending CONNECT
392-
393528
headers[HTTP2_HEADER_PATH] = path
394529
headers[HTTP2_HEADER_SCHEME] = protocol === 'http:' ? 'http' : 'https'
395530

@@ -469,13 +604,15 @@ function writeH2 (client, request) {
469604
if (expectContinue) {
470605
headers[HTTP2_HEADER_EXPECT] = '100-continue'
471606
stream = session.request(headers, { endStream: shouldEndStream, signal })
607+
stream[kHTTP2Stream] = true
472608

473609
stream.once('continue', writeBodyH2)
474610
} else {
475611
stream = session.request(headers, {
476612
endStream: shouldEndStream,
477613
signal
478614
})
615+
stream[kHTTP2Stream] = true
479616

480617
writeBodyH2()
481618
}

lib/dispatcher/h2c-client.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ class H2CClient extends DispatcherBase {
8585
.setNoDelay(true)
8686
.once('connect', function () {
8787
queueMicrotask(clearConnectTimeout)
88-
8988
if (callback) {
9089
const cb = callback
9190
callback = null
@@ -94,7 +93,6 @@ class H2CClient extends DispatcherBase {
9493
})
9594
.on('error', function (err) {
9695
queueMicrotask(clearConnectTimeout)
97-
9896
if (callback) {
9997
const cb = callback
10098
callback = null

lib/web/fetch/index.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2228,8 +2228,10 @@ async function httpNetworkFetch (
22282228
},
22292229

22302230
onUpgrade (status, rawHeaders, socket) {
2231-
if (status !== 101) {
2232-
return
2231+
// We need to support 200 for websocket over h2 as per RFC-8441
2232+
// Absence of session means H1
2233+
if ((socket.session != null && status !== 200) || (socket.session == null && status !== 101)) {
2234+
return false
22332235
}
22342236

22352237
const headersList = new HeadersList()

lib/web/websocket/connection.js

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ const crypto = runtimeFeatures.has('crypto')
1414
? require('node:crypto')
1515
: null
1616

17+
let warningEmitted = false
18+
1719
/**
1820
* @see https://websockets.spec.whatwg.org/#concept-websocket-establish
1921
* @param {URL} url
@@ -93,9 +95,25 @@ function establishWebSocketConnection (url, protocols, client, handler, options)
9395
processResponse (response) {
9496
// 1. If response is a network error or its status is not 101,
9597
// fail the WebSocket connection.
98+
// if (response.type === 'error' || ((response.socket?.session != null && response.status !== 200) && response.status !== 101)) {
9699
if (response.type === 'error' || response.status !== 101) {
97-
failWebsocketConnection(handler, 1002, 'Received network error or non-101 status code.', response.error)
98-
return
100+
// The presence of a session property on the socket indicates HTTP2
101+
// HTTP1
102+
if (response.socket?.session == null) {
103+
failWebsocketConnection(handler, 1002, 'Received network error or non-101 status code.', response.error)
104+
return
105+
}
106+
107+
// HTTP2
108+
if (response.status !== 200) {
109+
failWebsocketConnection(handler, 1002, 'Received network error or non-200 status code.', response.error)
110+
return
111+
}
112+
}
113+
114+
if (warningEmitted === false && response.socket?.session != null) {
115+
process.emitWarning('WebSocket over HTTP2 is experimental, and subject to change.', 'ExperimentalWarning')
116+
warningEmitted = true
99117
}
100118

101119
// 2. If protocols is not the empty list and extracting header
@@ -117,7 +135,8 @@ function establishWebSocketConnection (url, protocols, client, handler, options)
117135
// header field contains a value that is not an ASCII case-
118136
// insensitive match for the value "websocket", the client MUST
119137
// _Fail the WebSocket Connection_.
120-
if (response.headersList.get('Upgrade')?.toLowerCase() !== 'websocket') {
138+
// For H2, no upgrade header is expected.
139+
if (response.socket.session == null && response.headersList.get('Upgrade')?.toLowerCase() !== 'websocket') {
121140
failWebsocketConnection(handler, 1002, 'Server did not set Upgrade header to "websocket".')
122141
return
123142
}
@@ -126,7 +145,8 @@ function establishWebSocketConnection (url, protocols, client, handler, options)
126145
// |Connection| header field doesn't contain a token that is an
127146
// ASCII case-insensitive match for the value "Upgrade", the client
128147
// MUST _Fail the WebSocket Connection_.
129-
if (response.headersList.get('Connection')?.toLowerCase() !== 'upgrade') {
148+
// For H2, no connection header is expected.
149+
if (response.socket.session == null && response.headersList.get('Connection')?.toLowerCase() !== 'upgrade') {
130150
failWebsocketConnection(handler, 1002, 'Server did not set Connection header to "upgrade".')
131151
return
132152
}

0 commit comments

Comments
 (0)