Skip to content

Commit 471fbfe

Browse files
Use RelaxedSingleResultDeduplicator in TransportGetAllocationStatsAction
The current SingleResultDeduplicator will run the computation action again, in the same thread, if other threads call execute() while the current computation is running. For a potentially expensive action this can delay the response to the original caller by an additional action execution time. Since we are caching in TransportGetAllocationStatsAction it also is inconsistent with the strict requirements in the current SingleResultDeduplicator, since we can potentially return a response that was calculated before the call to execute(). Note also that due to the recursive nature of the current SingleResultDeduplicator implementation it is possible to continuously delay the original thread if additional threads call execute() while these computations run. This change refactors SingleResultDeduplicator into an interface with two implementations, a strict form which is the same as the original SingleResultDeduplicator, and a relaxed version that completes all waiting listeners, along with the original call's listener, with the single computation result.
1 parent e840334 commit 471fbfe

File tree

9 files changed

+381
-139
lines changed

9 files changed

+381
-139
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action;
11+
12+
import org.elasticsearch.action.support.ContextPreservingActionListener;
13+
import org.elasticsearch.action.support.SubscribableListener;
14+
import org.elasticsearch.common.util.concurrent.ThreadContext;
15+
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
import java.util.function.Consumer;
19+
20+
/**
21+
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
22+
* share the result from a single call to the wrapped action. This implementation is similar to {@link StrictSingleResultDeduplicator} but
23+
* relaxed in the sense that it allows the result of a currently running computation to be used for listeners that queue up during that
24+
* computation.
25+
*
26+
* @param <T> Result type
27+
*/
28+
public class RelaxedSingleResultDeduplicator<T> extends SingleResultDeduplicator<T> {
29+
30+
private ActionListenerList<T> waitingListeners;
31+
32+
public RelaxedSingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
33+
super(threadContext, executeAction);
34+
}
35+
36+
@Override
37+
public void execute(ActionListener<T> listener) {
38+
final var wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
39+
synchronized (this) {
40+
if (waitingListeners != null) {
41+
waitingListeners.add(wrappedListener);
42+
return;
43+
}
44+
waitingListeners = new ActionListenerList<>();
45+
waitingListeners.add(wrappedListener);
46+
}
47+
final var currentWaitingListeners = waitingListeners;
48+
SubscribableListener.newForked(executeAction::accept).addListener(ActionListener.runBefore(currentWaitingListeners, () -> {
49+
synchronized (this) {
50+
waitingListeners = null;
51+
}
52+
}));
53+
}
54+
55+
private static class ActionListenerList<T> implements ActionListener<T> {
56+
private final List<ActionListener<T>> listeners = new ArrayList<>();
57+
58+
void add(ActionListener<T> listener) {
59+
listeners.add(listener);
60+
}
61+
62+
@Override
63+
public void onResponse(T response) {
64+
ActionListener.onResponse(listeners, response);
65+
}
66+
67+
@Override
68+
public void onFailure(Exception e) {
69+
ActionListener.onFailure(listeners, e);
70+
}
71+
}
72+
}

server/src/main/java/org/elasticsearch/action/SingleResultDeduplicator.java

Lines changed: 6 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -9,117 +9,31 @@
99

1010
package org.elasticsearch.action;
1111

12-
import org.elasticsearch.action.support.ContextPreservingActionListener;
1312
import org.elasticsearch.common.util.concurrent.ThreadContext;
14-
import org.elasticsearch.core.Nullable;
1513

16-
import java.util.ArrayList;
17-
import java.util.List;
1814
import java.util.function.Consumer;
1915

2016
/**
21-
*
2217
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
23-
* share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
24-
* stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
25-
* be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
26-
* be used to deduplicate results from actions that produce results that change over time transparently.
18+
* share the result from a single call to the wrapped action.
2719
*
2820
* @param <T> Result type
2921
*/
30-
public final class SingleResultDeduplicator<T> {
31-
32-
private final ThreadContext threadContext;
33-
34-
/**
35-
* List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
36-
* progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
37-
* up here once done.
38-
*/
39-
private List<ActionListener<T>> waitingListeners;
40-
/**
41-
* The threadContext associated with the first listener in the waitingListeners. This context will be restored right before
42-
* we perform the {@code executeAction}.
43-
*/
44-
private ThreadContext.StoredContext waitingStoredContext;
22+
public abstract class SingleResultDeduplicator<T> {
4523

46-
private final Consumer<ActionListener<T>> executeAction;
24+
protected final ThreadContext threadContext;
25+
protected final Consumer<ActionListener<T>> executeAction;
4726

4827
public SingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
4928
this.threadContext = threadContext;
5029
this.executeAction = executeAction;
5130
}
5231

5332
/**
54-
* Execute the action for the given {@code listener}.
33+
* Execute the action for the given {@link ActionListener}.
5534
* @param listener listener to resolve with execution result. The listener always has its threadContext preserved, i.e.
5635
* when the listener is invoked, it will see its original threadContext plus any response headers generated
5736
* by performing the {@code executeAction}.
5837
*/
59-
public void execute(ActionListener<T> listener) {
60-
synchronized (this) {
61-
if (waitingListeners == null) {
62-
// no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
63-
// subsequent executions will wait
64-
waitingListeners = new ArrayList<>();
65-
waitingStoredContext = null;
66-
} else {
67-
// already running an execution, queue this one up
68-
if (waitingListeners.isEmpty()) {
69-
// Only the first listener in queue needs the stored context which is used for running executeAction
70-
assert waitingStoredContext == null;
71-
waitingStoredContext = threadContext.newStoredContext();
72-
}
73-
waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
74-
return;
75-
}
76-
}
77-
doExecute(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), null);
78-
}
79-
80-
private void doExecute(ActionListener<T> listener, @Nullable ThreadContext.StoredContext storedContext) {
81-
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
82-
final List<ActionListener<T>> listeners;
83-
final ThreadContext.StoredContext thisStoredContext;
84-
synchronized (this) {
85-
if (waitingListeners.isEmpty()) {
86-
// no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
87-
waitingListeners = null;
88-
waitingStoredContext = null;
89-
return;
90-
} else {
91-
// we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
92-
// listeners currently queued up
93-
listeners = waitingListeners;
94-
thisStoredContext = waitingStoredContext;
95-
// This batch of listeners will use the context of the first listener in this batch for the work execution
96-
assert thisStoredContext != null : "stored context must not be null for the first listener in a batch";
97-
waitingListeners = new ArrayList<>();
98-
waitingStoredContext = null;
99-
}
100-
}
101-
102-
// Create a child threadContext so that the parent context remains unchanged when the child execution ends.
103-
// This ensures the parent does not see response headers from the child execution.
104-
try (var ignore = threadContext.newStoredContext()) {
105-
doExecute(new ActionListener<>() {
106-
@Override
107-
public void onResponse(T response) {
108-
ActionListener.onResponse(listeners, response);
109-
}
110-
111-
@Override
112-
public void onFailure(Exception e) {
113-
ActionListener.onFailure(listeners, e);
114-
}
115-
}, thisStoredContext);
116-
}
117-
});
118-
// Restore the given threadContext before proceed with the work execution.
119-
// This ensures all executions begin execution with their own context.
120-
if (storedContext != null) {
121-
storedContext.restore();
122-
}
123-
ActionListener.run(wrappedListener, executeAction::accept);
124-
}
38+
public abstract void execute(ActionListener<T> listener);
12539
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action;
11+
12+
import org.elasticsearch.action.support.ContextPreservingActionListener;
13+
import org.elasticsearch.common.util.concurrent.ThreadContext;
14+
import org.elasticsearch.core.Nullable;
15+
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
import java.util.function.Consumer;
19+
20+
/**
21+
*
22+
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
23+
* share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
24+
* stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
25+
* be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
26+
* be used to deduplicate results from actions that produce results that change over time transparently.
27+
*
28+
* @param <T> Result type
29+
*/
30+
public final class StrictSingleResultDeduplicator<T> extends SingleResultDeduplicator<T> {
31+
32+
/**
33+
* List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
34+
* progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
35+
* up here once done.
36+
*/
37+
private List<ActionListener<T>> waitingListeners;
38+
/**
39+
* The threadContext associated with the first listener in the waitingListeners. This context will be restored right before
40+
* we perform the {@code executeAction}.
41+
*/
42+
private ThreadContext.StoredContext waitingStoredContext;
43+
44+
45+
public StrictSingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
46+
super(threadContext, executeAction);
47+
}
48+
49+
@Override
50+
public void execute(ActionListener<T> listener) {
51+
synchronized (this) {
52+
if (waitingListeners == null) {
53+
// no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
54+
// subsequent executions will wait
55+
waitingListeners = new ArrayList<>();
56+
waitingStoredContext = null;
57+
} else {
58+
// already running an execution, queue this one up
59+
if (waitingListeners.isEmpty()) {
60+
// Only the first listener in queue needs the stored context which is used for running executeAction
61+
assert waitingStoredContext == null;
62+
waitingStoredContext = threadContext.newStoredContext();
63+
}
64+
waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
65+
return;
66+
}
67+
}
68+
doExecute(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), null);
69+
}
70+
71+
private void doExecute(ActionListener<T> listener, @Nullable ThreadContext.StoredContext storedContext) {
72+
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
73+
final List<ActionListener<T>> listeners;
74+
final ThreadContext.StoredContext thisStoredContext;
75+
synchronized (this) {
76+
if (waitingListeners.isEmpty()) {
77+
// no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
78+
waitingListeners = null;
79+
waitingStoredContext = null;
80+
return;
81+
} else {
82+
// we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
83+
// listeners currently queued up
84+
listeners = waitingListeners;
85+
thisStoredContext = waitingStoredContext;
86+
// This batch of listeners will use the context of the first listener in this batch for the work execution
87+
assert thisStoredContext != null : "stored context must not be null for the first listener in a batch";
88+
waitingListeners = new ArrayList<>();
89+
waitingStoredContext = null;
90+
}
91+
}
92+
93+
// Create a child threadContext so that the parent context remains unchanged when the child execution ends.
94+
// This ensures the parent does not see response headers from the child execution.
95+
try (var ignore = threadContext.newStoredContext()) {
96+
doExecute(new ActionListener<>() {
97+
@Override
98+
public void onResponse(T response) {
99+
ActionListener.onResponse(listeners, response);
100+
}
101+
102+
@Override
103+
public void onFailure(Exception e) {
104+
ActionListener.onFailure(listeners, e);
105+
}
106+
}, thisStoredContext);
107+
}
108+
});
109+
// Restore the given threadContext before proceed with the work execution.
110+
// This ensures all executions begin execution with their own context.
111+
if (storedContext != null) {
112+
storedContext.restore();
113+
}
114+
ActionListener.run(wrappedListener, executeAction::accept);
115+
}
116+
}

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.ActionResponse;
1616
import org.elasticsearch.action.ActionRunnable;
1717
import org.elasticsearch.action.ActionType;
18+
import org.elasticsearch.action.RelaxedSingleResultDeduplicator;
1819
import org.elasticsearch.action.SingleResultDeduplicator;
1920
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
2021
import org.elasticsearch.action.support.ActionFilters;
@@ -87,7 +88,7 @@ public TransportGetAllocationStatsAction(
8788
);
8889
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
8990
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
90-
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
91+
this.allocationStatsSupplier = new RelaxedSingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
9192
final var cachedStats = allocationStatsCache.get();
9293
if (cachedStats != null) {
9394
l.onResponse(cachedStats);

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.ActionListener;
2828
import org.elasticsearch.action.ActionRunnable;
2929
import org.elasticsearch.action.SingleResultDeduplicator;
30+
import org.elasticsearch.action.StrictSingleResultDeduplicator;
3031
import org.elasticsearch.action.support.GroupedActionListener;
3132
import org.elasticsearch.action.support.PlainActionFuture;
3233
import org.elasticsearch.action.support.RefCountingListener;
@@ -506,7 +507,7 @@ protected BlobStoreRepository(
506507
this.namedXContentRegistry = namedXContentRegistry;
507508
this.basePath = basePath;
508509
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
509-
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
510+
this.repoDataLoadDeduplicator = new StrictSingleResultDeduplicator<>(
510511
threadPool.getThreadContext(),
511512
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
512513
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))

0 commit comments

Comments
 (0)