Skip to content

Commit a99c0e5

Browse files
committed
fix: better handling for stream cancellation
1 parent b8fa1eb commit a99c0e5

File tree

5 files changed

+103
-19
lines changed

5 files changed

+103
-19
lines changed

src/QUICConnection.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,8 +895,10 @@ class QUICConnection extends EventTarget {
895895
this.dispatchEvent(
896896
new events.QUICConnectionStreamEvent({ detail: quicStream }),
897897
);
898+
// No need to read after creation, doing so will throw during early cancellation
899+
} else {
900+
quicStream.read();
898901
}
899-
quicStream.read();
900902
}
901903
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
902904
const quicStream = this.streamMap.get(streamId);

src/QUICStream.ts

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ class QUICStream
8888
// create Peer state.
8989
try {
9090
connection.conn.streamSend(streamId, new Uint8Array(0), false);
91-
} catch {
92-
// FIXME: If there is an error here then stream will not create? Maybe we should abort?
91+
} catch (e) {
92+
// We ignore any errors here, if this is a server side stream then state already exists.
93+
// But it's possible for the stream to already be closed or have an error here.
94+
// These errors will be handled by the QUICStream and not here.
9395
}
9496
const stream = new this({
9597
streamId,
@@ -163,6 +165,18 @@ class QUICStream
163165
// and we need to propagate the error up and down the stream
164166
controller.error(reason);
165167
await this.closeRecv(true, reason);
168+
// It is possible the stream was cancelled, let's check the writable state;
169+
try {
170+
this.conn.streamWritable(this.streamId, 0);
171+
} catch (e) {
172+
const match = e.message.match(/InvalidStreamState\((.+)\)/);
173+
if (match == null) {
174+
return never(
175+
'Errors besides [InvalidStreamState(StreamId)] are not expected here',
176+
);
177+
}
178+
this.writableController.error(reason);
179+
}
166180
}
167181
break;
168182
}
@@ -319,8 +333,9 @@ class QUICStream
319333
this.readableController.close();
320334
}
321335
} catch (e) {
322-
// Ignore if done, not normally meant to happen but possible in rare cases
323-
if (e.message !== 'Done') {
336+
if (e.message === 'Done') {
337+
never();
338+
} else {
324339
this.logger.debug(`Stream recv reported: error ${e.message}`);
325340
if (!this._recvClosed) {
326341
// Close stream in background
@@ -329,6 +344,18 @@ class QUICStream
329344
(await this.processSendStreamError(e, 'recv')) ?? e;
330345
this.readableController.error(reason);
331346
await this.closeRecv(true, reason);
347+
// It is possible the stream was cancelled, let's check the writable state;
348+
try {
349+
this.conn.streamWritable(this.streamId, 0);
350+
} catch (e) {
351+
const match = e.message.match(/InvalidStreamState\((.+)\)/);
352+
if (match == null) {
353+
return never(
354+
'Errors besides [InvalidStreamState(StreamId)] are not expected here',
355+
);
356+
}
357+
this.writableController.error(reason);
358+
}
332359
})();
333360
}
334361
}
@@ -488,7 +515,7 @@ class QUICStream
488515
match = e.message.match(/InvalidStreamState\((.+)\)/);
489516
if (match != null) {
490517
// `InvalidStreamState()` returns the stream ID and not any actual error code
491-
return await this.codeToReason(type, 0);
518+
return never('Should never reach an [InvalidState(StreamId)] error');
492519
}
493520
return null;
494521
}

src/utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,8 @@ function decodeConnectionId(connIdString: ConnectionIdString): ConnectionId {
335335
return Buffer.from(connIdString, 'hex') as ConnectionId;
336336
}
337337

338-
function never(): never {
339-
throw new errors.ErrorQUICUndefinedBehaviour();
338+
function never(message?: string): never {
339+
throw new errors.ErrorQUICUndefinedBehaviour(message);
340340
}
341341

342342
function certificateDERToPEM(der: Uint8Array): string {

tests/QUICStream.test.ts

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -791,12 +791,13 @@ describe(QUICStream.name, () => {
791791
await client.destroy({ force: true });
792792
await server.stop({ force: true });
793793
});
794-
test('streams can be cancelled', async () => {
794+
test('streams can be cancelled after data sent', async () => {
795795
const cancelReason = Symbol('CancelReason');
796796
const connectionEventProm =
797797
utils.promise<events.QUICServerConnectionEvent>();
798798
const tlsConfig1 = await generateConfig(defaultType);
799799
const tlsConfig2 = await generateConfig(defaultType);
800+
const reasonConverters = testsUtils.createReasonConverters();
800801
const server = new QUICServer({
801802
crypto: {
802803
key,
@@ -809,6 +810,7 @@ describe(QUICStream.name, () => {
809810
verifyPeer: true,
810811
ca: tlsConfig2.ca,
811812
},
813+
...reasonConverters,
812814
});
813815
testsUtils.extractSocket(server, sockets);
814816
server.addEventListener(
@@ -831,6 +833,7 @@ describe(QUICStream.name, () => {
831833
key: tlsConfig2.key,
832834
cert: tlsConfig2.cert,
833835
},
836+
...reasonConverters,
834837
});
835838
testsUtils.extractSocket(client, sockets);
836839
const conn = (await connectionEventProm.p).detail;
@@ -848,19 +851,28 @@ describe(QUICStream.name, () => {
848851
const writer = clientStream.writable.getWriter();
849852
await writer.write(message);
850853
writer.releaseLock();
851-
await serverStreamProm.p;
852854
clientStream.cancel(cancelReason);
853855
await expect(clientStream.readable.getReader().read()).rejects.toBe(
854856
cancelReason,
855857
);
856858
await expect(clientStream.writable.getWriter().write()).rejects.toBe(
857859
cancelReason,
858860
);
861+
859862
// Let's check that the server side ended
860863
const serverStream = await serverStreamProm.p;
861-
await expect(
862-
serverStream.readable.pipeTo(serverStream.writable),
863-
).rejects.toThrow();
864+
const serverReadProm = (async () => {
865+
for await (const _ of serverStream.readable) {
866+
// Just consume until stream throws
867+
}
868+
})();
869+
await expect(serverReadProm).rejects.toBe(cancelReason);
870+
const serverWriter = serverStream.writable.getWriter();
871+
// Should throw
872+
await expect(serverWriter.write(Buffer.from('hello'))).rejects.toBe(
873+
cancelReason,
874+
);
875+
864876
// And client stream should've cleaned up
865877
await testsUtils.sleep(100);
866878
expect(clientStream[destroyed]).toBeTrue();
@@ -873,6 +885,7 @@ describe(QUICStream.name, () => {
873885
utils.promise<events.QUICServerConnectionEvent>();
874886
const tlsConfig1 = await generateConfig(defaultType);
875887
const tlsConfig2 = await generateConfig(defaultType);
888+
const reasonConverters = testsUtils.createReasonConverters();
876889
const server = new QUICServer({
877890
crypto: {
878891
key,
@@ -885,6 +898,7 @@ describe(QUICStream.name, () => {
885898
verifyPeer: true,
886899
ca: tlsConfig2.ca,
887900
},
901+
...reasonConverters,
888902
});
889903
testsUtils.extractSocket(server, sockets);
890904
server.addEventListener(
@@ -907,6 +921,7 @@ describe(QUICStream.name, () => {
907921
key: tlsConfig2.key,
908922
cert: tlsConfig2.cert,
909923
},
924+
...reasonConverters,
910925
});
911926
testsUtils.extractSocket(client, sockets);
912927
const conn = (await connectionEventProm.p).detail;
@@ -921,25 +936,34 @@ describe(QUICStream.name, () => {
921936
// Let's make a new streams.
922937
const clientStream = await client.connection.streamNew();
923938
clientStream.cancel(cancelReason);
924-
925939
await expect(clientStream.readable.getReader().read()).rejects.toBe(
926940
cancelReason,
927941
);
928942
await expect(clientStream.writable.getWriter().write()).rejects.toBe(
929943
cancelReason,
930944
);
945+
931946
// Let's check that the server side ended
932947
const serverStream = await serverStreamProm.p;
933-
await expect(
934-
serverStream.readable.pipeTo(serverStream.writable),
935-
).rejects.toThrow('recv 0');
948+
const serverReadProm = (async () => {
949+
for await (const _ of serverStream.readable) {
950+
// Just consume until stream throws
951+
}
952+
})();
953+
await expect(serverReadProm).rejects.toBe(cancelReason);
954+
const serverWriter = serverStream.writable.getWriter();
955+
// Should throw
956+
await expect(serverWriter.write(Buffer.from('hello'))).rejects.toBe(
957+
cancelReason,
958+
);
959+
936960
// And client stream should've cleaned up
937961
await testsUtils.sleep(100);
938962
expect(clientStream[destroyed]).toBeTrue();
939963
await client.destroy({ force: true });
940964
await server.stop({ force: true });
941965
});
942-
test('Stream will end when waiting for more data', async () => {
966+
test('stream will end when waiting for more data', async () => {
943967
// Needed to check that the pull based reading of data doesn't break when we
944968
// temporarily run out of data to read
945969
const connectionEventProm =
@@ -1008,7 +1032,7 @@ describe(QUICStream.name, () => {
10081032
await client.destroy({ force: true });
10091033
await server.stop({ force: true });
10101034
});
1011-
test('Stream can error when blocked on data', async () => {
1035+
test('stream can error when blocked on data', async () => {
10121036
// This checks that if the readable web-stream is full and not pulling data,
10131037
// we will still respond to an error in the readable stream
10141038

tests/utils.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type QUICSocket from '@/QUICSocket';
44
import type QUICClient from '@/QUICClient';
55
import type QUICServer from '@/QUICServer';
66
import type QUICStream from '@/QUICStream';
7+
import type { StreamCodeToReason, StreamReasonToCode } from '@';
78
import { Crypto } from '@peculiar/webcrypto';
89
import * as x509 from '@peculiar/x509';
910
import { never } from '@/utils';
@@ -740,6 +741,35 @@ async function generateConfig(type: KeyTypes): Promise<TLSConfigs> {
740741
};
741742
}
742743

744+
/**
745+
* This will create a `reasonToCode` and `CodeToReason` function that will
746+
* allow errors to "jump" the network boundary. It does this by mapping the
747+
* errors to an incrementing code and returning them on the other end of the
748+
* connection.
749+
*
750+
* Note: this should ONLY be used for testing as it requires the client and
751+
* server to share the same instance of `reasonToCode` and `codeToReason`.
752+
*/
753+
function createReasonConverters() {
754+
const reasonMap = new Map<number, any>();
755+
let code = 0;
756+
757+
const reasonToCode: StreamReasonToCode = (_type, reason) => {
758+
code++;
759+
reasonMap.set(code, reason);
760+
return code;
761+
};
762+
763+
const codeToReason: StreamCodeToReason = (_type, code) => {
764+
return reasonMap.get(code) ?? new Error('Reason not found');
765+
};
766+
767+
return {
768+
reasonToCode,
769+
codeToReason,
770+
};
771+
}
772+
743773
export {
744774
sleep,
745775
randomBytes,
@@ -760,6 +790,7 @@ export {
760790
waitForTimeoutNull,
761791
connStats,
762792
generateConfig,
793+
createReasonConverters,
763794
};
764795

765796
export type { Messages, StreamData, KeyTypes, TLSConfigs };

0 commit comments

Comments
 (0)