Skip to content

Commit 8f7c817

Browse files
committed
fix: Fixed bug with concurrent stream cancellation
1 parent 14fbd7d commit 8f7c817

File tree

3 files changed

+100
-5
lines changed

3 files changed

+100
-5
lines changed

src/QUICConnection.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -907,9 +907,22 @@ class QUICConnection extends EventTarget {
907907
// The QUICStream will always exist before processing it's writable.
908908
// 1. First time it is seen in the readable iterator
909909
// 2. created using `streamNew()`
910-
never();
910+
911+
// There is one condition where this can happen. That is when both sides of the stream cancel concurrently.
912+
// Local state is cleaned up while the remote side still sends a closing frame.
913+
try {
914+
this.conn.streamWritable(streamId, 0);
915+
this.logger.debug(
916+
`streamId ${streamId} was writable without an existing stream`,
917+
);
918+
} catch (e) {
919+
this.logger.debug(
920+
`streamId ${streamId} was writable without an existing stream and error ${e.message}`,
921+
);
922+
}
923+
} else {
924+
quicStream.write();
911925
}
912-
quicStream.write();
913926
}
914927
}
915928

src/QUICStream.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,7 @@ class QUICStream
327327
this.readableController.close();
328328
}
329329
} catch (e) {
330-
if (e.message === 'Done') {
331-
never();
332-
} else {
330+
if (e.message !== 'Done') {
333331
this.logger.debug(`Stream recv reported: error ${e.message}`);
334332
if (!this._recvClosed) {
335333
// Close stream in background

tests/QUICStream.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,90 @@ describe(QUICStream.name, () => {
963963
await client.destroy({ force: true });
964964
await server.stop({ force: true });
965965
});
966+
test('streams can be cancelled concurrently after data sent', async () => {
967+
const cancelReason = Symbol('CancelReason');
968+
const connectionEventProm =
969+
utils.promise<events.QUICServerConnectionEvent>();
970+
const tlsConfig1 = await generateConfig(defaultType);
971+
const tlsConfig2 = await generateConfig(defaultType);
972+
const reasonConverters = testsUtils.createReasonConverters();
973+
const server = new QUICServer({
974+
crypto: {
975+
key,
976+
ops: serverCrypto,
977+
},
978+
logger: logger.getChild(QUICServer.name),
979+
config: {
980+
key: tlsConfig1.key,
981+
cert: tlsConfig1.cert,
982+
verifyPeer: true,
983+
ca: tlsConfig2.ca,
984+
},
985+
...reasonConverters,
986+
});
987+
testsUtils.extractSocket(server, sockets);
988+
server.addEventListener(
989+
'serverConnection',
990+
(e: events.QUICServerConnectionEvent) => connectionEventProm.resolveP(e),
991+
);
992+
await server.start({
993+
host: localhost,
994+
});
995+
const client = await QUICClient.createQUICClient({
996+
host: localhost,
997+
port: server.port,
998+
localHost: localhost,
999+
crypto: {
1000+
ops: clientCrypto,
1001+
},
1002+
logger: logger.getChild(QUICClient.name),
1003+
config: {
1004+
verifyPeer: false,
1005+
key: tlsConfig2.key,
1006+
cert: tlsConfig2.cert,
1007+
},
1008+
...reasonConverters,
1009+
});
1010+
testsUtils.extractSocket(client, sockets);
1011+
const conn = (await connectionEventProm.p).detail;
1012+
// Do the test
1013+
const serverStreamProm = utils.promise<QUICStream>();
1014+
conn.addEventListener(
1015+
'connectionStream',
1016+
(event: events.QUICConnectionStreamEvent) => {
1017+
serverStreamProm.resolveP(event.detail);
1018+
},
1019+
);
1020+
// Let's make a new streams.
1021+
const message = Buffer.from('Hello!');
1022+
const clientStream = await client.connection.streamNew();
1023+
const writer = clientStream.writable.getWriter();
1024+
await writer.write(message);
1025+
writer.releaseLock();
1026+
const serverStream = await serverStreamProm.p;
1027+
serverStream.cancel(cancelReason);
1028+
clientStream.cancel(cancelReason);
1029+
1030+
// Checking stream states
1031+
await expect(clientStream.readable.getReader().read()).rejects.toBe(
1032+
cancelReason,
1033+
);
1034+
await expect(clientStream.writable.getWriter().write()).rejects.toBe(
1035+
cancelReason,
1036+
);
1037+
await expect(serverStream.readable.getReader().read()).rejects.toBe(
1038+
cancelReason,
1039+
);
1040+
await expect(serverStream.writable.getWriter().write()).rejects.toBe(
1041+
cancelReason,
1042+
);
1043+
1044+
// And client stream should've cleaned up
1045+
await testsUtils.sleep(100);
1046+
expect(clientStream[destroyed]).toBeTrue();
1047+
await client.destroy({ force: true });
1048+
await server.stop({ force: true });
1049+
});
9661050
test('stream will end when waiting for more data', async () => {
9671051
// Needed to check that the pull based reading of data doesn't break when we
9681052
// temporarily run out of data to read

0 commit comments

Comments
 (0)