diff --git a/package-lock.json b/package-lock.json index bd6a4f280..c6eb4a3b2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,7 +21,7 @@ "@matrixai/mdns": "^2.0.7", "@matrixai/quic": "^2.0.9", "@matrixai/resources": "^2.0.1", - "@matrixai/rpc": "^1.0.0", + "@matrixai/rpc": "^1.0.1", "@matrixai/timer": "^2.1.1", "@matrixai/workers": "^2.0.0", "@matrixai/ws": "^2.0.5", @@ -2043,9 +2043,9 @@ "integrity": "sha512-qP7wDz1HnQY7wV4NxybAE+A+488D7bGkkdgk2TIRaw8/fTWENi9Y/AFvOJrdKt3q5rDybB4OeTJIkN5qULE35A==" }, "node_modules/@matrixai/rpc": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/@matrixai/rpc/-/rpc-1.0.0.tgz", - "integrity": "sha512-VwplA8PLxoRs4YdkoVM5YsRWBtdpv/R4gookK3jmZSsymI4KSEHs/mP6ndotimy/qbajnzMtbJwHt7DEH0tW5w==", + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@matrixai/rpc/-/rpc-1.0.1.tgz", + "integrity": "sha512-4W3YGzB/N+MgV/Ha7Euowp+Scu/TYz6jo08plkp2zq80gFL9IkYurb2E77k7eXDrA0YzHVB3U0yM9mMFBvI1Lg==", "dependencies": { "@matrixai/async-init": "^2.1.2", "@matrixai/contexts": "^2.0.2", diff --git a/package.json b/package.json index 77d0b32fc..621e4100a 100644 --- a/package.json +++ b/package.json @@ -93,7 +93,7 @@ "@matrixai/mdns": "^2.0.7", "@matrixai/quic": "^2.0.9", "@matrixai/resources": "^2.0.1", - "@matrixai/rpc": "^1.0.0", + "@matrixai/rpc": "^1.0.1", "@matrixai/timer": "^2.1.1", "@matrixai/workers": "^2.0.0", "@matrixai/ws": "^2.0.5", diff --git a/src/nodes/NodeConnection.ts b/src/nodes/NodeConnection.ts index d977dde0c..605f45cb0 100644 --- a/src/nodes/NodeConnection.ts +++ b/src/nodes/NodeConnection.ts @@ -169,12 +169,9 @@ class NodeConnection { targetHostname, tlsConfig, connectionKeepAliveIntervalTime, - connectionKeepAliveTimeoutTime = config.defaultsSystem - .nodesConnectionIdleTimeoutTimeMin, - connectionInitialMaxStreamsBidi = config.defaultsSystem - .nodesConnectionInitialMaxStreamsBidi, - connectionInitialMaxStreamsUni = config.defaultsSystem - .nodesConnectionInitialMaxStreamsUni, + connectionKeepAliveTimeoutTime, + connectionInitialMaxStreamsBidi, + connectionInitialMaxStreamsUni, quicSocket, manifest, logger, @@ -301,9 +298,7 @@ class NodeConnection { const rpcClient = new RPCClient({ manifest, middlewareFactory: rpcUtilsMiddleware.defaultClientMiddlewareWrapper(), - streamFactory: async () => { - return quicConnection.newStream(); - }, + streamFactory: async () => quicConnection.newStream(), toError: networkUtils.toError, logger: logger.getChild(RPCClient.name), }); diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 333dfbed0..f0c92b4fe 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -8,7 +8,10 @@ import type { NodeId, NodeIdString, } from './types.js'; -import type { NodesAuthenticateConnectionMessage } from './agent/types.js'; +import type { + NodesAuthenticateConnectionMessage, + SuccessMessage, +} from './agent/types.js'; import type { AgentServerManifest } from './agent/handlers/index.js'; import type KeyRing from '../keys/KeyRing.js'; import type { CertificatePEM } from '../keys/types.js'; @@ -345,7 +348,9 @@ class NodeConnectionManager { * This should trigger the destruction of the `NodeConnection` through the * `EventNodeConnectionError` -> `EventNodeConnectionClose` event path. */ - protected handleEventQUICError = (evt: quicEvents.EventQUICSocketError) => { + protected handleEventQUICError = ( + evt: quicEvents.EventQUICSocketError, + ): void => { const err = new nodesErrors.ErrorNodeConnectionManagerInternalError( undefined, { cause: evt.detail }, @@ -361,7 +366,7 @@ class NodeConnectionManager { */ protected handleEventQUICSocketStopped = ( _evt: quicEvents.EventQUICSocketStopped, - ) => { + ): void => { const err = new nodesErrors.ErrorNodeConnectionManagerInternalError( 'QUICSocket stopped unexpectedly', ); @@ -376,7 +381,7 @@ class NodeConnectionManager { */ protected handleEventQUICServerStopped = ( _evt: quicEvents.EventQUICServerStopped, - ) => { + ): void => { const err = new nodesErrors.ErrorNodeConnectionManagerInternalError( 'QUICServer stopped unexpectedly', ); @@ -392,7 +397,7 @@ class NodeConnectionManager { */ protected handleEventQUICServerConnection = ( evt: quicEvents.EventQUICServerConnection, - ) => { + ): void => { this.handleConnectionReverse(evt.detail); }; @@ -604,10 +609,6 @@ class NodeConnectionManager { this.logger.info(`Started ${this.constructor.name}`); } - /** - * What doe stop do with force? - * Figure it out. - */ public async stop({ force = false, }: { @@ -654,13 +655,10 @@ class NodeConnectionManager { const cancelAuthenticationPs: Array> = []; const cancelReason = new nodesErrors.ErrorNodeConnectionManagerStopping(); for (const [nodeIdString] of this.connections) { - const destroyP = this.authenticateCancel(nodeIdString, cancelReason).then( - async () => { - return await this.destroyConnection( - IdInternal.fromString(nodeIdString), - force, - ); - }, + this.authenticateCancel(nodeIdString, cancelReason); + const destroyP = this.destroyConnection( + IdInternal.fromString(nodeIdString), + force, ); destroyConnectionPs.push(destroyP); } @@ -688,11 +686,14 @@ class NodeConnectionManager { } /** - * This is the internal acquireConnection for using connections without authentication. - * For usage with withF, to acquire a connection + * This is the internal acquireConnection for using connections without + * authentication. For usage with withF, to acquire a connection. To wait for + * authentication, use {@link acquireConnection}. + * * This unique acquire function structure of returning the ResourceAcquire * itself is such that we can pass targetNodeId as a parameter (as opposed to * an acquire function with no parameters). + * * @param targetNodeId Id of target node to communicate with * @returns ResourceAcquire Resource API for use in with contexts */ @@ -738,12 +739,13 @@ class NodeConnectionManager { connectionAndTimer.connection.connectionId, ); connectionAndTimer.timer = new Timer({ - handler: async () => + handler: async () => { await this.destroyConnection( targetNodeId, false, connectionAndTimer.connection.connectionId, - ), + ); + }, delay, }); // Prevent unhandled exceptions when cancelling @@ -756,10 +758,18 @@ class NodeConnectionManager { } /** - * For usage with withF, to acquire a connection * This unique acquire function structure of returning the ResourceAcquire * itself is such that we can pass targetNodeId as a parameter (as opposed to - * an acquire function with no parameters). + * an acquire function with no parameters). It waits for the connection to be + * authenticated, otherwise throws an error. See {@link acquireConnectionInternal} + * to connect to a node without waiting for authentication. + * + * If a connection exists but is not authenticated, the authentication is + * attempted. Authentication is reattempted if it has failed before but + * another attmept is being made to connect to a node. + * + * For usage with withF, to acquire a connection. + * * @param targetNodeId Id of target node to communicate with * @param ctx * @returns ResourceAcquire Resource API for use in with contexts @@ -839,7 +849,7 @@ class NodeConnectionManager { } /** - * Starts a connection. + * Starts a connection. This step also attemps to authenticate the connection. */ public createConnection( nodeIds: Array, @@ -878,6 +888,7 @@ class NodeConnectionManager { ctx, ); this.addConnection(nodeConnection.validatedNodeId, nodeConnection); + this.initiateForwardAuthenticate(nodeConnection.nodeId); // Dispatch the connection event const connectionData: ConnectionData = { remoteNodeId: nodeConnection.nodeId, @@ -1023,12 +1034,16 @@ class NodeConnectionManager { } /** - * Adds connection to the connections map. Preforms some checks and lifecycle hooks. - * This code is shared between the reverse and forward connection creation. + * Adds connection to the connections map. Preforms some checks and lifecycle + * hooks. This code sets up the authentication state machine, and must be run + * before attempting authentication. * * Multiple connections can be added for a single NodeId, but the connection * with the 'lowest' `connectionId` will be used. The remaining * connections will be left to timeout gracefully. + * + * @param nodeId The target NodeId to connect to + * @param nodeConnection The object corresponding to the node connection */ protected addConnection( nodeId: NodeId, @@ -1091,7 +1106,6 @@ class NodeConnectionManager { authenticatedRejectP, }; this.connections.set(nodeIdString, entry); - this.initiateForwardAuthenticate(nodeId); } else { // Adding connection to existing entry newConnAndTimer.timer = new Timer({ @@ -1177,7 +1191,7 @@ class NodeConnectionManager { const remainingKeys = Object.keys(connectionsEntry.connections); if (remainingKeys.length === 0) { // Clean up authentication - await this.authenticateCancel( + this.authenticateCancel( targetNodeIdString, new nodesErrors.ErrorNodeManagerAuthenticationFailed( 'Connection destroyed before authentication could complete', @@ -1288,10 +1302,11 @@ class NodeConnectionManager { /** * Open up a port in the NAT by sending packets to the target address. - * The packets will be sent in an exponential backoff dialing pattern and contain random data. + * The packets will be sent in an exponential backoff dialing pattern and + * contain random data. * - * This is only ever done used in the reverse direction to open up the nat for the connection to establish from the - * forward direction. + * This is only ever done used in the reverse direction to open up the nat + * for the connection to establish from the forward direction. * * This can't know it succeeded, it will continue until timed out or cancelled. * @@ -1315,7 +1330,8 @@ class NodeConnectionManager { port: Port, @decorators.context ctx: ContextTimed, ): Promise { - // We need to send a random data packet to the target until the process times out or a connection is established + // We need to send a random data packet to the target until the process + // times out or a connection is established. let ended = false; const { p: endedP, resolveP: endedResolveP } = utils.promise(); if (ctx.signal.aborted) { @@ -1333,8 +1349,9 @@ class NodeConnectionManager { try { while (true) { const message = keysUtils.getRandomBytes(32); - // Since the intention is to abstract away the success/failure of the hole-punch operation, - // We should catch any errors thrown out of this, as the caller does not expect the method to throw + // Since the intention is to abstract away the success/failure of the + // hole-punch operation, we should catch any errors thrown out of this, + // as the caller does not expect the method to throw. await this.quicSocket .send(Buffer.from(message), port, host) .catch(() => {}); @@ -1412,7 +1429,7 @@ class NodeConnectionManager { return size; } - public updateTlsConfig(tlsConfig: TLSConfig) { + public updateTlsConfig(tlsConfig: TLSConfig): void { this.tlsConfig = tlsConfig; this.quicServer.updateConfig({ key: tlsConfig.keyPrivatePem, @@ -1421,13 +1438,15 @@ class NodeConnectionManager { } /** - * This is used by the `NodesConnectionSignalFinal` to initiate the hole punch procedure. + * This is used by the `NodesConnectionSignalFinal` to initiate the hole punch + * procedure. * - * Will validate the message, and initiate hole punching in the background and return immediately. + * Will validate the message, and initiate hole punching in the background and + * return immediately. * Attempts to the same host and port are coalesced. * Attempts to the same host are limited by a semaphore. - * Active attempts are tracked inside the `activeHolePunchPs` set and are cancelled and awaited when the - * `NodeConnectionManager` stops. + * Active attempts are tracked inside the `activeHolePunchPs` set and are + * cancelled and awaited when the `NodeConnectionManager` stops. */ @startStop.ready(new nodesErrors.ErrorNodeManagerNotRunning()) public handleNodesConnectionSignalFinal(host: Host, port: Port) { @@ -1467,19 +1486,21 @@ class NodeConnectionManager { } /** - * This is used by the `NodesConnectionSignalInitial` to initiate a relay request. - * Requests can only be relayed to nodes this node is currently connected to. + * This is used by the `NodesConnectionSignalInitial` to initiate a relay + * request. Requests can only be relayed to nodes this node is currently + * connected to. * - * Requests made by the same node are rate limited, when the limit has been exceeded the request - * throws an `ErrorNodeConnectionManagerRequestRateExceeded` error. + * Requests made by the same node are rate limited, when the limit has been + * exceeded the request throws an `ErrorNodeConnectionManagerRequestRateExceeded` + * error. * - * Active relay attempts are tracked in `activeSignalFinalPs` and are cancelled and awaited when the - * `NodeConnectionManager` stops. + * Active relay attempts are tracked in `activeSignalFinalPs` and are cancelled + * and awaited when the `NodeConnectionManager` stops. * - * @param sourceNodeId - NodeId of the node making the request. Used for rate limiting. - * @param targetNodeId - NodeId of the node that needs to initiate hole punching. - * @param address - Address the target needs to punch to. - * @param requestSignature - `base64url` encoded signature + * @param sourceNodeId NodeId of the node making the request. Used for rate limiting. + * @param targetNodeId NodeId of the node that needs to initiate hole punching. + * @param address Address the target needs to punch to. + * @param requestSignature `base64url` encoded signature * @param ctx */ public async handleNodesConnectionSignalInitial( @@ -1569,8 +1590,9 @@ class NodeConnectionManager { .then( () => {}, (e) => { - // If it's a connection error or missing handler then it's a signalling failure, we ignore these since this - // is a fire and forget. Any unexpected errors should still be thrown + // If it's a connection error or missing handler then it's a signalling + // failure, we ignore these since this is a fire and forget. Any + // unexpected errors should still be thrown. if ( nodesUtils.isConnectionError(e) || e instanceof rpcErrors.ErrorRPCHandlerFailed @@ -1640,6 +1662,22 @@ class NodeConnectionManager { }); } + /** + * Handles the authentication for two nodes. This is done by triggering a RPC + * duplex method. The duplex stream writes messages to a writer and awaits the + * responses from a reader in real-time. The authentication follows a strict + * protocol. + * + * SEND Authentication message + * RECV Response message (reverse) + * RECV Authentication message from Node B + * SEND Response message (reverse) + * RECV Acknowledgement message + * + * @param nodeId The NodeId of the target node + * @param ctx + * @see {@link handleAuthentication} for RPC protocol + */ public forwardAuthenticate( nodeId: NodeId, ctx?: Partial, @@ -1659,24 +1697,92 @@ class NodeConnectionManager { throw new nodesErrors.ErrorNodeConnectionManagerConnectionNotFound(); } // Need to make an authenticate request here. Get the connection and RPC. + let rpcCancel: ((reason?: any) => void) | undefined; try { const authenticateMessage = await this.authenticateNetworkForwardCallback(ctx); await withF([this.acquireConnectionInternal(nodeId)], async ([conn]) => { - await conn.rpcClient.methods.nodesAuthenticateConnection( - authenticateMessage, + const authStream = + await conn.rpcClient.methods.nodesAuthenticateConnection(ctx); + const writer = authStream.writable.getWriter(); + const reader = authStream.readable.getReader(); + rpcCancel = (reason?: any) => authStream.cancel(reason); + + // Write the forward authentication message from this node + await writer.write(authenticateMessage); + const forwardMessageResultPair = await utils.raceSignal( + reader.read(), + ctx.signal, + ); + if (forwardMessageResultPair.done) { + throw new nodesErrors.ErrorNodeAuthenticationInvalidProtocol( + 'Stream ended prematurely', + ); + } + const forwardMessageResult = forwardMessageResultPair.value; + if (forwardMessageResult.type !== 'success') { + throw new nodesErrors.ErrorNodeManagerAuthenticationFailedForward( + 'Expected success message but got authentication message', + ); + } + + // Read and process the authentication token sent by the connectee + const reverseMessageInPair = await utils.raceSignal( + reader.read(), + ctx.signal, + ); + if (reverseMessageInPair.done) { + throw new nodesErrors.ErrorNodeAuthenticationInvalidProtocol( + 'Stream ended prematurely', + ); + } + const reverseMessageIn = reverseMessageInPair.value; + if (reverseMessageIn.type === 'success') { + throw new nodesErrors.ErrorNodeManagerAuthenticationFailedReverse( + 'Expected authentication message but got success message', + ); + } + await this.handleReverseAuthenticate( + conn.nodeId, + reverseMessageIn, ctx, ); + await writer.write({ type: 'success', success: true }); + + // Wait for other node to set its state + const ackPair = await utils.raceSignal(reader.read(), ctx.signal); + if (ackPair.done) { + throw new nodesErrors.ErrorNodeAuthenticationInvalidProtocol( + 'Stream ended prematurely', + ); + } + const ack = ackPair.value; + if (ack.type !== 'success') { + throw new nodesErrors.ErrorNodeManagerAuthenticationFailed( + 'Expected success message but got authentication message', + ); + } + rpcCancel(); }); connectionsEntry.authenticatedForward = AuthenticatingState.SUCCESS; } catch (e) { - const err = new nodesErrors.ErrorNodeManagerAuthenticationFailedForward( + const err = new nodesErrors.ErrorNodeManagerAuthenticationFailed( undefined, { cause: e }, ); - connectionsEntry.authenticatedForward = AuthenticatingState.FAIL; + rpcCancel?.(err); + // Make sure any pending authentication is set to FAIL accordingly + if ( + connectionsEntry.authenticatedForward === AuthenticatingState.PENDING + ) { + connectionsEntry.authenticatedForward = AuthenticatingState.FAIL; + } + if ( + connectionsEntry.authenticatedReverse === AuthenticatingState.PENDING + ) { + connectionsEntry.authenticatedReverse = AuthenticatingState.FAIL; + } this.authenticateFail(targetNodeIdString, err); - return; } // Check the reverse result switch (connectionsEntry.authenticatedReverse) { @@ -1741,9 +1847,10 @@ class NodeConnectionManager { } /** - * Will initiate a forward authentication call and coalesce + * Will initiate a forward authentication call and coalesce. This method is + * idempotent. */ - public initiateForwardAuthenticate(nodeId: NodeId) { + public initiateForwardAuthenticate(nodeId: NodeId): void { // Needs check the map if one is already running, otherwise it needs to start one and manage it. const nodeIdString = nodeId.toString() as NodeIdString; const authenticationEntry = this.connections.get(nodeIdString); @@ -1752,20 +1859,25 @@ class NodeConnectionManager { } const existingAuthenticate = this.activeForwardAuthenticateCalls.get(nodeIdString); - // If it exists in the map then we don't need to start one and can just return - if (existingAuthenticate != null) return; + + // If it exists in the map then we don't need to start one and can just + // return. However, if the previous attmept failed, we can reattempt + // authentication. + if (existingAuthenticate != null) { + return; + } if ( - authenticationEntry.authenticatedForward !== - AuthenticatingState.PENDING || - authenticationEntry.authenticateComplete + authenticationEntry.authenticatedForward === + AuthenticatingState.SUCCESS || + (authenticationEntry.authenticateComplete && + authenticationEntry.authenticatedForward !== AuthenticatingState.FAIL && + authenticationEntry.authenticatedReverse !== AuthenticatingState.FAIL) ) { return; } // Otherwise we need to start one and add it to the map - const forwardAuthenticateP = this.forwardAuthenticate(nodeId).finally( - () => { - this.activeForwardAuthenticateCalls.delete(nodeIdString); - }, + const forwardAuthenticateP = this.forwardAuthenticate(nodeId).finally(() => + this.activeForwardAuthenticateCalls.delete(nodeIdString), ); // Prevent unhandled errors forwardAuthenticateP.then( @@ -1775,6 +1887,120 @@ class NodeConnectionManager { this.activeForwardAuthenticateCalls.set(nodeIdString, forwardAuthenticateP); } + /** + * Handles the authentication for two nodes. This is done by yielding messages + * and awaiting response for the messages in real-time. The messages are + * yielded by the async generator and the reponse is awaited for via the + * async iterator. The authentication follows a strict protocol. + * + * RECV Authentication message from Node A + * SEND Response message + * SEND Authentication message from Node B + * RECV Response message + * SEND Acknowledgement message + * + * @param requestingNodeId The NodeId of the requesting node + * @param inputIterator An iterator yielding responses for the sent messages + * @param ctx + * @see {@link forwardAuthenticate} for usage example + */ + public async *handleAuthentication( + requestingNodeId: NodeId, + inputIterator: AsyncIterableIterator< + SuccessMessage | NodesAuthenticateConnectionMessage + >, + ctx: ContextTimed, + ): AsyncGenerator< + SuccessMessage | NodesAuthenticateConnectionMessage, + void, + void + > { + const requestingNodeIdString = requestingNodeId.toString() as NodeIdString; + const connectionEntry = this.connections.get(requestingNodeIdString); + if (connectionEntry == null) utils.never('Connection should be defined'); + + try { + const reverseMessageInPair = await utils.raceSignal( + inputIterator.next(), + ctx.signal, + ); + if (reverseMessageInPair.done === true) { + throw new nodesErrors.ErrorNodeAuthenticationInvalidProtocol( + 'Stream ended prematurely', + ); + } + const reverseMessageIn = reverseMessageInPair.value; + if (reverseMessageIn.type === 'success') { + throw new nodesErrors.ErrorNodeAuthenticationInvalidProtocol( + 'Expected authentication message but got success message', + ); + } + + // If reverse authentication succeeded without errors, then authentication + // was successful. The error is not wrapped to ensure a useful stack trace + // in case of an error. + await this.handleReverseAuthenticate( + requestingNodeId, + reverseMessageIn, + ctx, + ); + + yield { + type: 'success', + success: true, + }; + + // Generate and yield the forward token from this node + yield await this.authenticateNetworkForwardCallback(ctx); + const forwardMessageResultPair = await utils.raceSignal( + inputIterator.next(), + ctx.signal, + ); + if (forwardMessageResultPair.done === true) { + throw new nodesErrors.ErrorNodeAuthenticationInvalidProtocol( + 'Stream ended prematurely', + ); + } + const forwardMessageResult = forwardMessageResultPair.value; + if (forwardMessageResult.type !== 'success') { + throw new nodesErrors.ErrorNodeAuthenticationInvalidProtocol( + 'Expected success message but got authentication message', + ); + } + + // Success message should never return { success: false }. If there was an + // error with authentication, the RPC should be aborted immediately. + + // It is impossible to reach here without having the reverse connection + // being in a non-success state. + connectionEntry.authenticatedForward = AuthenticatingState.SUCCESS; + this.authenticateSuccess(requestingNodeIdString); + + // Yield a final acknowledgement message stating authentication has been + // completed and the state has been set. + yield { + type: 'success', + success: true, + }; + } catch (e) { + // Make sure any pending authentication is set to FAIL accordingly + if ( + connectionEntry.authenticatedForward === AuthenticatingState.PENDING + ) { + connectionEntry.authenticatedForward = AuthenticatingState.FAIL; + } + if ( + connectionEntry.authenticatedReverse === AuthenticatingState.PENDING + ) { + connectionEntry.authenticatedReverse = AuthenticatingState.FAIL; + } + this.authenticateFail(requestingNodeIdString, e); + throw new nodesErrors.ErrorNodeManagerAuthenticationFailed(undefined, { + cause: e, + }); + } + } + /** * Returns true if the connection has been authenticated */ @@ -1808,7 +2034,6 @@ class NodeConnectionManager { nodeId: NodeId, @decorators.context ctx: ContextTimed, ): Promise { - ctx.signal.throwIfAborted(); const targetNodeIdString = nodeId.toString() as NodeIdString; const connectionsEntry = this.connections.get(targetNodeIdString); if (connectionsEntry == null) { @@ -1818,11 +2043,24 @@ class NodeConnectionManager { const abortHandler = () => { rejectAbortP(ctx.signal.reason); }; - ctx.signal.addEventListener('abort', abortHandler, { once: true }); + if (ctx.signal.aborted) { + abortHandler(); + } else { + ctx.signal.addEventListener('abort', abortHandler, { once: true }); + } + // If the connection isn't already authenticated, then try authenticating + if ( + !connectionsEntry.authenticateComplete || + connectionsEntry.authenticatedForward === AuthenticatingState.FAIL || + connectionsEntry.authenticatedReverse === AuthenticatingState.FAIL + ) { + this.initiateForwardAuthenticate(nodeId); + } try { return await Promise.race([connectionsEntry.authenticatedP, abortP]); } catch (e) { - // Capture the stacktrace here since knowing where we're waiting for authentication is more useful + // Capture the stacktrace here since knowing where we're waiting for + // authentication is more useful. Error.captureStackTrace(e); throw e; } finally { @@ -1859,7 +2097,8 @@ class NodeConnectionManager { } connectionsEntry.authenticatedResolveP(); connectionsEntry.authenticateComplete = true; - // Resetting timeout delay for the active connection. The non-active connections would already have the min timeout. + // Resetting timeout delay for the active connection. The non-active + // connections would already have the min timeout. const connection = connectionsEntry.connections[connectionsEntry.activeConnection]; const nodeId = IdInternal.fromString(targetNodeIdString); @@ -1881,7 +2120,7 @@ class NodeConnectionManager { } } - protected async authenticateCancel( + protected authenticateCancel( targetNodeIdString: NodeIdString, reason: Error, ) { diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index e3cfdc45c..1a2e3f879 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -14,8 +14,8 @@ import type { import type KeyRing from '../keys/KeyRing.js'; import Logger from '@matrixai/logger'; import { createDestroyStartStop } from '@matrixai/async-init'; -import { IdInternal } from '@matrixai/id'; import { decorators } from '@matrixai/contexts'; +import { IdInternal } from '@matrixai/id'; import * as nodesUtils from './utils.js'; import * as nodesErrors from './errors.js'; import * as nodesEvents from './events.js'; @@ -291,11 +291,12 @@ class NodeGraph { @createDestroyStartStop.ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async *getNodeContacts( order: 'asc' | 'desc' = 'asc', - tran?: DBTransaction, + tran: DBTransaction | undefined, + ctx: ContextTimed, ): AsyncGenerator<[NodeId, NodeContact]> { if (tran == null) { // Lambda generators don't grab the `this` context, so we need to bind it - const getNodeContacts = (tran) => this.getNodeContacts(order, tran); + const getNodeContacts = (tran) => this.getNodeContacts(order, tran, ctx); return yield* this.db.withTransactionG(async function* (tran) { return yield* getNodeContacts(tran); }); @@ -303,6 +304,7 @@ class NodeGraph { return yield* nodesUtils.collectNodeContacts( [...this.nodeGraphBucketsDbPath], tran, + ctx, { reverse: order !== 'asc' }, ); } @@ -659,6 +661,7 @@ class NodeGraph { for await (const result of nodesUtils.collectNodeContacts( [...this.nodeGraphBucketsDbPath, bucketKey], tran, + ctx, { reverse: order !== 'asc', limit, @@ -736,10 +739,18 @@ class NodeGraph { * Resets the bucket according to the new node ID. * Run this after new node ID is generated via renewal or reset. */ + public async resetBuckets( + tran?: DBTransaction, + ctx?: ContextTimed, + ): Promise; @createDestroyStartStop.ready(new nodesErrors.ErrorNodeGraphNotRunning()) - public async resetBuckets(tran?: DBTransaction): Promise { + @decorators.timedCancellable(true) + public async resetBuckets( + tran: DBTransaction | undefined, + @decorators.context ctx: ContextTimed, + ): Promise { if (tran == null) { - return this.db.withTransactionF((tran) => this.resetBuckets(tran)); + return this.db.withTransactionF((tran) => this.resetBuckets(tran, ctx)); } // Setup new space const spaceNew = this.space === '0' ? '1' : '0'; @@ -760,6 +771,7 @@ class NodeGraph { for await (const [nodeId, nodeContact] of nodesUtils.collectNodeContacts( [...this.nodeGraphBucketsDbPath], tran, + ctx, )) { const nodeIdKey = nodesUtils.bucketDbKey(nodeId); const nodeIdOwn = this.keyRing.getNodeId(); @@ -964,12 +976,12 @@ class NodeGraph { for await (const nodeEntry of nodesUtils.collectNodeContacts( this.nodeGraphBucketsDbPath, tran, + ctx, { lt: [bucketIdKey, ''], limit: remainingLimit, }, )) { - ctx.signal.throwIfAborted(); nodes.push(nodeEntry); } } @@ -981,12 +993,12 @@ class NodeGraph { for await (const nodeEntry of nodesUtils.collectNodeContacts( this.nodeGraphBucketsDbPath, tran, + ctx, { gt: [bucketId, ''], limit: remainingLimit, }, )) { - ctx.signal.throwIfAborted(); nodes.push(nodeEntry); } } diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index ff0ebccd5..fd272dfe7 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -14,12 +14,7 @@ import type { } from '../tasks/types.js'; import type { SignedTokenEncoded } from '../tokens/types.js'; import type { Host, Port } from '../network/types.js'; -import type { - Claim, - ClaimId, - // ClaimIdEncoded, - SignedClaim, -} from '../claims/types.js'; +import type { Claim, ClaimId, SignedClaim } from '../claims/types.js'; import type { ClaimLinkNode } from '../claims/payloads/index.js'; import type NodeConnection from '../nodes/NodeConnection.js'; import type { @@ -48,8 +43,8 @@ import { decorators } from '@matrixai/contexts'; import * as nodesUtils from './utils.js'; import * as nodesEvents from './events.js'; import * as nodesErrors from './errors.js'; -import * as agentErrors from './agent/errors.js'; import NodeConnectionQueue from './NodeConnectionQueue.js'; +import config from '../config.js'; import { assertClaimNetworkAuthority } from '../claims/payloads/claimNetworkAuthority.js'; import { assertClaimNetworkAccess } from '../claims/payloads/claimNetworkAccess.js'; import Token from '../tokens/Token.js'; @@ -58,7 +53,6 @@ import * as tasksErrors from '../tasks/errors.js'; import * as claimsUtils from '../claims/utils.js'; import * as claimsErrors from '../claims/errors.js'; import * as utils from '../utils/utils.js'; -import config from '../config.js'; import * as networkUtils from '../network/utils.js'; const abortEphemeralTaskReason = Symbol('abort ephemeral task reason'); @@ -132,15 +126,7 @@ class NodeManager { _taskInfo, bucketIndex: NodeBucketIndex, ) => { - // Don't use defaults like this - // if a default is to be used - // provide it directly - - await this.refreshBucket( - bucketIndex, - this.connectionConnectTimeoutTime, - ctx, - ); + await this.refreshBucket(bucketIndex, undefined, ctx); // When completed reschedule the task // if refreshBucketDelay is 0 then it's considered disabled if (this.refreshBucketDelayTime > 0) { @@ -718,6 +704,7 @@ class NodeManager { } while (true) { + ctx.signal.throwIfAborted(); const isDone = await nodeConnectionsQueue.withNodeSignal( async (nodeIdTarget, nodeIdSignaller) => { let nodeConnection: NodeConnection | undefined; @@ -859,6 +846,7 @@ class NodeManager { } while (true) { + ctx.signal.throwIfAborted(); const isDone = await nodeConnectionsQueue.withNodeDirect( async (nodeIdTarget, nodeContact) => { if (!this.nodeConnectionManager.hasConnection(nodeIdTarget)) { @@ -1131,6 +1119,7 @@ class NodeManager { ); // Collecting results for await (const result of resultStream) { + ctx.signal.throwIfAborted(); const nodeIdNew = nodesUtils.decodeNodeId(result.nodeId); if (nodeIdNew == null) { utils.never(`failed to decode NodeId "${result.nodeId}"`); @@ -1147,6 +1136,7 @@ class NodeManager { ctx, ); for await (const { nodeIdEncoded, nodeContact } of resultStream) { + ctx.signal.throwIfAborted(); const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded); if (nodeId == null) { utils.never(`failed to decode NodeId "${nodeIdEncoded}"`); @@ -1632,7 +1622,7 @@ class NodeManager { } if (!success) { - throw new agentErrors.ErrorNodesClaimNetworkVerificationFailed(); + throw new nodesErrors.ErrorNodeClaimNetworkVerificationFailed(); } return { diff --git a/src/nodes/agent/callers/nodesAuthenticateConnection.ts b/src/nodes/agent/callers/nodesAuthenticateConnection.ts index 940991485..5440a4267 100644 --- a/src/nodes/agent/callers/nodesAuthenticateConnection.ts +++ b/src/nodes/agent/callers/nodesAuthenticateConnection.ts @@ -1,10 +1,10 @@ import type { HandlerTypes } from '@matrixai/rpc'; import type NodesAuthenticateConnection from '../handlers/NodesAuthenticateConnection.js'; -import { UnaryCaller } from '@matrixai/rpc'; +import { DuplexCaller } from '@matrixai/rpc'; type CallerTypes = HandlerTypes; -const nodesAuthenticateConnection = new UnaryCaller< +const nodesAuthenticateConnection = new DuplexCaller< CallerTypes['input'], CallerTypes['output'] >(); diff --git a/src/nodes/agent/errors.ts b/src/nodes/agent/errors.ts deleted file mode 100644 index c60bb368a..000000000 --- a/src/nodes/agent/errors.ts +++ /dev/null @@ -1,35 +0,0 @@ -import ErrorPolykey from '../../ErrorPolykey.js'; -import sysexits from '../../utils/sysexits.js'; - -class ErrorAgent extends ErrorPolykey {} - -class ErrorAgentNodeIdMissing extends ErrorAgent { - static description = 'Unable to obtain NodeId from connection certificates'; - exitCode = sysexits.UNAVAILABLE; -} - -class ErrorNodesConnectionSignalRequestVerificationFailed< - T, -> extends ErrorAgent { - static description = 'Failed to verify request message signature'; - exitCode = sysexits.UNAVAILABLE; -} - -class ErrorNodesConnectionSignalRelayVerificationFailed< - T, -> extends ErrorAgent { - static description = 'Failed to verify relay message signature'; - exitCode = sysexits.UNAVAILABLE; -} - -class ErrorNodesClaimNetworkVerificationFailed extends ErrorAgent { - static description = 'Failed to verify claim network message'; - exitCode = sysexits.UNAVAILABLE; -} - -export { - ErrorAgentNodeIdMissing, - ErrorNodesConnectionSignalRequestVerificationFailed, - ErrorNodesConnectionSignalRelayVerificationFailed, - ErrorNodesClaimNetworkVerificationFailed, -}; diff --git a/src/nodes/agent/handlers/NodesAuthenticateConnection.ts b/src/nodes/agent/handlers/NodesAuthenticateConnection.ts index a20ee6631..2475a940a 100644 --- a/src/nodes/agent/handlers/NodesAuthenticateConnection.ts +++ b/src/nodes/agent/handlers/NodesAuthenticateConnection.ts @@ -1,3 +1,4 @@ +import type { ContextTimed } from '@matrixai/contexts'; import type { AgentRPCRequestParams, AgentRPCResponseResult, @@ -6,39 +7,48 @@ import type { } from '../types.js'; import type NodeConnectionManager from '../../../nodes/NodeConnectionManager.js'; import type { JSONValue } from '../../../types.js'; -import type { ContextTimed } from '@matrixai/contexts'; -import { UnaryHandler } from '@matrixai/rpc'; -import * as agentErrors from '../errors.js'; +import { DuplexHandler } from '@matrixai/rpc'; import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; -class NodesAuthenticateConnection extends UnaryHandler< +class NodesAuthenticateConnection extends DuplexHandler< { nodeConnectionManager: NodeConnectionManager; }, - AgentRPCRequestParams, - AgentRPCResponseResult + AgentRPCRequestParams, + AgentRPCResponseResult > { - public handle = async ( - input: AgentRPCRequestParams, - _cancel, + public handle = async function* ( + input: AsyncIterableIterator< + AgentRPCRequestParams + >, + _cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): Promise> => { - const { nodeConnectionManager } = this.container; + ): AsyncGenerator< + AgentRPCResponseResult, + void, + void + > { + const { + nodeConnectionManager, + }: { + nodeConnectionManager: NodeConnectionManager; + } = this.container; + // Connections should always be validated const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } - await nodeConnectionManager.handleReverseAuthenticate( + + // This async generator handles the back-and-forth communication to + // authenticate a connection. + yield* nodeConnectionManager.handleAuthentication( requestingNodeId, input, ctx, ); - return { - type: 'success', - success: true, - }; }; } diff --git a/src/nodes/agent/handlers/NodesClaimNetworkSign.ts b/src/nodes/agent/handlers/NodesClaimNetworkSign.ts index 3e137066f..0f588b7a4 100644 --- a/src/nodes/agent/handlers/NodesClaimNetworkSign.ts +++ b/src/nodes/agent/handlers/NodesClaimNetworkSign.ts @@ -6,8 +6,8 @@ import type { import type NodeManager from '../../../nodes/NodeManager.js'; import type { JSONValue } from '../../../types.js'; import { UnaryHandler } from '@matrixai/rpc'; -import * as agentErrors from '../errors.js'; import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; class NodesClaimNetworkSign extends UnaryHandler< { @@ -25,7 +25,7 @@ class NodesClaimNetworkSign extends UnaryHandler< // Connections should always be validated const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } return nodeManager.handleClaimNetwork(requestingNodeId, input); }; diff --git a/src/nodes/agent/handlers/NodesClaimNetworkVerify.ts b/src/nodes/agent/handlers/NodesClaimNetworkVerify.ts index f22d425c8..81fd83b86 100644 --- a/src/nodes/agent/handlers/NodesClaimNetworkVerify.ts +++ b/src/nodes/agent/handlers/NodesClaimNetworkVerify.ts @@ -6,8 +6,8 @@ import type { import type NodeManager from '../../../nodes/NodeManager.js'; import type { JSONValue } from '../../../types.js'; import { UnaryHandler } from '@matrixai/rpc'; -import * as agentErrors from '../errors.js'; import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; class NodesClaimNetworkVerify extends UnaryHandler< { @@ -24,7 +24,7 @@ class NodesClaimNetworkVerify extends UnaryHandler< const { nodeManager }: { nodeManager: NodeManager } = this.container; const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } return nodeManager.handleVerifyClaimNetwork(requestingNodeId, input); }; diff --git a/src/nodes/agent/handlers/NodesConnectionSignalFinal.ts b/src/nodes/agent/handlers/NodesConnectionSignalFinal.ts index d96fc460b..58edc337c 100644 --- a/src/nodes/agent/handlers/NodesConnectionSignalFinal.ts +++ b/src/nodes/agent/handlers/NodesConnectionSignalFinal.ts @@ -13,8 +13,8 @@ import { validateSync } from '../../../validation/index.js'; import { matchSync } from '../../../utils/index.js'; import * as keysUtils from '../../../keys/utils/index.js'; import * as ids from '../../../ids/index.js'; -import * as agentErrors from '../errors.js'; import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; class NodesConnectionSignalFinal extends UnaryHandler< { @@ -55,7 +55,7 @@ class NodesConnectionSignalFinal extends UnaryHandler< ); const relayingNodeId = agentUtils.nodeIdFromMeta(meta); if (relayingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } const requestSignature = Buffer.from(input.requestSignature, 'base64url'); // Checking request requestSignature, requestData is just `` concatenated @@ -68,7 +68,7 @@ class NodesConnectionSignalFinal extends UnaryHandler< requestSignature, ) ) { - throw new agentErrors.ErrorNodesConnectionSignalRequestVerificationFailed(); + throw new nodesErrors.ErrorNodeConnectionSignalRequestVerificationFailed(); } // Checking relay message relaySignature. // relayData is just `
` concatenated. @@ -83,7 +83,7 @@ class NodesConnectionSignalFinal extends UnaryHandler< if ( !keysUtils.verifyWithPublicKey(relayPublicKey, relayData, relaySignature) ) { - throw new agentErrors.ErrorNodesConnectionSignalRelayVerificationFailed(); + throw new nodesErrors.ErrorNodeConnectionSignalRelayVerificationFailed(); } const host = input.address.host as Host; diff --git a/src/nodes/agent/handlers/NodesConnectionSignalInitial.ts b/src/nodes/agent/handlers/NodesConnectionSignalInitial.ts index cd3b56dd6..1a8cabbe4 100644 --- a/src/nodes/agent/handlers/NodesConnectionSignalInitial.ts +++ b/src/nodes/agent/handlers/NodesConnectionSignalInitial.ts @@ -12,8 +12,8 @@ import { UnaryHandler } from '@matrixai/rpc'; import { validateSync } from '../../../validation/index.js'; import { matchSync } from '../../../utils/index.js'; import { never } from '../../../utils/index.js'; -import * as agentErrors from '../errors.js'; import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; import * as keysUtils from '../../../keys/utils/index.js'; import * as ids from '../../../ids/index.js'; @@ -37,7 +37,7 @@ class NodesConnectionSignalInitial extends UnaryHandler< // Connections should always be validated const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } const { targetNodeId }: { targetNodeId: NodeId } = validateSync( (keyPath, value) => { @@ -55,7 +55,7 @@ class NodesConnectionSignalInitial extends UnaryHandler< const data = Buffer.concat([requestingNodeId, targetNodeId]); const sourcePublicKey = keysUtils.publicKeyFromNodeId(requestingNodeId); if (!keysUtils.verifyWithPublicKey(sourcePublicKey, data, signature)) { - throw new agentErrors.ErrorNodesConnectionSignalRelayVerificationFailed(); + throw new nodesErrors.ErrorNodeConnectionSignalRelayVerificationFailed(); } if (meta == null) never('Missing metadata from stream'); const remoteHost = meta.remoteHost; diff --git a/src/nodes/agent/handlers/NodesCrossSignClaim.ts b/src/nodes/agent/handlers/NodesCrossSignClaim.ts index dd089a971..a1cef64b8 100644 --- a/src/nodes/agent/handlers/NodesCrossSignClaim.ts +++ b/src/nodes/agent/handlers/NodesCrossSignClaim.ts @@ -7,7 +7,6 @@ import type { import type NodeManager from '../../NodeManager.js'; import type ACL from '../../../acl/ACL.js'; import { DuplexHandler } from '@matrixai/rpc'; -import * as agentErrors from '../errors.js'; import * as agentUtils from '../utils.js'; import * as nodesErrors from '../../errors.js'; @@ -36,7 +35,7 @@ class NodesCrossSignClaim extends DuplexHandler< } = this.container; const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } // Check the ACL for permissions const permissions = await acl.getNodePerm(requestingNodeId); diff --git a/src/nodes/agent/handlers/VaultsGitInfoGet.ts b/src/nodes/agent/handlers/VaultsGitInfoGet.ts index 6dc6d3ce6..cb7d55a4b 100644 --- a/src/nodes/agent/handlers/VaultsGitInfoGet.ts +++ b/src/nodes/agent/handlers/VaultsGitInfoGet.ts @@ -7,11 +7,11 @@ import type VaultManager from '../../../vaults/VaultManager.js'; import type { JSONValue } from '../../../types.js'; import { ReadableStream } from 'stream/web'; import { RawHandler } from '@matrixai/rpc'; -import * as agentErrors from '../errors.js'; +import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; +import * as nodesUtils from '../../utils.js'; import * as vaultsUtils from '../../../vaults/utils.js'; import * as vaultsErrors from '../../../vaults/errors.js'; -import * as nodesUtils from '../../utils.js'; -import * as agentUtils from '../utils.js'; import * as utils from '../../../utils/index.js'; /** @@ -65,7 +65,7 @@ class VaultsGitInfoGet extends RawHandler<{ // Getting the NodeId from the connection metadata const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } const nodeIdEncoded = nodesUtils.encodeNodeId(requestingNodeId); const permissions = await acl.getNodePerm(requestingNodeId, tran); diff --git a/src/nodes/agent/handlers/VaultsGitPackGet.ts b/src/nodes/agent/handlers/VaultsGitPackGet.ts index 492e3ee08..4a2dc87d9 100644 --- a/src/nodes/agent/handlers/VaultsGitPackGet.ts +++ b/src/nodes/agent/handlers/VaultsGitPackGet.ts @@ -6,8 +6,8 @@ import type ACL from '../../../acl/ACL.js'; import type VaultManager from '../../../vaults/VaultManager.js'; import { ReadableStream } from 'stream/web'; import { RawHandler } from '@matrixai/rpc'; -import * as agentErrors from '../errors.js'; import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; import * as nodesUtils from '../../utils.js'; import * as vaultsUtils from '../../../vaults/utils.js'; import * as vaultsErrors from '../../../vaults/errors.js'; @@ -31,7 +31,7 @@ class VaultsGitPackGet extends RawHandler<{ const [headerMessage, inputStream] = input; const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } const nodeIdEncoded = nodesUtils.encodeNodeId(requestingNodeId); const params = headerMessage.params; diff --git a/src/nodes/agent/handlers/VaultsScan.ts b/src/nodes/agent/handlers/VaultsScan.ts index a4e950d04..19412e304 100644 --- a/src/nodes/agent/handlers/VaultsScan.ts +++ b/src/nodes/agent/handlers/VaultsScan.ts @@ -8,8 +8,8 @@ import type { import type VaultManager from '../../../vaults/VaultManager.js'; import type { JSONValue } from '@matrixai/rpc'; import { ServerHandler } from '@matrixai/rpc'; -import * as agentErrors from '../errors.js'; import * as agentUtils from '../utils.js'; +import * as nodesErrors from '../../errors.js'; import * as vaultsUtils from '../../../vaults/utils.js'; /** @@ -33,7 +33,7 @@ class VaultsScan extends ServerHandler< this.container; const requestingNodeId = agentUtils.nodeIdFromMeta(meta); if (requestingNodeId == null) { - throw new agentErrors.ErrorAgentNodeIdMissing(); + throw new nodesErrors.ErrorNodeConnectionInvalidIdentity(); } yield* db.withTransactionG(async function* (tran): AsyncGenerator< AgentRPCResponseResult diff --git a/src/nodes/agent/index.ts b/src/nodes/agent/index.ts index cae058d26..a02e2b9d0 100644 --- a/src/nodes/agent/index.ts +++ b/src/nodes/agent/index.ts @@ -3,5 +3,4 @@ export { default as manifestServer } from './handlers/index.js'; export * as callers from './callers/index.js'; export * as handlers from './handlers/index.js'; export * as utils from './utils.js'; -export * as errors from './errors.js'; export * as types from './types.js'; diff --git a/src/nodes/errors.ts b/src/nodes/errors.ts index 1eaee4340..d24de831d 100644 --- a/src/nodes/errors.ts +++ b/src/nodes/errors.ts @@ -48,17 +48,21 @@ class ErrorNodeManagerAuthenticationFailed extends ErrorNodeManager { exitCode = sysexits.NOPERM; } -class ErrorNodeManagerAuthenticationFailedForward extends ErrorNodes { +class ErrorNodeManagerAuthenticationFailedForward< + T, +> extends ErrorNodeManager { static description = 'Failed to complete forward authentication'; exitCode = sysexits.USAGE; } -class ErrorNodeManagerAuthenticationFailedReverse extends ErrorNodes { +class ErrorNodeManagerAuthenticationFailedReverse< + T, +> extends ErrorNodeManager { static description = 'Failed to complete reverse authentication'; exitCode = sysexits.USAGE; } -class ErrorNodeManagerAuthenticatonTimedOut extends ErrorNodes { +class ErrorNodeManagerAuthenticationTimedOut extends ErrorNodeManager { static description = 'Failed to complete authentication before timing out'; exitCode = sysexits.USAGE; } @@ -154,7 +158,7 @@ class ErrorNodeConnectionTransportGenericError< exitCode = sysexits.USAGE; } -class ErrorConnectionNodesEmpty extends ErrorNodeConnection { +class ErrorNodeConnectionEmpty extends ErrorNodeConnection { static description = 'Nodes list to verify against was empty'; exitCode = sysexits.USAGE; } @@ -256,6 +260,34 @@ class ErrorNodeAuthenticationFailed extends ErrorNodes { exitCode = sysexits.NOPERM; } +class ErrorNodeAuthenticationInvalidProtocol extends ErrorNodes { + static description = 'Invalid protocol used for node authentication'; + exitCode = sysexits.USAGE; +} + +class ErrorNodeConnectionSignalRequestVerificationFailed< + T, +> extends ErrorNodeConnection { + static description = 'Failed to verify request message signature'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorNodeConnectionSignalRelayVerificationFailed< + T, +> extends ErrorNodeConnection { + static description = 'Failed to verify relay message signature'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorNodeClaimNetworkVerificationFailed extends ErrorNodes { + static description = 'Failed to verify claim network message'; + exitCode = sysexits.UNAVAILABLE; +} + +class ErrorNodeConnectionInvalidIdentity extends ErrorNodeConnection { + static description = 'Failed to verify connection identity'; +} + export { ErrorNodes, ErrorNodeManager, @@ -269,7 +301,7 @@ export { ErrorNodeManagerAuthenticationFailed, ErrorNodeManagerAuthenticationFailedForward, ErrorNodeManagerAuthenticationFailedReverse, - ErrorNodeManagerAuthenticatonTimedOut, + ErrorNodeManagerAuthenticationTimedOut, ErrorNodeGraph, ErrorNodeGraphRunning, ErrorNodeGraphNotRunning, @@ -288,7 +320,7 @@ export { ErrorNodeConnectionInternalError, ErrorNodeConnectionTransportUnknownError, ErrorNodeConnectionTransportGenericError, - ErrorConnectionNodesEmpty, + ErrorNodeConnectionEmpty, ErrorNodeConnectionManager, ErrorNodeConnectionManagerNotRunning, ErrorNodeConnectionManagerStopping, @@ -304,4 +336,9 @@ export { ErrorNodePermissionDenied, ErrorNodeLookupNotFound, ErrorNodeAuthenticationFailed, + ErrorNodeAuthenticationInvalidProtocol, + ErrorNodeClaimNetworkVerificationFailed, + ErrorNodeConnectionSignalRelayVerificationFailed, + ErrorNodeConnectionSignalRequestVerificationFailed, + ErrorNodeConnectionInvalidIdentity, }; diff --git a/src/nodes/utils.ts b/src/nodes/utils.ts index f7b45909e..b8a09bd4a 100644 --- a/src/nodes/utils.ts +++ b/src/nodes/utils.ts @@ -1,6 +1,7 @@ +import type { ContextTimed } from '@matrixai/contexts'; import type { DBTransaction, KeyPath, LevelPath } from '@matrixai/db'; -import type { X509Certificate } from '@peculiar/x509'; import type { QUICClientCrypto, QUICServerCrypto } from '@matrixai/quic'; +import type { X509Certificate } from '@peculiar/x509'; import type { Key, Certificate, CertificatePEM } from '../keys/types.js'; import type { Hostname, Port } from '../network/types.js'; import type { @@ -565,7 +566,7 @@ async function verifyServerCertificateChain( }; } if (nodeIds.length === 0) { - throw new nodesErrors.ErrorConnectionNodesEmpty(); + throw new nodesErrors.ErrorNodeConnectionEmpty(); } const certChain: Array> = []; for (const certPEM of certPEMChain) { @@ -753,6 +754,7 @@ const quicServerCrypto: QUICServerCrypto = { async function* collectNodeContacts( levelPath: LevelPath, tran: DBTransaction, + ctx: ContextTimed, options: { reverse?: boolean; lt?: LevelPath; @@ -773,6 +775,7 @@ async function* collectNodeContacts( gt: options.gt, valueAsBuffer: false, })) { + ctx.signal.throwIfAborted(); const { nodeId: nodeIdCurrent, nodeContactAddress } = parseBucketsDbKey([ ...(options.pathAdjust ?? []), ...keyPath, diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 3c28acd31..803ef16f3 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -547,6 +547,42 @@ async function importFS(fs?: FileSystem): Promise { return fsImported; } +/** + * Races a promise against a context. If the context is aborted, then the promise + * rejects with the reason. Otherwise, the original value is returned upon + * resolving. + * @param prom The promise to resolve + * @param ctx The ctx with which to race the promise against + * @returns A promise which resolves to the prom value or errors if ctx aborts + */ +async function raceSignal( + prom: Promise, + abortSignal: AbortSignal, +): Promise { + // Create an abort promise which rejects when ctx is aborted + const { p: abortP, rejectP: rejectAbortP } = promise(); + const abortHandler = () => { + rejectAbortP(abortSignal.reason); + }; + if (abortSignal.aborted) { + abortHandler(); + } else { + abortSignal.addEventListener('abort', abortHandler, { once: true }); + } + + // Race the original promise and abortP. If the original promise resolves + // first, then it is returned. If the context aborts first, then the + // promise is rejected. + try { + return await Promise.race([prom, abortP]); + } catch (e) { + Error.captureStackTrace(e); + throw e; + } finally { + abortSignal.removeEventListener('abort', abortHandler); + } +} + export { AsyncFunction, GeneratorFunction, @@ -588,4 +624,5 @@ export { yieldMicro, setMaxListeners, importFS, + raceSignal, }; diff --git a/tests/identities/IdentitiesManager.test.ts b/tests/identities/IdentitiesManager.test.ts index 668e0b169..45a20d12c 100644 --- a/tests/identities/IdentitiesManager.test.ts +++ b/tests/identities/IdentitiesManager.test.ts @@ -86,7 +86,7 @@ describe('IdentitiesManager', () => { await identitiesManager.getTokens('abc' as ProviderId); }).rejects.toThrow(identitiesErrors.ErrorIdentitiesManagerNotRunning); }); - test.only.prop([ + test.prop([ identitiesTestUtils.identitiyIdArb, identitiesTestUtils.providerTokenArb, ])('get, set and unset tokens', async (identityId, providerToken) => { diff --git a/tests/nodes/NodeConnection.test.ts b/tests/nodes/NodeConnection.test.ts index 7925d002b..020b986e2 100644 --- a/tests/nodes/NodeConnection.test.ts +++ b/tests/nodes/NodeConnection.test.ts @@ -211,7 +211,7 @@ describe(`${NodeConnection.name}`, () => { nodesUtils.encodeNodeId(nodeConnection.nodeId), ); }); - test('Should fail due to server rejecting client certificate (no certs)', async () => { + test('should fail due to server rejecting client certificate (no certs)', async () => { const nodeConnection = await NodeConnection.createNodeConnection({ handleStream: () => {}, targetNodeIds: [serverNodeId], @@ -237,7 +237,7 @@ describe(`${NodeConnection.name}`, () => { quicErrors.ErrorQUICConnectionPeerTLS, ); }); - test('Should fail due to client rejecting server certificate (missing NodeId)', async () => { + test('should fail due to client rejecting server certificate (missing NodeId)', async () => { const nodeConnectionProm = NodeConnection.createNodeConnection({ targetNodeIds: [clientNodeId], targetHost: localHost as Host, @@ -249,7 +249,7 @@ describe(`${NodeConnection.name}`, () => { }).then(extractNodeConnection); await expect(nodeConnectionProm).rejects.toThrow(); }); - test('Should fail and destroy due to connection failure', async () => { + test('should fail and destroy due to connection failure', async () => { const nodeConnection = await NodeConnection.createNodeConnection( { targetNodeIds: [serverNodeId], @@ -271,7 +271,7 @@ describe(`${NodeConnection.name}`, () => { await serverSocket.stop({ force: true }); await destroyP; }); - test('Should fail and destroy due to connection ending local', async () => { + test('should fail and destroy due to connection ending local', async () => { const nodeConnection = await NodeConnection.createNodeConnection( { targetNodeIds: [serverNodeId], @@ -297,7 +297,7 @@ describe(`${NodeConnection.name}`, () => { }); await destroyP; }); - test('Should fail and destroy due to connection ending remote', async () => { + test('should fail and destroy due to connection ending remote', async () => { const nodeConnection = await NodeConnection.createNodeConnection( { targetNodeIds: [serverNodeId], diff --git a/tests/nodes/NodeConnectionManager.test.ts b/tests/nodes/NodeConnectionManager.test.ts index b798379c6..23fe89d2f 100644 --- a/tests/nodes/NodeConnectionManager.test.ts +++ b/tests/nodes/NodeConnectionManager.test.ts @@ -107,11 +107,12 @@ describe(`${NodeConnectionManager.name}`, () => { }); // With constructed NCM and 1 peer - describe('With 1 peer', () => { + describe('with 1 peer', () => { let ncmLocal: NCMState; let ncmPeer1: NCMState; beforeEach(async () => { + jest.restoreAllMocks(); ncmLocal = await nodesTestUtils.nodeConnectionManagerFactory({ keyRing: keysTestUtils.createDummyKeyRing(), createOptions: { @@ -928,6 +929,138 @@ describe(`${NodeConnectionManager.name}`, () => { nodesErrors.ErrorNodeManagerAuthenticationFailed, ); }); + test('can reauthenticate if previous reverse auth failed', async () => { + ncmLocal.nodeConnectionManager.setAuthenticateNetworkForwardCallback( + nodesUtils.nodesAuthenticateConnectionForwardDefault, + ); + ncmPeer1.nodeConnectionManager.setAuthenticateNetworkForwardCallback( + nodesUtils.nodesAuthenticateConnectionForwardBasicPublicFactory( + 'someNetwork', + ), + ); + ncmLocal.nodeConnectionManager.setAuthenticateNetworkReverseCallback( + nodesUtils.nodesAuthenticateConnectionReverseBasicPublicFactory( + 'someNetwork', + ), + ); + ncmPeer1.nodeConnectionManager.setAuthenticateNetworkReverseCallback( + nodesUtils.nodesAuthenticateConnectionReverseBasicPublicFactory( + 'someNetwork', + ), + ); + + // Create the first connection + await ncmLocal.nodeConnectionManager.createConnection( + [ncmPeer1.nodeId], + localHost, + ncmPeer1.port, + ); + const authenticationAttemptFailP = + ncmLocal.nodeConnectionManager.withConnF( + ncmPeer1.nodeId, + undefined, + async () => { + // Do nothing + }, + ); + await expect(authenticationAttemptFailP).rejects.toThrow( + nodesErrors.ErrorNodeManagerAuthenticationFailed, + ); + await ncmLocal.nodeConnectionManager.destroyConnection( + ncmPeer1.nodeId, + false, + ); + + // Change the reverse callback to accept the authentication + ncmLocal.nodeConnectionManager.setAuthenticateNetworkForwardCallback( + nodesUtils.nodesAuthenticateConnectionForwardBasicPublicFactory( + 'someNetwork', + ), + ); + + // Another connection should be made as the previous connection has + // already failed authentication. + await ncmLocal.nodeConnectionManager.createConnection( + [ncmPeer1.nodeId], + localHost, + ncmPeer1.port, + ); + const authenticationAttemptPassP = + ncmLocal.nodeConnectionManager.withConnF( + ncmPeer1.nodeId, + undefined, + async () => { + // Do nothing + }, + ); + await expect(authenticationAttemptPassP).toResolve(); + }); + test('can reauthenticate if previous forward auth failed', async () => { + ncmLocal.nodeConnectionManager.setAuthenticateNetworkForwardCallback( + nodesUtils.nodesAuthenticateConnectionForwardBasicPublicFactory( + 'someNetwork', + ), + ); + ncmPeer1.nodeConnectionManager.setAuthenticateNetworkForwardCallback( + nodesUtils.nodesAuthenticateConnectionForwardDefault, + ); + ncmLocal.nodeConnectionManager.setAuthenticateNetworkReverseCallback( + nodesUtils.nodesAuthenticateConnectionReverseBasicPublicFactory( + 'someNetwork', + ), + ); + ncmPeer1.nodeConnectionManager.setAuthenticateNetworkReverseCallback( + nodesUtils.nodesAuthenticateConnectionReverseBasicPublicFactory( + 'someNetwork', + ), + ); + + // Create the first connection + await ncmLocal.nodeConnectionManager.createConnection( + [ncmPeer1.nodeId], + localHost, + ncmPeer1.port, + ); + const authenticationAttemptFailP = + ncmLocal.nodeConnectionManager.withConnF( + ncmPeer1.nodeId, + undefined, + async () => { + // Do nothing + }, + ); + await expect(authenticationAttemptFailP).rejects.toThrow( + nodesErrors.ErrorNodeManagerAuthenticationFailed, + ); + await ncmLocal.nodeConnectionManager.destroyConnection( + ncmPeer1.nodeId, + false, + ); + + // Change the reverse callback to accept the authentication + ncmPeer1.nodeConnectionManager.setAuthenticateNetworkForwardCallback( + nodesUtils.nodesAuthenticateConnectionForwardBasicPublicFactory( + 'someNetwork', + ), + ); + + // Another connection should be made as the previous connection has + // already failed authentication. + await ncmLocal.nodeConnectionManager.createConnection( + [ncmPeer1.nodeId], + localHost, + ncmPeer1.port, + ); + const authenticationAttemptPassP = + ncmLocal.nodeConnectionManager.withConnF( + ncmPeer1.nodeId, + undefined, + async () => { + // Do nothing + }, + ); + await expect(authenticationAttemptPassP).toResolve(); + }); test('non whitelisted RPC calls are prevented', async () => { ncmLocal.nodeConnectionManager.setAuthenticateNetworkForwardCallback( nodesUtils.nodesAuthenticateConnectionForwardBasicPublicFactory( @@ -950,6 +1083,13 @@ describe(`${NodeConnectionManager.name}`, () => { ), ); + // Disabling authentication function + const initiateForwardAuthenticationSpy = jest + .spyOn(ncmLocal.nodeConnectionManager, 'initiateForwardAuthenticate') + .mockImplementation(() => { + // Do nothing + }); + // Creating connection await ncmLocal.nodeConnectionManager.createConnection( [ncmPeer1.nodeId], @@ -966,6 +1106,9 @@ describe(`${NodeConnectionManager.name}`, () => { connection?.connection.rpcClient.unaryCaller('dummyMethod', {}), ).rejects.toThrow(nodesErrors.ErrorNodeConnectionManagerRPCDenied); + // Restore the original authentication functionality + initiateForwardAuthenticationSpy.mockRestore(); + const forwardAuthenticateP = ncmLocal.nodeConnectionManager.withConnF( ncmPeer1.nodeId, undefined, @@ -973,6 +1116,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Do nothing }, ); + await expect(forwardAuthenticateP).toResolve(); const reverseAuthenticateP = ncmPeer1.nodeConnectionManager.withConnF( ncmLocal.nodeId, @@ -981,6 +1125,7 @@ describe(`${NodeConnectionManager.name}`, () => { // Do nothing }, ); + await expect(reverseAuthenticateP).toResolve(); // Checking RPC again @@ -998,7 +1143,7 @@ describe(`${NodeConnectionManager.name}`, () => { ); }); }); - describe('With 2 peers', () => { + describe('with 2 peers', () => { let ncmLocal: NCMState; let ncmPeer1: NCMState; let ncmPeer2: NCMState; diff --git a/tests/nodes/NodeGraph.test.ts b/tests/nodes/NodeGraph.test.ts index 43e24afeb..815ac55fc 100644 --- a/tests/nodes/NodeGraph.test.ts +++ b/tests/nodes/NodeGraph.test.ts @@ -1,3 +1,4 @@ +import type { ContextTimed } from '@matrixai/contexts'; import type { NodeContactAddress, NodeContact, @@ -308,7 +309,12 @@ describe(`${NodeGraph.name} test`, () => { await nodeGraph.setNodeContact(nodeId2, nodeContact2); const results: Array<[NodeId, NodeContact]> = []; - for await (const result of nodeGraph.getNodeContacts()) { + const abortController = new AbortController(); + for await (const result of nodeGraph.getNodeContacts( + undefined, + undefined, + { signal: abortController.signal } as ContextTimed, + )) { results.push(result); } expect(results.length).toBe(2); diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index 95bfde9b9..9f7d28700 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -76,7 +76,7 @@ describe(`${NodeManager.name}`, () => { const timeoutTime = 1000; const dummyAgentService = { nodesAuthenticateConnection: new DummyNodesAuthenticateConnection({}), - } as AgentServerManifest; + } as unknown as AgentServerManifest; let dataDir: string;