diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index a158669d936fe..d50973f3801b6 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -28,6 +28,7 @@ import java.util.function.Consumer; import java.util.function.Function; +import static org.elasticsearch.action.ActionListenerImplementations.checkedRunnableFromReleasable; import static org.elasticsearch.action.ActionListenerImplementations.runnableFromReleasable; import static org.elasticsearch.action.ActionListenerImplementations.safeAcceptException; import static org.elasticsearch.action.ActionListenerImplementations.safeOnFailure; @@ -335,6 +336,16 @@ static ActionListener runBefore(ActionListener de return assertOnce(new ActionListenerImplementations.RunBeforeActionListener<>(delegate, runBefore)); } + /** + * Wraps a given listener and returns a new listener which releases the provided {@code releaseBefore} + * resource before the listener is notified via either {@code #onResponse} or {@code #onFailure}. + */ + static ActionListener releaseBefore(Releasable releaseBefore, ActionListener delegate) { + return assertOnce( + new ActionListenerImplementations.RunBeforeActionListener<>(delegate, checkedRunnableFromReleasable(releaseBefore)) + ); + } + /** * Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)} * and {@link #onFailure(Exception)} of the provided listener will be called at most once. diff --git a/server/src/main/java/org/elasticsearch/action/ActionListenerImplementations.java b/server/src/main/java/org/elasticsearch/action/ActionListenerImplementations.java index 00cbe475f162e..c00f22f9e5f36 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListenerImplementations.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListenerImplementations.java @@ -58,6 +58,20 @@ public String toString() { }; } + static CheckedRunnable checkedRunnableFromReleasable(Releasable releasable) { + return new CheckedRunnable<>() { + @Override + public void run() { + Releasables.closeExpectNoException(releasable); + } + + @Override + public String toString() { + return "release[" + releasable + "]"; + } + }; + } + static void safeAcceptException(Consumer consumer, Exception e) { assert e != null; try { diff --git a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java index bb785b2b6093a..50b8c860109d6 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; @@ -610,16 +611,26 @@ public String toString() { ); } + public void testReleaseBefore() { + runReleaseListenerTest(true, false, (delegate, releasable) -> ActionListener.releaseBefore(releasable, delegate)); + runReleaseListenerTest(true, true, (delegate, releasable) -> ActionListener.releaseBefore(releasable, delegate)); + runReleaseListenerTest(false, false, (delegate, releasable) -> ActionListener.releaseBefore(releasable, delegate)); + } + public void testReleaseAfter() { - runReleaseAfterTest(true, false); - runReleaseAfterTest(true, true); - runReleaseAfterTest(false, false); + runReleaseListenerTest(true, false, ActionListener::releaseAfter); + runReleaseListenerTest(true, true, ActionListener::releaseAfter); + runReleaseListenerTest(false, false, ActionListener::releaseAfter); } - private static void runReleaseAfterTest(boolean successResponse, final boolean throwFromOnResponse) { + private static void runReleaseListenerTest( + boolean successResponse, + final boolean throwFromOnResponse, + BiFunction, Releasable, ActionListener> releaseListenerProvider + ) { final AtomicBoolean released = new AtomicBoolean(); final String description = randomAlphaOfLength(10); - final ActionListener l = ActionListener.releaseAfter(new ActionListener<>() { + final ActionListener l = releaseListenerProvider.apply(new ActionListener<>() { @Override public void onResponse(Void unused) { if (throwFromOnResponse) {