From efebd528a75b90696cad6672f8eb36c1101e534a Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 7 Mar 2025 14:35:28 +0100 Subject: [PATCH 01/20] Ensure that RefreshListener do not access engine under refresh lock --- .../support/replication/PostWriteRefresh.java | 28 ++--- .../index/engine/AbstractReaderManager.java | 105 ++++++++++++++++++ .../engine/ElasticsearchReaderManager.java | 20 +--- .../engine/EngineAwareRefreshListener.java | 20 ++++ .../index/engine/InternalEngine.java | 19 +--- .../engine/SafeEngineAccessThreadLocal.java | 102 +++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 70 ++++++++++-- .../index/shard/RefreshListeners.java | 18 ++- .../elasticsearch/search/SearchService.java | 2 +- .../index/shard/RefreshListenersTests.java | 4 +- 10 files changed, 314 insertions(+), 74 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java create mode 100644 server/src/main/java/org/elasticsearch/index/engine/EngineAwareRefreshListener.java create mode 100644 server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java index cefb27376f9ea..a151e79ae5a88 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java @@ -49,21 +49,18 @@ public void refreshShard( ) { switch (policy) { case NONE -> listener.onResponse(false); - case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() { - @Override - public void onResponse(Boolean forced) { - if (location != null && indexShard.routingEntry().isSearchable() == false) { - refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout); - } else { - listener.onResponse(forced); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + case WAIT_UNTIL -> { + ActionListener wrapped; + if (location != null && indexShard.routingEntry().isSearchable() == false) { + var engineOrNull = indexShard.getEngineOrNull(); + wrapped = listener.delegateFailure( + (l, forced) -> refreshUnpromotables(indexShard, engineOrNull, location, listener, forced, postWriteRefreshTimeout) + ); + } else { + wrapped = listener; } - }); + waitUntil(indexShard, location, wrapped); + } case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> { if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) { sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout); @@ -103,17 +100,16 @@ private static void waitUntil(IndexShard indexShard, Translog.Location location, private void refreshUnpromotables( IndexShard indexShard, + Engine engineOrNull, // to avoid accessing it under the RefreshListener's refreshLock Translog.Location location, ActionListener listener, boolean forced, @Nullable TimeValue postWriteRefreshTimeout ) { - Engine engineOrNull = indexShard.getEngineOrNull(); if (engineOrNull == null) { listener.onFailure(new AlreadyClosedException("Engine closed during refresh.")); return; } - engineOrNull.addFlushListener(location, listener.delegateFailureAndWrap((l, generation) -> { try ( ThreadContext.StoredContext ignore = transportService.getThreadPool() diff --git a/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java b/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java new file mode 100644 index 0000000000000..81e4349854dd2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.core.Assertions; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.SuppressForbidden; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +@SuppressForbidden(reason = "reference counting is required here") +public abstract class AbstractReaderManager extends ReferenceManager { + + @Nullable // if assertions are disabled + private final Map assertingListeners = Assertions.ENABLED ? new ConcurrentHashMap<>() : null; + + + @Override + protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { + return reference.tryIncRef(); + } + + @Override + protected int getRefCount(ElasticsearchDirectoryReader reference) { + return reference.getRefCount(); + } + + @Override + protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { + reference.decRef(); + } + + @Override + public final void addListener(RefreshListener listener) { + if (Assertions.ENABLED == false) { + super.addListener(listener); + return; + } + + final var assertingListener = new AssertingRefreshListener(listener); + var previous = assertingListeners.put(listener, assertingListener); + assert previous == null : "listener already added"; + super.addListener(assertingListener); + } + + @Override + public final void removeListener(RefreshListener listener) { + if (Assertions.ENABLED == false) { + super.removeListener(listener); + return; + } + + final var assertingListener = assertingListeners.remove(listener); + assert assertingListener != null : "listener already removed"; + super.removeListener(assertingListener); + } + + /** + * A delegating {@link RefreshListener} used to assert that refresh listeners are not accessing the engine within before/after refresh + * methods. + */ + private static class AssertingRefreshListener implements RefreshListener { + + private final RefreshListener delegate; + + private AssertingRefreshListener(RefreshListener delegate) { + this.delegate = Objects.requireNonNull(delegate); + if (Assertions.ENABLED == false) { + throw new AssertionError("Only use this when assertions are enabled"); + } + } + + @Override + public void beforeRefresh() throws IOException { + SafeEngineAccessThreadLocal.accessStart(); + try { + delegate.beforeRefresh(); + } finally { + SafeEngineAccessThreadLocal.accessEnd(); + } + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + SafeEngineAccessThreadLocal.accessStart(); + try { + delegate.afterRefresh(didRefresh); + } finally { + SafeEngineAccessThreadLocal.accessEnd(); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java index 62adc49471c69..1ff26d9023b54 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java @@ -10,10 +10,8 @@ package org.elasticsearch.index.engine; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; -import org.elasticsearch.core.SuppressForbidden; import java.io.IOException; @@ -25,8 +23,7 @@ * @see SearcherManager * */ -@SuppressForbidden(reason = "reference counting is required here") -public class ElasticsearchReaderManager extends ReferenceManager { +public class ElasticsearchReaderManager extends AbstractReaderManager { /** * Creates and returns a new ElasticsearchReaderManager from the given @@ -39,23 +36,8 @@ public ElasticsearchReaderManager(ElasticsearchDirectoryReader reader) { this.current = reader; } - @Override - protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { - reference.decRef(); - } - @Override protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException { return (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); } - - @Override - protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { - return reference.tryIncRef(); - } - - @Override - protected int getRefCount(ElasticsearchDirectoryReader reference) { - return reference.getRefCount(); - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineAwareRefreshListener.java b/server/src/main/java/org/elasticsearch/index/engine/EngineAwareRefreshListener.java new file mode 100644 index 0000000000000..18efe7f007a67 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineAwareRefreshListener.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; + +/** + * A type of {@link ReferenceManager.RefreshListener} that is called back when a new {@link Engine} is instanciated. + */ +public interface EngineAwareRefreshListener extends ReferenceManager.RefreshListener { + + void onNewEngine(Engine engine); +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fc7f6eab0856c..7ff71f5b78784 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -71,7 +71,6 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexMode; @@ -453,8 +452,7 @@ public CompletionStats completionStats(String... fieldNamePatterns) { * this specialized implementation an external refresh will immediately be reflected on the internal reader * and old segments can be released in the same way previous version did this (as a side-effect of _refresh) */ - @SuppressForbidden(reason = "reference counting is required here") - private static final class ExternalReaderManager extends ReferenceManager { + private static final class ExternalReaderManager extends AbstractReaderManager { private final BiConsumer refreshListener; private final ElasticsearchReaderManager internalReaderManager; private boolean isWarmedUp; // guarded by refreshLock @@ -495,21 +493,6 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea return newReader; // steal the reference } } - - @Override - protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { - return reference.tryIncRef(); - } - - @Override - protected int getRefCount(ElasticsearchDirectoryReader reference) { - return reference.getRefCount(); - } - - @Override - protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { - reference.decRef(); - } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java new file mode 100644 index 0000000000000..19b8f07ccbb9c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.core.Assertions; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.shard.IndexShard; + +/** + * This class is used to assert that a thread does not access the current engine reference using the method + * {@link IndexShard#getEngineOrNull()} when it is executing some protected code blocks. A protected code block is a + * block of code which starts when {@link #accessStart()} is called and ends when {@link #accessEnd()} is called. + */ +public final class SafeEngineAccessThreadLocal { + + @Nullable // if assertions are disabled + private static final ThreadLocal threadLocalAccessor; + static { + threadLocalAccessor = Assertions.ENABLED ? new ThreadLocal<>() : null; + } + + private static class Accessor { + + private final Thread thread; + private final SetOnce failure; + + private Accessor(Thread thread) { + this.thread = thread; + this.failure = new SetOnce<>(); + } + + private void setFailure(AssertionError error) { + failure.set(error); + } + + private boolean isFailed() { + return failure.get() != null; + } + + private AssertionError getFailure() { + return failure.get(); + } + + @Override + public String toString() { + return thread.toString(); + } + } + + private SafeEngineAccessThreadLocal() {} + + private static Accessor getAccessorSafe() { + final var accessor = threadLocalAccessor.get(); + if (accessor != null && accessor.isFailed()) { + throw new AssertionError("Current thread has made an unsafe access to the engine", accessor.getFailure()); + } + return accessor; + } + + public static void accessStart() { + ensureAssertionsEnabled(); + final var accessor = getAccessorSafe(); + assert accessor == null : "current accessor already set"; + threadLocalAccessor.set(new Accessor(Thread.currentThread())); + } + + public static void accessEnd() { + ensureAssertionsEnabled(); + final var accessor = getAccessorSafe(); + assert accessor != null : "current accessor not set"; + threadLocalAccessor.remove(); + } + + /** + * Use this method to assert that the current thread has not entered a protected execution code block. + */ + public static boolean assertNoAccessByCurrentThread() { + ensureAssertionsEnabled(); + final var accessor = getAccessorSafe(); + if (accessor != null) { + var message = "thread [" + accessor + "] should not access the engine using the getEngineOrNull() method"; + accessor.setFailure(new AssertionError(message)); // to be thrown later + assert false : message; + } + return true; + } + + private static void ensureAssertionsEnabled() { + if (Assertions.ENABLED == false) { + throw new AssertionError("Only use this method when assertions are enabled"); + } + assert threadLocalAccessor != null; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7fecc53826ff1..a4308c23cf7bd 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -88,12 +88,14 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.GetResult; +import org.elasticsearch.index.engine.EngineAwareRefreshListener; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; +import org.elasticsearch.index.engine.SafeEngineAccessThreadLocal; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; @@ -2240,11 +2242,25 @@ private boolean assertLastestCommitUserData() throws IOException { return true; } - private void onNewEngine(Engine newEngine) { + private void onNewEngine(Engine engine) { assert Thread.holdsLock(engineMutex); - refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); - refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); - refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); + assert engine != null; + + var engineConfig = engine.config(); + if (engineConfig.getExternalRefreshListener() != null) { + for (var listener : engineConfig.getExternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(engine); + } + } + } + if (engineConfig.getInternalRefreshListener() != null) { + for (var listener : engineConfig.getInternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(engine); + } + } + } } /** @@ -3298,6 +3314,7 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { + assert SafeEngineAccessThreadLocal.assertNoAccessByCurrentThread(); return this.currentEngineReference.get(); } @@ -4082,13 +4099,21 @@ private void setRefreshPending(Engine engine) { }); } - private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener { + private class RefreshPendingLocationListener implements EngineAwareRefreshListener { + + private Supplier lastWriteLocationSupplier; Translog.Location lastWriteLocation; + @Override + public void onNewEngine(Engine engine) { + this.lastWriteLocationSupplier = () -> engine.getTranslogLastWriteLocation(); + } + @Override public void beforeRefresh() { + assert lastWriteLocationSupplier != null; try { - lastWriteLocation = getEngine().getTranslogLastWriteLocation(); + lastWriteLocation = lastWriteLocationSupplier.get(); } catch (AlreadyClosedException exc) { // shard is closed - no location is fine lastWriteLocation = null; @@ -4097,6 +4122,7 @@ public void beforeRefresh() { @Override public void afterRefresh(boolean didRefresh) { + assert lastWriteLocationSupplier != null; if (didRefresh && lastWriteLocation != null) { pendingRefreshLocation.updateAndGet(pendingLocation -> { if (pendingLocation == null || pendingLocation.compareTo(lastWriteLocation) <= 0) { @@ -4109,14 +4135,25 @@ public void afterRefresh(boolean didRefresh) { } } - private class RefreshFieldHasValueListener implements ReferenceManager.RefreshListener { + private class RefreshFieldHasValueListener implements EngineAwareRefreshListener { + + private Supplier engineSearcherSupplier; + + @Override + public void onNewEngine(Engine engine) { + this.engineSearcherSupplier = () -> engine.acquireSearcher("field_has_value"); + } + @Override - public void beforeRefresh() {} + public void beforeRefresh() { + assert engineSearcherSupplier != null; + } @Override public void afterRefresh(boolean didRefresh) { + assert engineSearcherSupplier != null; if (enableFieldHasValue && (didRefresh || fieldInfos == FieldInfos.EMPTY)) { - try (Engine.Searcher hasValueSearcher = getEngine().acquireSearcher("field_has_value")) { + try (Engine.Searcher hasValueSearcher = engineSearcherSupplier.get()) { setFieldInfos(FieldInfos.getMergedFieldInfos(hasValueSearcher.getIndexReader())); } catch (AlreadyClosedException ignore) { // engine is closed - no updated FieldInfos is fine @@ -4133,16 +4170,25 @@ public ShardFieldStats getShardFieldStats() { return shardFieldStats; } - private class RefreshShardFieldStatsListener implements ReferenceManager.RefreshListener { + private class RefreshShardFieldStatsListener implements EngineAwareRefreshListener { + + private Supplier engineSearcherSupplier; + @Override - public void beforeRefresh() { + public void onNewEngine(Engine engine) { + this.engineSearcherSupplier = () -> engine.acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL); + } + @Override + public void beforeRefresh() { + assert engineSearcherSupplier != null; } @Override public void afterRefresh(boolean didRefresh) { + assert engineSearcherSupplier != null; if (shardFieldStats == null || didRefresh) { - try (var searcher = getEngine().acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL)) { + try (var searcher = engineSearcherSupplier.get()) { int numSegments = 0; int totalFields = 0; long usages = 0; diff --git a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java index 8a2fc396c6737..a60e812d4968f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java @@ -10,7 +10,6 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; -import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; @@ -19,6 +18,8 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineAwareRefreshListener; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; @@ -39,7 +40,7 @@ * * When {@link Closeable#close()}d it will no longer accept listeners and flush any existing listeners. */ -public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable { +public final class RefreshListeners implements EngineAwareRefreshListener, Closeable { private final IntSupplier getMaxRefreshListeners; private final Runnable forceRefresh; private final Logger logger; @@ -264,21 +265,21 @@ public synchronized int pendingCount() { /** * Setup the translog used to find the last refreshed location. */ - public void setCurrentRefreshLocationSupplier(Supplier currentRefreshLocationSupplier) { + private void setCurrentRefreshLocationSupplier(Supplier currentRefreshLocationSupplier) { this.currentRefreshLocationSupplier = currentRefreshLocationSupplier; } /** * Setup the engine used to find the last processed sequence number checkpoint. */ - public void setCurrentProcessedCheckpointSupplier(LongSupplier processedCheckpointSupplier) { + private void setCurrentProcessedCheckpointSupplier(LongSupplier processedCheckpointSupplier) { this.processedCheckpointSupplier = processedCheckpointSupplier; } /** * Setup the engine used to find the max issued seqNo. */ - public void setMaxIssuedSeqNoSupplier(LongSupplier maxIssuedSeqNoSupplier) { + private void setMaxIssuedSeqNoSupplier(LongSupplier maxIssuedSeqNoSupplier) { this.maxIssuedSeqNoSupplier = maxIssuedSeqNoSupplier; } @@ -297,6 +298,13 @@ public void setMaxIssuedSeqNoSupplier(LongSupplier maxIssuedSeqNoSupplier) { private LongSupplier processedCheckpointSupplier; private LongSupplier maxIssuedSeqNoSupplier; + @Override + public void onNewEngine(Engine engine) { + setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); + setCurrentProcessedCheckpointSupplier(engine::getProcessedLocalCheckpoint); + setMaxIssuedSeqNoSupplier(engine::getMaxSeqNo); + } + @Override public void beforeRefresh() throws IOException { currentRefreshLocation = currentRefreshLocationSupplier.get(); diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1156cd4b7bdf0..5364ca65988d3 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1120,7 +1120,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen Engine.SearcherSupplier searcherSupplier = null; ReaderContext readerContext = null; try { - searcherSupplier = shard.acquireSearcherSupplier(); + searcherSupplier = shard.acquireSearcherSupplier(); // TODO I think we need to fork here final ShardSearchContextId id = new ShardSearchContextId( sessionId, idGenerator.incrementAndGet(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index ca616dc619ec9..2aa01f95ca4a7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -162,9 +162,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); - listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); - listeners.setCurrentProcessedCheckpointSupplier(engine::getProcessedLocalCheckpoint); - listeners.setMaxIssuedSeqNoSupplier(engine::getMaxSeqNo); + listeners.onNewEngine(engine); } @After From 94632dab8d8deb81f7bfdd4b393261594389d766 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 7 Mar 2025 14:30:15 +0000 Subject: [PATCH 02/20] [CI] Auto commit changes from spotless --- .../org/elasticsearch/index/engine/AbstractReaderManager.java | 1 - .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java b/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java index 81e4349854dd2..86db0bd9eea5f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java +++ b/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java @@ -26,7 +26,6 @@ public abstract class AbstractReaderManager extends ReferenceManager assertingListeners = Assertions.ENABLED ? new ConcurrentHashMap<>() : null; - @Override protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { return reference.tryIncRef(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7ff71f5b78784..a5d9fe7fda3e8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -452,7 +452,7 @@ public CompletionStats completionStats(String... fieldNamePatterns) { * this specialized implementation an external refresh will immediately be reflected on the internal reader * and old segments can be released in the same way previous version did this (as a side-effect of _refresh) */ - private static final class ExternalReaderManager extends AbstractReaderManager { + private static final class ExternalReaderManager extends AbstractReaderManager { private final BiConsumer refreshListener; private final ElasticsearchReaderManager internalReaderManager; private boolean isWarmedUp; // guarded by refreshLock From 6bcf4a86578970a1e29b09bd48bb2d3dcaf9a1f0 Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 7 Mar 2025 16:31:34 +0100 Subject: [PATCH 03/20] more registration --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a5d9fe7fda3e8..d7936ba6fd63b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -304,9 +304,15 @@ public InternalEngine(EngineConfig engineConfig) { // don't allow commits until we are done with recovering pendingTranslogRecovery.set(true); for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(this); + } this.externalReaderManager.addListener(listener); } for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(this); + } this.internalReaderManager.addListener(listener); } this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); From de3f41667948ca9212988a0868223265cd06b77a Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 10:57:06 +0100 Subject: [PATCH 04/20] defer engine --- .../TransportReplicationAction.java | 15 ++++-- .../elasticsearch/index/shard/IndexShard.java | 52 ++++++++++++++++++- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d9bdbc2747045..d80a831d197d2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -78,6 +79,7 @@ import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static org.elasticsearch.core.Strings.format; @@ -507,7 +509,8 @@ public void handleException(TransportException exp) { if (syncGlobalCheckpointAfterOperation) { try { - primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation"); + var seqNoStats = primaryShardReference.seqNoStatsSupplier.get(); + primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation", seqNoStats); } catch (final Exception e) { // only log non-closed exceptions if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { @@ -1136,10 +1139,16 @@ class PrimaryShardReference protected final IndexShard indexShard; private final Releasable operationLock; + private final Supplier seqNoStatsSupplier; + private final Supplier localCheckpointSupplier; + private final Supplier globalCheckpointSupplier; PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; + this.seqNoStatsSupplier = indexShard.getSeqNoStatsSupplier(); + this.localCheckpointSupplier = indexShard.getLocalCheckpointSupplier(); + this.globalCheckpointSupplier = indexShard.getLastSyncedGlobalCheckpointSupplier(); } @Override @@ -1189,12 +1198,12 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long @Override public long localCheckpoint() { - return indexShard.getLocalCheckpoint(); + return localCheckpointSupplier.get(); } @Override public long globalCheckpoint() { - return indexShard.getLastSyncedGlobalCheckpoint(); + return globalCheckpointSupplier.get(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a4308c23cf7bd..19f9e0e0f58c4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1351,6 +1351,18 @@ public SeqNoStats seqNoStats() { return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); } + /** + * Returns a supplier that supplies the {@link SeqNoStats} of the engine that is referenced at the time this method is called. + * Uses this method in place where the current engine reference cannot be resolved directly. + * + * @return a supplier of {@link SeqNoStats} + * @throws AlreadyClosedException if shard is closed + */ + public Supplier getSeqNoStatsSupplier() { + var engine = getEngine(); + return () -> engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint()); + } + public IndexingStats indexingStats() { Engine engine = getEngineOrNull(); final boolean throttled; @@ -2974,6 +2986,18 @@ public long getLocalCheckpoint() { return getEngine().getPersistedLocalCheckpoint(); } + /** + * Returns a supplier that supplies the local checkpoint of the engine that is referenced at the time this method is called. + * Uses this method in place where the current engine reference cannot be resolved directly. + * + * @return a supplier of the local checkpoint + * @throws AlreadyClosedException if shard is closed + */ + public Supplier getLocalCheckpointSupplier() { + var engine = getEngine(); + return () -> engine.getPersistedLocalCheckpoint(); + } + /** * Returns the global checkpoint for the shard. * @@ -2990,6 +3014,18 @@ public long getLastSyncedGlobalCheckpoint() { return getEngine().getLastSyncedGlobalCheckpoint(); } + /** + * Returns a supplier that supplies the latest global checkpoint of the engine that is referenced at the time this method is called. + * Uses this method in place where the current engine reference cannot be resolved directly. + * + * @return a supplier of the latest global checkpoint value that has been persisted in the underlying storage + * @throws AlreadyClosedException if shard is closed + */ + public Supplier getLastSyncedGlobalCheckpointSupplier() { + var engine = getEngine(); + return () -> engine.getLastSyncedGlobalCheckpoint(); + } + /** * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. * @@ -3012,8 +3048,22 @@ public void maybeSyncGlobalCheckpoint(final String reason) { return; } assert assertPrimaryMode(); - // only sync if there are no operations in flight, or when using async durability final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); + doMaybeSyncGlobalCheckpoint(reason, stats); + } + + public void maybeSyncGlobalCheckpoint(final String reason, final SeqNoStats stats) { + verifyNotClosed(); + assert shardRouting.primary() : "only call maybeSyncGlobalCheckpoint on primary shard"; + if (replicationTracker.isPrimaryMode() == false) { + return; + } + assert assertPrimaryMode(); + doMaybeSyncGlobalCheckpoint(reason, stats); + } + + private void doMaybeSyncGlobalCheckpoint(final String reason, final SeqNoStats stats) { + // only sync if there are no operations in flight, or when using async durability final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC; if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) { final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync(); From af744e9561e024a5f192e326555bcdb38a986a5c Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 11:39:50 +0100 Subject: [PATCH 05/20] fix TransportReplicationActionTests --- .../replication/TransportReplicationActionTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 233138c8eedbb..df8b76f40239c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardState; @@ -931,6 +932,8 @@ public void testSeqNoIsSetOnPrimary() { when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm); when(shard.routingEntry()).thenReturn(routingEntry); when(shard.isRelocatedPrimary()).thenReturn(false); + when(shard.getLocalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); + when(shard.getLastSyncedGlobalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); Set inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) @@ -1631,7 +1634,8 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService when(indexShard.getPendingPrimaryTerm()).thenAnswer( i -> clusterService.state().metadata().getProject().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()) ); - + when(indexShard.getLocalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); + when(indexShard.getLastSyncedGlobalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); ReplicationGroup replicationGroup = mock(ReplicationGroup.class); when(indexShard.getReplicationGroup()).thenReturn(replicationGroup); return indexShard; From 20d5dfc62dc51883dc4121baa8ee12d33c2ecad8 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 12:32:49 +0100 Subject: [PATCH 06/20] also replica --- .../replication/TransportReplicationAction.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d80a831d197d2..0c456d7cd6701 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -663,6 +663,8 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); private final ConcreteReplicaRequest replicaRequest; + private final Supplier localCheckpointSupplier; + private final Supplier globalCheckpointSupplier; AsyncReplicaAction( ConcreteReplicaRequest replicaRequest, @@ -675,6 +677,9 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio final ShardId shardId = replicaRequest.getRequest().shardId(); assert shardId != null : "request shardId must be set"; this.replica = getIndexShard(shardId); + this.localCheckpointSupplier = replica.getLocalCheckpointSupplier(); + this.globalCheckpointSupplier = replica.getLastSyncedGlobalCheckpointSupplier(); + } @Override @@ -685,10 +690,7 @@ public void onResponse(Releasable releasable) { replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(ActionListener.wrap(r -> { - final ReplicaResponse response = new ReplicaResponse( - replica.getLocalCheckpoint(), - replica.getLastSyncedGlobalCheckpoint() - ); + final ReplicaResponse response = new ReplicaResponse(localCheckpointSupplier.get(), globalCheckpointSupplier.get()); releasable.close(); // release shard operation lock before responding to caller if (logger.isTraceEnabled()) { logger.trace( From 1f31512a2e611fd1e25e2c71f67404572130c741 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 13:26:38 +0100 Subject: [PATCH 07/20] fix TransportReplicationActionTests --- .../support/replication/TransportReplicationActionTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index df8b76f40239c..3ae27b3072df5 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1634,8 +1634,8 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService when(indexShard.getPendingPrimaryTerm()).thenAnswer( i -> clusterService.state().metadata().getProject().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()) ); - when(indexShard.getLocalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); - when(indexShard.getLastSyncedGlobalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); + when(indexShard.getLocalCheckpointSupplier()).thenReturn(() -> 0L); + when(indexShard.getLastSyncedGlobalCheckpointSupplier()).thenReturn(() -> 0L); ReplicationGroup replicationGroup = mock(ReplicationGroup.class); when(indexShard.getReplicationGroup()).thenReturn(replicationGroup); return indexShard; From f7c193ffaf4d119015a6c5406056114cb293b361 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 14:58:59 +0100 Subject: [PATCH 08/20] fork in rewriteAndFetchShardRequest --- .../main/java/org/elasticsearch/search/SearchService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 5364ca65988d3..f3096800081f8 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1912,7 +1912,11 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re request.getRewriteable(), indicesService.getDataRewriteContext(request::nowInMillis), request.readerId() == null - ? listener.delegateFailureAndWrap((l, r) -> shard.ensureShardSearchActive(b -> l.onResponse(request))) + ? listener.delegateFailureAndWrap( + (l, r) -> shard.ensureShardSearchActive( + ignored -> threadPool.generic().execute(ActionRunnable.supply(l, () -> request)) + ) + ) : listener.safeMap(r -> request) ); } From f5c1ea02cfc3327c88fa0b87dc6ddf764ca12efa Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 15:40:15 +0100 Subject: [PATCH 09/20] fork in rewriteAndFetchShardRequest --- .../java/org/elasticsearch/search/SearchService.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f3096800081f8..7ee8348a84d16 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1914,7 +1914,14 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re request.readerId() == null ? listener.delegateFailureAndWrap( (l, r) -> shard.ensureShardSearchActive( - ignored -> threadPool.generic().execute(ActionRunnable.supply(l, () -> request)) + waitedForRefresh -> { + if (waitedForRefresh) { + // for to generic in case the listener accesses the engine + threadPool.generic().execute(ActionRunnable.supply(l, () -> request)); + } else { + l.onResponse(request); + } + } ) ) : listener.safeMap(r -> request) From 37c340931c4b9785c9b67010601170efd3ee3e4f Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 10 Mar 2025 14:49:36 +0000 Subject: [PATCH 10/20] [CI] Auto commit changes from spotless --- .../elasticsearch/search/SearchService.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 7ee8348a84d16..0cbec7187e975 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1911,20 +1911,14 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re Rewriteable.rewriteAndFetch( request.getRewriteable(), indicesService.getDataRewriteContext(request::nowInMillis), - request.readerId() == null - ? listener.delegateFailureAndWrap( - (l, r) -> shard.ensureShardSearchActive( - waitedForRefresh -> { - if (waitedForRefresh) { - // for to generic in case the listener accesses the engine - threadPool.generic().execute(ActionRunnable.supply(l, () -> request)); - } else { - l.onResponse(request); - } - } - ) - ) - : listener.safeMap(r -> request) + request.readerId() == null ? listener.delegateFailureAndWrap((l, r) -> shard.ensureShardSearchActive(waitedForRefresh -> { + if (waitedForRefresh) { + // for to generic in case the listener accesses the engine + threadPool.generic().execute(ActionRunnable.supply(l, () -> request)); + } else { + l.onResponse(request); + } + })) : listener.safeMap(r -> request) ); } From b1e673dc4932a611c89f2d144fdd477e58329e07 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 17:37:55 +0100 Subject: [PATCH 11/20] remove replica --- .../replication/TransportReplicationAction.java | 10 ++++------ .../index/engine/SafeEngineAccessThreadLocal.java | 3 +-- .../java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0c456d7cd6701..d80a831d197d2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -663,8 +663,6 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); private final ConcreteReplicaRequest replicaRequest; - private final Supplier localCheckpointSupplier; - private final Supplier globalCheckpointSupplier; AsyncReplicaAction( ConcreteReplicaRequest replicaRequest, @@ -677,9 +675,6 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio final ShardId shardId = replicaRequest.getRequest().shardId(); assert shardId != null : "request shardId must be set"; this.replica = getIndexShard(shardId); - this.localCheckpointSupplier = replica.getLocalCheckpointSupplier(); - this.globalCheckpointSupplier = replica.getLastSyncedGlobalCheckpointSupplier(); - } @Override @@ -690,7 +685,10 @@ public void onResponse(Releasable releasable) { replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(ActionListener.wrap(r -> { - final ReplicaResponse response = new ReplicaResponse(localCheckpointSupplier.get(), globalCheckpointSupplier.get()); + final ReplicaResponse response = new ReplicaResponse( + replica.getLocalCheckpoint(), + replica.getLastSyncedGlobalCheckpoint() + ); releasable.close(); // release shard operation lock before responding to caller if (logger.isTraceEnabled()) { logger.trace( diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java index 19b8f07ccbb9c..052e39a071224 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java @@ -82,7 +82,7 @@ public static void accessEnd() { /** * Use this method to assert that the current thread has not entered a protected execution code block. */ - public static boolean assertNoAccessByCurrentThread() { + public static void checkAccess() { ensureAssertionsEnabled(); final var accessor = getAccessorSafe(); if (accessor != null) { @@ -90,7 +90,6 @@ public static boolean assertNoAccessByCurrentThread() { accessor.setFailure(new AssertionError(message)); // to be thrown later assert false : message; } - return true; } private static void ensureAssertionsEnabled() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 19f9e0e0f58c4..413257173c7d4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3364,7 +3364,7 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - assert SafeEngineAccessThreadLocal.assertNoAccessByCurrentThread(); + SafeEngineAccessThreadLocal.checkAccess(); return this.currentEngineReference.get(); } From e33744c088777581d5b13d60b724f0b058ecf9b3 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 10 Mar 2025 18:00:49 +0100 Subject: [PATCH 12/20] remove replica --- .../elasticsearch/index/engine/SafeEngineAccessThreadLocal.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java index 052e39a071224..df4f3892b6a97 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java @@ -83,7 +83,6 @@ public static void accessEnd() { * Use this method to assert that the current thread has not entered a protected execution code block. */ public static void checkAccess() { - ensureAssertionsEnabled(); final var accessor = getAccessorSafe(); if (accessor != null) { var message = "thread [" + accessor + "] should not access the engine using the getEngineOrNull() method"; From d88b476ca13e8a7e8c2f4004fc21262c33315019 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 11 Mar 2025 09:07:23 +0100 Subject: [PATCH 13/20] replica again --- .../TransportReplicationAction.java | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d80a831d197d2..0b4f0285ec6c1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -684,26 +684,25 @@ public void onResponse(Releasable releasable) { shardOperationOnReplica( replicaRequest.getRequest(), replica, - ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(ActionListener.wrap(r -> { - final ReplicaResponse response = new ReplicaResponse( - replica.getLocalCheckpoint(), - replica.getLastSyncedGlobalCheckpoint() - ); - releasable.close(); // release shard operation lock before responding to caller - if (logger.isTraceEnabled()) { - logger.trace( - "action [{}] completed on shard [{}] for request [{}]", - transportReplicaAction, - replicaRequest.getRequest().shardId(), - replicaRequest.getRequest() - ); - } - setPhase(task, "finished"); - onCompletionListener.onResponse(response); + ActionListener.wrap((replicaResult) -> { + final var response = new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint()); + replicaResult.runPostReplicaActions(ActionListener.wrap(r -> { + releasable.close(); // release shard operation lock before responding to caller + if (logger.isTraceEnabled()) { + logger.trace( + "action [{}] completed on shard [{}] for request [{}]", + transportReplicaAction, + replicaRequest.getRequest().shardId(), + replicaRequest.getRequest() + ); + } + setPhase(task, "finished"); + onCompletionListener.onResponse(response); + }, e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + responseWithFailure(e); + })); }, e -> { - Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller - responseWithFailure(e); - })), e -> { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); }) From 0b34aa8ce802dc2dc3819391e5166c6ba300376b Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 11 Mar 2025 13:24:55 +0100 Subject: [PATCH 14/20] replication action --- ...ActionBypassCircuitBreakerOnReplicaIT.java | 4 +- ...tReplicationActionRetryOnClosedNodeIT.java | 4 +- ...TransportVerifyShardBeforeCloseAction.java | 4 +- .../flush/TransportShardFlushAction.java | 4 +- .../TransportVerifyShardIndexBlockAction.java | 4 +- .../refresh/TransportShardRefreshAction.java | 4 +- .../replication/ReplicationOperation.java | 23 +++-- .../TransportReplicationAction.java | 68 ++++++++------- .../replication/TransportWriteAction.java | 54 +++++++----- .../seqno/GlobalCheckpointSyncAction.java | 6 +- .../RetentionLeaseBackgroundSyncAction.java | 4 +- .../elasticsearch/index/shard/IndexShard.java | 24 ------ ...portVerifyShardBeforeCloseActionTests.java | 4 +- .../ReplicationOperationTests.java | 12 ++- .../TransportReplicationActionTests.java | 15 ++-- ...ReplicationAllPermitsAcquisitionTests.java | 4 +- .../TransportWriteActionTests.java | 84 +++++++++++++++++-- ...tentionLeaseBackgroundSyncActionTests.java | 12 ++- .../seqno/RetentionLeaseSyncActionTests.java | 12 ++- .../TransportWriteActionTestHelper.java | 5 +- .../ESIndexLevelReplicationTestCase.java | 19 +++-- .../ShardFollowTaskReplicationTests.java | 2 +- 22 files changed, 235 insertions(+), 137 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java index e18586117953f..ff48d1c559254 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java @@ -114,12 +114,12 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); + listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response())); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - listener.onResponse(new ReplicaResult()); + listener.onResponse(new ReplicaResult(replica)); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java index 60cf86031759d..b6e42c51606f2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -124,12 +124,12 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); + listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response())); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - listener.onResponse(new ReplicaResult()); + listener.onResponse(new ReplicaResult(replica)); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index e8ab4750854cb..a2f88ae999031 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -110,7 +110,7 @@ protected void shardOperationOnPrimary( ) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse()); }); } @@ -118,7 +118,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, replica); - return new ReplicaResult(); + return new ReplicaResult(replica); }); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 5d184f56dd748..e47bc8fe82b6f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -85,7 +85,7 @@ protected void shardOperationOnPrimary( ) { primary.flush(shardRequest.getRequest(), listener.map(flushed -> { logger.trace("{} flush request executed on primary", primary.shardId()); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse()); })); } @@ -93,7 +93,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener listener) { replica.flush(request.getRequest(), listener.map(flushed -> { logger.trace("{} flush request executed on replica", replica.shardId()); - return new ReplicaResult(); + return new ReplicaResult(replica); })); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java index e1563de750813..1934e05a89931 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java @@ -114,7 +114,7 @@ protected void shardOperationOnPrimary( ) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse()); }); } @@ -122,7 +122,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, replica); - return new ReplicaResult(); + return new ReplicaResult(replica); }); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 2286f64648185..35122ce2c7849 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -101,7 +101,7 @@ protected void shardOperationOnPrimary( ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult); replicaRequest.setParentTask(shardRequest.getParentTask()); logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult<>(replicaRequest, new ReplicationResponse()); + return new PrimaryResult<>(primary, replicaRequest, new ReplicationResponse()); })); } @@ -109,7 +109,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener listener) { replica.externalRefresh(SOURCE_API, listener.safeMap(refreshResult -> { logger.trace("{} refresh request executed on replica", replica.shardId()); - return new ReplicaResult(); + return new ReplicaResult(replica); })); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 257073f6ceb11..655041b43ddc7 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -194,21 +194,20 @@ private void handlePrimaryResult( pendingActionsListener ); } - primaryResult.runPostReplicationActions(new ActionListener<>() { - + primaryResult.runPostReplicationActions(new PrimaryPostReplicationActionsListener() { @Override - public void onResponse(Void aVoid) { + public void onResponse(long globalCheckpoint, long localCheckpoint) { successfulShards.incrementAndGet(); updateCheckPoints( primary.routingEntry(), - primary::localCheckpoint, - primary::globalCheckpoint, + () -> localCheckpoint, + () -> globalCheckpoint, () -> primaryCoordinationPendingActionListener.onResponse(null) ); } @Override - public void onFailure(Exception e) { + public void onFailure(long globalCheckpoint, long localCheckpoint, Exception e) { logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request); // TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas // go out of sync with the primary @@ -217,8 +216,8 @@ public void onFailure(Exception e) { // is appended into the translog. updateCheckPoints( primary.routingEntry(), - primary::localCheckpoint, - primary::globalCheckpoint, + () -> localCheckpoint, + () -> globalCheckpoint, () -> primaryCoordinationPendingActionListener.onFailure(e) ); } @@ -718,7 +717,13 @@ public interface PrimaryResult> { * Run actions to be triggered post replication * @param listener callback that is invoked after post replication actions have completed * */ - void runPostReplicationActions(ActionListener listener); + void runPostReplicationActions(PrimaryPostReplicationActionsListener listener); } + public interface PrimaryPostReplicationActionsListener { + + void onResponse(long globalCheckpoint, long localCheckpoint); + + void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex); + } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0b4f0285ec6c1..75fe056e1f2e4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -571,6 +571,7 @@ protected void adaptResponse(Response response, IndexShard indexShard) { public static class PrimaryResult, Response extends ReplicationResponse> implements ReplicationOperation.PrimaryResult { + private final IndexShard primary; private final ReplicaRequest replicaRequest; public final Response replicationResponse; @@ -578,7 +579,8 @@ public static class PrimaryResultreplicationResponse to be not-null */ - public PrimaryResult(ReplicaRequest replicaRequest, Response replicationResponse) { + public PrimaryResult(IndexShard primary, ReplicaRequest replicaRequest, Response replicationResponse) { + this.primary = Objects.requireNonNull(primary); assert replicaRequest != null : "request is required"; assert replicationResponse != null : "response is required"; this.replicaRequest = replicaRequest; @@ -596,31 +598,40 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ActionListener listener) { - listener.onResponse(null); + public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { + listener.onResponse(primary.getLastSyncedGlobalCheckpoint(), primary.getLocalCheckpoint()); } } public static class ReplicaResult { - final Exception finalFailure; + protected final IndexShard replica; + protected final Exception finalFailure; - public ReplicaResult(Exception finalFailure) { + public ReplicaResult(IndexShard replica, Exception finalFailure) { + this.replica = Objects.requireNonNull(replica); this.finalFailure = finalFailure; } - public ReplicaResult() { - this(null); + public ReplicaResult(IndexShard replica) { + this(replica, null); } - public void runPostReplicaActions(ActionListener listener) { + public void runPostReplicaActions(ReplicaPostReplicationActionsListener listener) { if (finalFailure != null) { listener.onFailure(finalFailure); } else { - listener.onResponse(null); + listener.onResponse(replica.getLastSyncedGlobalCheckpoint(), replica.getLocalCheckpoint()); } } } + public interface ReplicaPostReplicationActionsListener { + + void onResponse(long globalCheckpoint, long localCheckpoint); + + void onFailure(Exception ex); + } + protected void handleReplicaRequest( final ConcreteReplicaRequest replicaRequest, final TransportChannel channel, @@ -681,12 +692,11 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio public void onResponse(Releasable releasable) { assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; try { - shardOperationOnReplica( - replicaRequest.getRequest(), - replica, - ActionListener.wrap((replicaResult) -> { - final var response = new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint()); - replicaResult.runPostReplicaActions(ActionListener.wrap(r -> { + shardOperationOnReplica(replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) -> { + replicaResult.runPostReplicaActions(new ReplicaPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + final ReplicaResponse response = new ReplicaResponse(globalCheckpoint, localCheckpoint); releasable.close(); // release shard operation lock before responding to caller if (logger.isTraceEnabled()) { logger.trace( @@ -698,15 +708,19 @@ public void onResponse(Releasable releasable) { } setPhase(task, "finished"); onCompletionListener.onResponse(response); - }, e -> { - Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + } + + @Override + public void onFailure(Exception e) { + // release shard operation lock before responding to caller + Releasables.closeWhileHandlingException(releasable); responseWithFailure(e); - })); - }, e -> { - Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller - AsyncReplicaAction.this.onFailure(e); - }) - ); + } + }); + }, e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + AsyncReplicaAction.this.onFailure(e); + })); // TODO: Evaluate if we still need to catch this exception } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller @@ -1139,15 +1153,11 @@ class PrimaryShardReference protected final IndexShard indexShard; private final Releasable operationLock; private final Supplier seqNoStatsSupplier; - private final Supplier localCheckpointSupplier; - private final Supplier globalCheckpointSupplier; PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; this.seqNoStatsSupplier = indexShard.getSeqNoStatsSupplier(); - this.localCheckpointSupplier = indexShard.getLocalCheckpointSupplier(); - this.globalCheckpointSupplier = indexShard.getLastSyncedGlobalCheckpointSupplier(); } @Override @@ -1197,12 +1207,12 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long @Override public long localCheckpoint() { - return localCheckpointSupplier.get(); + return indexShard.getLocalCheckpoint(); } @Override public long globalCheckpoint() { - return globalCheckpointSupplier.get(); + return indexShard.getLastSyncedGlobalCheckpoint(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index cda56966f7a3a..34cbb2c854eb2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -44,6 +44,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -320,7 +321,7 @@ public WritePrimaryResult( PostWriteRefresh postWriteRefresh, @Nullable Consumer postWriteAction ) { - super(request, finalResponse); + super(primary, request, finalResponse); this.location = location; this.primary = primary; this.logger = logger; @@ -329,21 +330,21 @@ public WritePrimaryResult( } @Override - public void runPostReplicationActions(ActionListener listener) { + public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { /* * We call this after replication because this might wait for a refresh and that can take a while. * This way we wait for the refresh in parallel on the primary and on the replica. */ new AsyncAfterWriteAction(primary, replicaRequest(), location, new RespondingWriteResult() { @Override - public void onSuccess(boolean forcedRefresh) { + public void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh) { replicationResponse.setForcedRefresh(forcedRefresh); - listener.onResponse(null); + listener.onResponse(globalCheckpoint, localCheckpoint); } @Override - public void onFailure(Exception ex) { - listener.onFailure(ex); + public void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex) { + listener.onFailure(globalCheckpoint, localCheckpoint, ex); } }, logger, postWriteRefresh, postWriteAction).run(); } @@ -355,7 +356,6 @@ public void onFailure(Exception ex) { public static class WriteReplicaResult> extends ReplicaResult { public final Location location; private final ReplicaRequest request; - private final IndexShard replica; private final Logger logger; private final Consumer postWriteAction; @@ -377,27 +377,26 @@ public WriteReplicaResult( Logger logger, Consumer postWriteAction ) { - super(operationFailure); + super(replica, operationFailure); this.location = location; this.request = request; - this.replica = replica; this.logger = logger; this.postWriteAction = postWriteAction; } @Override - public void runPostReplicaActions(ActionListener listener) { + public void runPostReplicaActions(ReplicaPostReplicationActionsListener listener) { if (finalFailure != null) { listener.onFailure(finalFailure); } else { new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() { @Override - public void onSuccess(boolean forcedRefresh) { - listener.onResponse(null); + public void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh) { + listener.onResponse(globalCheckpoint, localCheckpoint); } @Override - public void onFailure(Exception ex) { + public void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex) { listener.onFailure(ex); } }, logger, null, postWriteAction).run(); @@ -424,12 +423,12 @@ interface RespondingWriteResult { * Called on successful processing of all post write actions * @param forcedRefresh true iff this write has caused a refresh */ - void onSuccess(boolean forcedRefresh); + void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh); /** * Called on failure if a post action failed. */ - void onFailure(Exception ex); + void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex); } /** @@ -453,6 +452,10 @@ static final class AsyncAfterWriteAction { private final PostWriteRefresh postWriteRefresh; private final Consumer postWriteAction; private final TimeValue postWriteRefreshTimeout; + // Capture the values of the local & global checkpoint to be passed to after-write-action listeners, so that they don't have to + // read the values back from the engine as it could deadlock. + private final AtomicLong globalCheckpoint; + private final AtomicLong localCheckpoint; AsyncAfterWriteAction( final IndexShard indexShard, @@ -482,26 +485,36 @@ static final class AsyncAfterWriteAction { this.logger = logger; this.postWriteRefreshTimeout = request.timeout(); assert pendingOps.get() >= 0 && pendingOps.get() <= 3 : "pendingOps was: " + pendingOps.get(); + this.globalCheckpoint = new AtomicLong(indexShard.getLastSyncedGlobalCheckpoint()); + this.localCheckpoint = new AtomicLong(indexShard.getLocalCheckpoint()); } /** calls the response listener if all pending operations have returned otherwise it just decrements the pending opts counter.*/ private void maybeFinish() { final int numPending = pendingOps.decrementAndGet(); if (numPending == 0) { + final long globalCheckpoint = this.globalCheckpoint.get(); + final long localCheckpoint = this.localCheckpoint.get(); + if (syncFailure.get() != null) { - respond.onFailure(syncFailure.get()); + respond.onFailure(globalCheckpoint, localCheckpoint, syncFailure.get()); } else { // TODO: Temporary until we fail unpromotable shard if (refreshFailure.get() != null) { - respond.onFailure(refreshFailure.get()); + respond.onFailure(globalCheckpoint, localCheckpoint, refreshFailure.get()); } else { - respond.onSuccess(refreshed.get()); + respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get()); } } } assert numPending >= 0 && numPending <= 2 : "numPending must either 2, 1 or 0 but was " + numPending; } + private void updateCheckpoints() { + this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); + this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); + } + void run() { /* * We either respond immediately (i.e., if we do not fsync per request or wait for @@ -509,8 +522,8 @@ void run() { * respond. */ indexShard.afterWriteOperation(); - // decrement pending by one, if there is nothing else to do we just respond with success - maybeFinish(); + updateCheckpoints(); + maybeFinish(); // decrement pending by one, if there is nothing else to do we just respond with success if (needsRefreshAction) { assert pendingOps.get() > 0; ActionListener refreshListener = new ActionListener<>() { @@ -549,6 +562,7 @@ public void onFailure(Exception e) { syncFailure.set(e); maybeFinish(); }); + updateCheckpoints(); } if (postWriteAction != null) { postWriteAction.accept(this::maybeFinish); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 54904323a25ad..1b356cd9360a8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -78,15 +78,15 @@ protected ReplicationResponse newResponseInstance(StreamInput in) throws IOExcep @Override protected void shardOperationOnPrimary( Request request, - IndexShard indexShard, + IndexShard primary, ActionListener> listener ) { - maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse()))); + maybeSyncTranslog(primary, listener.map(v -> new PrimaryResult<>(primary, request, new ReplicationResponse()))); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult())); + maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult(replica))); } private static void maybeSyncTranslog(IndexShard indexShard, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index cdeaa177dfd5e..73d9a680190f8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -146,7 +146,7 @@ protected void shardOperationOnPrimary( Objects.requireNonNull(request); Objects.requireNonNull(primary); primary.persistRetentionLeases(); - return new PrimaryResult<>(request, new ReplicationResponse()); + return new PrimaryResult<>(primary, request, new ReplicationResponse()); }); } @@ -157,7 +157,7 @@ protected void shardOperationOnReplica(Request request, IndexShard replica, Acti Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); replica.persistRetentionLeases(); - return new ReplicaResult(); + return new ReplicaResult(replica); }); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 413257173c7d4..72aa4d8ae452a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2986,18 +2986,6 @@ public long getLocalCheckpoint() { return getEngine().getPersistedLocalCheckpoint(); } - /** - * Returns a supplier that supplies the local checkpoint of the engine that is referenced at the time this method is called. - * Uses this method in place where the current engine reference cannot be resolved directly. - * - * @return a supplier of the local checkpoint - * @throws AlreadyClosedException if shard is closed - */ - public Supplier getLocalCheckpointSupplier() { - var engine = getEngine(); - return () -> engine.getPersistedLocalCheckpoint(); - } - /** * Returns the global checkpoint for the shard. * @@ -3014,18 +3002,6 @@ public long getLastSyncedGlobalCheckpoint() { return getEngine().getLastSyncedGlobalCheckpoint(); } - /** - * Returns a supplier that supplies the latest global checkpoint of the engine that is referenced at the time this method is called. - * Uses this method in place where the current engine reference cannot be resolved directly. - * - * @return a supplier of the latest global checkpoint value that has been persisted in the underlying storage - * @throws AlreadyClosedException if shard is closed - */ - public Supplier getLastSyncedGlobalCheckpointSupplier() { - var engine = getEngine(); - return () -> engine.getLastSyncedGlobalCheckpoint(); - } - /** * Get the local knowledge of the global checkpoints for all in-sync allocation IDs. * diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 72d56dd21aac8..c39675e654d95 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -423,8 +423,8 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ActionListener listener) { - listener.onResponse(null); + public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { + listener.onResponse(0L, 0L); } public ReplicationResponse.ShardInfo getShardInfo() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index a7abceddac58c..f7b5bb3981e0a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -669,15 +669,19 @@ public void perform(Request request, ActionListener listener) { if (request.processedOnPrimary.compareAndSet(false, true) == false) { fail("processed [" + request + "] twice"); } - listener.onResponse(new Result(request)); + listener.onResponse(new Result(request, localCheckpoint, globalCheckpoint)); } static class Result implements ReplicationOperation.PrimaryResult { private final Request request; private ShardInfo shardInfo; + final long localCheckpoint; + final long globalCheckpoint; - Result(Request request) { + Result(Request request, long localCheckpoint, long globalCheckpoint) { this.request = request; + this.localCheckpoint = localCheckpoint; + this.globalCheckpoint = globalCheckpoint; } @Override @@ -691,11 +695,11 @@ public void setShardInfo(ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ActionListener listener) { + public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { if (request.runPostReplicationActionsOnPrimary.compareAndSet(false, true) == false) { fail("processed [" + request + "] twice"); } - listener.onResponse(null); + listener.onResponse(globalCheckpoint, localCheckpoint); } public ShardInfo getShardInfo() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 3ae27b3072df5..2709b69eb82c8 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -54,7 +54,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardState; @@ -932,8 +931,6 @@ public void testSeqNoIsSetOnPrimary() { when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm); when(shard.routingEntry()).thenReturn(routingEntry); when(shard.isRelocatedPrimary()).thenReturn(false); - when(shard.getLocalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); - when(shard.getLastSyncedGlobalCheckpointSupplier()).thenReturn(() -> SequenceNumbers.UNASSIGNED_SEQ_NO); IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); Set inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) @@ -1069,7 +1066,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, if (throwException) { throw new ElasticsearchException("simulated"); } - return new ReplicaResult(); + return new ReplicaResult(replica); }); } }; @@ -1239,7 +1236,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, if (throwException.get()) { throw new RetryOnReplicaException(shardId, "simulation"); } - return new ReplicaResult(); + return new ReplicaResult(replica); }); } }; @@ -1341,7 +1338,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, throw new RetryOnReplicaException(shardId, "simulation"); } calledSuccessfully.set(true); - return new ReplicaResult(); + return new ReplicaResult(replica); }); } }; @@ -1540,13 +1537,13 @@ protected void shardOperationOnPrimary( ) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - listener.onResponse(new PrimaryResult<>(shardRequest, new TestResponse())); + listener.onResponse(new PrimaryResult<>(primary, shardRequest, new TestResponse())); } @Override protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener listener) { request.processedOnReplicas.incrementAndGet(); - listener.onResponse(new ReplicaResult()); + listener.onResponse(new ReplicaResult(replica)); } } @@ -1634,8 +1631,6 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService when(indexShard.getPendingPrimaryTerm()).thenAnswer( i -> clusterService.state().metadata().getProject().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()) ); - when(indexShard.getLocalCheckpointSupplier()).thenReturn(() -> 0L); - when(indexShard.getLastSyncedGlobalCheckpointSupplier()).thenReturn(() -> 0L); ReplicationGroup replicationGroup = mock(ReplicationGroup.class); when(indexShard.getReplicationGroup()).thenReturn(replicationGroup); return indexShard; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 7e0d7063c417e..7f5f1bc5da010 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -489,7 +489,7 @@ protected void shardOperationOnPrimary( // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the primary shard assertSame(primary, shard); - listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); + listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response())); } @Override @@ -503,7 +503,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the replica shard assertSame(replica, replica); - listener.onResponse(new ReplicaResult()); + listener.onResponse(new ReplicaResult(replica)); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index c911b0836c127..ab1640093380b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -141,7 +141,17 @@ public void testPrimaryNoRefreshCall() throws Exception { TestAction testAction = new TestAction(); testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicationActions(listener.map(ignore -> result.replicationResponse)); + result.runPostReplicationActions(new ReplicationOperation.PrimaryPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + listener.onResponse(result.replicationResponse); + } + + @Override + public void onFailure(long globalCheckpoint, long localCheckpoint, Exception failure) { + listener.onFailure(failure); + } + }); assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); @@ -157,7 +167,17 @@ public void testReplicaNoRefreshCall() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public void onFailure(Exception failure) { + listener.onFailure(failure); + } + }); assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); @@ -170,7 +190,17 @@ public void testPrimaryImmediateRefresh() throws Exception { TestAction testAction = new TestAction(); testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicationActions(listener.map(ignore -> result.replicationResponse)); + result.runPostReplicationActions(new ReplicationOperation.PrimaryPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + listener.onResponse(result.replicationResponse); + } + + @Override + public void onFailure(long globalCheckpoint, long localCheckpoint, Exception failure) { + listener.onFailure(failure); + } + }); @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) ActionListener.class); @@ -196,7 +226,17 @@ public void testReplicaImmediateRefresh() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public void onFailure(Exception failure) { + listener.onFailure(failure); + } + }); assertNull(listener.response); // Haven't responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) ActionListener.class); @@ -215,7 +255,17 @@ public void testPrimaryWaitForRefresh() throws Exception { TestAction testAction = new TestAction(); testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicationActions(listener.map(ignore -> result.replicationResponse)); + result.runPostReplicationActions(new ReplicationOperation.PrimaryPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + listener.onResponse(result.replicationResponse); + } + + @Override + public void onFailure(long globalCheckpoint, long localCheckpoint, Exception failure) { + listener.onFailure(failure); + } + }); assertNull(listener.response); // Haven't really responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -243,7 +293,17 @@ public void testReplicaWaitForRefresh() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public void onFailure(Exception failure) { + listener.onFailure(failure); + } + }); assertNull(listener.response); // Haven't responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); @@ -276,7 +336,17 @@ public void testDocumentFailureInShardOperationOnReplica() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public void onFailure(Exception failure) { + listener.onFailure(failure); + } + }); assertNull(listener.response); assertNotNull(listener.failure); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 6c1363c06a4d0..77ef0b336e6eb 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -154,7 +154,17 @@ public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateE verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); - result.runPostReplicaActions(ActionTestUtils.assertNoFailureListener(r -> success.set(true))); + result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + success.set(true); + } + + @Override + public void onFailure(Exception ex) { + fail(ex); + } + }); assertTrue(success.get()); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 93d56b45bd959..d4f1e751774dd 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -163,7 +163,17 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); - result.runPostReplicaActions(ActionTestUtils.assertNoFailureListener(r -> success.set(true))); + result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { + @Override + public void onResponse(long globalCheckpoint, long localCheckpoint) { + success.set(true); + } + + @Override + public void onFailure(Exception ex) { + fail(ex); + } + }); assertTrue(success.get()); } diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java b/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java index f73725e834d61..fcc9d38531b49 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java @@ -31,13 +31,14 @@ public static void performPostWriteActions( ) { final CountDownLatch latch = new CountDownLatch(1); TransportWriteAction.RespondingWriteResult writerResult = new TransportWriteAction.RespondingWriteResult() { + @Override - public void onSuccess(boolean forcedRefresh) { + public void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh) { latch.countDown(); } @Override - public void onFailure(Exception ex) { + public void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex) { throw new AssertionError(ex); } }; diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 68976155a519a..00eeba990c75a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -87,6 +87,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; @@ -810,12 +811,14 @@ public void markShardCopyAsStaleIfNeeded( } protected class PrimaryResult implements ReplicationOperation.PrimaryResult { + final IndexShard primary; final ReplicaRequest replicaRequest; final Response finalResponse; - public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { + public PrimaryResult(IndexShard primary, ReplicaRequest replicaRequest, Response replicationResponse) { + this.primary = Objects.requireNonNull(primary); this.replicaRequest = replicaRequest; - this.finalResponse = finalResponse; + this.finalResponse = replicationResponse; } @Override @@ -829,8 +832,8 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ActionListener actionListener) { - actionListener.onResponse(null); + public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { + listener.onResponse(primary.getLastSyncedGlobalCheckpoint(), primary.getLocalCheckpoint()); } } @@ -847,7 +850,7 @@ protected void performOnPrimary(IndexShard primary, BulkShardRequest request, Ac executeShardBulkOnPrimary( primary, request, - listener.map(result -> new PrimaryResult(result.replicaRequest(), result.replicationResponse)) + listener.map(result -> new PrimaryResult(primary, result.replicaRequest(), result.replicationResponse)) ); } @@ -1003,7 +1006,7 @@ protected void performOnPrimary( ) { ActionListener.completeWith(listener, () -> { primary.sync(); - return new PrimaryResult(request, new ReplicationResponse()); + return new PrimaryResult(primary, request, new ReplicationResponse()); }); } @@ -1024,7 +1027,7 @@ protected void performOnPrimary(IndexShard primary, ResyncReplicationRequest req ActionListener.completeWith(listener, () -> { final TransportWriteAction.WritePrimaryResult result = executeResyncOnPrimary(primary, request); - return new PrimaryResult(result.replicaRequest(), result.replicationResponse); + return new PrimaryResult(primary, result.replicaRequest(), result.replicationResponse); }); } @@ -1105,7 +1108,7 @@ protected void performOnPrimary( ) { ActionListener.completeWith(listener, () -> { primary.persistRetentionLeases(); - return new PrimaryResult(request, new RetentionLeaseSyncAction.Response()); + return new PrimaryResult(primary, request, new RetentionLeaseSyncAction.Response()); }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 34ce43e583086..4ccbf2a574fd8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -832,7 +832,7 @@ protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest r } catch (InterruptedException | ExecutionException | IOException e) { throw new RuntimeException(e); } - listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.replicationResponse)); + listener.onResponse(new PrimaryResult(primary, ccrResult.replicaRequest(), ccrResult.replicationResponse)); } @Override From e053aa0f3886495ffae390df5e4723af51647390 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 11 Mar 2025 13:42:03 +0100 Subject: [PATCH 15/20] replication action --- .../index/engine/SafeEngineAccessThreadLocal.java | 5 ++++- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java index df4f3892b6a97..89937960b37bc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java @@ -82,13 +82,16 @@ public static void accessEnd() { /** * Use this method to assert that the current thread has not entered a protected execution code block. */ - public static void checkAccess() { + public static boolean assertSafeAccess() { + ensureAssertionsEnabled(); final var accessor = getAccessorSafe(); if (accessor != null) { var message = "thread [" + accessor + "] should not access the engine using the getEngineOrNull() method"; accessor.setFailure(new AssertionError(message)); // to be thrown later assert false : message; + return false; } + return true; } private static void ensureAssertionsEnabled() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 72aa4d8ae452a..f7d0d44e3513b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3340,7 +3340,7 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - SafeEngineAccessThreadLocal.checkAccess(); + assert SafeEngineAccessThreadLocal.assertSafeAccess(); return this.currentEngineReference.get(); } From cd2333ca7ca04e47366cd6ee7606e94325884837 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 11 Mar 2025 14:17:40 +0100 Subject: [PATCH 16/20] bug --- .../action/support/replication/TransportReplicationAction.java | 2 +- .../action/support/replication/TransportWriteAction.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 75fe056e1f2e4..b0c7e65296658 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -696,7 +696,7 @@ public void onResponse(Releasable releasable) { replicaResult.runPostReplicaActions(new ReplicaPostReplicationActionsListener() { @Override public void onResponse(long globalCheckpoint, long localCheckpoint) { - final ReplicaResponse response = new ReplicaResponse(globalCheckpoint, localCheckpoint); + final ReplicaResponse response = new ReplicaResponse(localCheckpoint, globalCheckpoint); releasable.close(); // release shard operation lock before responding to caller if (logger.isTraceEnabled()) { logger.trace( diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 34cbb2c854eb2..9075c29f21050 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -559,6 +559,7 @@ public void onFailure(Exception e) { if (sync) { assert pendingOps.get() > 0; indexShard.syncAfterWrite(location, e -> { + updateCheckpoints(); syncFailure.set(e); maybeFinish(); }); From c5153c16fedba6afd17f8191d3b3039c6b1def50 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 11 Mar 2025 15:43:20 +0100 Subject: [PATCH 17/20] bug --- .../replication/TransportReplicationAction.java | 16 +++++++++------- .../replication/TransportWriteAction.java | 16 ++++++++++++---- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index b0c7e65296658..e4218d4ec7699 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -692,8 +692,10 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio public void onResponse(Releasable releasable) { assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; try { - shardOperationOnReplica(replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) -> { - replicaResult.runPostReplicaActions(new ReplicaPostReplicationActionsListener() { + shardOperationOnReplica( + replicaRequest.getRequest(), + replica, + ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(new ReplicaPostReplicationActionsListener() { @Override public void onResponse(long globalCheckpoint, long localCheckpoint) { final ReplicaResponse response = new ReplicaResponse(localCheckpoint, globalCheckpoint); @@ -716,11 +718,11 @@ public void onFailure(Exception e) { Releasables.closeWhileHandlingException(releasable); responseWithFailure(e); } - }); - }, e -> { - Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller - AsyncReplicaAction.this.onFailure(e); - })); + }), e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + AsyncReplicaAction.this.onFailure(e); + }) + ); // TODO: Evaluate if we still need to catch this exception } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 9075c29f21050..d28a234dd6a59 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -511,8 +511,13 @@ private void maybeFinish() { } private void updateCheckpoints() { - this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); - this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); + try { + this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); + this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); + } catch (Exception e) { + logger.warn("Failed to retrieve checkpoints", e); + assert false : e; + } } void run() { @@ -559,8 +564,11 @@ public void onFailure(Exception e) { if (sync) { assert pendingOps.get() > 0; indexShard.syncAfterWrite(location, e -> { - updateCheckpoints(); - syncFailure.set(e); + if (e != null) { + syncFailure.set(e); + } else { + updateCheckpoints(); + } maybeFinish(); }); updateCheckpoints(); From a422586cb1cb5c536e9aca138a9b9d9e523fb72b Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 11 Mar 2025 16:20:07 +0100 Subject: [PATCH 18/20] bug --- .../replication/TransportWriteAction.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index d28a234dd6a59..d23edd144cd41 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -10,6 +10,7 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; @@ -456,6 +457,7 @@ static final class AsyncAfterWriteAction { // read the values back from the engine as it could deadlock. private final AtomicLong globalCheckpoint; private final AtomicLong localCheckpoint; + private final AtomicReference checkpointFailure = new AtomicReference<>(null); AsyncAfterWriteAction( final IndexShard indexShard, @@ -503,7 +505,11 @@ private void maybeFinish() { if (refreshFailure.get() != null) { respond.onFailure(globalCheckpoint, localCheckpoint, refreshFailure.get()); } else { - respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get()); + if (checkpointFailure.get() != null) { + respond.onFailure(globalCheckpoint, localCheckpoint, checkpointFailure.get()); + } else { + respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get()); + } } } } @@ -512,11 +518,14 @@ private void maybeFinish() { private void updateCheckpoints() { try { - this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); - this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); + if (checkpointFailure.get() != null) { + this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); + this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); + } + } catch (AlreadyClosedException e) { + // the index was deleted or this shard was never activated after a relocation; fall through and finish normally } catch (Exception e) { - logger.warn("Failed to retrieve checkpoints", e); - assert false : e; + checkpointFailure.set(e); } } From 110426f19648f64e97db70b65cb6cb8b5ef6be24 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 11 Mar 2025 16:43:52 +0100 Subject: [PATCH 19/20] bug --- .../replication/TransportWriteAction.java | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index d23edd144cd41..04e5088075689 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -457,7 +457,6 @@ static final class AsyncAfterWriteAction { // read the values back from the engine as it could deadlock. private final AtomicLong globalCheckpoint; private final AtomicLong localCheckpoint; - private final AtomicReference checkpointFailure = new AtomicReference<>(null); AsyncAfterWriteAction( final IndexShard indexShard, @@ -505,11 +504,7 @@ private void maybeFinish() { if (refreshFailure.get() != null) { respond.onFailure(globalCheckpoint, localCheckpoint, refreshFailure.get()); } else { - if (checkpointFailure.get() != null) { - respond.onFailure(globalCheckpoint, localCheckpoint, checkpointFailure.get()); - } else { - respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get()); - } + respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get()); } } } @@ -518,14 +513,13 @@ private void maybeFinish() { private void updateCheckpoints() { try { - if (checkpointFailure.get() != null) { - this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); - this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); - } + this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); + this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); } catch (AlreadyClosedException e) { // the index was deleted or this shard was never activated after a relocation; fall through and finish normally } catch (Exception e) { - checkpointFailure.set(e); + syncFailure.set(e); + assert false : e; } } From b9e646c0aa16bbe42fe47334b0621b11a47d6a6b Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 12 Mar 2025 11:25:27 +0100 Subject: [PATCH 20/20] revert replication changes --- ...ActionBypassCircuitBreakerOnReplicaIT.java | 4 +- ...tReplicationActionRetryOnClosedNodeIT.java | 4 +- ...TransportVerifyShardBeforeCloseAction.java | 4 +- .../flush/TransportShardFlushAction.java | 4 +- .../TransportVerifyShardIndexBlockAction.java | 4 +- .../refresh/TransportShardRefreshAction.java | 4 +- .../replication/ReplicationOperation.java | 23 ++--- .../TransportReplicationAction.java | 92 ++++++++----------- .../replication/TransportWriteAction.java | 68 +++++--------- .../engine/SafeEngineAccessThreadLocal.java | 2 + .../seqno/GlobalCheckpointSyncAction.java | 6 +- .../RetentionLeaseBackgroundSyncAction.java | 4 +- .../elasticsearch/index/shard/IndexShard.java | 12 --- ...portVerifyShardBeforeCloseActionTests.java | 4 +- .../ReplicationOperationTests.java | 12 +-- .../TransportReplicationActionTests.java | 11 ++- ...ReplicationAllPermitsAcquisitionTests.java | 4 +- .../TransportWriteActionTests.java | 84 ++--------------- ...tentionLeaseBackgroundSyncActionTests.java | 12 +-- .../seqno/RetentionLeaseSyncActionTests.java | 12 +-- .../TransportWriteActionTestHelper.java | 5 +- .../ESIndexLevelReplicationTestCase.java | 19 ++-- .../ShardFollowTaskReplicationTests.java | 2 +- 23 files changed, 120 insertions(+), 276 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java index ff48d1c559254..e18586117953f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionBypassCircuitBreakerOnReplicaIT.java @@ -114,12 +114,12 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response())); + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - listener.onResponse(new ReplicaResult(replica)); + listener.onResponse(new ReplicaResult()); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java index b6e42c51606f2..60cf86031759d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -124,12 +124,12 @@ protected void shardOperationOnPrimary( IndexShard primary, ActionListener> listener ) { - listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response())); + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - listener.onResponse(new ReplicaResult(replica)); + listener.onResponse(new ReplicaResult()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index a2f88ae999031..e8ab4750854cb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -110,7 +110,7 @@ protected void shardOperationOnPrimary( ) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); }); } @@ -118,7 +118,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, replica); - return new ReplicaResult(replica); + return new ReplicaResult(); }); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index e47bc8fe82b6f..5d184f56dd748 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -85,7 +85,7 @@ protected void shardOperationOnPrimary( ) { primary.flush(shardRequest.getRequest(), listener.map(flushed -> { logger.trace("{} flush request executed on primary", primary.shardId()); - return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); })); } @@ -93,7 +93,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener listener) { replica.flush(request.getRequest(), listener.map(flushed -> { logger.trace("{} flush request executed on replica", replica.shardId()); - return new ReplicaResult(replica); + return new ReplicaResult(); })); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java index 1934e05a89931..e1563de750813 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/readonly/TransportVerifyShardIndexBlockAction.java @@ -114,7 +114,7 @@ protected void shardOperationOnPrimary( ) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(primary, shardRequest, new ReplicationResponse()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); }); } @@ -122,7 +122,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { ActionListener.completeWith(listener, () -> { executeShardOperation(shardRequest, replica); - return new ReplicaResult(replica); + return new ReplicaResult(); }); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 35122ce2c7849..2286f64648185 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -101,7 +101,7 @@ protected void shardOperationOnPrimary( ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult); replicaRequest.setParentTask(shardRequest.getParentTask()); logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult<>(primary, replicaRequest, new ReplicationResponse()); + return new PrimaryResult<>(replicaRequest, new ReplicationResponse()); })); } @@ -109,7 +109,7 @@ protected void shardOperationOnPrimary( protected void shardOperationOnReplica(ShardRefreshReplicaRequest request, IndexShard replica, ActionListener listener) { replica.externalRefresh(SOURCE_API, listener.safeMap(refreshResult -> { logger.trace("{} refresh request executed on replica", replica.shardId()); - return new ReplicaResult(replica); + return new ReplicaResult(); })); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 655041b43ddc7..257073f6ceb11 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -194,20 +194,21 @@ private void handlePrimaryResult( pendingActionsListener ); } - primaryResult.runPostReplicationActions(new PrimaryPostReplicationActionsListener() { + primaryResult.runPostReplicationActions(new ActionListener<>() { + @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { + public void onResponse(Void aVoid) { successfulShards.incrementAndGet(); updateCheckPoints( primary.routingEntry(), - () -> localCheckpoint, - () -> globalCheckpoint, + primary::localCheckpoint, + primary::globalCheckpoint, () -> primaryCoordinationPendingActionListener.onResponse(null) ); } @Override - public void onFailure(long globalCheckpoint, long localCheckpoint, Exception e) { + public void onFailure(Exception e) { logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request); // TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas // go out of sync with the primary @@ -216,8 +217,8 @@ public void onFailure(long globalCheckpoint, long localCheckpoint, Exception e) // is appended into the translog. updateCheckPoints( primary.routingEntry(), - () -> localCheckpoint, - () -> globalCheckpoint, + primary::localCheckpoint, + primary::globalCheckpoint, () -> primaryCoordinationPendingActionListener.onFailure(e) ); } @@ -717,13 +718,7 @@ public interface PrimaryResult> { * Run actions to be triggered post replication * @param listener callback that is invoked after post replication actions have completed * */ - void runPostReplicationActions(PrimaryPostReplicationActionsListener listener); + void runPostReplicationActions(ActionListener listener); } - public interface PrimaryPostReplicationActionsListener { - - void onResponse(long globalCheckpoint, long localCheckpoint); - - void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex); - } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index e4218d4ec7699..5b332b0a9653a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -79,7 +78,6 @@ import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import static org.elasticsearch.core.Strings.format; @@ -503,30 +501,31 @@ public void handleException(TransportException exp) { ); } else { setPhase(replicationTask, "primary"); + // Resolve the engine upfront to avoid an unsafe access if the responseListener is called by a refresh listener + final var engineOrNull = syncGlobalCheckpointAfterOperation ? primaryShardReference.indexShard.getEngineOrNull() : null; final ActionListener responseListener = ActionListener.wrap(response -> { adaptResponse(response, primaryShardReference.indexShard); - if (syncGlobalCheckpointAfterOperation) { + final var primary = primaryShardReference.indexShard; + if (engineOrNull != null) { try { - var seqNoStats = primaryShardReference.seqNoStatsSupplier.get(); - primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation", seqNoStats); + // TODO ensure engine is still open + var seqNoStats = engineOrNull.getSeqNoStats(primary.getLastKnownGlobalCheckpoint()); + primary.maybeSyncGlobalCheckpoint("post-operation", seqNoStats); } catch (final Exception e) { // only log non-closed exceptions if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { // intentionally swallow, a missed global checkpoint sync should not fail this operation logger.info( - () -> format( - "%s failed to execute post-operation global checkpoint sync", - primaryShardReference.indexShard.shardId() - ), + () -> format("%s failed to execute post-operation global checkpoint sync", primary.shardId()), e ); } } } - assert primaryShardReference.indexShard.isPrimaryMode(); + assert primary.isPrimaryMode(); primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); onCompletionListener.onResponse(response); @@ -571,7 +570,6 @@ protected void adaptResponse(Response response, IndexShard indexShard) { public static class PrimaryResult, Response extends ReplicationResponse> implements ReplicationOperation.PrimaryResult { - private final IndexShard primary; private final ReplicaRequest replicaRequest; public final Response replicationResponse; @@ -579,8 +577,7 @@ public static class PrimaryResultreplicationResponse to be not-null */ - public PrimaryResult(IndexShard primary, ReplicaRequest replicaRequest, Response replicationResponse) { - this.primary = Objects.requireNonNull(primary); + public PrimaryResult(ReplicaRequest replicaRequest, Response replicationResponse) { assert replicaRequest != null : "request is required"; assert replicationResponse != null : "response is required"; this.replicaRequest = replicaRequest; @@ -598,40 +595,31 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { - listener.onResponse(primary.getLastSyncedGlobalCheckpoint(), primary.getLocalCheckpoint()); + public void runPostReplicationActions(ActionListener listener) { + listener.onResponse(null); } } public static class ReplicaResult { - protected final IndexShard replica; - protected final Exception finalFailure; + final Exception finalFailure; - public ReplicaResult(IndexShard replica, Exception finalFailure) { - this.replica = Objects.requireNonNull(replica); + public ReplicaResult(Exception finalFailure) { this.finalFailure = finalFailure; } - public ReplicaResult(IndexShard replica) { - this(replica, null); + public ReplicaResult() { + this(null); } - public void runPostReplicaActions(ReplicaPostReplicationActionsListener listener) { + public void runPostReplicaActions(ActionListener listener) { if (finalFailure != null) { listener.onFailure(finalFailure); } else { - listener.onResponse(replica.getLastSyncedGlobalCheckpoint(), replica.getLocalCheckpoint()); + listener.onResponse(null); } } } - public interface ReplicaPostReplicationActionsListener { - - void onResponse(long globalCheckpoint, long localCheckpoint); - - void onFailure(Exception ex); - } - protected void handleReplicaRequest( final ConcreteReplicaRequest replicaRequest, final TransportChannel channel, @@ -695,30 +683,26 @@ public void onResponse(Releasable releasable) { shardOperationOnReplica( replicaRequest.getRequest(), replica, - ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(new ReplicaPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - final ReplicaResponse response = new ReplicaResponse(localCheckpoint, globalCheckpoint); - releasable.close(); // release shard operation lock before responding to caller - if (logger.isTraceEnabled()) { - logger.trace( - "action [{}] completed on shard [{}] for request [{}]", - transportReplicaAction, - replicaRequest.getRequest().shardId(), - replicaRequest.getRequest() - ); - } - setPhase(task, "finished"); - onCompletionListener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - // release shard operation lock before responding to caller - Releasables.closeWhileHandlingException(releasable); - responseWithFailure(e); + ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(ActionListener.wrap(r -> { + final ReplicaResponse response = new ReplicaResponse( + replica.getLocalCheckpoint(), + replica.getLastSyncedGlobalCheckpoint() + ); + releasable.close(); // release shard operation lock before responding to caller + if (logger.isTraceEnabled()) { + logger.trace( + "action [{}] completed on shard [{}] for request [{}]", + transportReplicaAction, + replicaRequest.getRequest().shardId(), + replicaRequest.getRequest() + ); } - }), e -> { + setPhase(task, "finished"); + onCompletionListener.onResponse(response); + }, e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + responseWithFailure(e); + })), e -> { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); }) @@ -1154,12 +1138,10 @@ class PrimaryShardReference protected final IndexShard indexShard; private final Releasable operationLock; - private final Supplier seqNoStatsSupplier; PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; this.operationLock = operationLock; - this.seqNoStatsSupplier = indexShard.getSeqNoStatsSupplier(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 04e5088075689..cda56966f7a3a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -10,7 +10,6 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; @@ -45,7 +44,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -322,7 +320,7 @@ public WritePrimaryResult( PostWriteRefresh postWriteRefresh, @Nullable Consumer postWriteAction ) { - super(primary, request, finalResponse); + super(request, finalResponse); this.location = location; this.primary = primary; this.logger = logger; @@ -331,21 +329,21 @@ public WritePrimaryResult( } @Override - public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { + public void runPostReplicationActions(ActionListener listener) { /* * We call this after replication because this might wait for a refresh and that can take a while. * This way we wait for the refresh in parallel on the primary and on the replica. */ new AsyncAfterWriteAction(primary, replicaRequest(), location, new RespondingWriteResult() { @Override - public void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh) { + public void onSuccess(boolean forcedRefresh) { replicationResponse.setForcedRefresh(forcedRefresh); - listener.onResponse(globalCheckpoint, localCheckpoint); + listener.onResponse(null); } @Override - public void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex) { - listener.onFailure(globalCheckpoint, localCheckpoint, ex); + public void onFailure(Exception ex) { + listener.onFailure(ex); } }, logger, postWriteRefresh, postWriteAction).run(); } @@ -357,6 +355,7 @@ public void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex) public static class WriteReplicaResult> extends ReplicaResult { public final Location location; private final ReplicaRequest request; + private final IndexShard replica; private final Logger logger; private final Consumer postWriteAction; @@ -378,26 +377,27 @@ public WriteReplicaResult( Logger logger, Consumer postWriteAction ) { - super(replica, operationFailure); + super(operationFailure); this.location = location; this.request = request; + this.replica = replica; this.logger = logger; this.postWriteAction = postWriteAction; } @Override - public void runPostReplicaActions(ReplicaPostReplicationActionsListener listener) { + public void runPostReplicaActions(ActionListener listener) { if (finalFailure != null) { listener.onFailure(finalFailure); } else { new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() { @Override - public void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh) { - listener.onResponse(globalCheckpoint, localCheckpoint); + public void onSuccess(boolean forcedRefresh) { + listener.onResponse(null); } @Override - public void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex) { + public void onFailure(Exception ex) { listener.onFailure(ex); } }, logger, null, postWriteAction).run(); @@ -424,12 +424,12 @@ interface RespondingWriteResult { * Called on successful processing of all post write actions * @param forcedRefresh true iff this write has caused a refresh */ - void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh); + void onSuccess(boolean forcedRefresh); /** * Called on failure if a post action failed. */ - void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex); + void onFailure(Exception ex); } /** @@ -453,10 +453,6 @@ static final class AsyncAfterWriteAction { private final PostWriteRefresh postWriteRefresh; private final Consumer postWriteAction; private final TimeValue postWriteRefreshTimeout; - // Capture the values of the local & global checkpoint to be passed to after-write-action listeners, so that they don't have to - // read the values back from the engine as it could deadlock. - private final AtomicLong globalCheckpoint; - private final AtomicLong localCheckpoint; AsyncAfterWriteAction( final IndexShard indexShard, @@ -486,43 +482,26 @@ static final class AsyncAfterWriteAction { this.logger = logger; this.postWriteRefreshTimeout = request.timeout(); assert pendingOps.get() >= 0 && pendingOps.get() <= 3 : "pendingOps was: " + pendingOps.get(); - this.globalCheckpoint = new AtomicLong(indexShard.getLastSyncedGlobalCheckpoint()); - this.localCheckpoint = new AtomicLong(indexShard.getLocalCheckpoint()); } /** calls the response listener if all pending operations have returned otherwise it just decrements the pending opts counter.*/ private void maybeFinish() { final int numPending = pendingOps.decrementAndGet(); if (numPending == 0) { - final long globalCheckpoint = this.globalCheckpoint.get(); - final long localCheckpoint = this.localCheckpoint.get(); - if (syncFailure.get() != null) { - respond.onFailure(globalCheckpoint, localCheckpoint, syncFailure.get()); + respond.onFailure(syncFailure.get()); } else { // TODO: Temporary until we fail unpromotable shard if (refreshFailure.get() != null) { - respond.onFailure(globalCheckpoint, localCheckpoint, refreshFailure.get()); + respond.onFailure(refreshFailure.get()); } else { - respond.onSuccess(globalCheckpoint, localCheckpoint, refreshed.get()); + respond.onSuccess(refreshed.get()); } } } assert numPending >= 0 && numPending <= 2 : "numPending must either 2, 1 or 0 but was " + numPending; } - private void updateCheckpoints() { - try { - this.globalCheckpoint.accumulateAndGet(indexShard.getLastSyncedGlobalCheckpoint(), Math::max); - this.localCheckpoint.accumulateAndGet(indexShard.getLocalCheckpoint(), Math::max); - } catch (AlreadyClosedException e) { - // the index was deleted or this shard was never activated after a relocation; fall through and finish normally - } catch (Exception e) { - syncFailure.set(e); - assert false : e; - } - } - void run() { /* * We either respond immediately (i.e., if we do not fsync per request or wait for @@ -530,8 +509,8 @@ void run() { * respond. */ indexShard.afterWriteOperation(); - updateCheckpoints(); - maybeFinish(); // decrement pending by one, if there is nothing else to do we just respond with success + // decrement pending by one, if there is nothing else to do we just respond with success + maybeFinish(); if (needsRefreshAction) { assert pendingOps.get() > 0; ActionListener refreshListener = new ActionListener<>() { @@ -567,14 +546,9 @@ public void onFailure(Exception e) { if (sync) { assert pendingOps.get() > 0; indexShard.syncAfterWrite(location, e -> { - if (e != null) { - syncFailure.set(e); - } else { - updateCheckpoints(); - } + syncFailure.set(e); maybeFinish(); }); - updateCheckpoints(); } if (postWriteAction != null) { postWriteAction.accept(this::maybeFinish); diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java index 89937960b37bc..784ce40782b26 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java @@ -76,6 +76,8 @@ public static void accessEnd() { ensureAssertionsEnabled(); final var accessor = getAccessorSafe(); assert accessor != null : "current accessor not set"; + var thread = Thread.currentThread(); + assert accessor.thread == thread : "current accessor [" + accessor + "] was set by a different thread [" + thread + ']'; threadLocalAccessor.remove(); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 1b356cd9360a8..54904323a25ad 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -78,15 +78,15 @@ protected ReplicationResponse newResponseInstance(StreamInput in) throws IOExcep @Override protected void shardOperationOnPrimary( Request request, - IndexShard primary, + IndexShard indexShard, ActionListener> listener ) { - maybeSyncTranslog(primary, listener.map(v -> new PrimaryResult<>(primary, request, new ReplicationResponse()))); + maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse()))); } @Override protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { - maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult(replica))); + maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult())); } private static void maybeSyncTranslog(IndexShard indexShard, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 73d9a680190f8..cdeaa177dfd5e 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -146,7 +146,7 @@ protected void shardOperationOnPrimary( Objects.requireNonNull(request); Objects.requireNonNull(primary); primary.persistRetentionLeases(); - return new PrimaryResult<>(primary, request, new ReplicationResponse()); + return new PrimaryResult<>(request, new ReplicationResponse()); }); } @@ -157,7 +157,7 @@ protected void shardOperationOnReplica(Request request, IndexShard replica, Acti Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); replica.persistRetentionLeases(); - return new ReplicaResult(replica); + return new ReplicaResult(); }); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f7d0d44e3513b..ca25c5614c68f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1351,18 +1351,6 @@ public SeqNoStats seqNoStats() { return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); } - /** - * Returns a supplier that supplies the {@link SeqNoStats} of the engine that is referenced at the time this method is called. - * Uses this method in place where the current engine reference cannot be resolved directly. - * - * @return a supplier of {@link SeqNoStats} - * @throws AlreadyClosedException if shard is closed - */ - public Supplier getSeqNoStatsSupplier() { - var engine = getEngine(); - return () -> engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint()); - } - public IndexingStats indexingStats() { Engine engine = getEngineOrNull(); final boolean throttled; diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index c39675e654d95..72d56dd21aac8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -423,8 +423,8 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { - listener.onResponse(0L, 0L); + public void runPostReplicationActions(ActionListener listener) { + listener.onResponse(null); } public ReplicationResponse.ShardInfo getShardInfo() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index f7b5bb3981e0a..a7abceddac58c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -669,19 +669,15 @@ public void perform(Request request, ActionListener listener) { if (request.processedOnPrimary.compareAndSet(false, true) == false) { fail("processed [" + request + "] twice"); } - listener.onResponse(new Result(request, localCheckpoint, globalCheckpoint)); + listener.onResponse(new Result(request)); } static class Result implements ReplicationOperation.PrimaryResult { private final Request request; private ShardInfo shardInfo; - final long localCheckpoint; - final long globalCheckpoint; - Result(Request request, long localCheckpoint, long globalCheckpoint) { + Result(Request request) { this.request = request; - this.localCheckpoint = localCheckpoint; - this.globalCheckpoint = globalCheckpoint; } @Override @@ -695,11 +691,11 @@ public void setShardInfo(ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { + public void runPostReplicationActions(ActionListener listener) { if (request.runPostReplicationActionsOnPrimary.compareAndSet(false, true) == false) { fail("processed [" + request + "] twice"); } - listener.onResponse(globalCheckpoint, localCheckpoint); + listener.onResponse(null); } public ShardInfo getShardInfo() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 2709b69eb82c8..233138c8eedbb 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1066,7 +1066,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, if (throwException) { throw new ElasticsearchException("simulated"); } - return new ReplicaResult(replica); + return new ReplicaResult(); }); } }; @@ -1236,7 +1236,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, if (throwException.get()) { throw new RetryOnReplicaException(shardId, "simulation"); } - return new ReplicaResult(replica); + return new ReplicaResult(); }); } }; @@ -1338,7 +1338,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, throw new RetryOnReplicaException(shardId, "simulation"); } calledSuccessfully.set(true); - return new ReplicaResult(replica); + return new ReplicaResult(); }); } }; @@ -1537,13 +1537,13 @@ protected void shardOperationOnPrimary( ) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - listener.onResponse(new PrimaryResult<>(primary, shardRequest, new TestResponse())); + listener.onResponse(new PrimaryResult<>(shardRequest, new TestResponse())); } @Override protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener listener) { request.processedOnReplicas.incrementAndGet(); - listener.onResponse(new ReplicaResult(replica)); + listener.onResponse(new ReplicaResult()); } } @@ -1631,6 +1631,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService when(indexShard.getPendingPrimaryTerm()).thenAnswer( i -> clusterService.state().metadata().getProject().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()) ); + ReplicationGroup replicationGroup = mock(ReplicationGroup.class); when(indexShard.getReplicationGroup()).thenReturn(replicationGroup); return indexShard; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 7f5f1bc5da010..7e0d7063c417e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -489,7 +489,7 @@ protected void shardOperationOnPrimary( // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the primary shard assertSame(primary, shard); - listener.onResponse(new PrimaryResult<>(primary, shardRequest, new Response())); + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); } @Override @@ -503,7 +503,7 @@ protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the replica shard assertSame(replica, replica); - listener.onResponse(new ReplicaResult(replica)); + listener.onResponse(new ReplicaResult()); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index ab1640093380b..c911b0836c127 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -141,17 +141,7 @@ public void testPrimaryNoRefreshCall() throws Exception { TestAction testAction = new TestAction(); testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicationActions(new ReplicationOperation.PrimaryPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - listener.onResponse(result.replicationResponse); - } - - @Override - public void onFailure(long globalCheckpoint, long localCheckpoint, Exception failure) { - listener.onFailure(failure); - } - }); + result.runPostReplicationActions(listener.map(ignore -> result.replicationResponse)); assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); @@ -167,17 +157,7 @@ public void testReplicaNoRefreshCall() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - listener.onResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public void onFailure(Exception failure) { - listener.onFailure(failure); - } - }); + result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); @@ -190,17 +170,7 @@ public void testPrimaryImmediateRefresh() throws Exception { TestAction testAction = new TestAction(); testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicationActions(new ReplicationOperation.PrimaryPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - listener.onResponse(result.replicationResponse); - } - - @Override - public void onFailure(long globalCheckpoint, long localCheckpoint, Exception failure) { - listener.onFailure(failure); - } - }); + result.runPostReplicationActions(listener.map(ignore -> result.replicationResponse)); @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) ActionListener.class); @@ -226,17 +196,7 @@ public void testReplicaImmediateRefresh() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - listener.onResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public void onFailure(Exception failure) { - listener.onFailure(failure); - } - }); + result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); assertNull(listener.response); // Haven't responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) ActionListener.class); @@ -255,17 +215,7 @@ public void testPrimaryWaitForRefresh() throws Exception { TestAction testAction = new TestAction(); testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicationActions(new ReplicationOperation.PrimaryPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - listener.onResponse(result.replicationResponse); - } - - @Override - public void onFailure(long globalCheckpoint, long localCheckpoint, Exception failure) { - listener.onFailure(failure); - } - }); + result.runPostReplicationActions(listener.map(ignore -> result.replicationResponse)); assertNull(listener.response); // Haven't really responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -293,17 +243,7 @@ public void testReplicaWaitForRefresh() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - listener.onResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public void onFailure(Exception failure) { - listener.onFailure(failure); - } - }); + result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); assertNull(listener.response); // Haven't responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); @@ -336,17 +276,7 @@ public void testDocumentFailureInShardOperationOnReplica() throws Exception { testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - listener.onResponse(TransportResponse.Empty.INSTANCE); - } - - @Override - public void onFailure(Exception failure) { - listener.onFailure(failure); - } - }); + result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); assertNull(listener.response); assertNotNull(listener.failure); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 77ef0b336e6eb..6c1363c06a4d0 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -154,17 +154,7 @@ public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateE verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); - result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - success.set(true); - } - - @Override - public void onFailure(Exception ex) { - fail(ex); - } - }); + result.runPostReplicaActions(ActionTestUtils.assertNoFailureListener(r -> success.set(true))); assertTrue(success.get()); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index d4f1e751774dd..93d56b45bd959 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -163,17 +163,7 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); - result.runPostReplicaActions(new TransportReplicationAction.ReplicaPostReplicationActionsListener() { - @Override - public void onResponse(long globalCheckpoint, long localCheckpoint) { - success.set(true); - } - - @Override - public void onFailure(Exception ex) { - fail(ex); - } - }); + result.runPostReplicaActions(ActionTestUtils.assertNoFailureListener(r -> success.set(true))); assertTrue(success.get()); } diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java b/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java index fcc9d38531b49..f73725e834d61 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/replication/TransportWriteActionTestHelper.java @@ -31,14 +31,13 @@ public static void performPostWriteActions( ) { final CountDownLatch latch = new CountDownLatch(1); TransportWriteAction.RespondingWriteResult writerResult = new TransportWriteAction.RespondingWriteResult() { - @Override - public void onSuccess(long globalCheckpoint, long localCheckpoint, boolean forcedRefresh) { + public void onSuccess(boolean forcedRefresh) { latch.countDown(); } @Override - public void onFailure(long globalCheckpoint, long localCheckpoint, Exception ex) { + public void onFailure(Exception ex) { throw new AssertionError(ex); } }; diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 00eeba990c75a..68976155a519a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -87,7 +87,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; @@ -811,14 +810,12 @@ public void markShardCopyAsStaleIfNeeded( } protected class PrimaryResult implements ReplicationOperation.PrimaryResult { - final IndexShard primary; final ReplicaRequest replicaRequest; final Response finalResponse; - public PrimaryResult(IndexShard primary, ReplicaRequest replicaRequest, Response replicationResponse) { - this.primary = Objects.requireNonNull(primary); + public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) { this.replicaRequest = replicaRequest; - this.finalResponse = replicationResponse; + this.finalResponse = finalResponse; } @Override @@ -832,8 +829,8 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) { } @Override - public void runPostReplicationActions(ReplicationOperation.PrimaryPostReplicationActionsListener listener) { - listener.onResponse(primary.getLastSyncedGlobalCheckpoint(), primary.getLocalCheckpoint()); + public void runPostReplicationActions(ActionListener actionListener) { + actionListener.onResponse(null); } } @@ -850,7 +847,7 @@ protected void performOnPrimary(IndexShard primary, BulkShardRequest request, Ac executeShardBulkOnPrimary( primary, request, - listener.map(result -> new PrimaryResult(primary, result.replicaRequest(), result.replicationResponse)) + listener.map(result -> new PrimaryResult(result.replicaRequest(), result.replicationResponse)) ); } @@ -1006,7 +1003,7 @@ protected void performOnPrimary( ) { ActionListener.completeWith(listener, () -> { primary.sync(); - return new PrimaryResult(primary, request, new ReplicationResponse()); + return new PrimaryResult(request, new ReplicationResponse()); }); } @@ -1027,7 +1024,7 @@ protected void performOnPrimary(IndexShard primary, ResyncReplicationRequest req ActionListener.completeWith(listener, () -> { final TransportWriteAction.WritePrimaryResult result = executeResyncOnPrimary(primary, request); - return new PrimaryResult(primary, result.replicaRequest(), result.replicationResponse); + return new PrimaryResult(result.replicaRequest(), result.replicationResponse); }); } @@ -1108,7 +1105,7 @@ protected void performOnPrimary( ) { ActionListener.completeWith(listener, () -> { primary.persistRetentionLeases(); - return new PrimaryResult(primary, request, new RetentionLeaseSyncAction.Response()); + return new PrimaryResult(request, new RetentionLeaseSyncAction.Response()); }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 4ccbf2a574fd8..34ce43e583086 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -832,7 +832,7 @@ protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest r } catch (InterruptedException | ExecutionException | IOException e) { throw new RuntimeException(e); } - listener.onResponse(new PrimaryResult(primary, ccrResult.replicaRequest(), ccrResult.replicationResponse)); + listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.replicationResponse)); } @Override