Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -252,6 +254,13 @@ public String toString() {
}
}

/**
* Creates a listener which releases the given resource on completion (whether success or failure)
*/
static <Response> ActionListener<Response> releasing(Releasable releasable) {
return wrap(runnableFromReleasable(releasable));
}

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
Expand Down Expand Up @@ -362,6 +371,14 @@ static <Response> ActionListener<Response> runAfter(ActionListener<Response> del
return new RunAfterActionListener<>(delegate, runAfter);
}

/**
* Wraps a given listener and returns a new listener which releases the provided {@code releaseAfter}
* resource when the listener is notified via either {@code #onResponse} or {@code #onFailure}.
*/
static <Response> ActionListener<Response> releaseAfter(ActionListener<Response> delegate, Releasable releaseAfter) {
return new RunAfterActionListener<>(delegate, runnableFromReleasable(releaseAfter));
}

final class RunAfterActionListener<T> extends Delegating<T, T> {

private final Runnable runAfter;
Expand Down Expand Up @@ -498,4 +515,18 @@ static <Response> void completeWith(ActionListener<Response> listener, CheckedSu
throw ex;
}
}

private static Runnable runnableFromReleasable(Releasable releasable) {
return new Runnable() {
@Override
public void run() {
Releasables.closeExpectNoException(releasable);
}

@Override
public String toString() {
return "release[" + releasable + "]";
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ public CountDownActionListener(int groupSize, ActionListener<Void> delegate) {
countDown = new AtomicInteger(groupSize);
}

/**
* Creates a new listener
* @param groupSize the group size
* @param runnable the runnable
*/
public CountDownActionListener(int groupSize, Runnable runnable) {
this(groupSize, ActionListener.wrap(Objects.requireNonNull(runnable)));
}

private boolean countDown() {
final var result = countDown.getAndUpdate(current -> Math.max(0, current - 1));
assert result > 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.support;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A mechanism to trigger an action on the completion of some (dynamic) collection of other actions. Basic usage is as follows:
*
* <pre>
* try (var refs = new RefCountingRunnable(finalRunnable)) {
* for (var item : collection) {
* runAsyncAction(item, refs.acquire()); // releases the acquired ref on completion
* }
* }
* </pre>
*
* The delegate action is completed when execution leaves the try-with-resources block and every acquired reference is released. Unlike a
* {@link CountDown} there is no need to declare the number of subsidiary actions up front (refs can be acquired dynamically as needed) nor
* does the caller need to check for completion each time a reference is released. Moreover even outside the try-with-resources block you
* can continue to acquire additional listeners, even in a separate thread, as long as there's at least one listener outstanding:
*
* <pre>
* try (var refs = new RefCountingRunnable(finalRunnable)) {
* for (var item : collection) {
* if (condition(item)) {
* runAsyncAction(item, refs.acquire());
* }
* }
* if (flag) {
* runOneOffAsyncAction(refs.acquire());
* return;
* }
* for (var item : otherCollection) {
* var itemRef = refs.acquire(); // delays completion while the background action is pending
* executorService.execute(() -> {
* try (var ignored = itemRef) {
* if (condition(item)) {
* runOtherAsyncAction(item, refs.acquire());
* }
* }
* });
* }
* }
* </pre>
*
* In particular (and also unlike a {@link CountDown}) this works even if you don't acquire any extra refs at all: in that case, the
* delegate action executes at the end of the try-with-resources block.
*/
public final class RefCountingRunnable implements Releasable {

private static final Logger logger = LogManager.getLogger(RefCountingRunnable.class);
static final String ALREADY_CLOSED_MESSAGE = "already closed, cannot acquire or release any further refs";

private final RefCounted refCounted;
private final AtomicBoolean originalRefReleased = new AtomicBoolean();

private class AcquiredRef implements Releasable {
private final AtomicBoolean released = new AtomicBoolean();

@Override
public void close() {
releaseRef(released);
}

@Override
public String toString() {
return RefCountingRunnable.this.toString();
}
}

/**
* Construct a {@link RefCountingRunnable} which executes {@code delegate} when all refs are released.
* @param delegate The action to execute when all refs are released. This action must not throw any exception.
*/
public RefCountingRunnable(Runnable delegate) {
this.refCounted = AbstractRefCounted.of(delegate);
}

/**
* Acquire a reference to this object and return an action which releases it. The delegate {@link Runnable} is called when all its
* references have been released.
*
* Callers must take care to close the returned resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
*/
public Releasable acquire() {
if (refCounted.tryIncRef()) {
return new AcquiredRef();
}
assert false : ALREADY_CLOSED_MESSAGE;
throw new IllegalStateException(ALREADY_CLOSED_MESSAGE);
}

/**
* Acquire a reference to this object and return a listener which releases it when notified. The delegate {@link Runnable} is called
* when all its references have been released.
*/
public ActionListener<Void> acquireListener() {
return ActionListener.releasing(acquire());
}

/**
* Release the original reference to this object, which executes the delegate {@link Runnable} if there are no other references.
*
* Callers must take care to close this resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
*/
@Override
public void close() {
releaseRef(originalRefReleased);
}

private void releaseRef(AtomicBoolean released) {
if (released.compareAndSet(false, true)) {
try {
refCounted.decRef();
} catch (Exception e) {
logger.error("exception in delegate", e);
assert false : e;
}
} else {
assert false : "already closed";
}
}

@Override
public String toString() {
return refCounted.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -32,7 +33,6 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreStats;
Expand Down Expand Up @@ -161,7 +161,7 @@ public void clusterChanged(ClusterChangedEvent event) {
private class AsyncRefresh {

private final List<ActionListener<ClusterInfo>> thisRefreshListeners;
private final CountDown countDown = new CountDown(2);
private final RefCountingRunnable fetchRefs = new RefCountingRunnable(this::callListeners);

AsyncRefresh(List<ActionListener<ClusterInfo>> thisRefreshListeners) {
this.thisRefreshListeners = thisRefreshListeners;
Expand All @@ -177,15 +177,15 @@ void execute() {
return;
}

assert countDown.isCountedDown() == false;
logger.trace("starting async refresh");

try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodeStats();
}

try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchIndicesStats();
try (var ignoredRefs = fetchRefs) {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodeStats();
}
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchIndicesStats();
}
}
}

Expand All @@ -203,7 +203,7 @@ private void fetchIndicesStats() {
logger,
threadPool,
ThreadPool.Names.MANAGEMENT,
ActionListener.runAfter(new ActionListener<>() {
ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
logger.trace("received indices stats response");
Expand Down Expand Up @@ -277,7 +277,7 @@ public void onFailure(Exception e) {
}
indicesStatsSummary = IndicesStatsSummary.EMPTY;
}
}, this::onStatsProcessed),
}, fetchRefs.acquire()),
false
)
);
Expand All @@ -288,7 +288,7 @@ private void fetchNodeStats() {
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
nodesStatsRequest.timeout(fetchTimeout);
client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.runAfter(new ActionListener<>() {
client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(NodesStatsResponse nodesStatsResponse) {
logger.trace("received node stats response");
Expand Down Expand Up @@ -318,18 +318,12 @@ public void onFailure(Exception e) {
leastAvailableSpaceUsages = Map.of();
mostAvailableSpaceUsages = Map.of();
}
}, this::onStatsProcessed));
}

private void onStatsProcessed() {
if (countDown.countDown()) {
logger.trace("stats all received, computing cluster info and notifying listeners");
callListeners();
}
}, fetchRefs.acquire()));
}

private void callListeners() {
try {
logger.trace("stats all received, computing cluster info and notifying listeners");
final ClusterInfo clusterInfo = getClusterInfo();
boolean anyListeners = false;
for (final Consumer<ClusterInfo> listener : listeners) {
Expand Down
Loading