Skip to content

Commit ad0f88c

Browse files
fix(shared-log): make waitForReplicators robust
1 parent b8115b0 commit ad0f88c

File tree

2 files changed

+45
-20
lines changed

2 files changed

+45
-20
lines changed

packages/programs/data/shared-log/src/index.ts

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3473,48 +3473,75 @@ export class SharedLog<
34733473

34743474
let coverageThreshold = options?.coverageThreshold ?? 1;
34753475
let deferred = pDefer<void>();
3476+
let settled = false;
34763477

34773478
const roleAge = options?.roleAge ?? (await this.getDefaultMinRoleAge());
34783479
const providedCustomRoleAge = options?.roleAge != null;
34793480

3480-
let checkCoverage = async () => {
3481+
const resolve = () => {
3482+
if (settled) return;
3483+
settled = true;
3484+
deferred.resolve();
3485+
};
3486+
3487+
const reject = (error: unknown) => {
3488+
if (settled) return;
3489+
settled = true;
3490+
deferred.reject(error);
3491+
};
3492+
3493+
let checkInFlight: Promise<void> | undefined;
3494+
const checkCoverage = async () => {
34813495
const coverage = await this.calculateCoverage({
34823496
roleAge,
34833497
});
34843498

34853499
if (coverage >= coverageThreshold) {
3486-
deferred.resolve();
3500+
resolve();
34873501
return true;
34883502
}
34893503
return false;
34903504
};
3505+
3506+
const scheduleCheckCoverage = () => {
3507+
if (settled || checkInFlight) {
3508+
return;
3509+
}
3510+
3511+
checkInFlight = checkCoverage()
3512+
.then(() => {})
3513+
.catch(reject)
3514+
.finally(() => {
3515+
checkInFlight = undefined;
3516+
});
3517+
};
34913518
const onReplicatorMature = () => {
3492-
checkCoverage();
3519+
scheduleCheckCoverage();
34933520
};
34943521
const onReplicationChange = () => {
3495-
checkCoverage();
3522+
scheduleCheckCoverage();
34963523
};
34973524
this.events.addEventListener("replicator:mature", onReplicatorMature);
34983525
this.events.addEventListener("replication:change", onReplicationChange);
3499-
await checkCoverage();
3526+
await checkCoverage().catch(reject);
35003527

3501-
let interval = providedCustomRoleAge
3502-
? setInterval(() => {
3503-
checkCoverage();
3504-
}, 100)
3505-
: undefined;
3528+
let intervalMs = providedCustomRoleAge ? 100 : 250;
3529+
let interval =
3530+
roleAge > 0
3531+
? setInterval(() => {
3532+
scheduleCheckCoverage();
3533+
}, intervalMs)
3534+
: undefined;
35063535

35073536
let timeout = options?.timeout ?? this.waitForReplicatorTimeout;
35083537
const timer = setTimeout(() => {
35093538
clear();
3510-
deferred.reject(
3511-
new TimeoutError(`Timeout waiting for mature replicators`),
3512-
);
3539+
reject(new TimeoutError(`Timeout waiting for mature replicators`));
35133540
}, timeout);
35143541

35153542
const abortListener = () => {
35163543
clear();
3517-
deferred.reject(new AbortError());
3544+
reject(new AbortError());
35183545
};
35193546

35203547
if (options?.signal) {

packages/programs/data/shared-log/test/events.spec.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ describe("events", () => {
290290
await waitPromise;
291291
});
292292

293-
it("times out if maturity signal never fires", async () => {
293+
it("resolves even if maturity timers are cleared", async () => {
294294
session = await TestSession.connected(1);
295295
const store = new EventStore();
296296
const timeUntilRoleMaturity = 3e3;
@@ -314,11 +314,9 @@ describe("events", () => {
314314
pending.clear();
315315
}
316316

317-
await expect(
318-
store1.log.waitForReplicators({
319-
timeout: 500,
320-
}),
321-
).to.be.eventually.rejectedWith("Timeout");
317+
await store1.log.waitForReplicators({
318+
timeout: 10e3,
319+
});
322320
});
323321

324322
it("times out after timeout if online", async () => {

0 commit comments

Comments
 (0)