Skip to content

Commit 1a86c0a

Browse files
shared-log: lock churn rebalance gap with deterministic test
1 parent 083aa19 commit 1a86c0a

File tree

2 files changed

+147
-5
lines changed

2 files changed

+147
-5
lines changed

packages/programs/data/shared-log/src/index.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2332,7 +2332,7 @@ export class SharedLog<
23322332

23332333
private dispatchMaybeMissingEntries(
23342334
target: string,
2335-
entries: Map<string, EntryReplicated<any>>,
2335+
entries: Map<string, EntryReplicated<R>>,
23362336
options?: {
23372337
bypassRecentDedupe?: boolean;
23382338
retryScheduleMs?: number[];
@@ -2377,13 +2377,30 @@ export class SharedLog<
23772377
return;
23782378
}
23792379

2380-
const run = () =>
2381-
Promise.resolve(
2380+
const run = () => {
2381+
// For force-fresh churn repair we intentionally bypass rateless IBLT and
2382+
// use simple hash-based sync. This path is a directed "push these hashes
2383+
// to that peer" recovery flow; using simple sync here avoids occasional
2384+
// single-hash gaps seen with IBLT-oriented maybe-sync batches under churn.
2385+
if (
2386+
options?.forceFreshDelivery &&
2387+
this.syncronizer instanceof RatelessIBLTSynchronizer
2388+
) {
2389+
return Promise.resolve(
2390+
this.syncronizer.simple.onMaybeMissingEntries({
2391+
entries: filteredEntries,
2392+
targets: [target],
2393+
}),
2394+
).catch((error: any) => logger.error(error));
2395+
}
2396+
2397+
return Promise.resolve(
23822398
this.syncronizer.onMaybeMissingEntries({
23832399
entries: filteredEntries,
23842400
targets: [target],
23852401
}),
23862402
).catch((error: any) => logger.error(error));
2403+
};
23872404

23882405
const retrySchedule =
23892406
options?.retryScheduleMs && options.retryScheduleMs.length > 0

packages/programs/data/shared-log/test/sharding.spec.ts

Lines changed: 127 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,7 @@ testSetups.forEach((setup) => {
540540
await checkBounded(entryCount, 0.5, 0.9, db1, db2, db3);
541541
});
542542

543-
it("distributes to leaving peers", async function (this: Mocha.Context) {
544-
this.retries(2);
543+
it("distributes to leaving peers", async () => {
545544
const args = {
546545
timeUntilRoleMaturity: 0,
547546
waitForPruneDelay: 50,
@@ -622,6 +621,132 @@ testSetups.forEach((setup) => {
622621
await checkBounded(entryCount, 1, 1, db1, db2);
623622
});
624623

624+
it("repairs redistributed entry when maybe-sync misses one hash on peer leave", async function () {
625+
if (setup.name !== "u64-iblt") {
626+
this.skip();
627+
}
628+
629+
const args = {
630+
timeUntilRoleMaturity: 0,
631+
waitForPruneDelay: 50,
632+
setup,
633+
} as const;
634+
635+
db1 = await session.peers[0].open(new EventStore<string, any>(), {
636+
args: {
637+
replicate: {
638+
offset: 0,
639+
},
640+
...args,
641+
},
642+
});
643+
644+
db2 = await EventStore.open<EventStore<string, any>>(
645+
db1.address!,
646+
session.peers[1],
647+
{
648+
args: {
649+
replicate: {
650+
offset: 0.3333,
651+
},
652+
...args,
653+
},
654+
},
655+
);
656+
db3 = await EventStore.open<EventStore<string, any>>(
657+
db1.address!,
658+
session.peers[2],
659+
{
660+
args: {
661+
replicate: {
662+
offset: 0.6666,
663+
},
664+
...args,
665+
},
666+
},
667+
);
668+
669+
const entryCount = sampleSize * 3;
670+
const inserts: Promise<any>[] = [];
671+
for (let i = 0; i < entryCount; i++) {
672+
inserts.push(
673+
db1.add(toBase64(new Uint8Array([i])), {
674+
meta: { next: [] },
675+
}),
676+
);
677+
}
678+
await Promise.all(inserts);
679+
await checkBounded(entryCount, 0.5, 0.9, db1, db2, db3);
680+
681+
const db2Hash = db2.node.identity.publicKey.hashcode();
682+
let candidateHash: string | undefined;
683+
for (const entry of await db1.log.log.toArray()) {
684+
if (await db2.log.log.has(entry.hash)) {
685+
continue;
686+
}
687+
if (!(await db3.log.log.has(entry.hash))) {
688+
continue;
689+
}
690+
candidateHash = entry.hash;
691+
break;
692+
}
693+
expect(
694+
candidateHash,
695+
"expected entry that requires redistribution to surviving peer",
696+
).to.be.a("string");
697+
698+
const sync = db1.log.syncronizer as {
699+
onMaybeMissingEntries: (properties: {
700+
entries: Map<string, any>;
701+
targets: string[];
702+
}) => Promise<void>;
703+
};
704+
const originalOnMaybeMissingEntries =
705+
sync.onMaybeMissingEntries.bind(sync);
706+
707+
sync.onMaybeMissingEntries = async (properties) => {
708+
if (
709+
candidateHash &&
710+
properties.targets.includes(db2Hash) &&
711+
properties.entries.has(candidateHash)
712+
) {
713+
const filtered = new Map(properties.entries);
714+
filtered.delete(candidateHash);
715+
return originalOnMaybeMissingEntries({
716+
...properties,
717+
entries: filtered,
718+
});
719+
}
720+
return originalOnMaybeMissingEntries(properties);
721+
};
722+
723+
try {
724+
await db3.close();
725+
726+
await Promise.all([
727+
waitForResolved(async () =>
728+
expect(await db1.log.replicationIndex?.getSize()).equal(2),
729+
),
730+
waitForResolved(async () =>
731+
expect(await db2.log.replicationIndex?.getSize()).equal(2),
732+
),
733+
]);
734+
735+
await waitForResolved(
736+
async () =>
737+
expect(await db2.log.log.has(candidateHash!)).to.be.true,
738+
{
739+
timeout: 30_000,
740+
delayInterval: 500,
741+
},
742+
);
743+
744+
await checkBounded(entryCount, 1, 1, db1, db2);
745+
} finally {
746+
sync.onMaybeMissingEntries = originalOnMaybeMissingEntries;
747+
}
748+
});
749+
625750
it("handles peer joining and leaving multiple times", async () => {
626751
db1 = await session.peers[0].open(new EventStore<string, any>(), {
627752
args: {

0 commit comments

Comments
 (0)