Skip to content

Commit 3885fc9

Browse files
Merge pull request #559 from dao-xyz/fix/waitforreplicator-request-config
shared-log: make waitForReplicator request loop configurable
2 parents 6aaa5dd + e0f68cd commit 3885fc9

File tree

3 files changed

+90
-5
lines changed

3 files changed

+90
-5
lines changed

packages/programs/data/shared-log/src/index.ts

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,8 @@ export type SharedLogOptions<
366366
syncronizer?: SynchronizerConstructor<R>;
367367
timeUntilRoleMaturity?: number;
368368
waitForReplicatorTimeout?: number;
369+
waitForReplicatorRequestIntervalMs?: number;
370+
waitForReplicatorRequestMaxAttempts?: number;
369371
waitForPruneDelay?: number;
370372
distributionDebounceTime?: number;
371373
compatibility?: number;
@@ -376,6 +378,8 @@ export type SharedLogOptions<
376378
export const DEFAULT_MIN_REPLICAS = 2;
377379
export const WAIT_FOR_REPLICATOR_TIMEOUT = 9000;
378380
export const WAIT_FOR_ROLE_MATURITY = 5000;
381+
export const WAIT_FOR_REPLICATOR_REQUEST_INTERVAL = 1000;
382+
export const WAIT_FOR_REPLICATOR_REQUEST_MIN_ATTEMPTS = 3;
379383
// TODO(prune): Investigate if/when a non-zero prune delay is required for correctness
380384
// (e.g. responsibility/replication-info message reordering in multi-peer scenarios).
381385
// Prefer making pruning robust without timing-based heuristics.
@@ -564,6 +568,8 @@ export class SharedLog<
564568

565569
timeUntilRoleMaturity!: number;
566570
waitForReplicatorTimeout!: number;
571+
waitForReplicatorRequestIntervalMs!: number;
572+
waitForReplicatorRequestMaxAttempts?: number;
567573
waitForPruneDelay!: number;
568574
distributionDebounceTime!: number;
569575

@@ -1903,12 +1909,31 @@ export class SharedLog<
19031909
options?.timeUntilRoleMaturity ?? WAIT_FOR_ROLE_MATURITY;
19041910
this.waitForReplicatorTimeout =
19051911
options?.waitForReplicatorTimeout ?? WAIT_FOR_REPLICATOR_TIMEOUT;
1912+
this.waitForReplicatorRequestIntervalMs =
1913+
options?.waitForReplicatorRequestIntervalMs ??
1914+
WAIT_FOR_REPLICATOR_REQUEST_INTERVAL;
1915+
this.waitForReplicatorRequestMaxAttempts =
1916+
options?.waitForReplicatorRequestMaxAttempts;
19061917
this.waitForPruneDelay = options?.waitForPruneDelay ?? WAIT_FOR_PRUNE_DELAY;
19071918

19081919
if (this.waitForReplicatorTimeout < this.timeUntilRoleMaturity) {
19091920
this.waitForReplicatorTimeout = this.timeUntilRoleMaturity; // does not makes sense to expect a replicator to mature faster than it is reachable
19101921
}
19111922

1923+
if (this.waitForReplicatorRequestIntervalMs <= 0) {
1924+
throw new Error(
1925+
"waitForReplicatorRequestIntervalMs must be a positive number",
1926+
);
1927+
}
1928+
if (
1929+
this.waitForReplicatorRequestMaxAttempts != null &&
1930+
this.waitForReplicatorRequestMaxAttempts <= 0
1931+
) {
1932+
throw new Error(
1933+
"waitForReplicatorRequestMaxAttempts must be a positive number",
1934+
);
1935+
}
1936+
19121937
this._closeController = new AbortController();
19131938
this._isTrustedReplicator = options?.canReplicate;
19141939
this.keep = options?.keep;
@@ -3360,11 +3385,13 @@ export class SharedLog<
33603385
}, timeoutMs);
33613386

33623387
let requestAttempts = 0;
3363-
const requestIntervalMs = 1000;
3364-
const maxRequestAttempts = Math.max(
3365-
3,
3366-
Math.ceil(timeoutMs / requestIntervalMs),
3367-
);
3388+
const requestIntervalMs = this.waitForReplicatorRequestIntervalMs;
3389+
const maxRequestAttempts =
3390+
this.waitForReplicatorRequestMaxAttempts ??
3391+
Math.max(
3392+
WAIT_FOR_REPLICATOR_REQUEST_MIN_ATTEMPTS,
3393+
Math.ceil(timeoutMs / requestIntervalMs),
3394+
);
33683395

33693396
const requestReplicationInfo = () => {
33703397
if (settled || this.closed) {

packages/programs/data/shared-log/test/utils/stores/event-store.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ export type Args<
6262
timeUntilRoleMaturity?: number;
6363
waitForPruneDelay?: number;
6464
waitForReplicatorTimeout?: number;
65+
waitForReplicatorRequestIntervalMs?: number;
66+
waitForReplicatorRequestMaxAttempts?: number;
6567
keep?: (
6668
entry: Entry<Operation<T>> | ShallowEntry | EntryReplicated<R>,
6769
) => boolean;
@@ -125,6 +127,10 @@ export class EventStore<
125127
trim: properties?.trim,
126128
replicas: properties?.replicas,
127129
waitForReplicatorTimeout: properties?.waitForReplicatorTimeout,
130+
waitForReplicatorRequestIntervalMs:
131+
properties?.waitForReplicatorRequestIntervalMs,
132+
waitForReplicatorRequestMaxAttempts:
133+
properties?.waitForReplicatorRequestMaxAttempts,
128134
encoding: JSON_ENCODING,
129135
timeUntilRoleMaturity: properties?.timeUntilRoleMaturity ?? 3000,
130136
waitForPruneDelay: properties?.waitForPruneDelay ?? 0,
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { TestSession } from "@peerbit/test-utils";
2+
import { TimeoutError } from "@peerbit/time";
3+
import { expect } from "chai";
4+
import { RequestReplicationInfoMessage } from "../src/replication.js";
5+
import { EventStore } from "./utils/stores/index.js";
6+
7+
describe("waitForReplicator", () => {
8+
let session: TestSession;
9+
let db: EventStore<string, any>;
10+
11+
afterEach(async () => {
12+
if (db && db.closed === false) {
13+
await db.drop();
14+
}
15+
await session?.stop();
16+
});
17+
18+
it("respects configured request retry limits", async () => {
19+
session = await TestSession.connected(2);
20+
db = await session.peers[0].open(new EventStore<string, any>(), {
21+
args: {
22+
timeUntilRoleMaturity: 0,
23+
waitForReplicatorRequestIntervalMs: 50,
24+
waitForReplicatorRequestMaxAttempts: 2,
25+
},
26+
});
27+
28+
const originalSend = db.log.rpc.send.bind(db.log.rpc);
29+
let requestCount = 0;
30+
db.log.rpc.send = async (message: any, options: any) => {
31+
if (message instanceof RequestReplicationInfoMessage) {
32+
requestCount++;
33+
return;
34+
}
35+
return originalSend(message, options);
36+
};
37+
38+
try {
39+
await db.log.waitForReplicator(session.peers[1].identity.publicKey, {
40+
timeout: 300,
41+
eager: true,
42+
});
43+
throw new Error("Expected waitForReplicator() to time out");
44+
} catch (error) {
45+
expect(error).to.be.instanceOf(TimeoutError);
46+
} finally {
47+
db.log.rpc.send = originalSend;
48+
}
49+
50+
expect(requestCount).to.equal(2);
51+
});
52+
});

0 commit comments

Comments
 (0)