Skip to content

Commit 830a1b7

Browse files
committed
chore: added cancellation to discovery background task
1 parent afe8cc8 commit 830a1b7

File tree

6 files changed

+50
-36
lines changed

6 files changed

+50
-36
lines changed

src/nodes/NodeConnection.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,9 @@ class NodeConnection {
169169
targetHostname,
170170
tlsConfig,
171171
connectionKeepAliveIntervalTime,
172-
connectionKeepAliveTimeoutTime = config.defaultsSystem
173-
.nodesConnectionIdleTimeoutTimeMin,
174-
connectionInitialMaxStreamsBidi = config.defaultsSystem
175-
.nodesConnectionInitialMaxStreamsBidi,
176-
connectionInitialMaxStreamsUni = config.defaultsSystem
177-
.nodesConnectionInitialMaxStreamsUni,
172+
connectionKeepAliveTimeoutTime,
173+
connectionInitialMaxStreamsBidi,
174+
connectionInitialMaxStreamsUni,
178175
quicSocket,
179176
manifest,
180177
logger,

src/nodes/NodeConnectionManager.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -663,13 +663,10 @@ class NodeConnectionManager {
663663
const cancelAuthenticationPs: Array<PromiseCancellable<void>> = [];
664664
const cancelReason = new nodesErrors.ErrorNodeConnectionManagerStopping();
665665
for (const [nodeIdString] of this.connections) {
666-
const destroyP = this.authenticateCancel(nodeIdString, cancelReason).then(
667-
async () => {
668-
return await this.destroyConnection(
669-
IdInternal.fromString<NodeId>(nodeIdString),
670-
force,
671-
);
672-
},
666+
this.authenticateCancel(nodeIdString, cancelReason);
667+
const destroyP = this.destroyConnection(
668+
IdInternal.fromString<NodeId>(nodeIdString),
669+
force,
673670
);
674671
destroyConnectionPs.push(destroyP);
675672
}
@@ -1186,7 +1183,7 @@ class NodeConnectionManager {
11861183
const remainingKeys = Object.keys(connectionsEntry.connections);
11871184
if (remainingKeys.length === 0) {
11881185
// Clean up authentication
1189-
await this.authenticateCancel(
1186+
this.authenticateCancel(
11901187
targetNodeIdString,
11911188
new nodesErrors.ErrorNodeManagerAuthenticationFailed(
11921189
'Connection destroyed before authentication could complete',
@@ -1817,7 +1814,6 @@ class NodeConnectionManager {
18171814
nodeId: NodeId,
18181815
@context ctx: ContextTimed,
18191816
): Promise<void> {
1820-
ctx.signal.throwIfAborted();
18211817
const targetNodeIdString = nodeId.toString() as NodeIdString;
18221818
const connectionsEntry = this.connections.get(targetNodeIdString);
18231819
if (connectionsEntry == null) {
@@ -1827,11 +1823,16 @@ class NodeConnectionManager {
18271823
const abortHandler = () => {
18281824
rejectAbortP(ctx.signal.reason);
18291825
};
1830-
ctx.signal.addEventListener('abort', abortHandler, { once: true });
1826+
if (ctx.signal.aborted) {
1827+
abortHandler();
1828+
} else {
1829+
ctx.signal.addEventListener('abort', abortHandler, { once: true });
1830+
}
18311831
try {
18321832
return await Promise.race([connectionsEntry.authenticatedP, abortP]);
18331833
} catch (e) {
1834-
// Capture the stacktrace here since knowing where we're waiting for authentication is more useful
1834+
// Capture the stacktrace here since knowing where we're waiting for
1835+
// authentication is more useful.
18351836
Error.captureStackTrace(e);
18361837
throw e;
18371838
} finally {
@@ -1890,7 +1891,7 @@ class NodeConnectionManager {
18901891
}
18911892
}
18921893

1893-
protected async authenticateCancel(
1894+
protected authenticateCancel(
18941895
targetNodeIdString: NodeIdString,
18951896
reason: Error,
18961897
) {

src/nodes/NodeGraph.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -295,18 +295,20 @@ class NodeGraph {
295295
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
296296
public async *getNodeContacts(
297297
order: 'asc' | 'desc' = 'asc',
298-
tran?: DBTransaction,
298+
tran: DBTransaction | undefined,
299+
ctx: ContextTimed,
299300
): AsyncGenerator<[NodeId, NodeContact]> {
300301
if (tran == null) {
301302
// Lambda generators don't grab the `this` context, so we need to bind it
302-
const getNodeContacts = (tran) => this.getNodeContacts(order, tran);
303+
const getNodeContacts = (tran) => this.getNodeContacts(order, tran, ctx);
303304
return yield* this.db.withTransactionG(async function* (tran) {
304305
return yield* getNodeContacts(tran);
305306
});
306307
}
307308
return yield* nodesUtils.collectNodeContacts(
308309
[...this.nodeGraphBucketsDbPath],
309310
tran,
311+
ctx,
310312
{ reverse: order !== 'asc' },
311313
);
312314
}
@@ -663,6 +665,7 @@ class NodeGraph {
663665
for await (const result of nodesUtils.collectNodeContacts(
664666
[...this.nodeGraphBucketsDbPath, bucketKey],
665667
tran,
668+
ctx,
666669
{
667670
reverse: order !== 'asc',
668671
limit,
@@ -740,10 +743,18 @@ class NodeGraph {
740743
* Resets the bucket according to the new node ID.
741744
* Run this after new node ID is generated via renewal or reset.
742745
*/
746+
public async resetBuckets(
747+
tran?: DBTransaction,
748+
ctx?: ContextTimed,
749+
): Promise<void>;
743750
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
744-
public async resetBuckets(tran?: DBTransaction): Promise<void> {
751+
@timedCancellable(true)
752+
public async resetBuckets(
753+
tran: DBTransaction | undefined,
754+
@context ctx: ContextTimed,
755+
): Promise<void> {
745756
if (tran == null) {
746-
return this.db.withTransactionF((tran) => this.resetBuckets(tran));
757+
return this.db.withTransactionF((tran) => this.resetBuckets(tran, ctx));
747758
}
748759
// Setup new space
749760
const spaceNew = this.space === '0' ? '1' : '0';
@@ -764,6 +775,7 @@ class NodeGraph {
764775
for await (const [nodeId, nodeContact] of nodesUtils.collectNodeContacts(
765776
[...this.nodeGraphBucketsDbPath],
766777
tran,
778+
ctx,
767779
)) {
768780
const nodeIdKey = nodesUtils.bucketDbKey(nodeId);
769781
const nodeIdOwn = this.keyRing.getNodeId();
@@ -965,12 +977,12 @@ class NodeGraph {
965977
for await (const nodeEntry of nodesUtils.collectNodeContacts(
966978
this.nodeGraphBucketsDbPath,
967979
tran,
980+
ctx,
968981
{
969982
lt: [bucketIdKey, ''],
970983
limit: remainingLimit,
971984
},
972985
)) {
973-
ctx.signal.throwIfAborted();
974986
nodes.push(nodeEntry);
975987
}
976988
}
@@ -982,12 +994,12 @@ class NodeGraph {
982994
for await (const nodeEntry of nodesUtils.collectNodeContacts(
983995
this.nodeGraphBucketsDbPath,
984996
tran,
997+
ctx,
985998
{
986999
gt: [bucketId, ''],
9871000
limit: remainingLimit,
9881001
},
9891002
)) {
990-
ctx.signal.throwIfAborted();
9911003
nodes.push(nodeEntry);
9921004
}
9931005
}

src/nodes/NodeManager.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,7 @@ class NodeManager {
136136
_taskInfo,
137137
bucketIndex: NodeBucketIndex,
138138
) => {
139-
// Don't use defaults like this
140-
// if a default is to be used
141-
// provide it directly
142-
143-
await this.refreshBucket(
144-
bucketIndex,
145-
this.connectionConnectTimeoutTime,
146-
ctx,
147-
);
139+
await this.refreshBucket(bucketIndex, undefined, ctx);
148140
// When completed reschedule the task
149141
// if refreshBucketDelay is 0 then it's considered disabled
150142
if (this.refreshBucketDelayTime > 0) {
@@ -722,6 +714,7 @@ class NodeManager {
722714
}
723715

724716
while (true) {
717+
ctx.signal.throwIfAborted();
725718
const isDone = await nodeConnectionsQueue.withNodeSignal(
726719
async (nodeIdTarget, nodeIdSignaller) => {
727720
let nodeConnection: NodeConnection | undefined;
@@ -863,6 +856,7 @@ class NodeManager {
863856
}
864857

865858
while (true) {
859+
ctx.signal.throwIfAborted();
866860
const isDone = await nodeConnectionsQueue.withNodeDirect(
867861
async (nodeIdTarget, nodeContact) => {
868862
if (!this.nodeConnectionManager.hasConnection(nodeIdTarget)) {
@@ -1135,6 +1129,7 @@ class NodeManager {
11351129
);
11361130
// Collecting results
11371131
for await (const result of resultStream) {
1132+
ctx.signal.throwIfAborted();
11381133
const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId);
11391134
if (nodeIdNew == null) {
11401135
utils.never(`failed to decode NodeId "${result.nodeId}"`);
@@ -1151,6 +1146,7 @@ class NodeManager {
11511146
ctx,
11521147
);
11531148
for await (const { nodeIdEncoded, nodeContact } of resultStream) {
1149+
ctx.signal.throwIfAborted();
11541150
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
11551151
if (nodeId == null) {
11561152
utils.never(`failed to decode NodeId "${nodeIdEncoded}"`);
@@ -2026,8 +2022,7 @@ class NodeManager {
20262022
@timedCancellable(true)
20272023
public async refreshBucket(
20282024
bucketIndex: NodeBucketIndex,
2029-
connectionConnectTimeoutTime: number | undefined = this
2030-
.connectionConnectTimeoutTime,
2025+
connectionConnectTimeoutTime: number = this.connectionConnectTimeoutTime,
20312026
@context ctx: ContextTimed,
20322027
): Promise<void> {
20332028
// We need to generate a random nodeId for this bucket

src/nodes/utils.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import type {
1818
NodesAuthenticateConnectionMessageBasicPublic,
1919
NodesAuthenticateConnectionMessageNone,
2020
} from './agent/types';
21+
import type { ContextTimed } from '@matrixai/contexts';
2122
import dns from 'dns';
2223
import { utils as dbUtils } from '@matrixai/db';
2324
import { IdInternal } from '@matrixai/id';
@@ -743,6 +744,7 @@ const quicServerCrypto: QUICServerCrypto = {
743744
async function* collectNodeContacts(
744745
levelPath: LevelPath,
745746
tran: DBTransaction,
747+
ctx: ContextTimed,
746748
options: {
747749
reverse?: boolean;
748750
lt?: LevelPath;
@@ -763,6 +765,7 @@ async function* collectNodeContacts(
763765
gt: options.gt,
764766
valueAsBuffer: false,
765767
})) {
768+
ctx.signal.throwIfAborted();
766769
const { nodeId: nodeIdCurrent, nodeContactAddress } = parseBucketsDbKey([
767770
...(options.pathAdjust ?? []),
768771
...keyPath,

tests/nodes/NodeGraph.test.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { ContextTimed } from '@matrixai/contexts';
12
import type {
23
NodeContactAddress,
34
NodeContact,
@@ -321,7 +322,12 @@ describe(`${NodeGraph.name} test`, () => {
321322
await nodeGraph.setNodeContact(nodeId2, nodeContact2);
322323

323324
const results: Array<[NodeId, NodeContact]> = [];
324-
for await (const result of nodeGraph.getNodeContacts()) {
325+
const abortController = new AbortController();
326+
for await (const result of nodeGraph.getNodeContacts(
327+
undefined,
328+
undefined,
329+
{ signal: abortController.signal } as ContextTimed,
330+
)) {
325331
results.push(result);
326332
}
327333
expect(results.length).toBe(2);

0 commit comments

Comments
 (0)