diff --git a/server/src/main/java/org/elasticsearch/action/RelaxedSingleResultDeduplicator.java b/server/src/main/java/org/elasticsearch/action/RelaxedSingleResultDeduplicator.java new file mode 100644 index 0000000000000..c2668aa468089 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/RelaxedSingleResultDeduplicator.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action; + +import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.util.concurrent.ThreadContext; + +import java.util.function.Consumer; + +/** + * Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can + * share the result from a single call to the wrapped action. This implementation is similar to {@link StrictSingleResultDeduplicator} but + * relaxed in the sense that it allows the result of a currently running computation to be used for listeners that queue up during that + * computation. + * + * @param Result type + */ +public class RelaxedSingleResultDeduplicator extends SingleResultDeduplicator { + + private SubscribableListener waitingListeners; + + public RelaxedSingleResultDeduplicator(ThreadContext threadContext, Consumer> executeAction) { + super(threadContext, executeAction); + } + + @Override + public void execute(ActionListener listener) { + final var wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext); + synchronized (this) { + if (waitingListeners != null) { + waitingListeners.addListener(wrappedListener); + return; + } + waitingListeners = new SubscribableListener<>(); + waitingListeners.addListener(ActionListener.runBefore(wrappedListener, () -> { + synchronized (this) { + waitingListeners = null; + } + })); + } + ActionListener.run(waitingListeners, executeAction::accept); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/SingleResultDeduplicator.java b/server/src/main/java/org/elasticsearch/action/SingleResultDeduplicator.java index d6280ef5bef79..21ed01de43f22 100644 --- a/server/src/main/java/org/elasticsearch/action/SingleResultDeduplicator.java +++ b/server/src/main/java/org/elasticsearch/action/SingleResultDeduplicator.java @@ -9,41 +9,20 @@ package org.elasticsearch.action; -import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.Nullable; -import java.util.ArrayList; -import java.util.List; import java.util.function.Consumer; /** - * * Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can - * share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers - * stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to - * be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to - * be used to deduplicate results from actions that produce results that change over time transparently. + * share the result from a single call to the wrapped action. * * @param Result type */ -public final class SingleResultDeduplicator { - - private final ThreadContext threadContext; - - /** - * List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in - * progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued - * up here once done. - */ - private List> waitingListeners; - /** - * The threadContext associated with the first listener in the waitingListeners. This context will be restored right before - * we perform the {@code executeAction}. - */ - private ThreadContext.StoredContext waitingStoredContext; +public abstract class SingleResultDeduplicator { - private final Consumer> executeAction; + protected final ThreadContext threadContext; + protected final Consumer> executeAction; public SingleResultDeduplicator(ThreadContext threadContext, Consumer> executeAction) { this.threadContext = threadContext; @@ -51,75 +30,10 @@ public SingleResultDeduplicator(ThreadContext threadContext, Consumer listener) { - synchronized (this) { - if (waitingListeners == null) { - // no queued up listeners, just execute this one directly without deduplication and instantiate the list so that - // subsequent executions will wait - waitingListeners = new ArrayList<>(); - waitingStoredContext = null; - } else { - // already running an execution, queue this one up - if (waitingListeners.isEmpty()) { - // Only the first listener in queue needs the stored context which is used for running executeAction - assert waitingStoredContext == null; - waitingStoredContext = threadContext.newStoredContext(); - } - waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)); - return; - } - } - doExecute(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), null); - } - - private void doExecute(ActionListener listener, @Nullable ThreadContext.StoredContext storedContext) { - final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final List> listeners; - final ThreadContext.StoredContext thisStoredContext; - synchronized (this) { - if (waitingListeners.isEmpty()) { - // no listeners were queued up while this execution ran, so we just reset the state to not having a running execution - waitingListeners = null; - waitingStoredContext = null; - return; - } else { - // we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the - // listeners currently queued up - listeners = waitingListeners; - thisStoredContext = waitingStoredContext; - // This batch of listeners will use the context of the first listener in this batch for the work execution - assert thisStoredContext != null : "stored context must not be null for the first listener in a batch"; - waitingListeners = new ArrayList<>(); - waitingStoredContext = null; - } - } - - // Create a child threadContext so that the parent context remains unchanged when the child execution ends. - // This ensures the parent does not see response headers from the child execution. - try (var ignore = threadContext.newStoredContext()) { - doExecute(new ActionListener<>() { - @Override - public void onResponse(T response) { - ActionListener.onResponse(listeners, response); - } - - @Override - public void onFailure(Exception e) { - ActionListener.onFailure(listeners, e); - } - }, thisStoredContext); - } - }); - // Restore the given threadContext before proceed with the work execution. - // This ensures all executions begin execution with their own context. - if (storedContext != null) { - storedContext.restore(); - } - ActionListener.run(wrappedListener, executeAction::accept); - } + public abstract void execute(ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/action/StrictSingleResultDeduplicator.java b/server/src/main/java/org/elasticsearch/action/StrictSingleResultDeduplicator.java new file mode 100644 index 0000000000000..6e22b6c1f02e9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/StrictSingleResultDeduplicator.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action; + +import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * + * Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can + * share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers + * stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to + * be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to + * be used to deduplicate results from actions that produce results that change over time transparently. + * + * @param Result type + */ +public final class StrictSingleResultDeduplicator extends SingleResultDeduplicator { + + /** + * List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in + * progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued + * up here once done. + */ + private List> waitingListeners; + /** + * The threadContext associated with the first listener in the waitingListeners. This context will be restored right before + * we perform the {@code executeAction}. + */ + private ThreadContext.StoredContext waitingStoredContext; + + public StrictSingleResultDeduplicator(ThreadContext threadContext, Consumer> executeAction) { + super(threadContext, executeAction); + } + + @Override + public void execute(ActionListener listener) { + synchronized (this) { + if (waitingListeners == null) { + // no queued up listeners, just execute this one directly without deduplication and instantiate the list so that + // subsequent executions will wait + waitingListeners = new ArrayList<>(); + waitingStoredContext = null; + } else { + // already running an execution, queue this one up + if (waitingListeners.isEmpty()) { + // Only the first listener in queue needs the stored context which is used for running executeAction + assert waitingStoredContext == null; + waitingStoredContext = threadContext.newStoredContext(); + } + waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)); + return; + } + } + doExecute(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), null); + } + + private void doExecute(ActionListener listener, @Nullable ThreadContext.StoredContext storedContext) { + final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { + final List> listeners; + final ThreadContext.StoredContext thisStoredContext; + synchronized (this) { + if (waitingListeners.isEmpty()) { + // no listeners were queued up while this execution ran, so we just reset the state to not having a running execution + waitingListeners = null; + waitingStoredContext = null; + return; + } else { + // we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the + // listeners currently queued up + listeners = waitingListeners; + thisStoredContext = waitingStoredContext; + // This batch of listeners will use the context of the first listener in this batch for the work execution + assert thisStoredContext != null : "stored context must not be null for the first listener in a batch"; + waitingListeners = new ArrayList<>(); + waitingStoredContext = null; + } + } + + // Create a child threadContext so that the parent context remains unchanged when the child execution ends. + // This ensures the parent does not see response headers from the child execution. + try (var ignore = threadContext.newStoredContext()) { + doExecute(new ActionListener<>() { + @Override + public void onResponse(T response) { + ActionListener.onResponse(listeners, response); + } + + @Override + public void onFailure(Exception e) { + ActionListener.onFailure(listeners, e); + } + }, thisStoredContext); + } + }); + // Restore the given threadContext before proceed with the work execution. + // This ensures all executions begin execution with their own context. + if (storedContext != null) { + storedContext.restore(); + } + ActionListener.run(wrappedListener, executeAction::accept); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java index eecbb3525bda9..d47f23a58d255 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.RelaxedSingleResultDeduplicator; import org.elasticsearch.action.SingleResultDeduplicator; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric; import org.elasticsearch.action.support.ActionFilters; @@ -87,7 +88,7 @@ public TransportGetAllocationStatsAction( ); final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT); this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL); - this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> { + this.allocationStatsSupplier = new RelaxedSingleResultDeduplicator<>(threadPool.getThreadContext(), l -> { final var cachedStats = allocationStatsCache.get(); if (cachedStats != null) { l.onResponse(cachedStats); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 85039f1b61792..41867915219ea 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.SingleResultDeduplicator; +import org.elasticsearch.action.StrictSingleResultDeduplicator; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.RefCountingListener; @@ -506,7 +507,7 @@ protected BlobStoreRepository( this.namedXContentRegistry = namedXContentRegistry; this.basePath = basePath; this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings()); - this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>( + this.repoDataLoadDeduplicator = new StrictSingleResultDeduplicator<>( threadPool.getThreadContext(), listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META) .execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)) diff --git a/server/src/test/java/org/elasticsearch/transport/RelaxedSingleResultDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/RelaxedSingleResultDeduplicatorTests.java new file mode 100644 index 0000000000000..a2f3faad1a07d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RelaxedSingleResultDeduplicatorTests.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.RelaxedSingleResultDeduplicator; +import org.elasticsearch.action.SingleResultDeduplicator; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +public class RelaxedSingleResultDeduplicatorTests extends SingleResultDeduplicatorTests { + + @Override + protected SingleResultDeduplicator makeSingleResultDeduplicator( + ThreadContext threadContext, + Consumer> executeAction + ) { + return new RelaxedSingleResultDeduplicator(threadContext, executeAction); + } + + public void testDeduplicate() { + final int numThreads = between(2, 10); + final var actionExecutionCount = new AtomicInteger(0); + final var responses = new Object[numThreads]; + final var resultObjectRef = new AtomicReference(); + final var countDownLatch = new CountDownLatch(numThreads - 1); + + final Consumer> computation = l -> { + final var count = actionExecutionCount.incrementAndGet(); + // The first thread will block until all the other callers have added a waiting listener. + safeAwait(countDownLatch); + final var resultObject = new Object(); + resultObjectRef.set(resultObject); + l.onResponse(resultObject); + }; + final var deduplicator = makeSingleResultDeduplicator(new ThreadContext(Settings.EMPTY), computation); + + ESTestCase.startInParallel(numThreads, threadNumber -> safeAwait(l -> { + deduplicator.execute(ActionTestUtils.assertNoFailureListener(response -> { + assertNull(responses[threadNumber]); + responses[threadNumber] = response; + l.onResponse(null); + })); + // If another thread has already gone down into the computation action, this thread will have a waiting listener added to + // the list and the call to deduplicator.execute() will return immediately. + countDownLatch.countDown(); + })); + + assertEquals("expected the action computation to run once", 1, actionExecutionCount.get()); + for (int i = 0; i < numThreads; i++) { + assertSame("unexpected result response for thread " + i, resultObjectRef.get(), responses[i]); + } + } + + public void testDeduplicateWithActionFailure() { + final int numThreads = between(2, 10); + final var actionExecutionCount = new AtomicInteger(0); + final var failures = new Exception[numThreads]; + final var exceptionRef = new AtomicReference(); + final var countDownLatch = new CountDownLatch(numThreads - 1); + + final Consumer> computation = l -> { + final var count = actionExecutionCount.incrementAndGet(); + // The first thread will block until all the other callers have added a waiting listener. + safeAwait(countDownLatch); + exceptionRef.set(new RuntimeException("failure")); + l.onFailure(exceptionRef.get()); + }; + final var deduplicator = makeSingleResultDeduplicator(new ThreadContext(Settings.EMPTY), computation); + + ESTestCase.startInParallel(numThreads, threadNumber -> safeAwait(l -> { + deduplicator.execute(ActionTestUtils.assertNoSuccessListener(e -> { + assertNull(failures[threadNumber]); + failures[threadNumber] = e; + l.onResponse(null); + })); + // If another thread has already gone down into the computation action, this thread will have a waiting listener added to + // the list and the call to deduplicator.execute() will return immediately. + countDownLatch.countDown(); + })); + + // Verify that the both the listener for the thread that ran the action and all waiting listeners all get the same failure. + assertEquals("expected the action computation to run once", 1, actionExecutionCount.get()); + for (int i = 0; i < numThreads; i++) { + assertSame("unexpected failure exception for thread " + i, exceptionRef.get(), failures[i]); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/SingleResultDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/SingleResultDeduplicatorTests.java index ed30da78ef725..64db4c2b1628e 100644 --- a/server/src/test/java/org/elasticsearch/transport/SingleResultDeduplicatorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SingleResultDeduplicatorTests.java @@ -9,11 +9,9 @@ package org.elasticsearch.transport; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.SingleResultDeduplicator; -import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.common.settings.Settings; @@ -31,6 +29,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -39,48 +38,12 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; -public class SingleResultDeduplicatorTests extends ESTestCase { +public abstract class SingleResultDeduplicatorTests extends ESTestCase { - public void testDeduplicatesWithoutShowingStaleData() { - final SetOnce> firstListenerRef = new SetOnce<>(); - final SetOnce> secondListenerRef = new SetOnce<>(); - final var deduplicator = new SingleResultDeduplicator<>(new ThreadContext(Settings.EMPTY), l -> { - if (firstListenerRef.trySet(l) == false) { - secondListenerRef.set(l); - } - }); - final Object result1 = new Object(); - final Object result2 = new Object(); - - final int totalListeners = randomIntBetween(2, 10); - final boolean[] called = new boolean[totalListeners]; - deduplicator.execute(ActionTestUtils.assertNoFailureListener(response -> { - assertFalse(called[0]); - called[0] = true; - assertEquals(result1, response); - })); - - for (int i = 1; i < totalListeners; i++) { - final int index = i; - deduplicator.execute(ActionTestUtils.assertNoFailureListener(response -> { - assertFalse(called[index]); - called[index] = true; - assertEquals(result2, response); - })); - } - for (int i = 0; i < totalListeners; i++) { - assertFalse(called[i]); - } - firstListenerRef.get().onResponse(result1); - assertTrue(called[0]); - for (int i = 1; i < totalListeners; i++) { - assertFalse(called[i]); - } - secondListenerRef.get().onResponse(result2); - for (int i = 0; i < totalListeners; i++) { - assertTrue(called[i]); - } - } + protected abstract SingleResultDeduplicator makeSingleResultDeduplicator( + ThreadContext threadContext, + Consumer> executeAction + ); public void testThreadContextPreservation() { final var resources = new Releasable[1]; @@ -97,7 +60,7 @@ public void testThreadContextPreservation() { final var workerResponseHeaderName = "worker-response-header"; final var threadHeaderName = "test-header"; final var threadContext = new ThreadContext(Settings.EMPTY); - final var deduplicator = new SingleResultDeduplicator(threadContext, l -> { + final var deduplicator = this.makeSingleResultDeduplicator(threadContext, l -> { threadContext.putHeader(workerRequestHeaderName, randomAlphaOfLength(5)); threadContext.addResponseHeader( workerResponseHeaderName, diff --git a/server/src/test/java/org/elasticsearch/transport/StrictSingleResultDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/StrictSingleResultDeduplicatorTests.java new file mode 100644 index 0000000000000..f09189f4f3df2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/StrictSingleResultDeduplicatorTests.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.transport; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.SingleResultDeduplicator; +import org.elasticsearch.action.StrictSingleResultDeduplicator; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; + +import java.util.function.Consumer; + +public class StrictSingleResultDeduplicatorTests extends SingleResultDeduplicatorTests { + + @Override + protected SingleResultDeduplicator makeSingleResultDeduplicator( + ThreadContext threadContext, + Consumer> executeAction + ) { + return new StrictSingleResultDeduplicator(threadContext, executeAction); + } + + public void testDeduplicatesWithoutShowingStaleData() { + final SetOnce> firstListenerRef = new SetOnce<>(); + final SetOnce> secondListenerRef = new SetOnce<>(); + final Object result1 = new Object(); + final Object result2 = new Object(); + final var deduplicator = makeSingleResultDeduplicator(new ThreadContext(Settings.EMPTY), l -> { + if (firstListenerRef.trySet(l) == false) { + secondListenerRef.set(l); + } + }); + + final int totalListeners = randomIntBetween(2, 10); + final boolean[] called = new boolean[totalListeners]; + deduplicator.execute(ActionTestUtils.assertNoFailureListener(response -> { + assertFalse(called[0]); + called[0] = true; + assertEquals(result1, response); + })); + + for (int i = 1; i < totalListeners; i++) { + final int index = i; + deduplicator.execute(ActionTestUtils.assertNoFailureListener(response -> { + assertFalse(called[index]); + called[index] = true; + assertEquals(result2, response); + })); + } + for (int i = 0; i < totalListeners; i++) { + assertFalse(called[i]); + } + firstListenerRef.get().onResponse(result1); + assertTrue(called[0]); + for (int i = 1; i < totalListeners; i++) { + assertFalse(called[i]); + } + secondListenerRef.get().onResponse(result2); + for (int i = 0; i < totalListeners; i++) { + assertTrue(called[i]); + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0bc72c8f78217..fb64e65690cb0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.action.SingleResultDeduplicator; +import org.elasticsearch.action.StrictSingleResultDeduplicator; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -161,7 +162,7 @@ public CcrRepository(RepositoryMetadata metadata, Client client, Settings settin this.threadPool = threadPool; this.remoteClientResponseExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); this.chunkResponseExecutor = threadPool.generic(); - csDeduplicator = new SingleResultDeduplicator<>( + csDeduplicator = new StrictSingleResultDeduplicator<>( threadPool.getThreadContext(), l -> getRemoteClusterClient().execute( ClusterStateAction.REMOTE_TYPE,