Skip to content

Commit 9b39434

Browse files
Merge pull request #618 from dao-xyz/codex/fix-sharding-distributes-flake-main
test(shared-log): stabilize sharding redistributes-to-leavers flake
2 parents 42e98ce + af60ab4 commit 9b39434

File tree

3 files changed

+148
-3
lines changed

3 files changed

+148
-3
lines changed

packages/programs/data/shared-log/src/index.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2332,7 +2332,7 @@ export class SharedLog<
23322332

23332333
private dispatchMaybeMissingEntries(
23342334
target: string,
2335-
entries: Map<string, EntryReplicated<any>>,
2335+
entries: Map<string, EntryReplicated<R>>,
23362336
options?: {
23372337
bypassRecentDedupe?: boolean;
23382338
retryScheduleMs?: number[];
@@ -2377,13 +2377,30 @@ export class SharedLog<
23772377
return;
23782378
}
23792379

2380-
const run = () =>
2381-
Promise.resolve(
2380+
const run = () => {
2381+
// For force-fresh churn repair we intentionally bypass rateless IBLT and
2382+
// use simple hash-based sync. This path is a directed "push these hashes
2383+
// to that peer" recovery flow; using simple sync here avoids occasional
2384+
// single-hash gaps seen with IBLT-oriented maybe-sync batches under churn.
2385+
if (
2386+
options?.forceFreshDelivery &&
2387+
this.syncronizer instanceof RatelessIBLTSynchronizer
2388+
) {
2389+
return Promise.resolve(
2390+
this.syncronizer.simple.onMaybeMissingEntries({
2391+
entries: filteredEntries,
2392+
targets: [target],
2393+
}),
2394+
).catch((error: any) => logger.error(error));
2395+
}
2396+
2397+
return Promise.resolve(
23822398
this.syncronizer.onMaybeMissingEntries({
23832399
entries: filteredEntries,
23842400
targets: [target],
23852401
}),
23862402
).catch((error: any) => logger.error(error));
2403+
};
23872404

23882405
const retrySchedule =
23892406
options?.retryScheduleMs && options.retryScheduleMs.length > 0

packages/programs/data/shared-log/src/sync/rateless-iblt.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,8 @@ export class RatelessIBLTSynchronizer<D extends "u32" | "u64">
540540
entries: Map<string, EntryReplicated<D>>;
541541
targets: string[];
542542
}): Promise<void> {
543+
// NOTE: this method is best-effort dispatch, not a per-hash convergence API.
544+
// It may require follow-up repair rounds under churn/loss to fully close all gaps.
543545
// Strategy:
544546
// - For small sets, prefer the simple synchronizer to reduce complexity and avoid
545547
// IBLT overhead on tiny batches.

packages/programs/data/shared-log/test/sharding.spec.ts

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,132 @@ testSetups.forEach((setup) => {
621621
await checkBounded(entryCount, 1, 1, db1, db2);
622622
});
623623

624+
it("repairs redistributed entry when maybe-sync misses one hash on peer leave", async function () {
625+
if (setup.name !== "u64-iblt") {
626+
this.skip();
627+
}
628+
629+
const args = {
630+
timeUntilRoleMaturity: 0,
631+
waitForPruneDelay: 50,
632+
setup,
633+
} as const;
634+
635+
db1 = await session.peers[0].open(new EventStore<string, any>(), {
636+
args: {
637+
replicate: {
638+
offset: 0,
639+
},
640+
...args,
641+
},
642+
});
643+
644+
db2 = await EventStore.open<EventStore<string, any>>(
645+
db1.address!,
646+
session.peers[1],
647+
{
648+
args: {
649+
replicate: {
650+
offset: 0.3333,
651+
},
652+
...args,
653+
},
654+
},
655+
);
656+
db3 = await EventStore.open<EventStore<string, any>>(
657+
db1.address!,
658+
session.peers[2],
659+
{
660+
args: {
661+
replicate: {
662+
offset: 0.6666,
663+
},
664+
...args,
665+
},
666+
},
667+
);
668+
669+
const entryCount = sampleSize * 3;
670+
const inserts: Promise<any>[] = [];
671+
for (let i = 0; i < entryCount; i++) {
672+
inserts.push(
673+
db1.add(toBase64(new Uint8Array([i])), {
674+
meta: { next: [] },
675+
}),
676+
);
677+
}
678+
await Promise.all(inserts);
679+
await checkBounded(entryCount, 0.5, 0.9, db1, db2, db3);
680+
681+
const db2Hash = db2.node.identity.publicKey.hashcode();
682+
let candidateHash: string | undefined;
683+
for (const entry of await db1.log.log.toArray()) {
684+
if (await db2.log.log.has(entry.hash)) {
685+
continue;
686+
}
687+
if (!(await db3.log.log.has(entry.hash))) {
688+
continue;
689+
}
690+
candidateHash = entry.hash;
691+
break;
692+
}
693+
expect(
694+
candidateHash,
695+
"expected entry that requires redistribution to surviving peer",
696+
).to.be.a("string");
697+
698+
const sync = db1.log.syncronizer as {
699+
onMaybeMissingEntries: (properties: {
700+
entries: Map<string, any>;
701+
targets: string[];
702+
}) => Promise<void>;
703+
};
704+
const originalOnMaybeMissingEntries =
705+
sync.onMaybeMissingEntries.bind(sync);
706+
707+
sync.onMaybeMissingEntries = async (properties) => {
708+
if (
709+
candidateHash &&
710+
properties.targets.includes(db2Hash) &&
711+
properties.entries.has(candidateHash)
712+
) {
713+
const filtered = new Map(properties.entries);
714+
filtered.delete(candidateHash);
715+
return originalOnMaybeMissingEntries({
716+
...properties,
717+
entries: filtered,
718+
});
719+
}
720+
return originalOnMaybeMissingEntries(properties);
721+
};
722+
723+
try {
724+
await db3.close();
725+
726+
await Promise.all([
727+
waitForResolved(async () =>
728+
expect(await db1.log.replicationIndex?.getSize()).equal(2),
729+
),
730+
waitForResolved(async () =>
731+
expect(await db2.log.replicationIndex?.getSize()).equal(2),
732+
),
733+
]);
734+
735+
await waitForResolved(
736+
async () =>
737+
expect(await db2.log.log.has(candidateHash!)).to.be.true,
738+
{
739+
timeout: 30_000,
740+
delayInterval: 500,
741+
},
742+
);
743+
744+
await checkBounded(entryCount, 1, 1, db1, db2);
745+
} finally {
746+
sync.onMaybeMissingEntries = originalOnMaybeMissingEntries;
747+
}
748+
});
749+
624750
it("handles peer joining and leaving multiple times", async () => {
625751
db1 = await session.peers[0].open(new EventStore<string, any>(), {
626752
args: {

0 commit comments

Comments
 (0)