Skip to content

Commit 000e3f1

Browse files
fix(document): retry missing shard groups before iterator close (#605)
* fix(document): retry missing shard groups in iterator * Stabilize shard iteration assertion in document query test * Strengthen document query economy and retry-budget tests
1 parent 0e16a6e commit 000e3f1

File tree

2 files changed

+312
-62
lines changed

2 files changed

+312
-62
lines changed

packages/programs/data/document/document/src/search.ts

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ type QueryDetailedOptions<
289289
response: types.AbstractSearchResult,
290290
from: PublicSignKey,
291291
) => void | Promise<void>;
292+
onMissingResponses?: (error: MissingResponsesError) => void | Promise<void>;
292293
remote?: {
293294
from?: string[]; // if specified, only query these peers
294295
};
@@ -2515,6 +2516,9 @@ export class DocumentIndex<
25152516
} catch (error) {
25162517
if (error instanceof MissingResponsesError) {
25172518
warn("Did not reciveve responses from all shard");
2519+
if (options?.onMissingResponses) {
2520+
await options.onMissingResponses(error);
2521+
}
25182522
if (remote?.throwOnMissing) {
25192523
throw error;
25202524
}
@@ -3074,6 +3078,7 @@ export class DocumentIndex<
30743078
): Promise<boolean> => {
30753079
await warmupPromise;
30763080
let hasMore = false;
3081+
let missingResponses = false;
30773082
const discoverTargets =
30783083
typeof options?.remote === "object"
30793084
? options.remote.reach?.discover
@@ -3090,14 +3095,14 @@ export class DocumentIndex<
30903095
queryRequestCoerced.fetch = n;
30913096
await this.queryCommence(
30923097
queryRequestCoerced,
3093-
{
3094-
local: fetchOptions?.from != null ? false : options?.local,
3095-
remote:
3096-
options?.remote !== false && !skipRemoteDueToDiscovery
3097-
? {
3098-
...(typeof options?.remote === "object"
3099-
? options.remote
3100-
: {}),
3098+
{
3099+
local: fetchOptions?.from != null ? false : options?.local,
3100+
remote:
3101+
options?.remote !== false && !skipRemoteDueToDiscovery
3102+
? {
3103+
...(typeof options?.remote === "object"
3104+
? options.remote
3105+
: {}),
31013106
from: fetchOptions?.from ?? initialRemoteTargets,
31023107
}
31033108
: false,
@@ -3209,10 +3214,38 @@ export class DocumentIndex<
32093214
);
32103215
}
32113216
},
3217+
onMissingResponses: (error) => {
3218+
missingResponses = true;
3219+
const missingGroups = (error as MissingResponsesError & {
3220+
missingGroups?: string[][];
3221+
}).missingGroups;
3222+
if (!missingGroups?.length) {
3223+
return;
3224+
}
3225+
3226+
const selfHash = this.node.identity.publicKey.hashcode();
3227+
for (const group of missingGroups) {
3228+
const target = group.find((hash) => {
3229+
if (!hash || hash === selfHash) return false;
3230+
const attempts = missingResponseRetryAttempts.get(hash) ?? 0;
3231+
return attempts < maxMissingResponseRetryAttempts;
3232+
});
3233+
if (!target) continue;
3234+
pendingMissingResponseRetryPeers.add(target);
3235+
missingResponseRetryAttempts.set(
3236+
target,
3237+
(missingResponseRetryAttempts.get(target) ?? 0) + 1,
3238+
);
3239+
}
3240+
},
32123241
},
32133242
fetchOptions?.fetchedFirstForRemote,
32143243
);
32153244

3245+
if (missingResponses) {
3246+
hasMore = true;
3247+
unsetDone();
3248+
}
32163249
if (!hasMore) {
32173250
maybeSetDone();
32183251
}
@@ -3239,6 +3272,17 @@ export class DocumentIndex<
32393272
return fetchPromise;
32403273
}
32413274

3275+
if (pendingMissingResponseRetryPeers.size > 0) {
3276+
const retryTargets = [...pendingMissingResponseRetryPeers];
3277+
pendingMissingResponseRetryPeers.clear();
3278+
fetchPromise = fetchFirst(n, {
3279+
from: retryTargets,
3280+
// retries for missing groups should not be suppressed by first-fetch dedupe
3281+
fetchedFirstForRemote: undefined,
3282+
});
3283+
return fetchPromise;
3284+
}
3285+
32423286
const promises: Promise<any>[] = [];
32433287
let resultsLeft = 0;
32443288

@@ -3676,6 +3720,9 @@ export class DocumentIndex<
36763720
let joinListener: (() => void) | undefined;
36773721

36783722
let fetchedFirstForRemote: Set<string> | undefined = undefined;
3723+
const pendingMissingResponseRetryPeers = new Set<string>();
3724+
const missingResponseRetryAttempts = new Map<string, number>();
3725+
const maxMissingResponseRetryAttempts = 2;
36793726

36803727
let updateDeferred: ReturnType<typeof pDefer> | undefined;
36813728
const onLateResultsQueue =

0 commit comments

Comments
 (0)