Skip to content

Commit 5e642f8

Browse files
fix(shared-log): harden churn rebalance repair
1 parent 1929680 commit 5e642f8

File tree

3 files changed

+153
-6
lines changed

3 files changed

+153
-6
lines changed

packages/programs/data/shared-log/src/index.ts

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@ export class SharedLog<
606606
{ attempts: number; timer?: ReturnType<typeof setTimeout> }
607607
>;
608608
private _replicationInfoApplyQueueByPeer!: Map<string, Promise<void>>;
609+
private _forceFreshRepairRetryTimers!: Set<ReturnType<typeof setTimeout>>;
609610

610611
private remoteBlocks!: RemoteBlocks;
611612

@@ -2430,6 +2431,7 @@ export class SharedLog<
24302431
this._replicationInfoBlockedPeers = new Set();
24312432
this._replicationInfoRequestByPeer = new Map();
24322433
this._replicationInfoApplyQueueByPeer = new Map();
2434+
this._forceFreshRepairRetryTimers = new Set();
24332435
this.coordinateToHash = new Cache<string>({ max: 1e6, ttl: 1e4 });
24342436
this.recentlyRebalanced = new Cache<string>({ max: 1e4, ttl: 1e5 });
24352437

@@ -3208,11 +3210,15 @@ export class SharedLog<
32083210
for (const [_k, v] of this._checkedPruneRetries) {
32093211
if (v.timer) clearTimeout(v.timer);
32103212
}
3213+
for (const timer of this._forceFreshRepairRetryTimers) {
3214+
clearTimeout(timer);
3215+
}
32113216

32123217
await this.remoteBlocks.stop();
32133218
this._pendingDeletes.clear();
32143219
this._pendingIHave.clear();
32153220
this._checkedPruneRetries.clear();
3221+
this._forceFreshRepairRetryTimers.clear();
32163222
this.latestReplicationInfoMessage.clear();
32173223
this._gidPeersHistory.clear();
32183224
this._requestIPruneSent.clear();
@@ -4825,9 +4831,20 @@ export class SharedLog<
48254831
}
48264832

48274833
if (!subscribed) {
4834+
const wasReplicator = this.uniqueReplicators.has(peerHash);
4835+
try {
4836+
// Unsubscribe can race with the peer's final replication reset message.
4837+
// Proactively evict its ranges so leader selection doesn't keep stale owners.
4838+
await this.removeReplicator(publicKey, { noEvent: true });
4839+
} catch (error) {
4840+
if (!isNotStartedError(error as Error)) {
4841+
throw error;
4842+
}
4843+
}
4844+
48284845
// Emit replicator:leave at most once per (join -> leave) transition, even if we
48294846
// concurrently process unsubscribe + replication reset messages for the same peer.
4830-
const stoppedTransition = this.uniqueReplicators.delete(peerHash);
4847+
const stoppedTransition = wasReplicator;
48314848
this._replicatorJoinEmitted.delete(peerHash);
48324849

48334850
this.cancelReplicationInfoRequests(peerHash);
@@ -5333,17 +5350,53 @@ export class SharedLog<
53335350
const gidPeersHistorySnapshot = new Map<string, Set<string> | undefined>();
53345351

53355352
const changed = false;
5353+
const selfHash = this.node.identity.publicKey.hashcode();
53365354

53375355
try {
53385356
const uncheckedDeliver: Map<
53395357
string,
53405358
Map<string, EntryReplicated<any>>
53415359
> = new Map();
5360+
const forceFreshRepairBatchSize = 256;
5361+
const dispatchMaybeMissingEntries = (
5362+
target: string,
5363+
entries: Map<string, EntryReplicated<any>>,
5364+
) => {
5365+
// During leave/rejoin churn we prefer bounded simple-sync batches. Large
5366+
// one-shot sets can go through the IBLT path and occasionally leave
5367+
// single-entry gaps behind under heavy timing pressure.
5368+
if (!forceFreshDelivery || entries.size <= forceFreshRepairBatchSize) {
5369+
this.syncronizer.onMaybeMissingEntries({
5370+
entries,
5371+
targets: [target],
5372+
});
5373+
return;
5374+
}
5375+
5376+
let batch = new Map<string, EntryReplicated<any>>();
5377+
for (const [hash, entry] of entries) {
5378+
batch.set(hash, entry);
5379+
if (batch.size >= forceFreshRepairBatchSize) {
5380+
this.syncronizer.onMaybeMissingEntries({
5381+
entries: batch,
5382+
targets: [target],
5383+
});
5384+
batch = new Map();
5385+
}
5386+
}
5387+
if (batch.size > 0) {
5388+
this.syncronizer.onMaybeMissingEntries({
5389+
entries: batch,
5390+
targets: [target],
5391+
});
5392+
}
5393+
};
53425394

53435395
for await (const entryReplicated of toRebalance<R>(
53445396
changes,
53455397
this.entryCoordinatesIndex,
53465398
this.recentlyRebalanced,
5399+
{ forceFresh: forceFreshDelivery },
53475400
)) {
53485401
if (this.closed) {
53495402
break;
@@ -5418,12 +5471,74 @@ export class SharedLog<
54185471
?.reject(new Error("Failed to delete, is leader again"));
54195472
this.removePruneRequestSent(entryReplicated.hash);
54205473
}
5474+
}
5475+
5476+
if (forceFreshDelivery) {
5477+
// Churn edge-case hardening:
5478+
// a remove+rejoin sequence can leave isolated under-replicated entries with no
5479+
// active sync in flight. Do a one-shot fresh sweep over indexed entries and
5480+
// seed repair deliveries for current leaders.
5481+
const iterator = this.entryCoordinatesIndex.iterate({});
5482+
while (iterator.done() !== true) {
5483+
const entries = await iterator.all();
5484+
for (const entry of entries) {
5485+
const entryReplicated = entry.value;
5486+
const currentPeers = await this.findLeaders(
5487+
entryReplicated.coordinates,
5488+
entryReplicated,
5489+
{
5490+
roleAge: 0,
5491+
},
5492+
);
5493+
5494+
for (const [currentPeer] of currentPeers) {
5495+
if (currentPeer === selfHash) {
5496+
continue;
5497+
}
5498+
5499+
let set = uncheckedDeliver.get(currentPeer);
5500+
if (!set) {
5501+
set = new Map();
5502+
uncheckedDeliver.set(currentPeer, set);
5503+
}
5504+
if (!set.has(entryReplicated.hash)) {
5505+
set.set(entryReplicated.hash, entryReplicated);
5506+
}
5507+
}
5508+
}
5509+
}
5510+
await iterator.close();
54215511
}
5512+
54225513
for (const [target, entries] of uncheckedDeliver) {
5423-
this.syncronizer.onMaybeMissingEntries({
5424-
entries,
5425-
targets: [target],
5426-
});
5514+
dispatchMaybeMissingEntries(target, entries);
5515+
}
5516+
5517+
if (forceFreshDelivery && uncheckedDeliver.size > 0) {
5518+
// Repair requests can race with transient disconnect/rejoin windows and get
5519+
// dropped. Re-seed the same requests shortly after to improve eventual
5520+
// convergence under churn.
5521+
const retryPayload = [...uncheckedDeliver.entries()].map(
5522+
([target, entries]) => ({
5523+
target,
5524+
entries: new Map(entries),
5525+
}),
5526+
);
5527+
const retry = (delayMs: number) => {
5528+
const timer = setTimeout(() => {
5529+
this._forceFreshRepairRetryTimers.delete(timer);
5530+
if (this.closed) {
5531+
return;
5532+
}
5533+
for (const { target, entries } of retryPayload) {
5534+
dispatchMaybeMissingEntries(target, entries);
5535+
}
5536+
}, delayMs);
5537+
timer.unref?.();
5538+
this._forceFreshRepairRetryTimers.add(timer);
5539+
};
5540+
retry(2_000);
5541+
retry(8_000);
54275542
}
54285543

54295544
return changed;

packages/programs/data/shared-log/src/ranges.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2707,8 +2707,14 @@ export const toRebalance = <R extends "u32" | "u64">(
27072707
| ReplicationChanges<ReplicationRangeIndexable<R>>[],
27082708
index: Index<EntryReplicated<R>>,
27092709
rebalanceHistory: Cache<string>,
2710+
options?: { forceFresh?: boolean },
27102711
): AsyncIterable<EntryReplicated<R>> => {
2711-
const change = mergeReplicationChanges(changeOrChanges, rebalanceHistory);
2712+
const change = options?.forceFresh
2713+
? (Array.isArray(changeOrChanges[0])
2714+
? (changeOrChanges as ReplicationChanges<ReplicationRangeIndexable<R>>[])
2715+
.flat()
2716+
: (changeOrChanges as ReplicationChanges<ReplicationRangeIndexable<R>>))
2717+
: mergeReplicationChanges(changeOrChanges, rebalanceHistory);
27122718
return {
27132719
[Symbol.asyncIterator]: async function* () {
27142720
const iterator = index.iterate({

packages/programs/data/shared-log/test/ranges.spec.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3809,6 +3809,32 @@ resolutions.forEach((resolution) => {
38093809
).map((x) => x.gid),
38103810
).to.deep.eq([]);
38113811

3812+
// Removal-driven churn should be able to force a fresh pass even if
3813+
// rebalance history would normally suppress this non-matured add.
3814+
expect(
3815+
(
3816+
await consumeAllFromAsyncIterator(
3817+
toRebalance(
3818+
[
3819+
{
3820+
range,
3821+
type: "removed",
3822+
timestamp: 0n,
3823+
},
3824+
{
3825+
range,
3826+
type: "added",
3827+
timestamp: 1n,
3828+
},
3829+
],
3830+
index,
3831+
cache,
3832+
{ forceFresh: true },
3833+
),
3834+
)
3835+
).map((x) => x.gid),
3836+
).to.deep.eq(["a"]);
3837+
38123838
expect(
38133839
(
38143840
await consumeAllFromAsyncIterator(

0 commit comments

Comments
 (0)