Skip to content

Commit 1c1086c

Browse files
fix(shared-log): harden waitForReplicators error handling
1 parent 14881a0 commit 1c1086c

File tree

2 files changed

+109
-18
lines changed

2 files changed

+109
-18
lines changed

packages/programs/data/shared-log/src/index.ts

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4382,6 +4382,7 @@ export class SharedLog<
43824382
const timeout = options.timeout ?? this.waitForReplicatorTimeout;
43834383

43844384
return new Promise((resolve, reject) => {
4385+
let settled = false;
43854386
const removeListeners = () => {
43864387
this.events.removeEventListener("replication:change", roleListener);
43874388
this.events.removeEventListener("replicator:mature", roleListener); // TODO replication:change event ?
@@ -4390,15 +4391,26 @@ export class SharedLog<
43904391
abortListener,
43914392
);
43924393
};
4393-
const abortListener = () => {
4394+
const settleResolve = (value: Map<string, { intersecting: boolean }> | false) => {
4395+
if (settled) return;
4396+
settled = true;
4397+
removeListeners();
4398+
clearTimeout(timer);
4399+
resolve(value);
4400+
};
4401+
const settleReject = (error: unknown) => {
4402+
if (settled) return;
4403+
settled = true;
43944404
removeListeners();
43954405
clearTimeout(timer);
4396-
resolve(false);
4406+
reject(error);
4407+
};
4408+
const abortListener = () => {
4409+
settleResolve(false);
43974410
};
43984411

43994412
const timer = setTimeout(async () => {
4400-
removeListeners();
4401-
resolve(false);
4413+
settleResolve(false);
44024414
}, timeout);
44034415

44044416
const check = async () => {
@@ -4422,19 +4434,22 @@ export class SharedLog<
44224434
}
44234435
options?.onLeader && leaderKeys.forEach(options.onLeader);
44244436

4425-
removeListeners();
4426-
clearTimeout(timer);
4427-
resolve(leaders);
4437+
settleResolve(leaders);
4438+
};
4439+
const runCheck = () => {
4440+
void check().catch((error) => {
4441+
settleReject(error);
4442+
});
44284443
};
44294444

44304445
const roleListener = () => {
4431-
check();
4446+
runCheck();
44324447
};
44334448

44344449
this.events.addEventListener("replication:change", roleListener); // TODO replication:change event ?
44354450
this.events.addEventListener("replicator:mature", roleListener); // TODO replication:change event ?
44364451
this._closeController.signal.addEventListener("abort", abortListener);
4437-
check();
4452+
runCheck();
44384453
});
44394454
}
44404455

@@ -4483,7 +4498,27 @@ export class SharedLog<
44834498
return; // no change
44844499
}
44854500

4486-
const cidObject = cidifyString(properties.entry.hash);
4501+
const entryHash = (properties.entry as any)?.hash;
4502+
if (typeof entryHash !== "string" || entryHash.length === 0) {
4503+
warn("Skipping persistCoordinate for entry without hash");
4504+
return;
4505+
}
4506+
4507+
let cidObject: ReturnType<typeof cidifyString>;
4508+
try {
4509+
cidObject = cidifyString(entryHash);
4510+
} catch (error) {
4511+
warn(
4512+
`Skipping persistCoordinate for invalid hash '${entryHash}': ${
4513+
(error as any)?.message ?? error
4514+
}`,
4515+
);
4516+
return;
4517+
}
4518+
if (!cidObject?.multihash?.digest) {
4519+
warn(`Skipping persistCoordinate for entry '${entryHash}' without digest`);
4520+
return;
4521+
}
44874522
const hashNumber = this.indexableDomain.numbers.bytesToNumber(
44884523
cidObject.multihash.digest,
44894524
);
@@ -4493,22 +4528,19 @@ export class SharedLog<
44934528
assignedToRangeBoundary,
44944529
coordinates: properties.coordinates,
44954530
meta: properties.entry.meta,
4496-
hash: properties.entry.hash,
4531+
hash: entryHash,
44974532
hashNumber,
44984533
}),
44994534
);
45004535

45014536
for (const coordinate of properties.coordinates) {
4502-
this.coordinateToHash.add(coordinate, properties.entry.hash);
4537+
this.coordinateToHash.add(coordinate, entryHash);
45034538
}
45044539

4505-
if (properties.entry.meta.next.length > 0) {
4540+
const next = properties.entry.meta?.next ?? [];
4541+
if (next.length > 0) {
45064542
await this.entryCoordinatesIndex.del({
4507-
query: new Or(
4508-
properties.entry.meta.next.map(
4509-
(x) => new StringMatch({ key: "hash", value: x }),
4510-
),
4511-
),
4543+
query: new Or(next.map((x) => new StringMatch({ key: "hash", value: x }))),
45124544
});
45134545
}
45144546
}

packages/programs/data/shared-log/test/wait-for-replicator.spec.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,63 @@ describe("waitForReplicator", () => {
5454

5555
expect(requestCount - baseline).to.equal(2);
5656
});
57+
58+
it("rejects waitForReplicators when internal leader check throws", async () => {
59+
session = await TestSession.connected(1);
60+
db = await session.peers[0].open(new EventStore<string, any>(), {
61+
args: {
62+
timeUntilRoleMaturity: 0,
63+
},
64+
});
65+
66+
const originalFindLeaders = db.log.findLeaders.bind(db.log);
67+
(db.log as any).findLeaders = async () => {
68+
throw new Error("forced-findLeaders-error");
69+
};
70+
71+
try {
72+
await expect(
73+
(db.log as any)._waitForReplicators(
74+
[0n],
75+
{
76+
hash: "bafkreif4wi7jfhqqlvgyj7a5z2fi6zt2fx5b5h3h3rfwjz2wco6n2w2k7u",
77+
meta: { next: [] },
78+
},
79+
[],
80+
{ timeout: 200 },
81+
),
82+
).to.be.rejectedWith("forced-findLeaders-error");
83+
} finally {
84+
(db.log as any).findLeaders = originalFindLeaders;
85+
}
86+
});
87+
88+
it("ignores persistCoordinate for missing or invalid hashes", async () => {
89+
session = await TestSession.connected(1);
90+
db = await session.peers[0].open(new EventStore<string, any>(), {
91+
args: {
92+
timeUntilRoleMaturity: 0,
93+
},
94+
});
95+
96+
const persistCoordinate = (db.log as any).persistCoordinate.bind(db.log);
97+
98+
await expect(
99+
persistCoordinate({
100+
coordinates: [0n],
101+
entry: { hash: undefined, meta: { next: [] } },
102+
leaders: new Map(),
103+
replicas: 1,
104+
}),
105+
).to.not.be.rejected;
106+
107+
await expect(
108+
persistCoordinate({
109+
coordinates: [0n],
110+
entry: { hash: "not-a-cid", meta: { next: [] } },
111+
leaders: new Map(),
112+
replicas: 1,
113+
}),
114+
).to.not.be.rejected;
115+
});
57116
});

0 commit comments

Comments
 (0)