Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123246.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123246
summary: Deduplicate allocation stats calls
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.SingleResultDeduplicator;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -46,7 +48,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc

public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");

private final AllocationStatsService allocationStatsService;
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
private final DiskThresholdSettings diskThresholdSettings;

@Inject
Expand All @@ -68,7 +70,10 @@ public TransportGetAllocationStatsAction(
TransportGetAllocationStatsAction.Response::new,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You wouldn't want to fork anymore with the deduplicator would you? Only fork in case you actually do the computation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes that's right of course. I'm always in two minds about adding more non-forking actions: they increase the risk of serious pain in future (for nontechnical reasons) at the cost of a little more latency right now. I'll let you have this one without prejudice tho 😉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd think forking lowers the risk of pain the future, but under load already are at the other end of this.
Forking is far more expensive than just the fork in the real world. You also need to account for the fact that you'll allocate a buffer off of the channel's thread and release that buffer which comes with contention very quickly unfortunately. Just to illustrate this a little :)

image

);
this.allocationStatsService = allocationStatsService;
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
threadPool.getThreadContext(),
l -> ActionListener.completeWith(l, allocationStatsService::stats)
);
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
}

Expand All @@ -84,12 +89,13 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
listener.onResponse(
new Response(
request.metrics().contains(Metric.ALLOCATIONS) ? allocationStatsService.stats() : Map.of(),
request.metrics().contains(Metric.FS) ? diskThresholdSettings : null
)
);
final SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep = request.metrics().contains(Metric.ALLOCATIONS)
? SubscribableListener.newForked(allocationStatsSupplier::execute)
: SubscribableListener.newSucceeded(Map.of());

allocationStatsStep.andThenApply(
allocationStats -> new Response(allocationStats, request.metrics().contains(Metric.FS) ? diskThresholdSettings : null)
).addListener(listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -112,4 +116,49 @@ public void testReturnsOnlyRequestedStats() throws Exception {
assertNull(response.getDiskThresholdSettings());
}
}

public void testDeduplicatesStatsComputations() throws InterruptedException {
final var requestCounter = new AtomicInteger();
final var isExecuting = new AtomicBoolean();
when(allocationStatsService.stats()).thenAnswer(invocation -> {
try {
assertTrue(isExecuting.compareAndSet(false, true));
return Map.of(Integer.toString(requestCounter.incrementAndGet()), NodeAllocationStatsTests.randomNodeAllocationStats());
} finally {
Thread.yield();
assertTrue(isExecuting.compareAndSet(true, false));
}
});

final var threads = new Thread[between(1, 5)];
final var startBarrier = new CyclicBarrier(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
safeAwait(startBarrier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: could use org.elasticsearch.test.ESTestCase#startInParallel here ? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cute TIL


final var minRequestIndex = requestCounter.get();

final TransportGetAllocationStatsAction.Response response = safeAwait(
l -> action.masterOperation(
mock(Task.class),
new TransportGetAllocationStatsAction.Request(
TEST_REQUEST_TIMEOUT,
TaskId.EMPTY_TASK_ID,
EnumSet.of(Metric.ALLOCATIONS)
),
ClusterState.EMPTY_STATE,
l
)
);

final var requestIndex = Integer.valueOf(response.getNodeAllocationStats().keySet().iterator().next());
assertThat(requestIndex, greaterThanOrEqualTo(minRequestIndex)); // did not get a stale result
}, "thread-" + i);
threads[i].start();
}

for (final var thread : threads) {
thread.join();
}
}
}