Skip to content

Commit ff2d436

Browse files
committed
feat: added nodes connections command to list active NodeConnections
1 parent ab8d759 commit ff2d436

File tree

13 files changed

+622
-16
lines changed

13 files changed

+622
-16
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import type PolykeyClient from '../../PolykeyClient';
2+
import type nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
3+
import CommandPolykey from '../CommandPolykey';
4+
import * as binUtils from '../utils/utils';
5+
import * as binProcessors from '../utils/processors';
6+
7+
class CommandAdd extends CommandPolykey {
8+
constructor(...args: ConstructorParameters<typeof CommandPolykey>) {
9+
super(...args);
10+
this.name('connections');
11+
this.description('list all active node connections');
12+
this.action(async (options) => {
13+
const { default: PolykeyClient } = await import('../../PolykeyClient');
14+
const utilsPB = await import('../../proto/js/polykey/v1/utils/utils_pb');
15+
const clientOptions = await binProcessors.processClientOptions(
16+
options.nodePath,
17+
options.nodeId,
18+
options.clientHost,
19+
options.clientPort,
20+
this.fs,
21+
this.logger.getChild(binProcessors.processClientOptions.name),
22+
);
23+
const meta = await binProcessors.processAuthentication(
24+
options.passwordFile,
25+
this.fs,
26+
);
27+
let pkClient: PolykeyClient;
28+
this.exitHandlers.handlers.push(async () => {
29+
if (pkClient != null) await pkClient.stop();
30+
});
31+
try {
32+
pkClient = await PolykeyClient.createPolykeyClient({
33+
nodePath: options.nodePath,
34+
nodeId: clientOptions.nodeId,
35+
host: clientOptions.clientHost,
36+
port: clientOptions.clientPort,
37+
logger: this.logger.getChild(PolykeyClient.name),
38+
});
39+
// DO things here...
40+
// Like create the message.
41+
const emptyMessage = new utilsPB.EmptyMessage();
42+
43+
const connections = await binUtils.retryAuthentication(async (auth) => {
44+
const connections = pkClient.grpcClient.nodesListConnections(
45+
emptyMessage,
46+
auth,
47+
);
48+
const connectionEntries: Array<nodesPB.NodeConnection.AsObject> = [];
49+
for await (const connection of connections) {
50+
connectionEntries.push(connection.toObject());
51+
}
52+
return connectionEntries;
53+
}, meta);
54+
if (options.format === 'human') {
55+
const output: Array<string> = [];
56+
for (const connection of connections) {
57+
const hostnameString =
58+
connection.hostname === '' ? '' : `(${connection.hostname})`;
59+
const hostString = `${connection.nodeId}@${connection.host}${hostnameString}:${connection.port}`;
60+
const usageCount = connection.usageCount;
61+
const timeout =
62+
connection.timeout === -1 ? 'NA' : `${connection.timeout}`;
63+
const outputLine = `${hostString}\t${usageCount}\t${timeout}`;
64+
output.push(outputLine);
65+
}
66+
process.stdout.write(
67+
binUtils.outputFormatter({
68+
type: 'list',
69+
data: output,
70+
}),
71+
);
72+
} else {
73+
process.stdout.write(
74+
binUtils.outputFormatter({
75+
type: 'json',
76+
data: connections,
77+
}),
78+
);
79+
}
80+
} finally {
81+
if (pkClient! != null) await pkClient.stop();
82+
}
83+
});
84+
}
85+
}
86+
87+
export default CommandAdd;

src/bin/nodes/CommandNodes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import CommandClaim from './CommandClaim';
33
import CommandFind from './CommandFind';
44
import CommandPing from './CommandPing';
55
import CommandGetAll from './CommandGetAll';
6+
import CommandConnections from './CommandConnections';
67
import CommandPolykey from '../CommandPolykey';
78

89
class CommandNodes extends CommandPolykey {
@@ -15,6 +16,7 @@ class CommandNodes extends CommandPolykey {
1516
this.addCommand(new CommandFind(...args));
1617
this.addCommand(new CommandPing(...args));
1718
this.addCommand(new CommandGetAll(...args));
19+
this.addCommand(new CommandConnections(...args));
1820
}
1921
}
2022

src/client/GRPCClientClient.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,25 @@ class GRPCClientClient extends GRPCClient<ClientServiceClient> {
915915
)(...args);
916916
}
917917

918+
@ready(new clientErrors.ErrorClientClientDestroyed())
919+
public nodesListConnections(
920+
...args
921+
): AsyncGeneratorReadableStreamClient<
922+
nodesPB.NodeConnection,
923+
ClientReadableStream<nodesPB.NodeConnection>
924+
> {
925+
return grpcUtils.promisifyReadableStreamCall<nodesPB.NodeConnection>(
926+
this.client,
927+
{
928+
nodeId: this.nodeId,
929+
host: this.host,
930+
port: this.port,
931+
command: this.nodesListConnections.name,
932+
},
933+
this.client.nodesListConnections,
934+
)(...args);
935+
}
936+
918937
@ready(new clientErrors.ErrorClientClientDestroyed())
919938
public identitiesAuthenticate(...args) {
920939
return grpcUtils.promisifyReadableStreamCall<identitiesPB.AuthenticationProcess>(

src/client/service/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import nodesClaim from './nodesClaim';
6060
import nodesFind from './nodesFind';
6161
import nodesPing from './nodesPing';
6262
import nodesGetAll from './nodesGetAll';
63+
import nodesListConnections from './nodesListConnections';
6364
import notificationsClear from './notificationsClear';
6465
import notificationsRead from './notificationsRead';
6566
import notificationsSend from './notificationsSend';
@@ -167,6 +168,7 @@ function createService({
167168
nodesFind: nodesFind(container),
168169
nodesPing: nodesPing(container),
169170
nodesGetAll: nodesGetAll(container),
171+
nodesListConnections: nodesListConnections(container),
170172
notificationsClear: notificationsClear(container),
171173
notificationsRead: notificationsRead(container),
172174
notificationsSend: notificationsSend(container),
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import type * as grpc from '@grpc/grpc-js';
2+
import type { Authenticate } from '../types';
3+
import type * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb';
4+
import type Logger from '@matrixai/logger';
5+
import type NodeConnectionManager from '../../nodes/NodeConnectionManager';
6+
import * as grpcUtils from '../../grpc/utils';
7+
import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
8+
import * as clientUtils from '../utils';
9+
import * as nodesUtils from '../../nodes/utils';
10+
11+
function nodesListConnections({
12+
authenticate,
13+
nodeConnectionManager,
14+
logger,
15+
}: {
16+
authenticate: Authenticate;
17+
nodeConnectionManager: NodeConnectionManager;
18+
logger: Logger;
19+
}) {
20+
return async (
21+
call: grpc.ServerWritableStream<
22+
utilsPB.EmptyMessage,
23+
nodesPB.NodeConnection
24+
>,
25+
): Promise<void> => {
26+
const genWritable = grpcUtils.generatorWritable(call, false);
27+
try {
28+
const metadata = await authenticate(call.metadata);
29+
call.sendMetadata(metadata);
30+
const connections = nodeConnectionManager.listConnections();
31+
for (const connection of connections) {
32+
const connectionMessage = new nodesPB.NodeConnection();
33+
connectionMessage.setNodeId(nodesUtils.encodeNodeId(connection.nodeId));
34+
connectionMessage.setHost(connection.address.host);
35+
connectionMessage.setHostname(connection.address.hostname ?? '');
36+
connectionMessage.setPort(connection.address.port);
37+
connectionMessage.setUsageCount(connection.usageCount);
38+
connectionMessage.setTimeout(connection.timeout ?? -1);
39+
await genWritable.next(connectionMessage);
40+
}
41+
await genWritable.next(null);
42+
return;
43+
} catch (e) {
44+
await genWritable.throw(e);
45+
!clientUtils.isClientClientError(e) &&
46+
logger.error(`${nodesListConnections.name}:${e}`);
47+
return;
48+
}
49+
};
50+
}
51+
52+
export default nodesListConnections;

src/nodes/NodeConnectionManager.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,10 +1028,44 @@ class NodeConnectionManager {
10281028
return establishedMap;
10291029
}
10301030

1031+
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
10311032
public hasConnection(nodeId: NodeId): boolean {
10321033
return this.connections.has(nodeId.toString() as NodeIdString);
10331034
}
10341035

1036+
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
1037+
public listConnections(): Array<{
1038+
nodeId: NodeId;
1039+
address: { host: Host; port: Port; hostname: Hostname | undefined };
1040+
usageCount: number;
1041+
timeout: number | undefined;
1042+
}> {
1043+
const results: Array<{
1044+
nodeId: NodeId;
1045+
address: { host: Host; port: Port; hostname: Hostname | undefined };
1046+
usageCount: number;
1047+
timeout: number | undefined;
1048+
}> = [];
1049+
for (const [
1050+
nodeIdString,
1051+
connectionAndTimer,
1052+
] of this.connections.entries()) {
1053+
const connection = connectionAndTimer.connection;
1054+
const nodeId = IdInternal.fromString<NodeId>(nodeIdString);
1055+
results.push({
1056+
nodeId,
1057+
address: {
1058+
host: connection.host,
1059+
port: connection.port,
1060+
hostname: connection.hostname,
1061+
},
1062+
usageCount: connectionAndTimer.usageCount,
1063+
timeout: connectionAndTimer.timer?.getTimeout(),
1064+
});
1065+
}
1066+
return results;
1067+
}
1068+
10351069
protected hasBackoff(nodeId: NodeId): boolean {
10361070
const backoff = this.nodesBackoffMap.get(nodeId.toString());
10371071
if (backoff == null) return false;

src/proto/js/polykey/v1/client_service_grpc_pb.d.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ interface IClientServiceService extends grpc.ServiceDefinition<grpc.UntypedServi
2828
nodesClaim: IClientServiceService_INodesClaim;
2929
nodesFind: IClientServiceService_INodesFind;
3030
nodesGetAll: IClientServiceService_INodesGetAll;
31+
nodesListConnections: IClientServiceService_INodesListConnections;
3132
keysKeyPairRoot: IClientServiceService_IKeysKeyPairRoot;
3233
keysKeyPairReset: IClientServiceService_IKeysKeyPairReset;
3334
keysKeyPairRenew: IClientServiceService_IKeysKeyPairRenew;
@@ -167,6 +168,15 @@ interface IClientServiceService_INodesGetAll extends grpc.MethodDefinition<polyk
167168
responseSerialize: grpc.serialize<polykey_v1_nodes_nodes_pb.NodeBuckets>;
168169
responseDeserialize: grpc.deserialize<polykey_v1_nodes_nodes_pb.NodeBuckets>;
169170
}
171+
interface IClientServiceService_INodesListConnections extends grpc.MethodDefinition<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_nodes_nodes_pb.NodeConnection> {
172+
path: "/polykey.v1.ClientService/NodesListConnections";
173+
requestStream: false;
174+
responseStream: true;
175+
requestSerialize: grpc.serialize<polykey_v1_utils_utils_pb.EmptyMessage>;
176+
requestDeserialize: grpc.deserialize<polykey_v1_utils_utils_pb.EmptyMessage>;
177+
responseSerialize: grpc.serialize<polykey_v1_nodes_nodes_pb.NodeConnection>;
178+
responseDeserialize: grpc.deserialize<polykey_v1_nodes_nodes_pb.NodeConnection>;
179+
}
170180
interface IClientServiceService_IKeysKeyPairRoot extends grpc.MethodDefinition<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_keys_keys_pb.KeyPair> {
171181
path: "/polykey.v1.ClientService/KeysKeyPairRoot";
172182
requestStream: false;
@@ -684,6 +694,7 @@ export interface IClientServiceServer extends grpc.UntypedServiceImplementation
684694
nodesClaim: grpc.handleUnaryCall<polykey_v1_nodes_nodes_pb.Claim, polykey_v1_utils_utils_pb.StatusMessage>;
685695
nodesFind: grpc.handleUnaryCall<polykey_v1_nodes_nodes_pb.Node, polykey_v1_nodes_nodes_pb.NodeAddress>;
686696
nodesGetAll: grpc.handleUnaryCall<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_nodes_nodes_pb.NodeBuckets>;
697+
nodesListConnections: grpc.handleServerStreamingCall<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_nodes_nodes_pb.NodeConnection>;
687698
keysKeyPairRoot: grpc.handleUnaryCall<polykey_v1_utils_utils_pb.EmptyMessage, polykey_v1_keys_keys_pb.KeyPair>;
688699
keysKeyPairReset: grpc.handleUnaryCall<polykey_v1_keys_keys_pb.Key, polykey_v1_utils_utils_pb.EmptyMessage>;
689700
keysKeyPairRenew: grpc.handleUnaryCall<polykey_v1_keys_keys_pb.Key, polykey_v1_utils_utils_pb.EmptyMessage>;
@@ -770,6 +781,8 @@ export interface IClientServiceClient {
770781
nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall;
771782
nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall;
772783
nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall;
784+
nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<polykey_v1_nodes_nodes_pb.NodeConnection>;
785+
nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata?: grpc.Metadata, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<polykey_v1_nodes_nodes_pb.NodeConnection>;
773786
keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall;
774787
keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall;
775788
keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall;
@@ -958,6 +971,8 @@ export class ClientServiceClient extends grpc.Client implements IClientServiceCl
958971
public nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall;
959972
public nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall;
960973
public nodesGetAll(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_nodes_nodes_pb.NodeBuckets) => void): grpc.ClientUnaryCall;
974+
public nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<polykey_v1_nodes_nodes_pb.NodeConnection>;
975+
public nodesListConnections(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata?: grpc.Metadata, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<polykey_v1_nodes_nodes_pb.NodeConnection>;
961976
public keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall;
962977
public keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall;
963978
public keysKeyPairRoot(request: polykey_v1_utils_utils_pb.EmptyMessage, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: polykey_v1_keys_keys_pb.KeyPair) => void): grpc.ClientUnaryCall;

src/proto/js/polykey/v1/client_service_grpc_pb.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,17 @@ function deserialize_polykey_v1_nodes_NodeBuckets(buffer_arg) {
234234
return polykey_v1_nodes_nodes_pb.NodeBuckets.deserializeBinary(new Uint8Array(buffer_arg));
235235
}
236236

237+
function serialize_polykey_v1_nodes_NodeConnection(arg) {
238+
if (!(arg instanceof polykey_v1_nodes_nodes_pb.NodeConnection)) {
239+
throw new Error('Expected argument of type polykey.v1.nodes.NodeConnection');
240+
}
241+
return Buffer.from(arg.serializeBinary());
242+
}
243+
244+
function deserialize_polykey_v1_nodes_NodeConnection(buffer_arg) {
245+
return polykey_v1_nodes_nodes_pb.NodeConnection.deserializeBinary(new Uint8Array(buffer_arg));
246+
}
247+
237248
function serialize_polykey_v1_notifications_List(arg) {
238249
if (!(arg instanceof polykey_v1_notifications_notifications_pb.List)) {
239250
throw new Error('Expected argument of type polykey.v1.notifications.List');
@@ -590,6 +601,17 @@ nodesAdd: {
590601
responseSerialize: serialize_polykey_v1_nodes_NodeBuckets,
591602
responseDeserialize: deserialize_polykey_v1_nodes_NodeBuckets,
592603
},
604+
nodesListConnections: {
605+
path: '/polykey.v1.ClientService/NodesListConnections',
606+
requestStream: false,
607+
responseStream: true,
608+
requestType: polykey_v1_utils_utils_pb.EmptyMessage,
609+
responseType: polykey_v1_nodes_nodes_pb.NodeConnection,
610+
requestSerialize: serialize_polykey_v1_utils_EmptyMessage,
611+
requestDeserialize: deserialize_polykey_v1_utils_EmptyMessage,
612+
responseSerialize: serialize_polykey_v1_nodes_NodeConnection,
613+
responseDeserialize: deserialize_polykey_v1_nodes_NodeConnection,
614+
},
593615
// Keys
594616
keysKeyPairRoot: {
595617
path: '/polykey.v1.ClientService/KeysKeyPairRoot',

0 commit comments

Comments
 (0)