Skip to content

Commit cc6e723

Browse files
shared-log: chunk repair sync and preserve rateless sweep batches
1 parent f44918f commit cc6e723

File tree

9 files changed

+654
-222
lines changed

9 files changed

+654
-222
lines changed

packages/programs/data/shared-log/src/index.ts

Lines changed: 257 additions & 172 deletions
Large diffs are not rendered by default.

packages/programs/data/shared-log/src/sync/index.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,23 @@ export type SyncOptions<R extends "u32" | "u64"> = {
2424
* high-priority entries using the simple synchronizer.
2525
*/
2626
maxSimpleEntries?: number;
27+
28+
/**
29+
* Maximum number of hash strings in one simple sync message.
30+
*/
31+
maxSimpleHashesPerMessage?: number;
32+
33+
/**
34+
* Maximum number of coordinates in one simple sync coordinate message.
35+
*/
36+
maxSimpleCoordinatesPerMessage?: number;
37+
38+
/**
39+
* Maximum number of hashes tracked per convergent repair session target.
40+
* Large sessions still dispatch all entries, but only this many are tracked
41+
* for deterministic completion metadata.
42+
*/
43+
maxConvergentTrackedHashes?: number;
2744
};
2845

2946
export type SynchronizerComponents<R extends "u32" | "u64"> = {
@@ -51,6 +68,8 @@ export type RepairSessionResult = {
5168
attempts: number;
5269
durationMs: number;
5370
completed: boolean;
71+
requestedTotal?: number;
72+
truncated?: boolean;
5473
};
5574

5675
export type RepairSession = {

packages/programs/data/shared-log/src/sync/rateless-iblt.ts

Lines changed: 133 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ const getSyncIdString = (message: { syncId: Uint8Array }) => {
5858
return toBase64(message.syncId);
5959
};
6060

61+
const DEFAULT_CONVERGENT_REPAIR_TIMEOUT_MS = 30_000;
62+
const DEFAULT_CONVERGENT_RETRY_INTERVALS_MS = [0, 1_000, 3_000, 7_000];
63+
const DEFAULT_MAX_CONVERGENT_TRACKED_HASHES = 4_096;
64+
6165
@variant([3, 0])
6266
export class StartSync extends TransportMessage {
6367
@field({ type: Uint8Array })
@@ -280,6 +284,46 @@ export class RatelessIBLTSynchronizer<D extends "u32" | "u64">
280284
this.startedOrCompletedSynchronizations = new Cache({ max: 1e4 });
281285
}
282286

287+
private get maxConvergentTrackedHashes() {
288+
const value = this.properties.sync?.maxConvergentTrackedHashes;
289+
return value && Number.isFinite(value) && value > 0
290+
? Math.floor(value)
291+
: DEFAULT_MAX_CONVERGENT_TRACKED_HASHES;
292+
}
293+
294+
private normalizeRetryIntervals(retryIntervalsMs?: number[]): number[] {
295+
if (!retryIntervalsMs || retryIntervalsMs.length === 0) {
296+
return [...DEFAULT_CONVERGENT_RETRY_INTERVALS_MS];
297+
}
298+
299+
return [...retryIntervalsMs]
300+
.map((x) => Math.max(0, Math.floor(x)))
301+
.filter((x, i, arr) => arr.indexOf(x) === i)
302+
.sort((a, b) => a - b);
303+
}
304+
305+
private getPrioritizedEntries(entries: Map<string, EntryReplicated<D>>) {
306+
const priorityFn = this.properties.sync?.priority;
307+
if (!priorityFn) {
308+
return [...entries.values()];
309+
}
310+
311+
let index = 0;
312+
const scored: { entry: EntryReplicated<D>; index: number; priority: number }[] =
313+
[];
314+
for (const entry of entries.values()) {
315+
const priorityValue = priorityFn(entry);
316+
scored.push({
317+
entry,
318+
index,
319+
priority: Number.isFinite(priorityValue) ? priorityValue : 0,
320+
});
321+
index += 1;
322+
}
323+
scored.sort((a, b) => b.priority - a.priority || a.index - b.index);
324+
return scored.map((x) => x.entry);
325+
}
326+
283327
startRepairSession(properties: {
284328
entries: Map<string, EntryReplicated<D>>;
285329
targets: string[];
@@ -288,17 +332,98 @@ export class RatelessIBLTSynchronizer<D extends "u32" | "u64">
288332
retryIntervalsMs?: number[];
289333
}): RepairSession {
290334
const mode = properties.mode ?? "best-effort";
335+
const targets = [...new Set(properties.targets)];
336+
const timeoutMs = Math.max(
337+
1,
338+
Math.floor(properties.timeoutMs ?? DEFAULT_CONVERGENT_REPAIR_TIMEOUT_MS),
339+
);
340+
const retryIntervalsMs = this.normalizeRetryIntervals(properties.retryIntervalsMs);
341+
const trackedLimit = this.maxConvergentTrackedHashes;
342+
const requestedHashes = [...properties.entries.keys()];
343+
const requestedHashesTracked = requestedHashes.slice(0, trackedLimit);
344+
const truncated = requestedHashesTracked.length < requestedHashes.length;
345+
291346
if (mode === "convergent") {
292-
return this.simple.startRepairSession({
293-
...properties,
347+
if (properties.entries.size <= trackedLimit) {
348+
return this.simple.startRepairSession({
349+
...properties,
350+
mode: "convergent",
351+
timeoutMs,
352+
retryIntervalsMs,
353+
});
354+
}
355+
356+
const id = `rateless-repair-${++this.repairSessionCounter}`;
357+
const startedAt = Date.now();
358+
const prioritized = this.getPrioritizedEntries(properties.entries);
359+
const trackedEntries = new Map<string, EntryReplicated<D>>();
360+
for (const entry of prioritized.slice(0, trackedLimit)) {
361+
trackedEntries.set(entry.hash, entry);
362+
}
363+
364+
let cancelled = false;
365+
const trackedSession = this.simple.startRepairSession({
366+
entries: trackedEntries,
367+
targets,
294368
mode: "convergent",
369+
timeoutMs,
370+
retryIntervalsMs,
295371
});
372+
373+
const runDispatchSchedule = async () => {
374+
let previousDelay = 0;
375+
for (const delayMs of retryIntervalsMs) {
376+
if (cancelled) {
377+
return;
378+
}
379+
const elapsed = Date.now() - startedAt;
380+
if (elapsed >= timeoutMs) {
381+
return;
382+
}
383+
const waitMs = Math.max(0, delayMs - previousDelay);
384+
previousDelay = delayMs;
385+
if (waitMs > 0) {
386+
await new Promise<void>((resolve) => {
387+
const timer = setTimeout(resolve, waitMs);
388+
timer.unref?.();
389+
});
390+
}
391+
if (cancelled) {
392+
return;
393+
}
394+
try {
395+
await this.onMaybeMissingEntries({
396+
entries: properties.entries,
397+
targets,
398+
});
399+
} catch {
400+
// Best-effort schedule: tracked session timeout/result decides completion.
401+
}
402+
}
403+
};
404+
405+
const done = (async (): Promise<RepairSessionResult[]> => {
406+
await runDispatchSchedule();
407+
const trackedResults = await trackedSession.done;
408+
return trackedResults.map((result) => ({
409+
...result,
410+
requestedTotal: requestedHashes.length,
411+
truncated: true,
412+
}));
413+
})();
414+
415+
return {
416+
id,
417+
done,
418+
cancel: () => {
419+
cancelled = true;
420+
trackedSession.cancel();
421+
},
422+
};
296423
}
297424

298425
const id = `rateless-repair-${++this.repairSessionCounter}`;
299426
const startedAt = Date.now();
300-
const requestedHashes = [...properties.entries.keys()];
301-
const targets = [...new Set(properties.targets)];
302427
const done = (async (): Promise<RepairSessionResult[]> => {
303428
await this.onMaybeMissingEntries({
304429
entries: properties.entries,
@@ -307,12 +432,14 @@ export class RatelessIBLTSynchronizer<D extends "u32" | "u64">
307432
const durationMs = Date.now() - startedAt;
308433
return targets.map((target) => ({
309434
target,
310-
requested: requestedHashes.length,
435+
requested: requestedHashesTracked.length,
311436
resolved: 0,
312-
unresolved: [...requestedHashes],
437+
unresolved: [...requestedHashesTracked],
313438
attempts: 1,
314439
durationMs,
315440
completed: false,
441+
requestedTotal: requestedHashes.length,
442+
truncated,
316443
}));
317444
})();
318445
return {

0 commit comments

Comments
 (0)