Skip to content

Commit a60a309

Browse files
committed
fix: added clearer error for when stream limit is reached
Also added two tests demonstrating stream limit behaviour.
1 parent adf7870 commit a60a309

File tree

4 files changed

+211
-1
lines changed

4 files changed

+211
-1
lines changed

package-lock.json

Lines changed: 61 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/QUICStream.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,18 @@ class QUICStream implements ReadableWritablePair<Uint8Array, Uint8Array> {
295295
// We would get the `StreamStopped` exception here. If so, we can
296296
// ignore.
297297
if (utils.isStreamStopped(e) === false) {
298+
if (e.message === 'StreamLimit') {
299+
const limit =
300+
this.type === 'bidi'
301+
? config.initialMaxStreamsBidi
302+
: config.initialMaxStreamsUni;
303+
throw new errors.ErrorQUICStreamLimit(
304+
`Stream limit of ${limit} has been reached`,
305+
{ cause: e },
306+
);
307+
}
298308
throw new errors.ErrorQUICStreamInternal(
299-
'Failed to prime local stream state with a 0-length message',
309+
`Failed to prime local stream state with a 0-length message: ${e.message}`,
300310
{ cause: e },
301311
);
302312
}

src/errors.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ class ErrorQUICStreamInternal<T> extends ErrorQUICStream<T> {
287287
static description = 'QUIC Stream internal error';
288288
}
289289

290+
class ErrorQUICStreamLimit<T> extends ErrorQUICStream<T> {
291+
static description = 'QUIC Stream limit has been reached';
292+
}
293+
290294
export {
291295
ErrorQUIC,
292296
ErrorQUICHostInvalid,
@@ -331,4 +335,5 @@ export {
331335
ErrorQUICStreamPeerRead,
332336
ErrorQUICStreamPeerWrite,
333337
ErrorQUICStreamInternal,
338+
ErrorQUICStreamLimit,
334339
};

tests/QUICStream.test.ts

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,4 +2112,138 @@ describe(QUICStream.name, () => {
21122112
await client.destroy({ force: true });
21132113
await server.stop({ force: true });
21142114
});
2115+
test('should throw stream limit error when limit is reached', async () => {
2116+
const streamsNum = 10;
2117+
const connectionEventProm =
2118+
utils.promise<events.EventQUICServerConnection>();
2119+
const tlsConfig = await generateTLSConfig(defaultType);
2120+
const server = new QUICServer({
2121+
crypto: {
2122+
key,
2123+
ops: serverCrypto,
2124+
},
2125+
logger: logger.getChild(QUICServer.name),
2126+
config: {
2127+
initialMaxStreamsBidi: 5,
2128+
key: tlsConfig.leafKeyPairPEM.privateKey,
2129+
cert: tlsConfig.leafCertPEM,
2130+
verifyPeer: false,
2131+
},
2132+
});
2133+
socketCleanMethods.extractSocket(server);
2134+
server.addEventListener(
2135+
events.EventQUICServerConnection.name,
2136+
(e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e),
2137+
);
2138+
await server.start({
2139+
host: localhost,
2140+
});
2141+
const client = await QUICClient.createQUICClient({
2142+
host: localhost,
2143+
port: server.port,
2144+
localHost: localhost,
2145+
crypto: {
2146+
ops: clientCrypto,
2147+
},
2148+
logger: logger.getChild(QUICClient.name),
2149+
config: {
2150+
initialMaxStreamsBidi: 5,
2151+
verifyPeer: false,
2152+
},
2153+
});
2154+
socketCleanMethods.extractSocket(client);
2155+
const conn = (await connectionEventProm.p).detail;
2156+
// Do the test
2157+
let streamCount = 0;
2158+
const streamCreationProm = utils.promise();
2159+
conn.addEventListener(events.EventQUICConnectionStream.name, () => {
2160+
streamCount += 1;
2161+
if (streamCount >= streamsNum) streamCreationProm.resolveP();
2162+
});
2163+
// Let's make a new streams.
2164+
const message = Buffer.from('Hello!');
2165+
const streamsP = (async () => {
2166+
for (let i = 0; i < streamsNum; i++) {
2167+
const stream = client.connection.newStream();
2168+
const writer = stream.writable.getWriter();
2169+
await writer.write(message);
2170+
await writer.close();
2171+
}
2172+
})();
2173+
await expect(streamsP).rejects.toThrow(errors.ErrorQUICStreamLimit);
2174+
await client.destroy({ force: true });
2175+
await server.stop({ force: true });
2176+
});
2177+
test('ended streams do not contribute to limit', async () => {
2178+
const streamsNum = 10;
2179+
const connectionEventProm =
2180+
utils.promise<events.EventQUICServerConnection>();
2181+
const tlsConfig = await generateTLSConfig(defaultType);
2182+
const server = new QUICServer({
2183+
crypto: {
2184+
key,
2185+
ops: serverCrypto,
2186+
},
2187+
logger: logger.getChild(QUICServer.name),
2188+
config: {
2189+
initialMaxStreamsBidi: 5,
2190+
key: tlsConfig.leafKeyPairPEM.privateKey,
2191+
cert: tlsConfig.leafCertPEM,
2192+
verifyPeer: false,
2193+
},
2194+
});
2195+
socketCleanMethods.extractSocket(server);
2196+
server.addEventListener(
2197+
events.EventQUICServerConnection.name,
2198+
(e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e),
2199+
);
2200+
await server.start({
2201+
host: localhost,
2202+
});
2203+
const client = await QUICClient.createQUICClient({
2204+
host: localhost,
2205+
port: server.port,
2206+
localHost: localhost,
2207+
crypto: {
2208+
ops: clientCrypto,
2209+
},
2210+
logger: logger.getChild(QUICClient.name),
2211+
config: {
2212+
initialMaxStreamsBidi: 5,
2213+
verifyPeer: false,
2214+
},
2215+
});
2216+
socketCleanMethods.extractSocket(client);
2217+
const conn = (await connectionEventProm.p).detail;
2218+
// Do the test
2219+
let streamCount = 0;
2220+
const streamCreationProm = utils.promise();
2221+
conn.addEventListener(
2222+
events.EventQUICConnectionStream.name,
2223+
(evt: events.EventQUICConnectionStream) => {
2224+
const stream = evt.detail;
2225+
void stream.readable.pipeTo(stream.writable).catch(() => {});
2226+
streamCount += 1;
2227+
if (streamCount >= streamsNum) streamCreationProm.resolveP();
2228+
},
2229+
);
2230+
// Let's make a new streams.
2231+
const message = Buffer.from('Hello!');
2232+
const streamsP = (async () => {
2233+
for (let i = 0; i < streamsNum; i++) {
2234+
const stream = client.connection.newStream();
2235+
const writer = stream.writable.getWriter();
2236+
await writer.write(message);
2237+
await writer.close();
2238+
for await (const _ of stream.readable) {
2239+
// Just consume
2240+
}
2241+
}
2242+
})();
2243+
await expect(streamsP).resolves.toBe(undefined);
2244+
await streamCreationProm.p;
2245+
expect(streamCount).toBe(streamsNum);
2246+
await client.destroy({ force: true });
2247+
await server.stop({ force: true });
2248+
});
21152249
});

0 commit comments

Comments
 (0)