Skip to content

Commit cba1bcc

Browse files
fix: stabilize file-share refresh catch-up under churn (#647)
* fix(sync): soften transient entry fetch failures during refresh * test(blocks): satisfy strict callback typing * test(blocks): use remote block handler in retry regression
1 parent e273b99 commit cba1bcc

File tree

4 files changed

+131
-10
lines changed

4 files changed

+131
-10
lines changed

packages/log/src/log.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ import { Trim, type TrimOptions } from "./trim.js";
3737

3838
const { LastWriteWins } = Sorting;
3939

40+
const isRecoverableJoinResolveError = (error: unknown): boolean => {
41+
const message =
42+
error instanceof Error ? error.message : typeof error === "string" ? error : "";
43+
return (
44+
message.includes("Failed to resolve block") ||
45+
(error instanceof Error && error.name === "AbortError")
46+
);
47+
};
48+
4049
type CreateSqliteIndexer = typeof import("@peerbit/indexer-sqlite3").create;
4150
let sqliteCreate: CreateSqliteIndexer | undefined;
4251
const createDefaultIndexer = async (): Promise<Indices> => {
@@ -846,15 +855,23 @@ export class Log<T> {
846855
}
847856

848857
const from = await options.resolveRemoteFrom?.(a, remote?.signal);
849-
const nested =
850-
options.references?.get(a) ??
851-
(await Entry.fromMultihash<T>(this._storage, a, {
852-
remote: {
853-
timeout: remote?.timeout,
854-
signal: remote?.signal,
855-
...(from && from.length > 0 ? { from } : {}),
856-
},
857-
}));
858+
let nested: Entry<T>;
859+
try {
860+
nested =
861+
options.references?.get(a) ??
862+
(await Entry.fromMultihash<T>(this._storage, a, {
863+
remote: {
864+
timeout: remote?.timeout,
865+
signal: remote?.signal,
866+
...(from && from.length > 0 ? { from } : {}),
867+
},
868+
}));
869+
} catch (error) {
870+
if (isRecoverableJoinResolveError(error)) {
871+
return false;
872+
}
873+
throw error;
874+
}
858875

859876
const p = this.joinRecursively(
860877
nested,

packages/log/test/join.spec.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,30 @@ describe("join", function () {
10331033
expect(fetchCounter).equal(0); // no fetches since all entries where passed
10341034
expect(joinEntryCounter).equal(2);
10351035
});
1036+
1037+
it("does not reject join when a nested entry block is temporarily missing", async () => {
1038+
const { entry: a1 } = await log1.append(new Uint8Array([0, 1]));
1039+
const { entry: a2 } = await log1.append(new Uint8Array([0, 2]), {
1040+
meta: { next: [a1] },
1041+
});
1042+
1043+
const fromMultihashStub = sinon
1044+
.stub(Entry, "fromMultihash")
1045+
.callsFake(async (store, hash, options) => {
1046+
if (hash === a1.hash) {
1047+
throw new Error("Failed to resolve block: " + hash);
1048+
}
1049+
return fromMultihashOrg(store, hash, options);
1050+
});
1051+
1052+
try {
1053+
await log2.join([a2]);
1054+
} finally {
1055+
fromMultihashStub.restore();
1056+
}
1057+
1058+
expect(log2.length).equal(0);
1059+
});
10361060
});
10371061

10381062
// TODO move this into the prune test file

packages/transport/blocks/src/remote.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,19 @@ export class RemoteBlocks implements IBlocks {
251251
this._providerCache.add(cidString, normalized);
252252
}
253253

254+
private pickRequestBatch(providers: string[], attempt: number): string[] {
255+
if (providers.length <= 1) {
256+
return providers;
257+
}
258+
const batchSize = Math.min(2, providers.length);
259+
const start = (attempt * batchSize) % providers.length;
260+
const batch: string[] = [];
261+
for (let i = 0; i < batchSize; i++) {
262+
batch.push(providers[(start + i) % providers.length]);
263+
}
264+
return this.normalizeProviderHints(batch, batchSize);
265+
}
266+
254267
private async resolveRemoteProviders(
255268
cidString: string,
256269
options?: { signal?: AbortSignal; refresh?: boolean },
@@ -541,13 +554,20 @@ export class RemoteBlocks implements IBlocks {
541554
if (providers.length === 0) return;
542555
try {
543556
const expiresAt = Date.now() + (options.timeout ?? 30_000);
557+
const requestProviders = this.pickRequestBatch(
558+
providers,
559+
requeryCount,
560+
);
544561
await this.options.publish(
545562
new BlockRequest(cidString),
546563
{
547564
priority: options.priority,
548565
responsePriority: options.priority,
549566
expiresAt,
550-
mode: new SilentDelivery({ to: providers, redundancy: 1 }),
567+
mode: new SilentDelivery({
568+
to: requestProviders,
569+
redundancy: requestProviders.length,
570+
}),
551571
},
552572
);
553573
requeryCount += 1;

packages/transport/blocks/test/libp2p.spec.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,66 @@ describe("transport", function () {
233233
expect(new Uint8Array(read!)).to.deep.equal(data);
234234
});
235235

236+
it("probes additional explicit providers when the first candidate does not answer", async () => {
237+
session = await TestSession.disconnected(3, {
238+
services: { blocks: (c) => new DirectBlock(c) },
239+
});
240+
241+
await store(session, 0).start();
242+
await store(session, 1).start();
243+
await store(session, 2).start();
244+
245+
await session.connect([[session.peers[0], session.peers[1]]]);
246+
await waitForNeighbour(store(session, 0), store(session, 1));
247+
248+
const data = new Uint8Array([5, 4, 3]);
249+
const cid = await store(session, 0).put(data);
250+
expect(cid).equal("zb2rhbnwihVzMMEGAPf9EwTZBsQz9fszCnM4Y8mJmBFgiyN7J");
251+
252+
const requesterRemoteBlocks = (store(session, 1) as any)[
253+
"remoteBlocks"
254+
] as RemoteBlocks;
255+
const originalPublish = requesterRemoteBlocks.options.publish;
256+
let forwardedToSource = 0;
257+
258+
requesterRemoteBlocks.options.publish = (message: any, options: any) => {
259+
if (message instanceof BlockRequest) {
260+
const to = (options?.mode as any)?.to ?? [];
261+
const redundancy = Math.max(1, (options?.mode as any)?.redundancy ?? 1);
262+
const selected = to.slice(0, redundancy);
263+
return (async (): Promise<void> => {
264+
const sourceRemoteBlocks = (store(session, 0) as any)[
265+
"remoteBlocks"
266+
] as RemoteBlocks;
267+
await Promise.all(
268+
selected.map(async (target: string): Promise<void> => {
269+
if (target === store(session, 0).publicKeyHash) {
270+
forwardedToSource++;
271+
await sourceRemoteBlocks.onMessage(message, {
272+
from: store(session, 1).publicKeyHash,
273+
});
274+
}
275+
}),
276+
);
277+
})();
278+
}
279+
return originalPublish(message, options);
280+
};
281+
282+
try {
283+
const read = await store(session, 1).get(cid, {
284+
remote: {
285+
timeout: 5_000,
286+
from: [store(session, 2).publicKeyHash, store(session, 0).publicKeyHash],
287+
},
288+
});
289+
expect(new Uint8Array(read!)).to.deep.equal(data);
290+
expect(forwardedToSource).greaterThan(0);
291+
} finally {
292+
requesterRemoteBlocks.options.publish = originalPublish;
293+
}
294+
});
295+
236296
it("only responds to peer that needs block", async () => {
237297
session = await TestSession.disconnected(3, {
238298
services: { blocks: (c) => new DirectBlock(c) },

0 commit comments

Comments
 (0)