Skip to content

Commit 5abf687

Browse files
authored
Merge pull request #75 from MatrixAI/feature-unforced-end
fix ending `QUICConnection` with `force: false`
2 parents c06bf7f + e64ed4a commit 5abf687

File tree

6 files changed

+452
-23
lines changed

6 files changed

+452
-23
lines changed

package-lock.json

Lines changed: 48 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/QUICConnection.ts

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { buildQuicheConfig, minIdleTimeout } from './config';
2727
import QUICConnectionId from './QUICConnectionId';
2828
import QUICStream from './QUICStream';
2929
import { quiche, ConnectionErrorCode } from './native';
30+
import { Shutdown } from './native/types';
3031
import * as utils from './utils';
3132
import * as events from './events';
3233
import * as errors from './errors';
@@ -616,7 +617,27 @@ class QUICConnection {
616617
} = {}) {
617618
this.logger.info(`Stop ${this.constructor.name}`);
618619
this.stopKeepAliveIntervalTimer();
619-
// Closing the connection first to avoid accepting new streams
620+
621+
// Yield to allow any background processing to settle before proceeding.
622+
// This will allow any streams to process buffers before continuing
623+
await utils.yieldMicro();
624+
625+
// Destroy all streams
626+
const streamsDestroyP: Array<Promise<void>> = [];
627+
for (const quicStream of this.streamMap.values()) {
628+
// The reason is only used if `force` is `true`
629+
// If `force` is not true, this will gracefully wait for
630+
// both readable and writable to gracefully close
631+
streamsDestroyP.push(
632+
quicStream.destroy({
633+
reason: this.errorLast,
634+
force: force || this.conn.isDraining() || this.conn.isClosed(),
635+
}),
636+
);
637+
}
638+
await Promise.all(streamsDestroyP);
639+
640+
// Close after processing all streams
620641
if (!this.conn.isDraining() && !this.conn.isClosed()) {
621642
// If `this.conn.close` is already called, the connection will be draining,
622643
// in that case we just skip doing this local close.
@@ -632,20 +653,7 @@ class QUICConnection {
632653
});
633654
this.dispatchEvent(new events.EventQUICConnectionError({ detail: e }));
634655
}
635-
// Destroy all streams
636-
const streamsDestroyP: Array<Promise<void>> = [];
637-
for (const quicStream of this.streamMap.values()) {
638-
// The reason is only used if `force` is `true`
639-
// If `force` is not true, this will gracefully wait for
640-
// both readable and writable to gracefully close
641-
streamsDestroyP.push(
642-
quicStream.destroy({
643-
reason: this.errorLast,
644-
force: force || this.conn.isDraining() || this.conn.isClosed(),
645-
}),
646-
);
647-
}
648-
await Promise.all(streamsDestroyP);
656+
649657
// Waiting for `closedP` to resolve
650658
// Only the `this.connTimeoutTimer` will resolve this promise
651659
await this.closedP;
@@ -940,6 +948,13 @@ class QUICConnection {
940948
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
941949
let quicStream = this.streamMap.get(streamId);
942950
if (quicStream == null) {
951+
if (this[running] === false || this[status] === 'stopping') {
952+
// We should reject new connections when stopping
953+
this.conn.streamShutdown(streamId, Shutdown.Write, 1);
954+
this.conn.streamShutdown(streamId, Shutdown.Read, 1);
955+
continue;
956+
}
957+
943958
quicStream = QUICStream.createQUICStream({
944959
initiated: 'peer',
945960
streamId,
@@ -969,6 +984,13 @@ class QUICConnection {
969984
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
970985
let quicStream = this.streamMap.get(streamId);
971986
if (quicStream == null) {
987+
if (this[running] === false || this[status] === 'stopping') {
988+
// We should reject new connections when stopping
989+
this.conn.streamShutdown(streamId, Shutdown.Write, 1);
990+
this.conn.streamShutdown(streamId, Shutdown.Read, 1);
991+
continue;
992+
}
993+
972994
quicStream = QUICStream.createQUICStream({
973995
initiated: 'peer',
974996
streamId,
@@ -1132,6 +1154,21 @@ class QUICConnection {
11321154
return quicStream;
11331155
}
11341156

1157+
/**
1158+
* Destroys all active streams without closing the connection.
1159+
*
1160+
* If there are no active streams then it will do nothing.
1161+
* If the connection is stopped with `force: false` then this can be used
1162+
* to force close any streams `stop` is waiting for to end.
1163+
*
1164+
* Destruction will occur in the background.
1165+
*/
1166+
public destroyStreams(reason?: any) {
1167+
for (const quicStream of this.streamMap.values()) {
1168+
quicStream.cancel(reason);
1169+
}
1170+
}
1171+
11351172
/**
11361173
* Starts the keep alive interval timer.
11371174
*

src/utils.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ import * as errors from './errors';
1717
const textEncoder = new TextEncoder();
1818
const textDecoder = new TextDecoder('utf-8');
1919

20+
/**
21+
* Used to yield to the event loop to allow other micro tasks to process
22+
*/
23+
async function yieldMicro(): Promise<void> {
24+
return await new Promise<void>((r) => queueMicrotask(r));
25+
}
26+
2027
/**
2128
* Convert callback-style to promise-style
2229
* If this is applied to overloaded function
@@ -550,6 +557,7 @@ function isStreamReset(e: Error): number | false {
550557
export {
551558
textEncoder,
552559
textDecoder,
560+
yieldMicro,
553561
promisify,
554562
promise,
555563
bufferWrap,

tests/QUICClient.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,6 +1775,7 @@ describe(QUICClient.name, () => {
17751775

17761776
// Handling client error event
17771777
const clientErrorProm = promise<never>();
1778+
void clientErrorProm.p.catch(() => {}); // Ignore unhandled rejection
17781779
client.addEventListener(
17791780
events.EventQUICClientError.name,
17801781
(evt: events.EventQUICClientError) => clientErrorProm.rejectP(evt.detail),
@@ -1783,6 +1784,7 @@ describe(QUICClient.name, () => {
17831784

17841785
// Handling client destroy event
17851786
const clientDestroyedProm = promise<void>();
1787+
void clientDestroyedProm.p.catch(() => {}); // Ignore unhandled rejection
17861788
client.addEventListener(
17871789
events.EventQUICClientDestroyed.name,
17881790
() => clientDestroyedProm.resolveP(),

0 commit comments

Comments
 (0)