Skip to content

Commit 5b63c6f

Browse files
Refactor lambda passed to deduplicator to check cache in transport thread
1 parent 53f307c commit 5b63c6f

File tree

1 file changed

+12
-21
lines changed

1 file changed

+12
-21
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,21 +87,19 @@ public TransportGetAllocationStatsAction(
8787
);
8888
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
8989
this.allocationStatsCache = new AllocationStatsCache(CACHE_MAX_AGE_SETTING.get(settings).millis(), threadPool);
90-
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
91-
threadPool.getThreadContext(),
92-
l -> managementExecutor.execute(ActionRunnable.supply(l, () -> {
93-
// Check the cache again here to prevent duplicate work when a thread has a cache miss and is just about to fork just as
94-
// other threads are coming off a deduplicator call that is about to finish.
95-
final var cachedStats = allocationStatsCache.get();
96-
if (cachedStats != null) {
97-
return cachedStats;
98-
}
90+
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
91+
final var cachedStats = allocationStatsCache.get();
92+
if (cachedStats != null) {
93+
l.onResponse(cachedStats);
94+
return;
95+
}
9996

97+
managementExecutor.execute(ActionRunnable.supply(l, () -> {
10098
final var stats = allocationStatsService.stats();
10199
allocationStatsCache.put(stats);
102100
return stats;
103-
}))
104-
);
101+
}));
102+
});
105103
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
106104
}
107105

@@ -119,16 +117,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
119117
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
120118
// NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
121119

122-
SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep;
123-
124-
if (request.metrics().contains(Metric.ALLOCATIONS)) {
125-
final var cachedStats = allocationStatsCache.get();
126-
allocationStatsStep = cachedStats != null
127-
? SubscribableListener.newSucceeded(cachedStats)
128-
: SubscribableListener.newForked(allocationStatsSupplier::execute);
129-
} else {
130-
allocationStatsStep = SubscribableListener.newSucceeded(Map.of());
131-
}
120+
final SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep = request.metrics().contains(Metric.ALLOCATIONS)
121+
? SubscribableListener.newForked(allocationStatsSupplier::execute)
122+
: SubscribableListener.newSucceeded(Map.of());
132123

133124
allocationStatsStep.andThenApply(
134125
allocationStats -> new Response(allocationStats, request.metrics().contains(Metric.FS) ? diskThresholdSettings : null)

0 commit comments

Comments
 (0)