diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index 7d32add22..d5f3865bd 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -161,7 +161,7 @@ class Discovery { this.dispatchEvent( new discoveryEvents.EventDiscoveryVertexProcessed({ detail: { - vertex, + vertex: vertex, parent: parent ?? undefined, }, }), @@ -190,7 +190,7 @@ class Discovery { this.dispatchEvent( new discoveryEvents.EventDiscoveryVertexFailed({ detail: { - vertex, + vertex: vertex, parent: parent ?? undefined, message: e.message, code: e.code, @@ -206,9 +206,13 @@ class Discovery { /** * This handler is run periodically to check if nodes are ready to be rediscovered */ - protected checkRediscoveryHandler: TaskHandler = async () => { + protected checkRediscoveryHandler: TaskHandler = async ( + ctx: ContextTimed, + ) => { await this.checkRediscovery( Date.now() - this.rediscoverVertexThresholdTime, + undefined, + ctx, ); await this.taskManager.scheduleTask({ handlerId: this.checkRediscoveryHandlerId, @@ -407,9 +411,9 @@ class Discovery { const [type, id] = vertexId; switch (type) { case 'node': - return await this.processNode(id, ctx, lastProcessedCutoffTime); + return await this.processNode(id, lastProcessedCutoffTime, ctx); case 'identity': - return await this.processIdentity(id, ctx, lastProcessedCutoffTime); + return await this.processIdentity(id, lastProcessedCutoffTime, ctx); default: never(`type must be either "node" or "identity" got "${type}"`); } @@ -417,8 +421,8 @@ class Discovery { protected async processNode( nodeId: NodeId, + lastProcessedCutoffTime: number | undefined, ctx: ContextTimed, - lastProcessedCutoffTime?: number, ) { // If the vertex we've found is our own node, we simply get our own chain const processedTime = Date.now(); @@ -456,7 +460,6 @@ class Discovery { } // Iterate over each of the claims in the chain (already verified). for (const signedClaim of Object.values(vertexChainData)) { - if (ctx.signal.aborted) throw ctx.signal.reason; switch (signedClaim.payload.typ) { case 'ClaimLinkNode': await this.processClaimLinkNode( @@ -469,8 +472,8 @@ class Discovery { await this.processClaimLinkIdentity( signedClaim as SignedClaim, nodeId, - ctx, lastProcessedCutoffTime, + ctx, ); break; default: @@ -553,8 +556,8 @@ class Discovery { protected async processClaimLinkIdentity( signedClaim: SignedClaim, nodeId: NodeId, - ctx: ContextTimed, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, + ctx: ContextTimed, ): Promise { // Checking the claim is valid const publicKey = keysUtils.publicKeyFromNodeId(nodeId); @@ -655,8 +658,8 @@ class Discovery { protected async processIdentity( id: ProviderIdentityId, - ctx: ContextTimed, lastProcessedCutoffTime = Date.now() - this.rediscoverSkipTime, + ctx: ContextTimed, ) { // If the next vertex is an identity, perform a social discovery // Firstly get the identity info of this identity @@ -789,7 +792,7 @@ class Discovery { parent?: GestaltId, ignoreActive: boolean = false, tran?: DBTransaction, - ) { + ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => this.scheduleDiscoveryForVertex( @@ -852,7 +855,7 @@ class Discovery { ], lazy: true, deadline: this.discoverVertexTimeoutTime, - delay, + delay: delay, }, tran, ); @@ -1034,10 +1037,17 @@ class Discovery { public async checkRediscovery( lastProcessedCutoffTime: number, tran?: DBTransaction, + ctx?: Partial, + ): Promise; + @timedCancellable(true) + public async checkRediscovery( + lastProcessedCutoffTime: number, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => - this.checkRediscovery(lastProcessedCutoffTime, tran), + this.checkRediscovery(lastProcessedCutoffTime, tran, ctx), ); } @@ -1055,6 +1065,7 @@ class Discovery { }, tran, )) { + ctx.signal.throwIfAborted(); gestaltIds.push([ gestaltsUtils.encodeGestaltId(gestaltId), lastProcessedTime, @@ -1091,6 +1102,7 @@ class Discovery { [this.constructor.name, this.discoverVertexHandlerId, gestaltIdEncoded], tran, )) { + ctx.signal.throwIfAborted(); if (taskExisting == null) { taskExisting = task; continue; diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 27bd2e6f6..bcafb1f6e 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -43,7 +43,11 @@ import { status, } from '@matrixai/async-init/dist/StartStop'; import { AbstractEvent, EventAll } from '@matrixai/events'; -import { context, timedCancellable } from '@matrixai/contexts/dist/decorators'; +import { + context, + timed, + timedCancellable, +} from '@matrixai/contexts/dist/decorators'; import { Semaphore } from '@matrixai/async-locks'; import { PromiseCancellable } from '@matrixai/async-cancellable'; import NodeConnection from './NodeConnection'; @@ -768,13 +772,15 @@ class NodeConnectionManager { * itself is such that we can pass targetNodeId as a parameter (as opposed to * an acquire function with no parameters). * @param targetNodeId Id of target node to communicate with + * @param ctx * @returns ResourceAcquire Resource API for use in with contexts */ public acquireConnection( targetNodeId: NodeId, + ctx: ContextTimed, ): ResourceAcquire { return async () => { - await this.isAuthenticatedP(targetNodeId); + await this.isAuthenticatedP(targetNodeId, ctx); return await this.acquireConnectionInternal(targetNodeId)(); }; } @@ -785,14 +791,22 @@ class NodeConnectionManager { * doesn't exist. * for use with normal arrow function * @param targetNodeId Id of target node to communicate with + * @param ctx * @param f Function to handle communication */ public async withConnF( targetNodeId: NodeId, + ctx: Partial | undefined, + f: (conn: NodeConnection) => Promise, + ): Promise; + @timedCancellable(true) + public async withConnF( + targetNodeId: NodeId, + @context ctx: ContextTimed, f: (conn: NodeConnection) => Promise, ): Promise { return await withF( - [this.acquireConnection(targetNodeId)], + [this.acquireConnection(targetNodeId, ctx)], async ([conn]) => { return await f(conn); }, @@ -805,14 +819,22 @@ class NodeConnectionManager { * doesn't exist. * for use with a generator function * @param targetNodeId Id of target node to communicate with + * @param ctx * @param g Generator function to handle communication */ + public withConnG( + targetNodeId: NodeId, + ctx: Partial | undefined, + g: (conn: NodeConnection) => AsyncGenerator, + ): AsyncGenerator; @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) + @timed() public async *withConnG( targetNodeId: NodeId, + @context ctx: ContextTimed, g: (conn: NodeConnection) => AsyncGenerator, ): AsyncGenerator { - const acquire = this.acquireConnection(targetNodeId); + const acquire = this.acquireConnection(targetNodeId, ctx); const [release, conn] = await acquire(); let caughtError: Error | undefined; try { @@ -975,6 +997,7 @@ class NodeConnectionManager { } const { host, port } = await this.withConnF( nodeIdSignaller, + ctx, async (conn) => { const client = conn.getClient(); const nodeIdSource = this.keyRing.getNodeId(); @@ -1440,8 +1463,27 @@ class NodeConnectionManager { * @param targetNodeId - NodeId of the node that needs to initiate hole punching. * @param address - Address the target needs to punch to. * @param requestSignature - `base64url` encoded signature + * @param ctx */ + public async handleNodesConnectionSignalInitial( + sourceNodeId: NodeId, + targetNodeId: NodeId, + address: { + host: Host; + port: Port; + }, + requestSignature: string, + ctx?: Partial, + ): Promise<{ + host: Host; + port: Port; + }>; @ready(new nodesErrors.ErrorNodeManagerNotRunning()) + @timedCancellable( + true, + (nodeConnectionManager: NodeConnectionManager) => + nodeConnectionManager.connectionConnectTimeoutTime, + ) public async handleNodesConnectionSignalInitial( sourceNodeId: NodeId, targetNodeId: NodeId, @@ -1450,6 +1492,7 @@ class NodeConnectionManager { port: Port; }, requestSignature: string, + @context ctx: ContextTimed, ): Promise<{ host: Host; port: Port; @@ -1479,16 +1522,20 @@ class NodeConnectionManager { this.keyRing.keyPair, data, ); - const connectionSignalP = this.withConnF(targetNodeId, async (conn) => { - const client = conn.getClient(); - await client.methods.nodesConnectionSignalFinal({ - sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId), - targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId), - address, - requestSignature: requestSignature, - relaySignature: relaySignature.toString('base64url'), - }); - }) + const connectionSignalP = this.withConnF( + targetNodeId, + ctx, + async (conn) => { + const client = conn.getClient(); + await client.methods.nodesConnectionSignalFinal({ + sourceNodeIdEncoded: nodesUtils.encodeNodeId(sourceNodeId), + targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId), + address: address, + requestSignature: requestSignature, + relaySignature: relaySignature.toString('base64url'), + }); + }, + ) // Ignore results and failures, then are expected to happen and are allowed .then( () => {}, @@ -1745,19 +1792,40 @@ class NodeConnectionManager { * Returns a promise that resolves once the connection has authenticated, * otherwise it rejects with the authentication failure * @param nodeId + * @param ctx */ - public async isAuthenticatedP(nodeId: NodeId): Promise { + public async isAuthenticatedP( + nodeId: NodeId, + ctx?: Partial, + ): Promise; + @timedCancellable( + true, + (nodeConnectionManager: NodeConnectionManager) => + nodeConnectionManager.connectionConnectTimeoutTime, + ) + public async isAuthenticatedP( + nodeId: NodeId, + @context ctx: ContextTimed, + ): Promise { + ctx.signal.throwIfAborted(); const targetNodeIdString = nodeId.toString() as NodeIdString; const connectionsEntry = this.connections.get(targetNodeIdString); if (connectionsEntry == null) { throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); } + const { p: abortP, rejectP: rejectAbortP } = utils.promise(); + const abortHandler = () => { + rejectAbortP(ctx.signal.reason); + }; + ctx.signal.addEventListener('abort', abortHandler, { once: true }); try { - return await connectionsEntry.authenticatedP; + return await Promise.race([connectionsEntry.authenticatedP, abortP]); } catch (e) { // Capture the stacktrace here since knowing where we're waiting for authentication is more useful Error.captureStackTrace(e); throw e; + } finally { + ctx.signal.removeEventListener('abort', abortHandler); } } diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index 61a9b4f86..4af01f14f 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -1,5 +1,5 @@ import type { DB, DBTransaction, LevelPath } from '@matrixai/db'; -import type { ContextTimed } from '@matrixai/contexts'; +import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts'; import type { NodeId, NodeAddress, @@ -18,6 +18,8 @@ import { ready, } from '@matrixai/async-init/dist/CreateDestroyStartStop'; import { IdInternal } from '@matrixai/id'; +import { timedCancellable } from '@matrixai/contexts/dist/decorators'; +import { context } from '@matrixai/contexts/dist/decorators'; import * as nodesUtils from './utils'; import * as nodesErrors from './errors'; import * as nodesEvents from './events'; @@ -26,7 +28,7 @@ import config from '../config'; /** * NodeGraph is an implementation of Kademlia for maintaining peer to peer - * information about Polkey nodes. + * information about Polykey nodes. * * It is a database of fixed-size buckets, where each bucket * contains NodeId -> NodeData. The bucket index is a prefix key. @@ -241,14 +243,21 @@ class NodeGraph { /** * Get a single `NodeContact` */ - @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getNodeContact( nodeId: NodeId, tran?: DBTransaction, + ctx?: Partial, + ): Promise; + @ready(new nodesErrors.ErrorNodeGraphNotRunning()) + @timedCancellable(true) + public async getNodeContact( + nodeId: NodeId, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): Promise { if (tran == null) { - return this.db.withTransactionF((tran) => - this.getNodeContact(nodeId, tran), + return await this.db.withTransactionF( + async (tran) => await this.getNodeContact(nodeId, tran, ctx), ); } const [bucketIndex] = this.bucketIndex(nodeId); @@ -266,6 +275,7 @@ class NodeGraph { valueAsBuffer: false, }, )) { + ctx.signal.throwIfAborted(); const nodeContactAddress = keyPath[0].toString(); contact[nodeContactAddress] = nodeContactAddressData; } @@ -615,18 +625,30 @@ class NodeGraph { * @param limit Limit the number of nodes returned, note that `-1` means * no limit, but `Infinity` means `0`. * @param tran + * @param ctx */ + public async getBucket( + bucketIndex: NodeBucketIndex, + sort?: 'nodeId' | 'distance' | 'connected', + order?: 'asc' | 'desc', + limit?: number, + tran?: DBTransaction, + ctx?: Partial, + ): Promise; + @timedCancellable(true) @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getBucket( bucketIndex: NodeBucketIndex, sort: 'nodeId' | 'distance' | 'connected' = 'nodeId', order: 'asc' | 'desc' = 'asc', - limit?: number, - tran?: DBTransaction, + limit: number | undefined, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): Promise { if (tran == null) { - return this.db.withTransactionF((tran) => - this.getBucket(bucketIndex, sort, order, limit, tran), + return await this.db.withTransactionF( + async (tran) => + await this.getBucket(bucketIndex, sort, order, limit, tran, ctx), ); } if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) { @@ -647,6 +669,7 @@ class NodeGraph { pathAdjust: [''], }, )) { + ctx.signal.throwIfAborted(); bucket.push(result); } if (sort === 'distance') { @@ -660,6 +683,7 @@ class NodeGraph { limit, }, )) { + ctx.signal.throwIfAborted(); const nodeId = IdInternal.fromBuffer(nodeIdBuffer); const nodeContact = await this.getNodeContact( IdInternal.fromBuffer(nodeIdBuffer), @@ -883,15 +907,23 @@ class NodeGraph { * @returns The `NodeBucket` which could have less than `limit` nodes if the * node graph has less than the requested limit. */ + public async getClosestNodes( + nodeId: NodeId, + limit?: number, + tran?: DBTransaction, + ctx?: Partial, + ): Promise; + @timedCancellable(true) @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getClosestNodes( nodeId: NodeId, limit: number = this.nodeBucketLimit, - tran?: DBTransaction, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): Promise { if (tran == null) { - return this.db.withTransactionF((tran) => - this.getClosestNodes(nodeId, limit, tran), + return await this.db.withTransactionF( + async (tran) => await this.getClosestNodes(nodeId, limit, tran), ); } // Buckets map to the target node in the following way; @@ -915,6 +947,7 @@ class NodeGraph { undefined, undefined, tran, + ctx, ); // We need to iterate over the key stream // When streaming we want all nodes in the starting bucket @@ -937,6 +970,7 @@ class NodeGraph { limit: remainingLimit, }, )) { + ctx.signal.throwIfAborted(); nodes.push(nodeEntry); } } @@ -953,6 +987,7 @@ class NodeGraph { limit: remainingLimit, }, )) { + ctx.signal.throwIfAborted(); nodes.push(nodeEntry); } } @@ -969,6 +1004,7 @@ class NodeGraph { undefined, undefined, tran, + ctx, ); // Pop off elements of the same bucket to avoid duplicates let element = nodes.pop(); diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 363ad2597..784832bd7 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -44,7 +44,11 @@ import Logger from '@matrixai/logger'; import { ready, StartStop } from '@matrixai/async-init/dist/StartStop'; import { Lock, LockBox, Semaphore } from '@matrixai/async-locks'; import { IdInternal } from '@matrixai/id'; -import { context, timedCancellable } from '@matrixai/contexts/dist/decorators'; +import { + context, + timed, + timedCancellable, +} from '@matrixai/contexts/dist/decorators'; import * as nodesUtils from './utils'; import * as nodesEvents from './events'; import * as nodesErrors from './errors'; @@ -195,7 +199,7 @@ class NodeManager { } if (connectionCount > 0) { this.logger.debug('triggering bucket refresh for bucket 255'); - await this.updateRefreshBucketDelay(255, 0); + await this.updateRefreshBucketDelay(255, 0, undefined, undefined, ctx); } try { this.logger.debug( @@ -285,6 +289,7 @@ class NodeManager { // Getting the closest node from the `NodeGraph` let bucketIndex: number | undefined; for await (const bucket of this.nodeGraph.getBuckets('distance', 'asc')) { + if (ctx.signal.aborted) return; bucketIndex = bucket[0]; } // If no buckets then end here @@ -471,7 +476,7 @@ class NodeManager { */ public acquireConnection( nodeId: NodeId, - ctx?: Partial, + ctx: ContextTimed, ): ResourceAcquire { if (this.keyRing.getNodeId().equals(nodeId)) { throw new nodesErrors.ErrorNodeManagerNodeIdOwn(); @@ -483,18 +488,16 @@ class NodeManager { // Checking if connection already exists if (!this.nodeConnectionManager.hasConnection(nodeId)) { // Establish the connection - const result = await this.findNode( - { - nodeId: nodeId, - }, - ctx, - ); + const result = await this.findNode({ nodeId: nodeId }, ctx); if (result == null) { throw new nodesErrors.ErrorNodeManagerConnectionFailed(); } } // Initiate authentication and await - return await this.nodeConnectionManager.acquireConnection(nodeId)(); + return await this.nodeConnectionManager.acquireConnection( + nodeId, + ctx, + )(); }, ); }; @@ -537,11 +540,17 @@ class NodeManager { * @param g Generator function to handle communication * @param ctx */ + public withConnG( + nodeId: NodeId, + ctx: Partial | undefined, + g: (conn: NodeConnection) => AsyncGenerator, + ): AsyncGenerator; @ready(new nodesErrors.ErrorNodeManagerNotRunning()) + @timed() public async *withConnG( nodeId: NodeId, + @context ctx: ContextTimed, g: (conn: NodeConnection) => AsyncGenerator, - ctx?: Partial, ): AsyncGenerator { const acquire = this.acquireConnection(nodeId, ctx); const [release, conn] = await acquire(); @@ -847,6 +856,8 @@ class NodeManager { ] of await this.nodeGraph.getClosestNodes( nodeId, this.nodeGraph.nodeBucketLimit, + undefined, + ctx, )) { nodeConnectionsQueue.queueNodeDirect(nodeIdTarget, nodeContact); } @@ -975,13 +986,17 @@ class NodeManager { if (service == null) { // Setup promises const { p: endedP, resolveP: resolveEndedP } = utils.promise(); + const { + p: serviceP, + resolveP: resolveServiceP, + rejectP: rejectServiceP, + } = utils.promise(); const abortHandler = () => { resolveEndedP(); + rejectServiceP(); }; ctx.signal.addEventListener('abort', abortHandler, { once: true }); ctx.timer.catch(() => {}).finally(() => abortHandler()); - const { p: serviceP, resolveP: resolveServiceP } = - utils.promise(); const handleEventMDNSService = (evt: mdnsEvents.EventMDNSService) => { if (evt.detail.name === encodedNodeId) { resolveServiceP(evt.detail); @@ -1108,7 +1123,7 @@ class NodeManager { nodeConnectionsQueue: NodeConnectionQueue, ctx: ContextTimed, ) { - await this.nodeConnectionManager.withConnF(nodeId, async (conn) => { + await this.nodeConnectionManager.withConnF(nodeId, ctx, async (conn) => { const nodeIdEncoded = nodesUtils.encodeNodeId(nodeIdTarget); const closestConnectionsRequestP = (async () => { const resultStream = @@ -1242,13 +1257,16 @@ class NodeManager { return await this.withConnF(targetNodeId, ctx, async (connection) => { const claims: Record = {}; const client = connection.getClient(); - for await (const agentClaim of await client.methods.nodesClaimsGet({ - claimIdEncoded: - claimId != null - ? claimsUtils.encodeClaimId(claimId) - : ('' as ClaimIdEncoded), - })) { - if (ctx.signal.aborted) throw ctx.signal.reason; + for await (const agentClaim of await client.methods.nodesClaimsGet( + { + claimIdEncoded: + claimId != null + ? claimsUtils.encodeClaimId(claimId) + : ('' as ClaimIdEncoded), + }, + ctx, + )) { + ctx.signal.throwIfAborted(); // Need to re-construct each claim const claimId: ClaimId = claimsUtils.decodeClaimId( agentClaim.claimIdEncoded, @@ -2112,16 +2130,25 @@ class NodeManager { await Promise.allSettled(taskPs); } + public async updateRefreshBucketDelay( + bucketIndex: number, + delay?: number, + lazy?: boolean, + tran?: DBTransaction, + ctx?: Partial, + ): Promise; @ready(new nodesErrors.ErrorNodeManagerNotRunning(), true, ['stopping']) + @timedCancellable(true) public async updateRefreshBucketDelay( bucketIndex: number, delay: number = this.refreshBucketDelayTime, lazy: boolean = true, - tran?: DBTransaction, + tran: DBTransaction | undefined, + @context ctx: ContextTimed, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => - this.updateRefreshBucketDelay(bucketIndex, delay, lazy, tran), + this.updateRefreshBucketDelay(bucketIndex, delay, lazy, tran, ctx), ); } @@ -2137,6 +2164,7 @@ class NodeManager { [this.tasksPath, this.refreshBucketHandlerId, `${bucketIndex}`], tran, )) { + ctx.signal.throwIfAborted(); if (!existingTask) { foundTask = task; // Update the first one diff --git a/src/tasks/TaskManager.ts b/src/tasks/TaskManager.ts index 208a08e31..435f04db2 100644 --- a/src/tasks/TaskManager.ts +++ b/src/tasks/TaskManager.ts @@ -382,8 +382,8 @@ class TaskManager { return { id: taskId, status: taskStatus!, - promise, - cancel, + promise: promise, + cancel: cancel, handlerId: taskData.handlerId, parameters: taskData.parameters, delay: tasksUtils.fromDelay(taskData.delay), diff --git a/src/vaults/VaultManager.ts b/src/vaults/VaultManager.ts index 62c6b2c00..b1c7faf8c 100644 --- a/src/vaults/VaultManager.ts +++ b/src/vaults/VaultManager.ts @@ -540,7 +540,7 @@ class VaultManager { public async renameVault( vaultId: VaultId, newVaultName: VaultName, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { if (tran == null) { return this.db.withTransactionF((tran) => @@ -978,6 +978,7 @@ class VaultManager { // Create a connection to another node return yield* this.nodeManager.withConnG( targetNodeId, + ctx, async function* (connection): AsyncGenerator<{ vaultName: VaultName; vaultIdEncoded: VaultIdEncoded; @@ -994,7 +995,6 @@ class VaultManager { }; } }, - ctx, ); } diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index 25258ad1e..857f9e406 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -293,10 +293,17 @@ describe('Discovery', () => { await discovery.stop(); await discovery.destroy(); await expect( - discovery.queueDiscoveryByIdentity('' as ProviderId, '' as IdentityId), + async () => + await discovery.queueDiscoveryByIdentity( + '' as ProviderId, + '' as IdentityId, + ), ).rejects.toThrow(discoveryErrors.ErrorDiscoveryNotRunning); await expect( - discovery.queueDiscoveryByNode(testNodesUtils.generateRandomNodeId()), + async () => + await discovery.queueDiscoveryByNode( + testNodesUtils.generateRandomNodeId(), + ), ).rejects.toThrow(discoveryErrors.ErrorDiscoveryNotRunning); }); test('discovery by node', async () => { diff --git a/tests/nodes/NodeConnectionManager.test.ts b/tests/nodes/NodeConnectionManager.test.ts index ef7c2ee22..483d62cb0 100644 --- a/tests/nodes/NodeConnectionManager.test.ts +++ b/tests/nodes/NodeConnectionManager.test.ts @@ -363,6 +363,7 @@ describe(`${NodeConnectionManager.name}`, () => { ); await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { expect(connectionAndTimer?.usageCount).toBe(1); expect(connectionAndTimer?.timer).toBeNull(); @@ -391,6 +392,7 @@ describe(`${NodeConnectionManager.name}`, () => { await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async (connection) => { expect(connection.connectionId).toBe(connectionIds[0]); }, @@ -401,6 +403,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Lowest connection is deterministically the same for the peer too await ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async (connection) => { expect(connection.connectionId).toBe(connectionIds[0]); }, @@ -421,6 +424,7 @@ describe(`${NodeConnectionManager.name}`, () => { for (const connectionId of connectionIds) { await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async (connection) => { // Should always be the lowest alive connectionId expect(connection.connectionId).toBe(connectionId); @@ -437,6 +441,7 @@ describe(`${NodeConnectionManager.name}`, () => { await expect( ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => {}, ), ).rejects.toThrow( @@ -569,6 +574,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Wait for timeout. await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { expect(ncmLocal.nodeConnectionManager.connectionsActive()).toBe(3); await connectionDestroyProm1; @@ -717,12 +723,14 @@ describe(`${NodeConnectionManager.name}`, () => { // Checking authentication result await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, ); await ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -759,6 +767,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Checking authentication result const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -769,6 +778,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP2 = ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -806,6 +816,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -815,6 +826,7 @@ describe(`${NodeConnectionManager.name}`, () => { ); const forwardAuthenticateP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -824,6 +836,7 @@ describe(`${NodeConnectionManager.name}`, () => { ); const reverseAuthenticateP = ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -861,6 +874,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -898,6 +912,7 @@ describe(`${NodeConnectionManager.name}`, () => { const authenticationAttemptP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -946,6 +961,7 @@ describe(`${NodeConnectionManager.name}`, () => { const forwardAuthenticateP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async () => { // Do nothing }, @@ -953,6 +969,7 @@ describe(`${NodeConnectionManager.name}`, () => { await expect(forwardAuthenticateP).toResolve(); const reverseAuthenticateP = ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, + undefined, async () => { // Do nothing }, @@ -962,6 +979,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Checking RPC again await ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, + undefined, async (conn) => { await expect( conn.rpcClient.unaryCaller('dummyMethod', {}), diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index aba4942ed..8d6aa1e92 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -647,17 +647,21 @@ describe(`${NodeManager.name}`, () => { scopes: ['global'], }, ); + const abortController = new AbortController(); + const ctx = { signal: abortController.signal } as ContextTimed; const [resourceReleaser, nodeConnection] = - await nodeManager.acquireConnection(nodeId)(); + await nodeManager.acquireConnection(nodeId, ctx)(); expect(nodeConnection).toBeInstanceOf(NodeConnection); expect(nodeConnectionManager.hasConnection(nodeId)).toBeTrue(); await resourceReleaser(); }); test('acquire Connection fails', async () => { + const abortController = new AbortController(); + const ctx = { signal: abortController.signal } as ContextTimed; const nodeId = keyRingPeer.getNodeId(); - await expect(nodeManager.acquireConnection(nodeId)()).rejects.toThrow( - nodesErrors.ErrorNodeManagerConnectionFailed, - ); + await expect( + nodeManager.acquireConnection(nodeId, ctx)(), + ).rejects.toThrow(nodesErrors.ErrorNodeManagerConnectionFailed); }); test('withConnF', async () => { const nodeId = keyRingPeer.getNodeId(); @@ -695,6 +699,7 @@ describe(`${NodeManager.name}`, () => { const gen = nodeManager.withConnG( nodeId, + undefined, async function* ( conn, ): AsyncGenerator { diff --git a/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts b/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts index 1622f48f9..76f85f23b 100644 --- a/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts +++ b/tests/nodes/agent/handlers/nodesClosestActiveConnectionsGet.test.ts @@ -161,6 +161,7 @@ describe('nodesClosestLocalNode', () => { const results = await nodeConnectionManagerLocal.withConnF( nodeIdPeer1, + undefined, async () => { const resultStream = await connection.rpcClient.methods.nodesClosestActiveConnectionsGet({ diff --git a/tests/vaults/VaultInternal.test.ts b/tests/vaults/VaultInternal.test.ts index 4a1ad59e5..aacc9c7c9 100644 --- a/tests/vaults/VaultInternal.test.ts +++ b/tests/vaults/VaultInternal.test.ts @@ -117,13 +117,13 @@ describe('VaultInternal', () => { test('VaultInternal readiness', async () => { await vault.stop(); - await expect(async () => { - await vault.log(); - }).rejects.toThrow(vaultsErrors.ErrorVaultNotRunning); + await expect(async () => await vault.log()).rejects.toThrow( + vaultsErrors.ErrorVaultNotRunning, + ); await vault.destroy(); - await expect(async () => { - await vault.start(); - }).rejects.toThrow(vaultsErrors.ErrorVaultDestroyed); + await expect(async () => await vault.start()).rejects.toThrow( + vaultsErrors.ErrorVaultDestroyed, + ); }); test('is type correct', async () => { expect(vault).toBeInstanceOf(VaultInternal);