Skip to content

Commit a4f1a7e

Browse files
tegefaulkesaryanjassal
authored andcommitted
wip: applying cancellability to handlers
1 parent f771659 commit a4f1a7e

File tree

5 files changed

+33
-15
lines changed

5 files changed

+33
-15
lines changed

src/git/http.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { Buffer } from 'buffer';
1010
import git from 'isomorphic-git';
1111
import * as gitUtils from './utils';
1212
import * as utils from '../utils';
13+
import {ContextCancellable} from "@matrixai/contexts";
1314

1415
/**
1516
* Reference discovery
@@ -118,7 +119,7 @@ async function* advertiseRefGenerator({
118119
efs: EncryptedFS;
119120
dir: string;
120121
gitDir: string;
121-
}): AsyncGenerator<Buffer, void, void> {
122+
}, ctx: ContextCancellable): AsyncGenerator<Buffer, void, void> {
122123
// Providing side-band-64, symref for the HEAD and agent name capabilities
123124
const capabilityList = [
124125
gitUtils.SIDE_BAND_64_CAPABILITY,
@@ -134,14 +135,14 @@ async function* advertiseRefGenerator({
134135
efs,
135136
dir,
136137
gitDir,
137-
});
138+
}, ctx );
138139

139140
// PKT-LINE("# service=$servicename" LF)
140141
yield packetLineBuffer(gitUtils.REFERENCE_DISCOVERY_HEADER);
141142
// "0000"
142143
yield gitUtils.FLUSH_PACKET_BUFFER;
143144
// Ref_list
144-
yield* referenceListGenerator(objectGenerator, capabilityList);
145+
yield* referenceListGenerator(objectGenerator, capabilityList, ctx);
145146
// "0000"
146147
yield gitUtils.FLUSH_PACKET_BUFFER;
147148
}
@@ -165,6 +166,7 @@ async function* advertiseRefGenerator({
165166
async function* referenceListGenerator(
166167
objectGenerator: AsyncGenerator<[Reference, ObjectId], void, void>,
167168
capabilities: CapabilityList,
169+
ctx: ContextCancellable,
168170
): AsyncGenerator<Buffer, void, void> {
169171
// Cap-list = capability *(SP capability)
170172
const capabilitiesListBuffer = Buffer.from(
@@ -175,6 +177,7 @@ async function* referenceListGenerator(
175177
// *ref_record
176178
let first = true;
177179
for await (const [name, objectId] of objectGenerator) {
180+
ctx.signal.throwIfAborted();
178181
if (first) {
179182
// PKT-LINE(obj-id SP name NUL cap_list LF)
180183
yield packetLineBuffer(
@@ -351,15 +354,15 @@ async function* generatePackRequest({
351354
dir: string;
352355
gitDir: string;
353356
body: Array<Buffer>;
354-
}): AsyncGenerator<Buffer, void, void> {
357+
}, ctx: ContextCancellable): AsyncGenerator<Buffer, void, void> {
355358
const [wants, haves, _capabilities] = await parsePackRequest(body);
356359
const objectIds = await gitUtils.listObjects({
357360
efs: efs,
358361
dir,
359362
gitDir: gitDir,
360363
wants,
361364
haves,
362-
});
365+
}, ctx);
363366
// Reply that we have no common history and that we need to send everything
364367
yield packetLineBuffer(gitUtils.NAK_BUFFER);
365368
// Send everything over in pack format
@@ -368,7 +371,7 @@ async function* generatePackRequest({
368371
dir,
369372
gitDir,
370373
objectIds,
371-
});
374+
}, ctx);
372375
// Send dummy progress data
373376
yield packetLineBuffer(
374377
gitUtils.DUMMY_PROGRESS_BUFFER,
@@ -396,7 +399,7 @@ async function* generatePackData({
396399
gitDir: string;
397400
objectIds: Array<ObjectId>;
398401
chunkSize?: number;
399-
}): AsyncGenerator<Buffer, void, void> {
402+
}, ctx: ContextCancellable): AsyncGenerator<Buffer, void, void> {
400403
let packFile: PackObjectsResult;
401404
// In case of errors we don't want to throw them. This will result in the error being thrown into `isometric-git`
402405
// when it consumes the response. It handles this by logging out the error which we don't want to happen.
@@ -423,6 +426,7 @@ async function* generatePackData({
423426
// Streaming the packFile as chunks of the length specified by the `chunkSize`.
424427
// Each line is formatted as a `PKT-LINE`
425428
do {
429+
ctx.signal.throwIfAborted();
426430
const subBuffer = packFileBuffer.subarray(0, chunkSize);
427431
packFileBuffer = packFileBuffer.subarray(chunkSize);
428432
yield packetLineBuffer(subBuffer, gitUtils.CHANNEL_DATA);

src/git/utils.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import git from 'isomorphic-git';
1313
import { requestTypes } from './types';
1414
import * as utils from '../utils';
1515
import * as validationErrors from '../validation/errors';
16+
import {ContextCancellable} from "@matrixai/contexts";
1617

1718
// Constants
1819
// Total number of bytes per pack line minus the 4 size bytes and 1 channel byte
@@ -75,7 +76,7 @@ async function* listReferencesGenerator({
7576
efs: EncryptedFS;
7677
dir: string;
7778
gitDir: string;
78-
}): AsyncGenerator<[Reference, ObjectId], void, void> {
79+
}, ctx: ContextCancellable): AsyncGenerator<[Reference, ObjectId], void, void> {
7980
const refs: Array<[string, Promise<string>]> = await git
8081
.listBranches({
8182
fs: efs,
@@ -84,6 +85,7 @@ async function* listReferencesGenerator({
8485
})
8586
.then((refs) => {
8687
return refs.map((ref) => {
88+
ctx.signal.throwIfAborted();
8789
return [
8890
`${REFERENCES_STRING}${ref}`,
8991
git.resolveRef({ fs: efs, dir, gitdir: gitDir, ref: ref }),
@@ -99,6 +101,7 @@ async function* listReferencesGenerator({
99101
});
100102
yield [HEAD_REFERENCE, resolvedHead];
101103
for (const [key, refP] of refs) {
104+
ctx.signal.throwIfAborted();
102105
yield [key, await refP];
103106
}
104107
}
@@ -155,14 +158,15 @@ async function listObjects({
155158
gitDir: string;
156159
wants: ObjectIdList;
157160
haves: ObjectIdList;
158-
}): Promise<ObjectIdList> {
161+
}, ctx: ContextCancellable): Promise<ObjectIdList> {
159162
const commits = new Set<string>();
160163
const trees = new Set<string>();
161164
const blobs = new Set<string>();
162165
const tags = new Set<string>();
163166
const havesSet: Set<string> = new Set(haves);
164167

165168
async function walk(objectId: ObjectId, type: ObjectType): Promise<void> {
169+
ctx.signal.throwIfAborted();
166170
// If object was listed as a have then we don't need to walk over it
167171
if (havesSet.has(objectId)) return;
168172
switch (type) {
@@ -243,7 +247,7 @@ async function listObjectsAll({
243247
}: {
244248
fs: EncryptedFS;
245249
gitDir: string;
246-
}) {
250+
}): Promise<Array<string>> {
247251
const objectsDirPath = path.join(gitDir, objectsDirName);
248252
const objectSet: Set<string> = new Set();
249253
const objectDirs = await fs.promises.readdir(objectsDirPath);

src/nodes/agent/handlers/VaultsGitPackGet.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { DB } from '@matrixai/db';
22
import type { JSONObject, JSONRPCRequest } from '@matrixai/rpc';
3+
import type {ContextTimed} from '@matrixai/contexts';
34
import type { VaultName } from '../../../vaults/types';
45
import type ACL from '../../../acl/ACL';
56
import type VaultManager from '../../../vaults/VaultManager';
@@ -24,6 +25,7 @@ class VaultsGitPackGet extends RawHandler<{
2425
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
2526
_cancel,
2627
meta,
28+
ctx: ContextTimed,
2729
): Promise<[JSONObject, ReadableStream<Uint8Array>]> => {
2830
const { vaultManager, acl, db } = this.container;
2931
const [headerMessage, inputStream] = input;

src/nodes/agent/handlers/VaultsScan.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { DB } from '@matrixai/db';
2+
import type {ContextTimed} from '@matrixai/contexts';
23
import type {
34
AgentRPCRequestParams,
45
AgentRPCResponseResult,
@@ -25,6 +26,7 @@ class VaultsScan extends ServerHandler<
2526
input: AgentRPCRequestParams,
2627
_cancel,
2728
meta,
29+
ctx: ContextTimed,
2830
): AsyncGenerator<AgentRPCResponseResult<VaultsScanMessage>> {
2931
const { vaultManager, db } = this.container;
3032
const requestingNodeId = agentUtils.nodeIdFromMeta(meta);
@@ -36,13 +38,15 @@ class VaultsScan extends ServerHandler<
3638
> {
3739
const listResponse = vaultManager.handleScanVaults(
3840
requestingNodeId,
41+
ctx,
3942
tran,
4043
);
4144
for await (const {
4245
vaultId,
4346
vaultName,
4447
vaultPermissions,
4548
} of listResponse) {
49+
ctx.signal.throwIfAborted();
4650
yield {
4751
vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId),
4852
vaultName,

src/vaults/VaultManager.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import * as nodesUtils from '../nodes/utils';
4040
import * as keysUtils from '../keys/utils';
4141
import config from '../config';
4242
import { mkdirExists } from '../utils/utils';
43+
import {ContextCancellable} from "@matrixai/contexts";
4344

4445
/**
4546
* Object map pattern for each vault
@@ -838,12 +839,13 @@ class VaultManager {
838839
public async *handlePackRequest(
839840
vaultId: VaultId,
840841
body: Array<Buffer>,
842+
ctx: ContextCancellable,
841843
tran?: DBTransaction,
842844
): AsyncGenerator<Buffer, void, void> {
843845
if (tran == null) {
844846
// Lambda to maintain `this` context
845847
const handlePackRequest = (tran: DBTransaction) =>
846-
this.handlePackRequest(vaultId, body, tran);
848+
this.handlePackRequest(vaultId, body, ctx, tran);
847849
return yield* this.db.withTransactionG(async function* (tran) {
848850
return yield* handlePackRequest(tran);
849851
});
@@ -853,16 +855,16 @@ class VaultManager {
853855
const efs = this.efs;
854856
yield* withG(
855857
[
856-
this.vaultLocks.lock([vaultId.toString(), RWLockWriter, 'read']),
857-
vault.getLock().read(),
858+
this.vaultLocks.lock([vaultId.toString(), RWLockWriter, 'read'], ctx),
859+
vault.getLock().read(ctx),
858860
],
859861
async function* (): AsyncGenerator<Buffer, void, void> {
860862
yield* gitHttp.generatePackRequest({
861863
efs,
862864
dir: path.join(vaultsUtils.encodeVaultId(vaultId), 'contents'),
863865
gitDir: path.join(vaultsUtils.encodeVaultId(vaultId), '.git'),
864866
body: body,
865-
});
867+
}, ctx);
866868
},
867869
);
868870
}
@@ -900,6 +902,7 @@ class VaultManager {
900902
*/
901903
public async *handleScanVaults(
902904
nodeId: NodeId,
905+
ctx: ContextCancellable,
903906
tran?: DBTransaction,
904907
): AsyncGenerator<{
905908
vaultId: VaultId;
@@ -909,7 +912,7 @@ class VaultManager {
909912
if (tran == null) {
910913
// Lambda to maintain `this` context
911914
const handleScanVaults = (tran: DBTransaction) =>
912-
this.handleScanVaults(nodeId, tran);
915+
this.handleScanVaults(nodeId, ctx, tran);
913916
return yield* this.db.withTransactionG(async function* (tran) {
914917
return yield* handleScanVaults(tran);
915918
});
@@ -932,6 +935,7 @@ class VaultManager {
932935
// Getting the list of vaults
933936
const vaults = permissions.vaults;
934937
for (const vaultIdString of Object.keys(vaults)) {
938+
ctx.signal.throwIfAborted();
935939
// Getting vault permissions
936940
const vaultId = IdInternal.fromString<VaultId>(vaultIdString);
937941
const vaultPermissions = Object.keys(

0 commit comments

Comments
 (0)