Skip to content

Commit 5b16a53

Browse files
authored
Merge pull request #872 from MatrixAI/feature-agent-stop-termination
Adding cancellation to background handlers to prevent agent from being held open
2 parents 7dcd635 + 584af7f commit 5b16a53

File tree

11 files changed

+255
-80
lines changed

11 files changed

+255
-80
lines changed

src/discovery/Discovery.ts

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class Discovery {
161161
this.dispatchEvent(
162162
new discoveryEvents.EventDiscoveryVertexProcessed({
163163
detail: {
164-
vertex,
164+
vertex: vertex,
165165
parent: parent ?? undefined,
166166
},
167167
}),
@@ -190,7 +190,7 @@ class Discovery {
190190
this.dispatchEvent(
191191
new discoveryEvents.EventDiscoveryVertexFailed({
192192
detail: {
193-
vertex,
193+
vertex: vertex,
194194
parent: parent ?? undefined,
195195
message: e.message,
196196
code: e.code,
@@ -206,9 +206,13 @@ class Discovery {
206206
/**
207207
* This handler is run periodically to check if nodes are ready to be rediscovered
208208
*/
209-
protected checkRediscoveryHandler: TaskHandler = async () => {
209+
protected checkRediscoveryHandler: TaskHandler = async (
210+
ctx: ContextTimed,
211+
) => {
210212
await this.checkRediscovery(
211213
Date.now() - this.rediscoverVertexThresholdTime,
214+
undefined,
215+
ctx,
212216
);
213217
await this.taskManager.scheduleTask({
214218
handlerId: this.checkRediscoveryHandlerId,
@@ -407,18 +411,18 @@ class Discovery {
407411
const [type, id] = vertexId;
408412
switch (type) {
409413
case 'node':
410-
return await this.processNode(id, ctx, lastProcessedCutoffTime);
414+
return await this.processNode(id, lastProcessedCutoffTime, ctx);
411415
case 'identity':
412-
return await this.processIdentity(id, ctx, lastProcessedCutoffTime);
416+
return await this.processIdentity(id, lastProcessedCutoffTime, ctx);
413417
default:
414418
never(`type must be either "node" or "identity" got "${type}"`);
415419
}
416420
}
417421

418422
protected async processNode(
419423
nodeId: NodeId,
424+
lastProcessedCutoffTime: number | undefined,
420425
ctx: ContextTimed,
421-
lastProcessedCutoffTime?: number,
422426
) {
423427
// If the vertex we've found is our own node, we simply get our own chain
424428
const processedTime = Date.now();
@@ -456,7 +460,6 @@ class Discovery {
456460
}
457461
// Iterate over each of the claims in the chain (already verified).
458462
for (const signedClaim of Object.values(vertexChainData)) {
459-
if (ctx.signal.aborted) throw ctx.signal.reason;
460463
switch (signedClaim.payload.typ) {
461464
case 'ClaimLinkNode':
462465
await this.processClaimLinkNode(
@@ -469,8 +472,8 @@ class Discovery {
469472
await this.processClaimLinkIdentity(
470473
signedClaim as SignedClaim<ClaimLinkIdentity>,
471474
nodeId,
472-
ctx,
473475
lastProcessedCutoffTime,
476+
ctx,
474477
);
475478
break;
476479
default:
@@ -553,8 +556,8 @@ class Discovery {
553556
protected async processClaimLinkIdentity(
554557
signedClaim: SignedClaim<ClaimLinkIdentity>,
555558
nodeId: NodeId,
556-
ctx: ContextTimed,
557559
lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime,
560+
ctx: ContextTimed,
558561
): Promise<void> {
559562
// Checking the claim is valid
560563
const publicKey = keysUtils.publicKeyFromNodeId(nodeId);
@@ -655,8 +658,8 @@ class Discovery {
655658

656659
protected async processIdentity(
657660
id: ProviderIdentityId,
658-
ctx: ContextTimed,
659661
lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime,
662+
ctx: ContextTimed,
660663
) {
661664
// If the next vertex is an identity, perform a social discovery
662665
// Firstly get the identity info of this identity
@@ -789,7 +792,7 @@ class Discovery {
789792
parent?: GestaltId,
790793
ignoreActive: boolean = false,
791794
tran?: DBTransaction,
792-
) {
795+
): Promise<void> {
793796
if (tran == null) {
794797
return this.db.withTransactionF((tran) =>
795798
this.scheduleDiscoveryForVertex(
@@ -852,7 +855,7 @@ class Discovery {
852855
],
853856
lazy: true,
854857
deadline: this.discoverVertexTimeoutTime,
855-
delay,
858+
delay: delay,
856859
},
857860
tran,
858861
);
@@ -1034,10 +1037,17 @@ class Discovery {
10341037
public async checkRediscovery(
10351038
lastProcessedCutoffTime: number,
10361039
tran?: DBTransaction,
1040+
ctx?: Partial<ContextTimedInput>,
1041+
): Promise<void>;
1042+
@timedCancellable(true)
1043+
public async checkRediscovery(
1044+
lastProcessedCutoffTime: number,
1045+
tran: DBTransaction | undefined,
1046+
@context ctx: ContextTimed,
10371047
): Promise<void> {
10381048
if (tran == null) {
10391049
return this.db.withTransactionF((tran) =>
1040-
this.checkRediscovery(lastProcessedCutoffTime, tran),
1050+
this.checkRediscovery(lastProcessedCutoffTime, tran, ctx),
10411051
);
10421052
}
10431053

@@ -1055,6 +1065,7 @@ class Discovery {
10551065
},
10561066
tran,
10571067
)) {
1068+
ctx.signal.throwIfAborted();
10581069
gestaltIds.push([
10591070
gestaltsUtils.encodeGestaltId(gestaltId),
10601071
lastProcessedTime,
@@ -1091,6 +1102,7 @@ class Discovery {
10911102
[this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded],
10921103
tran,
10931104
)) {
1105+
ctx.signal.throwIfAborted();
10941106
if (taskExisting == null) {
10951107
taskExisting = task;
10961108
continue;

src/nodes/NodeConnectionManager.ts

Lines changed: 84 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ import {
4343
status,
4444
} from '@matrixai/async-init/dist/StartStop';
4545
import { AbstractEvent, EventAll } from '@matrixai/events';
46-
import { context, timedCancellable } from '@matrixai/contexts/dist/decorators';
46+
import {
47+
context,
48+
timed,
49+
timedCancellable,
50+
} from '@matrixai/contexts/dist/decorators';
4751
import { Semaphore } from '@matrixai/async-locks';
4852
import { PromiseCancellable } from '@matrixai/async-cancellable';
4953
import NodeConnection from './NodeConnection';
@@ -768,13 +772,15 @@ class NodeConnectionManager {
768772
* itself is such that we can pass targetNodeId as a parameter (as opposed to
769773
* an acquire function with no parameters).
770774
* @param targetNodeId Id of target node to communicate with
775+
* @param ctx
771776
* @returns ResourceAcquire Resource API for use in with contexts
772777
*/
773778
public acquireConnection(
774779
targetNodeId: NodeId,
780+
ctx: ContextTimed,
775781
): ResourceAcquire<NodeConnection> {
776782
return async () => {
777-
await this.isAuthenticatedP(targetNodeId);
783+
await this.isAuthenticatedP(targetNodeId, ctx);
778784
return await this.acquireConnectionInternal(targetNodeId)();
779785
};
780786
}
@@ -785,14 +791,22 @@ class NodeConnectionManager {
785791
* doesn't exist.
786792
* for use with normal arrow function
787793
* @param targetNodeId Id of target node to communicate with
794+
* @param ctx
788795
* @param f Function to handle communication
789796
*/
790797
public async withConnF<T>(
791798
targetNodeId: NodeId,
799+
ctx: Partial<ContextTimedInput> | undefined,
800+
f: (conn: NodeConnection) => Promise<T>,
801+
): Promise<T>;
802+
@timedCancellable(true)
803+
public async withConnF<T>(
804+
targetNodeId: NodeId,
805+
@context ctx: ContextTimed,
792806
f: (conn: NodeConnection) => Promise<T>,
793807
): Promise<T> {
794808
return await withF(
795-
[this.acquireConnection(targetNodeId)],
809+
[this.acquireConnection(targetNodeId, ctx)],
796810
async ([conn]) => {
797811
return await f(conn);
798812
},
@@ -805,14 +819,22 @@ class NodeConnectionManager {
805819
* doesn't exist.
806820
* for use with a generator function
807821
* @param targetNodeId Id of target node to communicate with
822+
* @param ctx
808823
* @param g Generator function to handle communication
809824
*/
825+
public withConnG<T, TReturn, TNext>(
826+
targetNodeId: NodeId,
827+
ctx: Partial<ContextTimedInput> | undefined,
828+
g: (conn: NodeConnection) => AsyncGenerator<T, TReturn, TNext>,
829+
): AsyncGenerator<T, TReturn, TNext>;
810830
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
831+
@timed()
811832
public async *withConnG<T, TReturn, TNext>(
812833
targetNodeId: NodeId,
834+
@context ctx: ContextTimed,
813835
g: (conn: NodeConnection) => AsyncGenerator<T, TReturn, TNext>,
814836
): AsyncGenerator<T, TReturn, TNext> {
815-
const acquire = this.acquireConnection(targetNodeId);
837+
const acquire = this.acquireConnection(targetNodeId, ctx);
816838
const [release, conn] = await acquire();
817839
let caughtError: Error | undefined;
818840
try {
@@ -975,6 +997,7 @@ class NodeConnectionManager {
975997
}
976998
const { host, port } = await this.withConnF(
977999
nodeIdSignaller,
1000+
ctx,
9781001
async (conn) => {
9791002
const client = conn.getClient();
9801003
const nodeIdSource = this.keyRing.getNodeId();
@@ -1440,8 +1463,27 @@ class NodeConnectionManager {
14401463
* @param targetNodeId - NodeId of the node that needs to initiate hole punching.
14411464
* @param address - Address the target needs to punch to.
14421465
* @param requestSignature - `base64url` encoded signature
1466+
* @param ctx
14431467
*/
1468+
public async handleNodesConnectionSignalInitial(
1469+
sourceNodeId: NodeId,
1470+
targetNodeId: NodeId,
1471+
address: {
1472+
host: Host;
1473+
port: Port;
1474+
},
1475+
requestSignature: string,
1476+
ctx?: Partial<ContextTimedInput>,
1477+
): Promise<{
1478+
host: Host;
1479+
port: Port;
1480+
}>;
14441481
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
1482+
@timedCancellable(
1483+
true,
1484+
(nodeConnectionManager: NodeConnectionManager) =>
1485+
nodeConnectionManager.connectionConnectTimeoutTime,
1486+
)
14451487
public async handleNodesConnectionSignalInitial(
14461488
sourceNodeId: NodeId,
14471489
targetNodeId: NodeId,
@@ -1450,6 +1492,7 @@ class NodeConnectionManager {
14501492
port: Port;
14511493
},
14521494
requestSignature: string,
1495+
@context ctx: ContextTimed,
14531496
): Promise<{
14541497
host: Host;
14551498
port: Port;
@@ -1479,16 +1522,20 @@ class NodeConnectionManager {
14791522
this.keyRing.keyPair,
14801523
data,
14811524
);
1482-
const connectionSignalP = this.withConnF(targetNodeId, async (conn) => {
1483-
const client = conn.getClient();
1484-
await client.methods.nodesConnectionSignalFinal({
1485-
sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId),
1486-
targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId),
1487-
address,
1488-
requestSignature: requestSignature,
1489-
relaySignature: relaySignature.toString('base64url'),
1490-
});
1491-
})
1525+
const connectionSignalP = this.withConnF(
1526+
targetNodeId,
1527+
ctx,
1528+
async (conn) => {
1529+
const client = conn.getClient();
1530+
await client.methods.nodesConnectionSignalFinal({
1531+
sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId),
1532+
targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId),
1533+
address: address,
1534+
requestSignature: requestSignature,
1535+
relaySignature: relaySignature.toString('base64url'),
1536+
});
1537+
},
1538+
)
14921539
// Ignore results and failures, then are expected to happen and are allowed
14931540
.then(
14941541
() => {},
@@ -1745,19 +1792,40 @@ class NodeConnectionManager {
17451792
* Returns a promise that resolves once the connection has authenticated,
17461793
* otherwise it rejects with the authentication failure
17471794
* @param nodeId
1795+
* @param ctx
17481796
*/
1749-
public async isAuthenticatedP(nodeId: NodeId): Promise<void> {
1797+
public async isAuthenticatedP(
1798+
nodeId: NodeId,
1799+
ctx?: Partial<ContextTimedInput>,
1800+
): Promise<void>;
1801+
@timedCancellable(
1802+
true,
1803+
(nodeConnectionManager: NodeConnectionManager) =>
1804+
nodeConnectionManager.connectionConnectTimeoutTime,
1805+
)
1806+
public async isAuthenticatedP(
1807+
nodeId: NodeId,
1808+
@context ctx: ContextTimed,
1809+
): Promise<void> {
1810+
ctx.signal.throwIfAborted();
17501811
const targetNodeIdString = nodeId.toString() as NodeIdString;
17511812
const connectionsEntry = this.connections.get(targetNodeIdString);
17521813
if (connectionsEntry == null) {
17531814
throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound();
17541815
}
1816+
const { p: abortP, rejectP: rejectAbortP } = utils.promise<never>();
1817+
const abortHandler = () => {
1818+
rejectAbortP(ctx.signal.reason);
1819+
};
1820+
ctx.signal.addEventListener('abort', abortHandler, { once: true });
17551821
try {
1756-
return await connectionsEntry.authenticatedP;
1822+
return await Promise.race([connectionsEntry.authenticatedP, abortP]);
17571823
} catch (e) {
17581824
// Capture the stacktrace here since knowing where we're waiting for authentication is more useful
17591825
Error.captureStackTrace(e);
17601826
throw e;
1827+
} finally {
1828+
ctx.signal.removeEventListener('abort', abortHandler);
17611829
}
17621830
}
17631831

0 commit comments

Comments
 (0)