Skip to content

Commit 17638a1

Browse files
authored
Merge pull request #914 from MatrixAI/feat-general-fixes
Small fixes and optimisations
2 parents 7701da6 + 7be67ce commit 17638a1

File tree

4 files changed

+165
-81
lines changed

4 files changed

+165
-81
lines changed

src/discovery/Discovery.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,11 @@ class Discovery {
460460
return;
461461
}
462462
// Iterate over each of the claims in the chain (already verified).
463-
for (const signedClaim of Object.values(vertexChainData)) {
463+
const processedClaimIds: Set<string> = new Set();
464+
for (const [claimIdString, signedClaim] of Object.entries(
465+
vertexChainData,
466+
)) {
467+
processedClaimIds.add(claimIdString);
464468
switch (signedClaim.payload.typ) {
465469
case 'ClaimLinkNode':
466470
await this.processClaimLinkNode(
@@ -483,6 +487,23 @@ class Discovery {
483487
);
484488
}
485489
}
490+
// Queue up known linked vertices that weren't just processed
491+
for await (const [gestaltId, gestaltLink] of this.gestaltGraph.getLinks([
492+
'node',
493+
nodeId,
494+
])) {
495+
const claimIdString = decodeClaimId(
496+
gestaltLink[1].claim.payload.jti,
497+
)!.toString();
498+
if (!processedClaimIds.has(claimIdString)) {
499+
await this.scheduleDiscoveryForVertex(
500+
gestaltId,
501+
undefined,
502+
lastProcessedCutoffTime,
503+
['node', nodeId],
504+
);
505+
}
506+
}
486507
await this.gestaltGraph.setVertexProcessedTime(
487508
gestaltNodeId,
488509
processedTime,

src/nodes/NodeManager.ts

Lines changed: 134 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import type {
3434
NodeAddress,
3535
NodeBucket,
3636
NodeBucketIndex,
37+
NodeContact,
3738
NodeContactAddressData,
3839
NodeId,
3940
NodeIdEncoded,
@@ -1145,48 +1146,68 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
11451146
nodeConnectionsQueue: NodeConnectionQueue,
11461147
ctx: ContextTimed,
11471148
) {
1148-
await this.nodeConnectionManager.withConnF(nodeId, ctx, async (conn) => {
1149-
const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget);
1150-
const closestConnectionsRequestP = (async () => {
1151-
const resultStream =
1152-
await conn.rpcClient.methods.nodesClosestActiveConnectionsGet(
1153-
{
1154-
nodeIdEncoded: nodeIdEncoded,
1155-
},
1156-
ctx,
1157-
);
1158-
// Collecting results
1159-
for await (const result of resultStream) {
1160-
ctx.signal.throwIfAborted();
1161-
const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId);
1162-
if (nodeIdNew == null) {
1163-
utils.never(`failed to decode NodeId "${result.nodeId}"`);
1149+
const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget);
1150+
const closestConnectionsRequestP = (async () => {
1151+
const data = await this.nodeConnectionManager.withConnF(
1152+
nodeId,
1153+
ctx,
1154+
async (conn) => {
1155+
const resultStream =
1156+
await conn.rpcClient.methods.nodesClosestActiveConnectionsGet(
1157+
{
1158+
nodeIdEncoded: nodeIdEncoded,
1159+
},
1160+
ctx,
1161+
);
1162+
const connections: Array<NodeId> = [];
1163+
// Collecting results
1164+
for await (const result of resultStream) {
1165+
ctx.signal.throwIfAborted();
1166+
const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId);
1167+
if (nodeIdNew == null) {
1168+
utils.never(`failed to decode NodeId "${result.nodeId}"`);
1169+
}
1170+
connections.push(nodeIdNew);
11641171
}
1165-
nodeConnectionsQueue.queueNodeSignal(nodeIdNew, nodeId);
1166-
}
1167-
})();
1168-
const closestNodesRequestP = (async () => {
1169-
const resultStream =
1170-
await conn.rpcClient.methods.nodesClosestLocalNodesGet(
1171-
{
1172-
nodeIdEncoded: nodeIdEncoded,
1173-
},
1174-
ctx,
1175-
);
1176-
for await (const { nodeIdEncoded, nodeContact } of resultStream) {
1177-
ctx.signal.throwIfAborted();
1178-
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
1179-
if (nodeId == null) {
1180-
utils.never(`failed to decode NodeId "${nodeIdEncoded}"`);
1172+
return connections;
1173+
},
1174+
);
1175+
for (const nodeIdNew of data) {
1176+
nodeConnectionsQueue.queueNodeSignal(nodeIdNew, nodeId);
1177+
}
1178+
})();
1179+
const closestNodesRequestP = (async () => {
1180+
const data = await this.nodeConnectionManager.withConnF(
1181+
nodeId,
1182+
ctx,
1183+
async (conn) => {
1184+
const resultStream =
1185+
await conn.rpcClient.methods.nodesClosestLocalNodesGet(
1186+
{
1187+
nodeIdEncoded: nodeIdEncoded,
1188+
},
1189+
ctx,
1190+
);
1191+
const data: Array<[NodeId, NodeContact]> = [];
1192+
for await (const { nodeIdEncoded, nodeContact } of resultStream) {
1193+
ctx.signal.throwIfAborted();
1194+
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
1195+
if (nodeId == null) {
1196+
utils.never(`failed to decode NodeId "${nodeIdEncoded}"`);
1197+
}
1198+
data.push([nodeId, nodeContact]);
11811199
}
1182-
nodeConnectionsQueue.queueNodeDirect(nodeId, nodeContact);
1183-
}
1184-
})();
1185-
await Promise.allSettled([
1186-
closestConnectionsRequestP,
1187-
closestNodesRequestP,
1188-
]);
1189-
});
1200+
return data;
1201+
},
1202+
);
1203+
for (const [nodeId, nodeContact] of data) {
1204+
nodeConnectionsQueue.queueNodeDirect(nodeId, nodeContact);
1205+
}
1206+
})();
1207+
await Promise.allSettled([
1208+
closestConnectionsRequestP,
1209+
closestNodesRequestP,
1210+
]);
11901211
}
11911212

11921213
/**
@@ -1256,44 +1277,75 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
12561277
}
12571278
}
12581279

1280+
/**
1281+
* Will attempt to make a direct connection without ICE.
1282+
* This will only succeed due to these conditions
1283+
* 1. connection already exists to target.
1284+
* 2. Nat already allows port due to already being punched.
1285+
* 3. Port is publicly accessible due to nat configuration .
1286+
* Will return true if connection was established or already exists, false otherwise.
1287+
*/
1288+
public pingNodeAddressMultiple(
1289+
nodeId: NodeId,
1290+
addresses: Array<[Host, Port]>,
1291+
ctx?: Partial<ContextTimedInput>,
1292+
): PromiseCancellable<boolean>;
1293+
@startStop.ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
1294+
@decorators.timedCancellable(
1295+
true,
1296+
(nodeConnectionManager: NodeConnectionManager<Manifest>) =>
1297+
nodeConnectionManager.connectionConnectTimeoutTime,
1298+
)
1299+
public async pingNodeAddressMultiple(
1300+
nodeId: NodeId,
1301+
addresses: Array<[Host, Port]>,
1302+
@decorators.context ctx: ContextTimed,
1303+
): Promise<boolean> {
1304+
if (this.nodeConnectionManager.hasConnection(nodeId)) return true;
1305+
try {
1306+
await this.nodeConnectionManager.createConnectionMultiple(
1307+
[nodeId],
1308+
addresses,
1309+
ctx,
1310+
);
1311+
return true;
1312+
} catch (e) {
1313+
if (!nodesUtils.isConnectionError(e)) throw e;
1314+
return false;
1315+
}
1316+
}
1317+
12591318
/**
12601319
* Connects to the target node, and retrieves its sigchain data.
12611320
* Verifies and returns the decoded chain as ChainData. Note: this will drop
12621321
* any unverifiable claims.
12631322
* For node1 -> node2 claims, the verification process also involves connecting
12641323
* to node2 to verify the claim (to retrieve its signing public key).
12651324
* @param targetNodeId Id of the node to connect request the chain data of.
1266-
* @param _claimId If set then we get the claims newer that this claim ID.
1325+
* @param claimId If set then we get the claims newer that this claim ID.
12671326
* @param ctx
12681327
*/
12691328
public requestChainData(
12701329
targetNodeId: NodeId,
1271-
_claimId?: ClaimId,
1330+
claimId?: ClaimId,
12721331
ctx?: Partial<ContextTimed>,
12731332
): PromiseCancellable<Record<ClaimId, SignedClaim>>;
12741333
@decorators.timedCancellable(true)
12751334
public async requestChainData(
12761335
targetNodeId: NodeId,
1277-
_claimId: ClaimId | undefined,
1336+
claimId: ClaimId | undefined,
12781337
@decorators.context ctx: ContextTimed,
12791338
): Promise<Record<ClaimId, SignedClaim>> {
12801339
// Verify the node's chain with its own public key
12811340
return await this.withConnF(targetNodeId, ctx, async (connection) => {
12821341
const claims: Record<ClaimId, SignedClaim> = {};
12831342
const client = connection.getClient();
12841343

1285-
// Let claimIdEncoded: ClaimIdEncoded | undefined;
1286-
1287-
// if (claimId != null) {
1288-
// claimIdEncoded = claimsUtils.encodeClaimId(claimId);
1289-
// } else {
1290-
// claimIdEncoded = undefined;
1291-
// }
1292-
1344+
const claimIdEncoded: ClaimIdEncoded | undefined =
1345+
claimId != null ? claimsUtils.encodeClaimId(claimId) : undefined;
12931346
for await (const agentClaim of await client.methods.nodesClaimsGet(
12941347
{
1295-
// Needs to be addressed later - causes test failures in Discovery.test.ts
1296-
// seek: claimIdEncoded,
1348+
seek: claimIdEncoded,
12971349
},
12981350
ctx,
12991351
)) {
@@ -2339,7 +2391,7 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
23392391
let removedNodes = 0;
23402392
const unsetLock = new Lock();
23412393
const pendingPromises: Array<Promise<void>> = [];
2342-
for (const [nodeId] of bucket) {
2394+
for (const [nodeId, nodeContact] of bucket) {
23432395
if (removedNodes >= pendingNodes.size) break;
23442396
await semaphore.waitForUnlock(ctx);
23452397
if (ctx.signal?.aborted === true) break;
@@ -2351,21 +2403,34 @@ class NodeManager<Manifest extends AgentClientManifestNodeManager> {
23512403
signal: ctx.signal,
23522404
timer: connectionConnectTimeoutTime,
23532405
};
2354-
const pingResult = await this.pingNode(nodeId, pingCtx);
2355-
if (pingResult != null) {
2356-
// Succeeded so update
2357-
const [nodeAddress, nodeContactAddressData] = pingResult;
2358-
await this.setNode(
2359-
nodeId,
2360-
nodeAddress,
2361-
nodeContactAddressData,
2362-
false,
2363-
false,
2364-
undefined,
2365-
tran,
2366-
ctx,
2367-
);
2368-
} else {
2406+
// Getting known addresses for the ping
2407+
const desiredAddresses: Array<NodeAddress> = [];
2408+
for (const [
2409+
nodeContactAddress,
2410+
nodeContactAddressData,
2411+
] of Object.entries(nodeContact)) {
2412+
if (nodeContactAddressData.mode === 'direct') {
2413+
desiredAddresses.push(
2414+
nodesUtils.parseNodeContactAddress(nodeContactAddress),
2415+
);
2416+
}
2417+
}
2418+
2419+
const resolvedAddresses = await networkUtils.resolveHostnames(
2420+
desiredAddresses,
2421+
undefined,
2422+
this.dnsServers,
2423+
ctx,
2424+
);
2425+
2426+
const pingResult = await this.pingNodeAddressMultiple(
2427+
nodeId,
2428+
resolvedAddresses,
2429+
pingCtx,
2430+
);
2431+
2432+
// If ping fails we remove it, otherwise we don't update
2433+
if (!pingResult) {
23692434
// We don't remove node the ping was aborted
23702435
if (ctx.signal.aborted) return;
23712436
// We need to lock this since it's concurrent

src/utils/utils.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,6 @@ function sleepCancellable(ms: number): PromiseCancellable<void> {
114114
/**
115115
* Checks if value is an object.
116116
* Arrays are also considered objects.
117-
* The type guard here says `o is any`.
118-
* TODO: When TS 4.9.x is released, change this to `o is object`.
119117
* At that point `'x' in o` checks become type guards that
120118
* can assert the property's existence.
121119
*/
@@ -298,8 +296,8 @@ function promise<T = void>(): PromiseDeconstructed<T> {
298296
* Promise constructed from signal
299297
* This rejects when the signal is aborted
300298
*/
301-
// fixme: There is also a one signal to many `signalPromise` relationship in the NM connection queue that needs to be fixed.
302299
function signalPromise(signal: AbortSignal): PromiseCancellable<void> {
300+
setMaxListeners(signal);
303301
return new PromiseCancellable((resolve, _, signalCancel) => {
304302
// Short circuit if signal already aborted
305303
if (signal.aborted) return resolve();

tests/nodes/NodeManager.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ describe(`${NodeManager.name}`, () => {
350350
);
351351
});
352352
test('should not add new node if bucket is full and old nodes are responsive', async () => {
353-
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
353+
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
354354
// Fill bucket
355355
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
356356
for (let i = 0; i < 20; i++) {
@@ -362,7 +362,7 @@ describe(`${NodeManager.name}`, () => {
362362
});
363363
}
364364

365-
mockedPingNode.mockResolvedValue([nodeAddress, nodeContactAddressData]);
365+
mockedPingNode.mockResolvedValue(true);
366366
// Add 21st node
367367
await nodeManager.setNode(
368368
nodeId,
@@ -374,7 +374,7 @@ describe(`${NodeManager.name}`, () => {
374374
expect(await nodeGraph.getNodeContact(nodeId)).toBeUndefined();
375375
});
376376
test('should add new node if bucket is full and old nodes are responsive but force is set', async () => {
377-
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
377+
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
378378
// Fill bucket
379379
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
380380
for (let i = 0; i < 20; i++) {
@@ -386,7 +386,7 @@ describe(`${NodeManager.name}`, () => {
386386
});
387387
}
388388

389-
mockedPingNode.mockResolvedValue([nodeAddress, nodeContactAddressData]);
389+
mockedPingNode.mockResolvedValue(true);
390390
// Add 21st node
391391
await nodeManager.setNode(
392392
nodeId,
@@ -399,15 +399,15 @@ describe(`${NodeManager.name}`, () => {
399399
expect(await nodeGraph.getNodeContact(nodeId)).toBeDefined();
400400
});
401401
test('should add new node if bucket is full and old nodes are unresponsive', async () => {
402-
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
402+
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
403403
// Fill bucket
404404
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
405405
for (let i = 0; i < 20; i++) {
406406
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, i + 1);
407407
await nodeManager.setNode(nodeId, nodeAddress, nodeContactAddressData);
408408
}
409409

410-
mockedPingNode.mockResolvedValue(undefined);
410+
mockedPingNode.mockResolvedValue(false);
411411
// Add 21st node
412412
await nodeManager.setNode(
413413
nodeId,
@@ -419,7 +419,7 @@ describe(`${NodeManager.name}`, () => {
419419
expect(await nodeGraph.getNodeContact(nodeId)).toBeDefined();
420420
});
421421
test('should not block when bucket is full', async () => {
422-
const mockedPingNode = jest.spyOn(nodeManager, 'pingNode');
422+
const mockedPingNode = jest.spyOn(nodeManager, 'pingNodeAddressMultiple');
423423
// Fill bucket
424424
const nodeId = generateNodeIdForBucket(keyRing.getNodeId(), 255, 0);
425425
for (let i = 0; i < 20; i++) {
@@ -436,7 +436,7 @@ describe(`${NodeManager.name}`, () => {
436436
mockedPingNode.mockImplementation(() => {
437437
return new PromiseCancellable(async (resolve) => {
438438
await waitP;
439-
resolve(undefined);
439+
resolve(false);
440440
});
441441
});
442442
// Add 21st node

0 commit comments

Comments
 (0)