Skip to content

Commit 615c5da

Browse files
committed
chore: added cancellation to discovery background task
chore: rebased onto staging for esm fix: build wip: cleaning up duplex auth handler [ci skip]
1 parent 11972b9 commit 615c5da

File tree

9 files changed

+212
-65
lines changed

9 files changed

+212
-65
lines changed

src/nodes/NodeConnection.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,9 @@ class NodeConnection {
169169
targetHostname,
170170
tlsConfig,
171171
connectionKeepAliveIntervalTime,
172-
connectionKeepAliveTimeoutTime = config.defaultsSystem
173-
.nodesConnectionIdleTimeoutTimeMin,
174-
connectionInitialMaxStreamsBidi = config.defaultsSystem
175-
.nodesConnectionInitialMaxStreamsBidi,
176-
connectionInitialMaxStreamsUni = config.defaultsSystem
177-
.nodesConnectionInitialMaxStreamsUni,
172+
connectionKeepAliveTimeoutTime,
173+
connectionInitialMaxStreamsBidi,
174+
connectionInitialMaxStreamsUni,
178175
quicSocket,
179176
manifest,
180177
logger,
@@ -301,9 +298,7 @@ class NodeConnection {
301298
const rpcClient = new RPCClient<AgentClientManifest>({
302299
manifest,
303300
middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper(),
304-
streamFactory: async () => {
305-
return quicConnection.newStream();
306-
},
301+
streamFactory: async () => quicConnection.newStream(),
307302
toError: networkUtils.toError,
308303
logger: logger.getChild(RPCClient.name),
309304
});

src/nodes/NodeConnectionManager.ts

Lines changed: 109 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -654,13 +654,10 @@ class NodeConnectionManager {
654654
const cancelAuthenticationPs: Array<PromiseCancellable<void>> = [];
655655
const cancelReason = new nodesErrors.ErrorNodeConnectionManagerStopping();
656656
for (const [nodeIdString] of this.connections) {
657-
const destroyP = this.authenticateCancel(nodeIdString, cancelReason).then(
658-
async () => {
659-
return await this.destroyConnection(
660-
IdInternal.fromString<NodeId>(nodeIdString),
661-
force,
662-
);
663-
},
657+
this.authenticateCancel(nodeIdString, cancelReason);
658+
const destroyP = this.destroyConnection(
659+
IdInternal.fromString<NodeId>(nodeIdString),
660+
force,
664661
);
665662
destroyConnectionPs.push(destroyP);
666663
}
@@ -738,12 +735,13 @@ class NodeConnectionManager {
738735
connectionAndTimer.connection.connectionId,
739736
);
740737
connectionAndTimer.timer = new Timer({
741-
handler: async () =>
738+
handler: async () => {
742739
await this.destroyConnection(
743740
targetNodeId,
744741
false,
745742
connectionAndTimer.connection.connectionId,
746-
),
743+
);
744+
},
747745
delay,
748746
});
749747
// Prevent unhandled exceptions when cancelling
@@ -878,6 +876,7 @@ class NodeConnectionManager {
878876
ctx,
879877
);
880878
this.addConnection(nodeConnection.validatedNodeId, nodeConnection);
879+
this.initiateForwardAuthenticate(nodeConnection.nodeId);
881880
// Dispatch the connection event
882881
const connectionData: ConnectionData = {
883882
remoteNodeId: nodeConnection.nodeId,
@@ -1050,6 +1049,7 @@ class NodeConnectionManager {
10501049

10511050
// Creating TTL timeout.
10521051
// Add to map
1052+
// TODO: update type to something like ConnectionDetails
10531053
const newConnAndTimer: ConnectionAndTimer = {
10541054
connection: nodeConnection,
10551055
timer: null,
@@ -1091,7 +1091,6 @@ class NodeConnectionManager {
10911091
authenticatedRejectP,
10921092
};
10931093
this.connections.set(nodeIdString, entry);
1094-
this.initiateForwardAuthenticate(nodeId);
10951094
} else {
10961095
// Adding connection to existing entry
10971096
newConnAndTimer.timer = new Timer({
@@ -1177,7 +1176,7 @@ class NodeConnectionManager {
11771176
const remainingKeys = Object.keys(connectionsEntry.connections);
11781177
if (remainingKeys.length === 0) {
11791178
// Clean up authentication
1180-
await this.authenticateCancel(
1179+
this.authenticateCancel(
11811180
targetNodeIdString,
11821181
new nodesErrors.ErrorNodeManagerAuthenticationFailed(
11831182
'Connection destroyed before authentication could complete',
@@ -1663,10 +1662,52 @@ class NodeConnectionManager {
16631662
const authenticateMessage =
16641663
await this.authenticateNetworkForwardCallback(ctx);
16651664
await withF([this.acquireConnectionInternal(nodeId)], async ([conn]) => {
1666-
await conn.rpcClient.methods.nodesAuthenticateConnection(
1667-
authenticateMessage,
1668-
ctx,
1669-
);
1665+
const authStream =
1666+
await conn.rpcClient.methods.nodesAuthenticateConnection(ctx);
1667+
const writer = authStream.writable.getWriter();
1668+
const reader = authStream.readable.getReader();
1669+
await writer.write(authenticateMessage);
1670+
const reverseMessageIn = (await reader.read()).value;
1671+
// If the reverse auth was unsuccessful, error out gracefully.
1672+
if (
1673+
reverseMessageIn == null ||
1674+
reverseMessageIn.type !== 'success' ||
1675+
!reverseMessageIn.success
1676+
) {
1677+
throw new nodesErrors.ErrorNodeManagerAuthenticationFailedForward(
1678+
'Unsuccessful forward response',
1679+
);
1680+
}
1681+
const forwardMessageIn = (await reader.read()).value;
1682+
// If the forward message retrieval was unsuccessful, error out.
1683+
if (forwardMessageIn == null || forwardMessageIn.type === 'success') {
1684+
throw new nodesErrors.ErrorNodeManagerAuthenticationFailedForward(
1685+
'Invalid forward message',
1686+
);
1687+
}
1688+
try {
1689+
await this.handleReverseAuthenticate(
1690+
conn.nodeId,
1691+
forwardMessageIn,
1692+
ctx,
1693+
);
1694+
} catch (e) {
1695+
await writer.close();
1696+
await reader.cancel();
1697+
throw e;
1698+
}
1699+
// If reverse authentication finished without errors, then we continue
1700+
await writer.write({ type: 'success', success: true });
1701+
const ack = (await reader.read()).value;
1702+
1703+
if (ack == null || ack.type !== 'success' || !ack.success) {
1704+
throw new nodesErrors.ErrorNodeManagerAuthenticationFailedForward(
1705+
'Invalid ack response',
1706+
);
1707+
}
1708+
1709+
await writer.close();
1710+
await reader.cancel();
16701711
});
16711712
connectionsEntry.authenticatedForward = AuthenticatingState.SUCCESS;
16721713
} catch (e) {
@@ -1743,7 +1784,7 @@ class NodeConnectionManager {
17431784
/**
17441785
* Will initiate a forward authentication call and coalesce
17451786
*/
1746-
public initiateForwardAuthenticate(nodeId: NodeId) {
1787+
public initiateForwardAuthenticate(nodeId: NodeId): void {
17471788
// Needs check the map if one is already running, otherwise it needs to start one and manage it.
17481789
const nodeIdString = nodeId.toString() as NodeIdString;
17491790
const authenticationEntry = this.connections.get(nodeIdString);
@@ -1752,8 +1793,12 @@ class NodeConnectionManager {
17521793
}
17531794
const existingAuthenticate =
17541795
this.activeForwardAuthenticateCalls.get(nodeIdString);
1796+
17551797
// If it exists in the map then we don't need to start one and can just return
17561798
if (existingAuthenticate != null) return;
1799+
// TODO: retry if fail
1800+
// create retry authenticatoin on new connections if the existing connection
1801+
// had a failed authentication. put a limit - soemwhere around 3.
17571802
if (
17581803
authenticationEntry.authenticatedForward !==
17591804
AuthenticatingState.PENDING ||
@@ -1762,10 +1807,8 @@ class NodeConnectionManager {
17621807
return;
17631808
}
17641809
// Otherwise we need to start one and add it to the map
1765-
const forwardAuthenticateP = this.forwardAuthenticate(nodeId).finally(
1766-
() => {
1767-
this.activeForwardAuthenticateCalls.delete(nodeIdString);
1768-
},
1810+
const forwardAuthenticateP = this.forwardAuthenticate(nodeId).finally(() =>
1811+
this.activeForwardAuthenticateCalls.delete(nodeIdString),
17691812
);
17701813
// Prevent unhandled errors
17711814
forwardAuthenticateP.then(
@@ -1775,6 +1818,40 @@ class NodeConnectionManager {
17751818
this.activeForwardAuthenticateCalls.set(nodeIdString, forwardAuthenticateP);
17761819
}
17771820

1821+
public handleAuthentication(
1822+
requestingNodeId: NodeId,
1823+
forwardMessage: NodesAuthenticateConnectionMessage,
1824+
ctx: ContextTimed,
1825+
): PromiseCancellable<NodesAuthenticateConnectionMessage>;
1826+
@decorators.timedCancellable(true)
1827+
public async handleAuthentication(
1828+
requestingNodeId: NodeId,
1829+
forwardMessage: NodesAuthenticateConnectionMessage,
1830+
@decorators.context ctx: ContextTimed,
1831+
): Promise<NodesAuthenticateConnectionMessage> {
1832+
const requestingNodeIdString = requestingNodeId.toString() as NodeIdString;
1833+
const connectionEntry = this.connections.get(requestingNodeIdString);
1834+
if (connectionEntry == null) utils.never('Connection should be defined');
1835+
1836+
await this.handleReverseAuthenticate(requestingNodeId, forwardMessage, ctx);
1837+
connectionEntry.authenticatedReverse = AuthenticatingState.SUCCESS;
1838+
1839+
return await this.authenticateNetworkForwardCallback(ctx);
1840+
}
1841+
1842+
public finalizeAuthentication(requestingNodeId: NodeId, success: boolean) {
1843+
const requestingNodeIdString = requestingNodeId.toString() as NodeIdString;
1844+
const connectionEntry = this.connections.get(requestingNodeIdString);
1845+
if (connectionEntry == null) utils.never('Connection should be defined');
1846+
if (success) {
1847+
connectionEntry.authenticatedReverse = AuthenticatingState.SUCCESS;
1848+
this.authenticateSuccess(requestingNodeIdString);
1849+
} else {
1850+
connectionEntry.authenticatedReverse = AuthenticatingState.FAIL;
1851+
this.authenticateFail(requestingNodeIdString, new Error('temp'));
1852+
}
1853+
}
1854+
17781855
/**
17791856
* Returns true if the connection has been authenticated
17801857
*/
@@ -1808,7 +1885,6 @@ class NodeConnectionManager {
18081885
nodeId: NodeId,
18091886
@decorators.context ctx: ContextTimed,
18101887
): Promise<void> {
1811-
ctx.signal.throwIfAborted();
18121888
const targetNodeIdString = nodeId.toString() as NodeIdString;
18131889
const connectionsEntry = this.connections.get(targetNodeIdString);
18141890
if (connectionsEntry == null) {
@@ -1818,11 +1894,20 @@ class NodeConnectionManager {
18181894
const abortHandler = () => {
18191895
rejectAbortP(ctx.signal.reason);
18201896
};
1821-
ctx.signal.addEventListener('abort', abortHandler, { once: true });
1897+
if (ctx.signal.aborted) {
1898+
abortHandler();
1899+
} else {
1900+
ctx.signal.addEventListener('abort', abortHandler, { once: true });
1901+
}
1902+
// If the connection isn't already authenticated, then try authenticating
1903+
if (!connectionsEntry.authenticateComplete) {
1904+
this.initiateForwardAuthenticate(nodeId);
1905+
}
18221906
try {
18231907
return await Promise.race([connectionsEntry.authenticatedP, abortP]);
18241908
} catch (e) {
1825-
// Capture the stacktrace here since knowing where we're waiting for authentication is more useful
1909+
// Capture the stacktrace here since knowing where we're waiting for
1910+
// authentication is more useful.
18261911
Error.captureStackTrace(e);
18271912
throw e;
18281913
} finally {
@@ -1881,7 +1966,7 @@ class NodeConnectionManager {
18811966
}
18821967
}
18831968

1884-
protected async authenticateCancel(
1969+
protected authenticateCancel(
18851970
targetNodeIdString: NodeIdString,
18861971
reason: Error,
18871972
) {

src/nodes/NodeGraph.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import type {
1414
import type KeyRing from '../keys/KeyRing.js';
1515
import Logger from '@matrixai/logger';
1616
import { createDestroyStartStop } from '@matrixai/async-init';
17-
import { IdInternal } from '@matrixai/id';
1817
import { decorators } from '@matrixai/contexts';
18+
import { IdInternal } from '@matrixai/id';
1919
import * as nodesUtils from './utils.js';
2020
import * as nodesErrors from './errors.js';
2121
import * as nodesEvents from './events.js';
@@ -291,18 +291,20 @@ class NodeGraph {
291291
@createDestroyStartStop.ready(new nodesErrors.ErrorNodeGraphNotRunning())
292292
public async *getNodeContacts(
293293
order: 'asc' | 'desc' = 'asc',
294-
tran?: DBTransaction,
294+
tran: DBTransaction | undefined,
295+
ctx: ContextTimed,
295296
): AsyncGenerator<[NodeId, NodeContact]> {
296297
if (tran == null) {
297298
// Lambda generators don't grab the `this` context, so we need to bind it
298-
const getNodeContacts = (tran) => this.getNodeContacts(order, tran);
299+
const getNodeContacts = (tran) => this.getNodeContacts(order, tran, ctx);
299300
return yield* this.db.withTransactionG(async function* (tran) {
300301
return yield* getNodeContacts(tran);
301302
});
302303
}
303304
return yield* nodesUtils.collectNodeContacts(
304305
[...this.nodeGraphBucketsDbPath],
305306
tran,
307+
ctx,
306308
{ reverse: order !== 'asc' },
307309
);
308310
}
@@ -659,6 +661,7 @@ class NodeGraph {
659661
for await (const result of nodesUtils.collectNodeContacts(
660662
[...this.nodeGraphBucketsDbPath, bucketKey],
661663
tran,
664+
ctx,
662665
{
663666
reverse: order !== 'asc',
664667
limit,
@@ -736,10 +739,18 @@ class NodeGraph {
736739
* Resets the bucket according to the new node ID.
737740
* Run this after new node ID is generated via renewal or reset.
738741
*/
742+
public async resetBuckets(
743+
tran?: DBTransaction,
744+
ctx?: ContextTimed,
745+
): Promise<void>;
739746
@createDestroyStartStop.ready(new nodesErrors.ErrorNodeGraphNotRunning())
740-
public async resetBuckets(tran?: DBTransaction): Promise<void> {
747+
@decorators.timedCancellable(true)
748+
public async resetBuckets(
749+
tran: DBTransaction | undefined,
750+
@decorators.context ctx: ContextTimed,
751+
): Promise<void> {
741752
if (tran == null) {
742-
return this.db.withTransactionF((tran) => this.resetBuckets(tran));
753+
return this.db.withTransactionF((tran) => this.resetBuckets(tran, ctx));
743754
}
744755
// Setup new space
745756
const spaceNew = this.space === '0' ? '1' : '0';
@@ -760,6 +771,7 @@ class NodeGraph {
760771
for await (const [nodeId, nodeContact] of nodesUtils.collectNodeContacts(
761772
[...this.nodeGraphBucketsDbPath],
762773
tran,
774+
ctx,
763775
)) {
764776
const nodeIdKey = nodesUtils.bucketDbKey(nodeId);
765777
const nodeIdOwn = this.keyRing.getNodeId();
@@ -964,12 +976,12 @@ class NodeGraph {
964976
for await (const nodeEntry of nodesUtils.collectNodeContacts(
965977
this.nodeGraphBucketsDbPath,
966978
tran,
979+
ctx,
967980
{
968981
lt: [bucketIdKey, ''],
969982
limit: remainingLimit,
970983
},
971984
)) {
972-
ctx.signal.throwIfAborted();
973985
nodes.push(nodeEntry);
974986
}
975987
}
@@ -981,12 +993,12 @@ class NodeGraph {
981993
for await (const nodeEntry of nodesUtils.collectNodeContacts(
982994
this.nodeGraphBucketsDbPath,
983995
tran,
996+
ctx,
984997
{
985998
gt: [bucketId, ''],
986999
limit: remainingLimit,
9871000
},
9881001
)) {
989-
ctx.signal.throwIfAborted();
9901002
nodes.push(nodeEntry);
9911003
}
9921004
}

0 commit comments

Comments
 (0)