Skip to content

Commit 5266913

Browse files
committed
fix: fixing cancellability for nodes domain
[ci skip]
1 parent 98b3fc9 commit 5266913

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

src/nodes/NodeGraph.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { DB, DBTransaction, LevelPath } from '@matrixai/db';
2+
import type { ContextTimed } from '@matrixai/contexts';
23
import type {
34
NodeId,
45
NodeAddress,
@@ -221,15 +222,20 @@ class NodeGraph {
221222

222223
/**
223224
* Locks the bucket index for exclusive operations.
224-
* This allows you sequence operations for any bucket.
225+
* This allows you to sequence operations for any bucket.
225226
*/
226227
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
227-
public async lockBucket(bucketIndex: number, tran: DBTransaction) {
228+
public async lockBucket(
229+
bucketIndex: number,
230+
tran: DBTransaction,
231+
ctx?: ContextTimed,
232+
) {
228233
const keyPath = [
229234
...this.nodeGraphMetaDbPath,
230235
nodesUtils.bucketKey(bucketIndex),
231236
];
232-
return await tran.lock(keyPath.join(''));
237+
if (ctx != null) return await tran.lock(keyPath.join(''), ctx);
238+
else return await tran.lock(keyPath.join(''));
233239
}
234240

235241
/**

src/nodes/NodeManager.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,7 @@ class NodeManager {
10561056
await this.nodeConnectionManager.createConnectionMultiple(
10571057
[nodeId],
10581058
addresses,
1059+
ctx,
10591060
);
10601061
return [
10611062
[nodeConnection.host, nodeConnection.port],
@@ -1109,9 +1110,12 @@ class NodeManager {
11091110
})();
11101111
const closestNodesRequestP = (async () => {
11111112
const resultStream =
1112-
await conn.rpcClient.methods.nodesClosestLocalNodesGet({
1113-
nodeIdEncoded: nodeIdEncoded,
1114-
});
1113+
await conn.rpcClient.methods.nodesClosestLocalNodesGet(
1114+
{
1115+
nodeIdEncoded: nodeIdEncoded,
1116+
},
1117+
ctx,
1118+
);
11151119
for await (const { nodeIdEncoded, nodeContact } of resultStream) {
11161120
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
11171121
if (nodeId == null) {
@@ -1676,7 +1680,7 @@ class NodeManager {
16761680
// fails we delete the old node and add in the new one.
16771681
const [bucketIndex] = this.nodeGraph.bucketIndex(nodeId);
16781682
// To avoid conflict we want to lock on the bucket index
1679-
await this.nodeGraph.lockBucket(bucketIndex, tran);
1683+
await this.nodeGraph.lockBucket(bucketIndex, tran, ctx);
16801684

16811685
const nodeContact = await this.nodeGraph.getNodeContact(nodeId, tran);
16821686
// If this is a new entry, check the bucket limit
@@ -1816,7 +1820,7 @@ class NodeManager {
18161820
if (pendingNodes == null || pendingNodes.size === 0) return;
18171821
this.pendingNodes.set(bucketIndex, new Map());
18181822
// Locking on bucket
1819-
await this.nodeGraph.lockBucket(bucketIndex, tran);
1823+
await this.nodeGraph.lockBucket(bucketIndex, tran, ctx);
18201824
const semaphore = new Semaphore(this.concurrencyLimit);
18211825
// Iterating over existing nodes
18221826
const bucket = await this.nodeGraph.getBucket(
@@ -1832,9 +1836,9 @@ class NodeManager {
18321836
const pendingPromises: Array<Promise<void>> = [];
18331837
for (const [nodeId] of bucket) {
18341838
if (removedNodes >= pendingNodes.size) break;
1835-
await semaphore.waitForUnlock();
1839+
await semaphore.waitForUnlock(ctx);
18361840
if (ctx.signal?.aborted === true) break;
1837-
const [semaphoreReleaser] = await semaphore.lock()();
1841+
const [semaphoreReleaser] = await semaphore.lock(ctx)();
18381842
pendingPromises.push(
18391843
(async () => {
18401844
// Ping and remove or update node in bucket
@@ -1853,15 +1857,15 @@ class NodeManager {
18531857
false,
18541858
false,
18551859
undefined,
1856-
undefined,
1860+
ctx,
18571861
tran,
18581862
);
18591863
} else {
18601864
// We don't remove node the ping was aborted
18611865
if (ctx.signal.aborted) return;
18621866
// We need to lock this since it's concurrent
18631867
// and shares the transaction
1864-
await unsetLock.withF(async () => {
1868+
await unsetLock.withF(ctx, async () => {
18651869
await this.unsetNode(nodeId, tran);
18661870
removedNodes += 1;
18671871
});
@@ -1887,7 +1891,7 @@ class NodeManager {
18871891
false,
18881892
false,
18891893
undefined,
1890-
undefined,
1894+
ctx,
18911895
tran,
18921896
);
18931897
removedNodes -= 1;

0 commit comments

Comments
 (0)