Skip to content

Commit c8bdd7d

Browse files
committed
fix: streams allowed to gracefully end when ending connections with force: false
[ci skip]
1 parent c06bf7f commit c8bdd7d

File tree

4 files changed

+252
-17
lines changed

4 files changed

+252
-17
lines changed

package-lock.json

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/QUICConnection.ts

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import { quiche, ConnectionErrorCode } from './native';
3030
import * as utils from './utils';
3131
import * as events from './events';
3232
import * as errors from './errors';
33+
import { sleep } from '../tests/utils';
3334

3435
interface QUICConnection extends StartStop {}
3536
@StartStop({
@@ -616,7 +617,27 @@ class QUICConnection {
616617
} = {}) {
617618
this.logger.info(`Stop ${this.constructor.name}`);
618619
this.stopKeepAliveIntervalTimer();
619-
// Closing the connection first to avoid accepting new streams
620+
621+
// Yield to allow any background processing to settle before proceeding.
622+
// This will allow any streams to process buffers before continuing
623+
await sleep(0);
624+
625+
// Destroy all streams
626+
const streamsDestroyP: Array<Promise<void>> = [];
627+
for (const quicStream of this.streamMap.values()) {
628+
// The reason is only used if `force` is `true`
629+
// If `force` is not true, this will gracefully wait for
630+
// both readable and writable to gracefully close
631+
streamsDestroyP.push(
632+
quicStream.destroy({
633+
reason: this.errorLast,
634+
force: force || this.conn.isDraining() || this.conn.isClosed(),
635+
}),
636+
);
637+
}
638+
await Promise.all(streamsDestroyP);
639+
640+
// Close after processing all streams
620641
if (!this.conn.isDraining() && !this.conn.isClosed()) {
621642
// If `this.conn.close` is already called, the connection will be draining,
622643
// in that case we just skip doing this local close.
@@ -632,20 +653,7 @@ class QUICConnection {
632653
});
633654
this.dispatchEvent(new events.EventQUICConnectionError({ detail: e }));
634655
}
635-
// Destroy all streams
636-
const streamsDestroyP: Array<Promise<void>> = [];
637-
for (const quicStream of this.streamMap.values()) {
638-
// The reason is only used if `force` is `true`
639-
// If `force` is not true, this will gracefully wait for
640-
// both readable and writable to gracefully close
641-
streamsDestroyP.push(
642-
quicStream.destroy({
643-
reason: this.errorLast,
644-
force: force || this.conn.isDraining() || this.conn.isClosed(),
645-
}),
646-
);
647-
}
648-
await Promise.all(streamsDestroyP);
656+
649657
// Waiting for `closedP` to resolve
650658
// Only the `this.connTimeoutTimer` will resolve this promise
651659
await this.closedP;

tests/QUICClient.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,6 +1775,7 @@ describe(QUICClient.name, () => {
17751775

17761776
// Handling client error event
17771777
const clientErrorProm = promise<never>();
1778+
void clientErrorProm.p.catch(() => {}); // Ignore unhandled rejection
17781779
client.addEventListener(
17791780
events.EventQUICClientError.name,
17801781
(evt: events.EventQUICClientError) => clientErrorProm.rejectP(evt.detail),
@@ -1783,6 +1784,7 @@ describe(QUICClient.name, () => {
17831784

17841785
// Handling client destroy event
17851786
const clientDestroyedProm = promise<void>();
1787+
void clientDestroyedProm.p.catch(() => {}); // Ignore unhandled rejection
17861788
client.addEventListener(
17871789
events.EventQUICClientDestroyed.name,
17881790
() => clientDestroyedProm.resolveP(),

tests/QUICStream.test.ts

Lines changed: 179 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,8 +672,6 @@ describe(QUICStream.name, () => {
672672
await streamEndedProm.p;
673673
expect(streamCreatedCount).toBe(streamsNum);
674674
expect(streamEndedCount).toBe(streamsNum);
675-
await client.destroy({ force: true });
676-
await server.stop({ force: true });
677675
});
678676
test('should clean up streams when connection times out', async () => {
679677
const streamsNum = 10;
@@ -1237,4 +1235,183 @@ describe(QUICStream.name, () => {
12371235
await client.destroy({ force: true });
12381236
await server.stop({ force: true });
12391237
});
1238+
1239+
test('streams are allowed to end when client is destroyed with force: false', async () => {
1240+
const message = Buffer.from('The Quick Brown Fox Jumped Over The Lazy Dog');
1241+
const numStreams = 10;
1242+
const numMessage = 10;
1243+
const connectionEventProm =
1244+
utils.promise<events.EventQUICServerConnection>();
1245+
const tlsConfig = await generateTLSConfig(defaultType);
1246+
const server = new QUICServer({
1247+
crypto: {
1248+
key,
1249+
ops: serverCrypto,
1250+
},
1251+
logger: logger.getChild(QUICServer.name),
1252+
config: {
1253+
key: tlsConfig.leafKeyPairPEM.privateKey,
1254+
cert: tlsConfig.leafCertPEM,
1255+
verifyPeer: false,
1256+
},
1257+
});
1258+
socketCleanMethods.extractSocket(server);
1259+
server.addEventListener(
1260+
events.EventQUICServerConnection.name,
1261+
(e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e),
1262+
);
1263+
await server.start({
1264+
host: localhost,
1265+
});
1266+
const client = await QUICClient.createQUICClient({
1267+
host: localhost,
1268+
port: server.port,
1269+
localHost: localhost,
1270+
crypto: {
1271+
ops: clientCrypto,
1272+
},
1273+
logger: logger.getChild(QUICClient.name),
1274+
config: {
1275+
verifyPeer: false,
1276+
},
1277+
});
1278+
socketCleanMethods.extractSocket(client);
1279+
const conn = (await connectionEventProm.p).detail;
1280+
1281+
// Do the test
1282+
const activeServerStreams: Array<Promise<void>> = [];
1283+
conn.addEventListener(
1284+
events.EventQUICConnectionStream.name,
1285+
(streamEvent: events.EventQUICConnectionStream) => {
1286+
const stream = streamEvent.detail;
1287+
const streamProm = stream.readable.pipeTo(stream.writable);
1288+
activeServerStreams.push(streamProm);
1289+
},
1290+
);
1291+
1292+
const { p: waitP, resolveP: waitResolveP } = utils.promise();
1293+
1294+
// Let's make a new streams.
1295+
const activeClientStreams: Array<Promise<void>> = [];
1296+
for (let i = 0; i < numStreams; i++) {
1297+
activeClientStreams.push(
1298+
(async () => {
1299+
const stream = client.connection.newStream();
1300+
const writer = stream.writable.getWriter();
1301+
const reader = stream.readable.getReader();
1302+
// Do write and read messages here.
1303+
for (let j = 0; j < numMessage; j++) {
1304+
await writer.write(message);
1305+
const readMessage = await reader.read();
1306+
expect(readMessage.done).toBeFalse();
1307+
expect(readMessage.value).toStrictEqual(message);
1308+
await waitP;
1309+
}
1310+
await writer.close();
1311+
const value = await reader.read();
1312+
expect(value.done).toBeTrue();
1313+
})(),
1314+
);
1315+
}
1316+
1317+
// Start unforced close of client
1318+
const clientDestroyP = client.destroy({ force: false });
1319+
waitResolveP();
1320+
1321+
await Promise.all([
1322+
Promise.all(activeClientStreams),
1323+
Promise.all(activeServerStreams),
1324+
clientDestroyP,
1325+
]);
1326+
await server.stop({ force: true });
1327+
});
1328+
test('streams are allowed to end when server is destroyed with force: false', async () => {
1329+
const message = Buffer.from('The Quick Brown Fox Jumped Over The Lazy Dog');
1330+
const numStreams = 10;
1331+
const numMessage = 10;
1332+
const connectionEventProm =
1333+
utils.promise<events.EventQUICServerConnection>();
1334+
const tlsConfig = await generateTLSConfig(defaultType);
1335+
const server = new QUICServer({
1336+
crypto: {
1337+
key,
1338+
ops: serverCrypto,
1339+
},
1340+
logger: logger.getChild(QUICServer.name),
1341+
config: {
1342+
key: tlsConfig.leafKeyPairPEM.privateKey,
1343+
cert: tlsConfig.leafCertPEM,
1344+
verifyPeer: false,
1345+
},
1346+
});
1347+
socketCleanMethods.extractSocket(server);
1348+
server.addEventListener(
1349+
events.EventQUICServerConnection.name,
1350+
(e: events.EventQUICServerConnection) => connectionEventProm.resolveP(e),
1351+
);
1352+
await server.start({
1353+
host: localhost,
1354+
});
1355+
const client = await QUICClient.createQUICClient({
1356+
host: localhost,
1357+
port: server.port,
1358+
localHost: localhost,
1359+
crypto: {
1360+
ops: clientCrypto,
1361+
},
1362+
logger: logger.getChild(QUICClient.name),
1363+
config: {
1364+
verifyPeer: false,
1365+
},
1366+
});
1367+
socketCleanMethods.extractSocket(client);
1368+
const conn = (await connectionEventProm.p).detail;
1369+
1370+
// Do the test
1371+
const activeServerStreams: Array<Promise<void>> = [];
1372+
conn.addEventListener(
1373+
events.EventQUICConnectionStream.name,
1374+
(streamEvent: events.EventQUICConnectionStream) => {
1375+
const stream = streamEvent.detail;
1376+
const streamProm = stream.readable.pipeTo(stream.writable);
1377+
activeServerStreams.push(streamProm);
1378+
},
1379+
);
1380+
1381+
const { p: waitP, resolveP: waitResolveP } = utils.promise();
1382+
1383+
// Let's make a new streams.
1384+
const activeClientStreams: Array<Promise<void>> = [];
1385+
for (let i = 0; i < numStreams; i++) {
1386+
activeClientStreams.push(
1387+
(async () => {
1388+
const stream = client.connection.newStream();
1389+
const writer = stream.writable.getWriter();
1390+
const reader = stream.readable.getReader();
1391+
// Do write and read messages here.
1392+
for (let j = 0; j < numMessage; j++) {
1393+
await writer.write(message);
1394+
const readMessage = await reader.read();
1395+
expect(readMessage.done).toBeFalse();
1396+
expect(readMessage.value).toStrictEqual(message);
1397+
await waitP;
1398+
}
1399+
await writer.close();
1400+
const value = await reader.read();
1401+
expect(value.done).toBeTrue();
1402+
})(),
1403+
);
1404+
}
1405+
1406+
// Start unforced close of server
1407+
const serverStopP = server.stop({ force: false });
1408+
waitResolveP();
1409+
1410+
await Promise.all([
1411+
Promise.all(activeClientStreams),
1412+
Promise.all(activeServerStreams),
1413+
serverStopP,
1414+
]);
1415+
await client.destroy({ force: true });
1416+
});
12401417
});

0 commit comments

Comments
 (0)