Skip to content

Commit e64ed4a

Browse files
committed
feat: added forceDestroyStreams to connections, server and client to allow force closing streams
Allows us to destroy a connection with `force: false` for force the streams to end later if needed. [ci skip]
1 parent 0183736 commit e64ed4a

File tree

4 files changed

+108
-7
lines changed

4 files changed

+108
-7
lines changed

src/QUICConnection.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import { Shutdown } from './native/types';
3131
import * as utils from './utils';
3232
import * as events from './events';
3333
import * as errors from './errors';
34-
import { sleep } from '../tests/utils';
3534

3635
interface QUICConnection extends StartStop {}
3736
@StartStop({
@@ -621,7 +620,7 @@ class QUICConnection {
621620

622621
// Yield to allow any background processing to settle before proceeding.
623622
// This will allow any streams to process buffers before continuing
624-
await sleep(0);
623+
await utils.yieldMicro();
625624

626625
// Destroy all streams
627626
const streamsDestroyP: Array<Promise<void>> = [];
@@ -1155,6 +1154,21 @@ class QUICConnection {
11551154
return quicStream;
11561155
}
11571156

1157+
/**
1158+
* Destroys all active streams without closing the connection.
1159+
*
1160+
* If there are no active streams then it will do nothing.
1161+
* If the connection is stopped with `force: false` then this can be used
1162+
* to force close any streams `stop` is waiting for to end.
1163+
*
1164+
* Destruction will occur in the background.
1165+
*/
1166+
public destroyStreams(reason?: any) {
1167+
for (const quicStream of this.streamMap.values()) {
1168+
quicStream.cancel(reason);
1169+
}
1170+
}
1171+
11581172
/**
11591173
* Starts the keep alive interval timer.
11601174
*

src/utils.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ import * as errors from './errors';
1717
const textEncoder = new TextEncoder();
1818
const textDecoder = new TextDecoder('utf-8');
1919

20+
/**
21+
* Used to yield to the event loop to allow other micro tasks to process
22+
*/
23+
async function yieldMicro(): Promise<void> {
24+
return await new Promise<void>((r) => queueMicrotask(r));
25+
}
26+
2027
/**
2128
* Convert callback-style to promise-style
2229
* If this is applied to overloaded function
@@ -550,6 +557,7 @@ function isStreamReset(e: Error): number | false {
550557
export {
551558
textEncoder,
552559
textDecoder,
560+
yieldMicro,
553561
promisify,
554562
promise,
555563
bufferWrap,

tests/QUICStream.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,8 @@ describe(QUICStream.name, () => {
13131313
})(),
13141314
);
13151315
}
1316+
// Yield to allow streams to propagate
1317+
await sleep(0);
13161318

13171319
// Start unforced close of client
13181320
const clientDestroyP = client.destroy({ force: false });
@@ -1402,6 +1404,8 @@ describe(QUICStream.name, () => {
14021404
})(),
14031405
);
14041406
}
1407+
// Yield to allow streams to propagate
1408+
await sleep(0);
14051409

14061410
// Start unforced close of server
14071411
const serverStopP = server.stop({ force: false });
@@ -1489,7 +1493,87 @@ describe(QUICStream.name, () => {
14891493
await expect(asd).rejects.toThrow('read 1');
14901494

14911495
waitResolveP();
1496+
await Promise.all(activeServerStreams);
1497+
await clientDestroyP;
1498+
await server.stop({ force: true });
1499+
});
1500+
test('connection can be forced closed after unforced destroy', async () => {
1501+
const message = Buffer.from('The Quick Brown Fox Jumped Over The Lazy Dog');
1502+
const connectionEventProm =
1503+
utils.promise<events.EventQUICServerConnection>();
1504+
const tlsConfig = await generateTLSConfig(defaultType);
1505+
const server = new QUICServer({
1506+
crypto: {
1507+
key,
1508+
ops: serverCrypto,
1509+
},
1510+
logger: logger.getChild(QUICServer.name),
1511+
config: {
1512+
key: tlsConfig.leafKeyPairPEM.privateKey,
1513+
cert: tlsConfig.leafCertPEM,
1514+
verifyPeer: false,
1515+
},
1516+
});
1517+
socketCleanMethods.extractSocket(server);
1518+
server.addEventListener(
1519+
events.EventQUICServerConnection.name,
1520+
(e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e),
1521+
);
1522+
await server.start({
1523+
host: localhost,
1524+
});
1525+
const client = await QUICClient.createQUICClient({
1526+
host: localhost,
1527+
port: server.port,
1528+
localHost: localhost,
1529+
crypto: {
1530+
ops: clientCrypto,
1531+
},
1532+
logger: logger.getChild(QUICClient.name),
1533+
config: {
1534+
verifyPeer: false,
1535+
},
1536+
});
1537+
socketCleanMethods.extractSocket(client);
1538+
const conn = (await connectionEventProm.p).detail;
1539+
1540+
// Do the test
1541+
const { p: waitP, resolveP: waitResolveP } = utils.promise();
1542+
const activeServerStreams: Array<Promise<void>> = [];
1543+
conn.addEventListener(
1544+
events.EventQUICConnectionStream.name,
1545+
async (streamEvent: events.EventQUICConnectionStream) => {
1546+
const stream = streamEvent.detail;
1547+
await waitP;
1548+
const streamProm = stream.readable
1549+
.pipeTo(stream.writable)
1550+
.catch(() => {});
1551+
activeServerStreams.push(streamProm);
1552+
},
1553+
);
1554+
1555+
const stream = client.connection.newStream();
1556+
const writer = stream.writable.getWriter();
1557+
await writer.write(message);
1558+
await writer.close();
1559+
1560+
// Start unforced close of client
1561+
const clientDestroyP = client.destroy({ force: false });
1562+
1563+
const result = await Promise.race([
1564+
clientDestroyP.then(() => true),
1565+
sleep(500).then(() => false),
1566+
]);
1567+
1568+
expect(result).toBe(false);
1569+
1570+
// We can force close the streams causing client destruction to end
1571+
client.connection.destroyStreams();
14921572
await clientDestroyP;
1573+
await Promise.allSettled(activeServerStreams);
1574+
14931575
await server.stop({ force: true });
1576+
waitResolveP();
1577+
await waitP;
14941578
});
14951579
});

tests/utils.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ async function sleep(ms: number): Promise<void> {
2323
return await new Promise<void>((r) => setTimeout(r, ms));
2424
}
2525

26-
async function yieldMicro(): Promise<void> {
27-
return await new Promise<void>((r) => queueMicrotask(r));
28-
}
29-
3026
async function randomBytes(data: ArrayBuffer) {
3127
webcrypto.getRandomValues(new Uint8Array(data));
3228
}
@@ -853,7 +849,6 @@ function createReasonConverters() {
853849

854850
export {
855851
sleep,
856-
yieldMicro,
857852
randomBytes,
858853
generateKeyPairRSA,
859854
generateKeyPairECDSA,

0 commit comments

Comments
 (0)