Skip to content

Commit 4fc006e

Browse files
authored
Async repo contents iterator (#120819)
Reifies the iterator over each repository's contents so that we can (in follow-ups, and under certain combinations of request options) pre-process this iterator to avoid loading `SnapshotInfo` blobs that we can determine to be unnecessary up front. This is just a refactoring, it doesn't change behaviour, and this area is well-covered by tests such as `GetSnapshotsIT#testAllFeatures` so no need for any test changes here.
1 parent 8f70713 commit 4fc006e

File tree

1 file changed

+66
-43
lines changed

1 file changed

+66
-43
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

Lines changed: 66 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -276,55 +276,67 @@ void runOperation(ActionListener<GetSnapshotsResponse> listener) {
276276
*/
277277
private void populateResults(ActionListener<Void> listener) {
278278
try (var listeners = new RefCountingListener(listener)) {
279-
for (final RepositoryMetadata repository : repositories) {
280-
final String repositoryName = repository.name();
281-
if (skipRepository(repositoryName)) {
282-
continue;
283-
}
284279

285-
if (listeners.isFailing()) {
286-
return;
287-
}
280+
final BooleanSupplier failFastSupplier = () -> cancellableTask.isCancelled() || listeners.isFailing();
281+
282+
final Iterator<AsyncSnapshotInfoIterator> asyncSnapshotInfoIterators = Iterators.failFast(
283+
Iterators.map(
284+
Iterators.filter(
285+
Iterators.map(repositories.iterator(), RepositoryMetadata::name),
286+
repositoryName -> skipRepository(repositoryName) == false
287+
),
288+
repositoryName -> asyncRepositoryContentsListener -> SubscribableListener
289+
290+
.<RepositoryData>newForked(l -> maybeGetRepositoryData(repositoryName, l))
291+
.andThenApply(repositoryData -> {
292+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
293+
cancellableTask.ensureNotCancelled();
294+
ensureRequiredNamesPresent(repositoryName, repositoryData);
295+
return getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData);
296+
})
297+
.addListener(asyncRepositoryContentsListener)
298+
),
299+
failFastSupplier
300+
);
288301

289-
maybeGetRepositoryData(repositoryName, listeners.acquire(repositoryData -> {
290-
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
291-
cancellableTask.ensureNotCancelled();
292-
ensureRequiredNamesPresent(repositoryName, repositoryData);
293-
ThrottledIterator.run(
294-
Iterators.failFast(
295-
getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData),
296-
() -> cancellableTask.isCancelled() || listeners.isFailing()
297-
),
298-
(ref, asyncSnapshotInfo) -> ActionListener.run(
299-
ActionListener.runBefore(listeners.acquire(), ref::close),
300-
refListener -> asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
301-
@Override
302-
public void onResponse(SnapshotInfo snapshotInfo) {
303-
if (matchesPredicates(snapshotInfo)) {
304-
totalCount.incrementAndGet();
305-
if (afterPredicate.test(snapshotInfo)) {
306-
allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
302+
// TODO if the request parameters allow it, modify asyncSnapshotInfoIterators to skip unnecessary GET calls here
303+
304+
asyncSnapshotInfoIterators.forEachRemaining(
305+
asyncSnapshotInfoIteratorSupplier -> asyncSnapshotInfoIteratorSupplier.getAsyncSnapshotInfoIterator(
306+
listeners.acquire(
307+
asyncSnapshotInfoIterator -> ThrottledIterator.run(
308+
Iterators.failFast(asyncSnapshotInfoIterator, failFastSupplier),
309+
(ref, asyncSnapshotInfo) -> ActionListener.run(
310+
ActionListener.runBefore(listeners.acquire(), ref::close),
311+
refListener -> asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
312+
@Override
313+
public void onResponse(SnapshotInfo snapshotInfo) {
314+
if (matchesPredicates(snapshotInfo)) {
315+
totalCount.incrementAndGet();
316+
if (afterPredicate.test(snapshotInfo)) {
317+
allSnapshotInfos.add(snapshotInfo.maybeWithoutIndices(indices));
318+
}
307319
}
320+
refListener.onResponse(null);
308321
}
309-
refListener.onResponse(null);
310-
}
311322

312-
@Override
313-
public void onFailure(Exception e) {
314-
if (ignoreUnavailable) {
315-
logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
316-
refListener.onResponse(null);
317-
} else {
318-
refListener.onFailure(e);
323+
@Override
324+
public void onFailure(Exception e) {
325+
if (ignoreUnavailable) {
326+
logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
327+
refListener.onResponse(null);
328+
} else {
329+
refListener.onFailure(e);
330+
}
319331
}
320-
}
321-
})
322-
),
323-
getSnapshotInfoExecutor.getMaxRunningTasks(),
324-
() -> {}
325-
);
326-
}));
327-
}
332+
})
333+
),
334+
getSnapshotInfoExecutor.getMaxRunningTasks(),
335+
() -> {}
336+
)
337+
)
338+
)
339+
);
328340
}
329341
}
330342

@@ -383,6 +395,17 @@ private interface AsyncSnapshotInfo {
383395
void getSnapshotInfo(ActionListener<SnapshotInfo> listener);
384396
}
385397

398+
/**
399+
* An asynchronous supplier of the collection of snapshots contained in a repository, as an iterator over snapshots each represented
400+
* as an {@link AsyncSnapshotInfo}.
401+
*/
402+
private interface AsyncSnapshotInfoIterator {
403+
/**
404+
* @param listener completed, possibly asynchronously, with the appropriate iterator over {@link AsyncSnapshotInfo} instances.
405+
*/
406+
void getAsyncSnapshotInfoIterator(ActionListener<Iterator<AsyncSnapshotInfo>> listener);
407+
}
408+
386409
/**
387410
* @return an {@link AsyncSnapshotInfo} for the given in-progress snapshot entry.
388411
*/

0 commit comments

Comments
 (0)