Skip to content

Commit b4f49a9

Browse files
committed
feat: vaults agent handlers are using raw streams now
1 parent a4443c7 commit b4f49a9

File tree

7 files changed

+104
-130
lines changed

7 files changed

+104
-130
lines changed

src/agent/handlers/clientManifest.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types';
22
import type {
33
AgentClaimMessage,
44
ClaimIdMessage,
5-
GitPackMessage,
65
HolePunchRelayMessage,
76
NodeAddressMessage,
87
NodeIdMessage,
98
SignedNotificationEncoded,
10-
VaultsGitPackGetMessage,
119
VaultsScanMessage,
1210
} from './types';
1311
import {
@@ -44,10 +42,7 @@ const notificationsSend = new UnaryCaller<
4442

4543
const vaultsGitInfoGet = new RawCaller();
4644

47-
const vaultsGitPackGet = new ServerCaller<
48-
AgentRPCRequestParams<VaultsGitPackGetMessage>,
49-
AgentRPCResponseResult<GitPackMessage>
50-
>();
45+
const vaultsGitPackGet = new RawCaller();
5146

5247
const vaultsScan = new ServerCaller<
5348
AgentRPCRequestParams,

src/agent/handlers/types.ts

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,3 @@ export type VaultInfo = {
4040
export type VaultsScanMessage = VaultInfo & {
4141
vaultPermissions: Array<VaultAction>;
4242
};
43-
44-
export type VaultsGitInfoGetMessage = {
45-
vaultNameOrId: VaultIdEncoded | VaultName;
46-
action: VaultAction;
47-
};
48-
49-
export type GitPackMessage = {
50-
/**
51-
* Chunk of data in binary form;
52-
*/
53-
chunk: string;
54-
};
55-
56-
export type VaultsGitPackGetMessage = {
57-
body: string;
58-
nameOrId: VaultIdEncoded | VaultName;
59-
vaultAction: VaultAction;
60-
};

src/agent/handlers/vaultsGitInfoGet.ts

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,7 @@ class VaultsGitInfoGetHandler extends RawHandler<{
3030
): Promise<[JSONValue, ReadableStream<Uint8Array>]> {
3131
const { db, vaultManager, acl } = this.container;
3232
const [headerMessage, inputStream] = input;
33-
const readableProm = (async () => {
34-
for await (const _ of inputStream) {
35-
// Input stream is not used here, wait for finish.
36-
// It should be closed by the caller immediately
37-
}
38-
})();
33+
await inputStream.cancel();
3934
const params = headerMessage.params;
4035
if (params == null || !utils.isObject(params)) never();
4136
if (
@@ -82,29 +77,30 @@ class VaultsGitInfoGetHandler extends RawHandler<{
8277
)}`,
8378
);
8479
}
85-
8680
return {
8781
vaultId,
8882
vaultName,
8983
};
9084
});
9185

92-
// TODO: Needs to handle cancellation
86+
let handleInfoRequestGen: AsyncGenerator<Buffer>;
9387
const stream = new ReadableStream({
94-
start: async (controller) => {
95-
for await (const buffer of vaultManager.handleInfoRequest(
96-
data.vaultId,
97-
)) {
98-
if (buffer != null) {
99-
controller.enqueue(buffer);
100-
} else {
101-
break;
102-
}
88+
start: async () => {
89+
handleInfoRequestGen = vaultManager.handleInfoRequest(data.vaultId);
90+
},
91+
pull: async (controller) => {
92+
const result = await handleInfoRequestGen.next();
93+
if (result.done) {
94+
controller.close();
95+
return;
96+
} else {
97+
controller.enqueue(result.value);
10398
}
104-
controller.close();
99+
},
100+
cancel: async (reason) => {
101+
await handleInfoRequestGen.throw(reason).catch(() => {});
105102
},
106103
});
107-
await readableProm;
108104
return [
109105
{
110106
vaultName: data.vaultName,

src/agent/handlers/vaultsGitPackGet.ts

Lines changed: 77 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,106 +1,108 @@
11
import type { DB } from '@matrixai/db';
2-
import type { GitPackMessage, VaultsGitPackGetMessage } from './types';
3-
import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types';
4-
import type { VaultAction, VaultName } from '../../vaults/types';
2+
import type { VaultName } from '../../vaults/types';
53
import type VaultManager from '../../vaults/VaultManager';
64
import type ACL from '../../acl/ACL';
5+
import type { JSONValue } from '../../types';
6+
import type { PassThrough } from 'readable-stream';
7+
import type { JSONRPCRequest } from '../../rpc/types';
8+
import { ReadableStream } from 'stream/web';
9+
import * as utils from '../../utils';
710
import * as agentErrors from '../errors';
811
import * as agentUtils from '../utils';
912
import * as nodesUtils from '../../nodes/utils';
1013
import * as vaultsUtils from '../../vaults/utils';
1114
import * as vaultsErrors from '../../vaults/errors';
12-
import { validateSync } from '../../validation';
13-
import { matchSync } from '../../utils';
15+
import { never } from '../../utils';
1416
import * as validationUtils from '../../validation/utils';
15-
import { ServerHandler } from '../../rpc/handlers';
17+
import { RawHandler } from '../../rpc/handlers';
1618

17-
// TODO: This needs to be a raw handler
18-
class VaultsGitPackGetHandler extends ServerHandler<
19-
{
20-
vaultManager: VaultManager;
21-
acl: ACL;
22-
db: DB;
23-
},
24-
AgentRPCRequestParams<VaultsGitPackGetMessage>,
25-
AgentRPCResponseResult<GitPackMessage>
26-
> {
27-
public async *handle(
28-
input: AgentRPCRequestParams<VaultsGitPackGetMessage>,
19+
class VaultsGitPackGetHandler extends RawHandler<{
20+
vaultManager: VaultManager;
21+
acl: ACL;
22+
db: DB;
23+
}> {
24+
public async handle(
25+
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
2926
_cancel,
3027
meta,
31-
): AsyncGenerator<AgentRPCResponseResult<GitPackMessage>> {
28+
): Promise<[JSONValue, ReadableStream<Uint8Array>]> {
3229
const { vaultManager, acl, db } = this.container;
30+
const [headerMessage, inputStream] = input;
3331
const requestingNodeId = agentUtils.nodeIdFromMeta(meta);
3432
if (requestingNodeId == null) {
3533
throw new agentErrors.ErrorAgentNodeIdMissing();
3634
}
3735
const nodeIdEncoded = nodesUtils.encodeNodeId(requestingNodeId);
38-
const nameOrId = input.nameOrId;
39-
yield* db.withTransactionG(async function* (
40-
tran,
41-
): AsyncGenerator<AgentRPCResponseResult<GitPackMessage>> {
42-
const vaultIdFromName = await vaultManager.getVaultId(
43-
nameOrId as VaultName,
44-
tran,
45-
);
46-
const vaultId = vaultIdFromName ?? vaultsUtils.decodeVaultId(nameOrId);
47-
if (vaultId == null) {
48-
throw new vaultsErrors.ErrorVaultsVaultUndefined();
49-
}
50-
const {
51-
actionType,
52-
}: {
53-
actionType: VaultAction;
54-
} = validateSync(
55-
(keyPath, value) => {
56-
return matchSync(keyPath)(
57-
[['actionType'], () => validationUtils.parseVaultAction(value)],
58-
() => value,
59-
);
60-
},
61-
{
62-
actionType: input.vaultAction,
63-
},
64-
);
65-
// Checking permissions
66-
const permissions = await acl.getNodePerm(requestingNodeId, tran);
67-
const vaultPerms = permissions?.vaults[vaultId];
68-
if (vaultPerms?.[actionType] !== null) {
69-
throw new vaultsErrors.ErrorVaultsPermissionDenied(
70-
`${nodeIdEncoded} does not have permission to ${actionType} from vault ${vaultsUtils.encodeVaultId(
71-
vaultId,
72-
)}`,
36+
const params = headerMessage.params;
37+
if (params == null || !utils.isObject(params)) never();
38+
if (!('nameOrId' in params) || typeof params.nameOrId != 'string') {
39+
never();
40+
}
41+
if (!('vaultAction' in params) || typeof params.vaultAction != 'string') {
42+
never();
43+
}
44+
const nameOrId = params.nameOrId;
45+
const actionType = validationUtils.parseVaultAction(params.vaultAction);
46+
const [vaultIdFromName, permissions] = await db.withTransactionF(
47+
async (tran) => {
48+
const vaultIdFromName = await vaultManager.getVaultId(
49+
nameOrId as VaultName,
50+
tran,
7351
);
74-
}
75-
const [sideBand, progressStream] = await vaultManager.handlePackRequest(
76-
vaultId,
77-
Buffer.from(input.body, 'utf-8'),
78-
tran,
52+
const permissions = await acl.getNodePerm(requestingNodeId, tran);
53+
54+
return [vaultIdFromName, permissions];
55+
},
56+
);
57+
const vaultId = vaultIdFromName ?? vaultsUtils.decodeVaultId(nameOrId);
58+
if (vaultId == null) {
59+
throw new vaultsErrors.ErrorVaultsVaultUndefined();
60+
}
61+
// Checking permissions
62+
const vaultPerms = permissions?.vaults[vaultId];
63+
if (vaultPerms?.[actionType] !== null) {
64+
throw new vaultsErrors.ErrorVaultsPermissionDenied(
65+
`${nodeIdEncoded} does not have permission to ${actionType} from vault ${vaultsUtils.encodeVaultId(
66+
vaultId,
67+
)}`,
7968
);
80-
yield {
81-
chunk: Buffer.from('0008NAK\n').toString('binary'),
82-
};
83-
const responseBuffers: Uint8Array[] = [];
84-
// FIXME: this WHOLE thing needs to change, why are we streaming when we send monolithic messages?
85-
const result = await new Promise<string>((resolve, reject) => {
69+
}
70+
71+
// Getting data
72+
let sideBand: PassThrough;
73+
let progressStream: PassThrough;
74+
const outputStream = new ReadableStream({
75+
start: async (controller) => {
76+
const body = new Array<Uint8Array>();
77+
for await (const message of inputStream) {
78+
body.push(message);
79+
}
80+
[sideBand, progressStream] = await vaultManager.handlePackRequest(
81+
vaultId,
82+
Buffer.concat(body),
83+
);
84+
controller.enqueue(Buffer.from('0008NAK\n'));
8685
sideBand.on('data', async (data: Uint8Array) => {
87-
responseBuffers.push(data);
86+
controller.enqueue(data);
87+
sideBand.pause();
8888
});
8989
sideBand.on('end', async () => {
90-
const result = Buffer.concat(responseBuffers).toString('binary');
91-
resolve(result);
90+
controller.close();
9291
});
93-
sideBand.on('error', (err) => {
94-
reject(err);
92+
sideBand.on('error', (e) => {
93+
controller.error(e);
9594
});
9695
progressStream.write(Buffer.from('0014progress is at 50%\n'));
9796
progressStream.end();
98-
});
99-
yield {
100-
chunk: result,
101-
};
97+
},
98+
pull: () => {
99+
sideBand.resume();
100+
},
101+
cancel: (e) => {
102+
sideBand.destroy(e);
103+
},
102104
});
103-
return;
105+
return [null, outputStream];
104106
}
105107
}
106108

src/rpc/RPCServer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ class RPCServer extends EventTarget {
586586
id: null,
587587
};
588588
await headerWriter.write(Buffer.from(JSON.stringify(rpcErrorMessage)));
589+
await headerWriter.close();
589590
// Clean up and return
590591
timer.cancel(cleanupReason);
591592
abortController.signal.removeEventListener('abort', handleAbort);

src/vaults/VaultInternal.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -767,13 +767,11 @@ class VaultInternal {
767767
typeof vaultNameOrId === 'string'
768768
? vaultNameOrId
769769
: vaultsUtils.encodeVaultId(vaultNameOrId);
770-
const response = await client.methods.vaultsGitInfoGet({
770+
const vaultsGitInfoGetStream = await client.methods.vaultsGitInfoGet({
771771
vaultNameOrId: vaultNameOrId_,
772772
action: vaultAction,
773773
});
774-
await response.writable.close();
775-
776-
const result = response.meta?.result;
774+
const result = vaultsGitInfoGetStream.meta?.result;
777775
if (result == null || !utils.isObject(result)) never();
778776
if (!('vaultName' in result) || typeof result.vaultName != 'string') {
779777
never();
@@ -789,7 +787,7 @@ class VaultInternal {
789787

790788
// Collect the response buffers from the GET request
791789
const infoResponse: Uint8Array[] = [];
792-
for await (const chunk of response.readable) {
790+
for await (const chunk of vaultsGitInfoGetStream.readable) {
793791
infoResponse.push(chunk);
794792
}
795793
return [
@@ -816,13 +814,15 @@ class VaultInternal {
816814
};
817815
} else if (method === 'POST') {
818816
const responseBuffers: Array<Uint8Array> = [];
819-
const stream = await client.methods.vaultsGitPackGet({
820-
body: body[0].toString('binary'),
817+
const vaultsGitPackGetStream = await client.methods.vaultsGitPackGet({
821818
nameOrId: result.vaultIdEncoded as string,
822819
vaultAction,
823820
});
824-
for await (const value of stream) {
825-
responseBuffers.push(Buffer.from(value.chunk, 'binary'));
821+
const writer = vaultsGitPackGetStream.writable.getWriter();
822+
await writer.write(body[0]);
823+
await writer.close();
824+
for await (const value of vaultsGitPackGetStream.readable) {
825+
responseBuffers.push(value);
826826
}
827827
return {
828828
url: url,

tests/vaults/VaultManager.test.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,7 @@ describe('VaultManager', () => {
469469
await vaultManager?.destroy();
470470
}
471471
});
472-
// TODO: disabled until feature is addressed in agent migration stage 2
473-
describe.skip('with remote agents', () => {
472+
describe('with remote agents', () => {
474473
let allDataDir: string;
475474
let keyRing: KeyRing;
476475
let nodeGraph: NodeGraph;
@@ -653,7 +652,6 @@ describe('VaultManager', () => {
653652
localNodeId,
654653
'pull',
655654
);
656-
657655
await vaultManager.cloneVault(remoteKeynode1Id, vaultName);
658656
const vaultId = await vaultManager.getVaultId(vaultName);
659657
if (vaultId === undefined) fail('VaultId is not found.');

0 commit comments

Comments
 (0)