diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java index cefb27376f9ea..a151e79ae5a88 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java @@ -49,21 +49,18 @@ public void refreshShard( ) { switch (policy) { case NONE -> listener.onResponse(false); - case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() { - @Override - public void onResponse(Boolean forced) { - if (location != null && indexShard.routingEntry().isSearchable() == false) { - refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout); - } else { - listener.onResponse(forced); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + case WAIT_UNTIL -> { + ActionListener wrapped; + if (location != null && indexShard.routingEntry().isSearchable() == false) { + var engineOrNull = indexShard.getEngineOrNull(); + wrapped = listener.delegateFailure( + (l, forced) -> refreshUnpromotables(indexShard, engineOrNull, location, listener, forced, postWriteRefreshTimeout) + ); + } else { + wrapped = listener; } - }); + waitUntil(indexShard, location, wrapped); + } case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> { if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) { sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout); @@ -103,17 +100,16 @@ private static void waitUntil(IndexShard indexShard, Translog.Location location, private void refreshUnpromotables( IndexShard indexShard, + Engine engineOrNull, // to avoid accessing it under the RefreshListener's refreshLock Translog.Location location, ActionListener listener, boolean forced, @Nullable TimeValue postWriteRefreshTimeout ) { - Engine engineOrNull = indexShard.getEngineOrNull(); if (engineOrNull == null) { listener.onFailure(new AlreadyClosedException("Engine closed during refresh.")); return; } - engineOrNull.addFlushListener(location, listener.delegateFailureAndWrap((l, generation) -> { try ( ThreadContext.StoredContext ignore = transportService.getThreadPool() diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d9bdbc2747045..5b332b0a9653a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -501,29 +501,31 @@ public void handleException(TransportException exp) { ); } else { setPhase(replicationTask, "primary"); + // Resolve the engine upfront to avoid an unsafe access if the responseListener is called by a refresh listener + final var engineOrNull = syncGlobalCheckpointAfterOperation ? primaryShardReference.indexShard.getEngineOrNull() : null; final ActionListener responseListener = ActionListener.wrap(response -> { adaptResponse(response, primaryShardReference.indexShard); - if (syncGlobalCheckpointAfterOperation) { + final var primary = primaryShardReference.indexShard; + if (engineOrNull != null) { try { - primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation"); + // TODO ensure engine is still open + var seqNoStats = engineOrNull.getSeqNoStats(primary.getLastKnownGlobalCheckpoint()); + primary.maybeSyncGlobalCheckpoint("post-operation", seqNoStats); } catch (final Exception e) { // only log non-closed exceptions if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { // intentionally swallow, a missed global checkpoint sync should not fail this operation logger.info( - () -> format( - "%s failed to execute post-operation global checkpoint sync", - primaryShardReference.indexShard.shardId() - ), + () -> format("%s failed to execute post-operation global checkpoint sync", primary.shardId()), e ); } } } - assert primaryShardReference.indexShard.isPrimaryMode(); + assert primary.isPrimaryMode(); primaryShardReference.close(); // release shard operation lock before responding to caller setPhase(replicationTask, "finished"); onCompletionListener.onResponse(response); diff --git a/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java b/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java new file mode 100644 index 0000000000000..86db0bd9eea5f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/AbstractReaderManager.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.core.Assertions; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.SuppressForbidden; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +@SuppressForbidden(reason = "reference counting is required here") +public abstract class AbstractReaderManager extends ReferenceManager { + + @Nullable // if assertions are disabled + private final Map assertingListeners = Assertions.ENABLED ? new ConcurrentHashMap<>() : null; + + @Override + protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { + return reference.tryIncRef(); + } + + @Override + protected int getRefCount(ElasticsearchDirectoryReader reference) { + return reference.getRefCount(); + } + + @Override + protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { + reference.decRef(); + } + + @Override + public final void addListener(RefreshListener listener) { + if (Assertions.ENABLED == false) { + super.addListener(listener); + return; + } + + final var assertingListener = new AssertingRefreshListener(listener); + var previous = assertingListeners.put(listener, assertingListener); + assert previous == null : "listener already added"; + super.addListener(assertingListener); + } + + @Override + public final void removeListener(RefreshListener listener) { + if (Assertions.ENABLED == false) { + super.removeListener(listener); + return; + } + + final var assertingListener = assertingListeners.remove(listener); + assert assertingListener != null : "listener already removed"; + super.removeListener(assertingListener); + } + + /** + * A delegating {@link RefreshListener} used to assert that refresh listeners are not accessing the engine within before/after refresh + * methods. + */ + private static class AssertingRefreshListener implements RefreshListener { + + private final RefreshListener delegate; + + private AssertingRefreshListener(RefreshListener delegate) { + this.delegate = Objects.requireNonNull(delegate); + if (Assertions.ENABLED == false) { + throw new AssertionError("Only use this when assertions are enabled"); + } + } + + @Override + public void beforeRefresh() throws IOException { + SafeEngineAccessThreadLocal.accessStart(); + try { + delegate.beforeRefresh(); + } finally { + SafeEngineAccessThreadLocal.accessEnd(); + } + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + SafeEngineAccessThreadLocal.accessStart(); + try { + delegate.afterRefresh(didRefresh); + } finally { + SafeEngineAccessThreadLocal.accessEnd(); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java index 62adc49471c69..1ff26d9023b54 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchReaderManager.java @@ -10,10 +10,8 @@ package org.elasticsearch.index.engine; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; -import org.elasticsearch.core.SuppressForbidden; import java.io.IOException; @@ -25,8 +23,7 @@ * @see SearcherManager * */ -@SuppressForbidden(reason = "reference counting is required here") -public class ElasticsearchReaderManager extends ReferenceManager { +public class ElasticsearchReaderManager extends AbstractReaderManager { /** * Creates and returns a new ElasticsearchReaderManager from the given @@ -39,23 +36,8 @@ public ElasticsearchReaderManager(ElasticsearchDirectoryReader reader) { this.current = reader; } - @Override - protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { - reference.decRef(); - } - @Override protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException { return (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh); } - - @Override - protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { - return reference.tryIncRef(); - } - - @Override - protected int getRefCount(ElasticsearchDirectoryReader reference) { - return reference.getRefCount(); - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineAwareRefreshListener.java b/server/src/main/java/org/elasticsearch/index/engine/EngineAwareRefreshListener.java new file mode 100644 index 0000000000000..18efe7f007a67 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineAwareRefreshListener.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.ReferenceManager; + +/** + * A type of {@link ReferenceManager.RefreshListener} that is called back when a new {@link Engine} is instanciated. + */ +public interface EngineAwareRefreshListener extends ReferenceManager.RefreshListener { + + void onNewEngine(Engine engine); +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fc7f6eab0856c..d7936ba6fd63b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -71,7 +71,6 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexMode; @@ -305,9 +304,15 @@ public InternalEngine(EngineConfig engineConfig) { // don't allow commits until we are done with recovering pendingTranslogRecovery.set(true); for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(this); + } this.externalReaderManager.addListener(listener); } for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(this); + } this.internalReaderManager.addListener(listener); } this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); @@ -453,8 +458,7 @@ public CompletionStats completionStats(String... fieldNamePatterns) { * this specialized implementation an external refresh will immediately be reflected on the internal reader * and old segments can be released in the same way previous version did this (as a side-effect of _refresh) */ - @SuppressForbidden(reason = "reference counting is required here") - private static final class ExternalReaderManager extends ReferenceManager { + private static final class ExternalReaderManager extends AbstractReaderManager { private final BiConsumer refreshListener; private final ElasticsearchReaderManager internalReaderManager; private boolean isWarmedUp; // guarded by refreshLock @@ -495,21 +499,6 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea return newReader; // steal the reference } } - - @Override - protected boolean tryIncRef(ElasticsearchDirectoryReader reference) { - return reference.tryIncRef(); - } - - @Override - protected int getRefCount(ElasticsearchDirectoryReader reference) { - return reference.getRefCount(); - } - - @Override - protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { - reference.decRef(); - } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java new file mode 100644 index 0000000000000..784ce40782b26 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/SafeEngineAccessThreadLocal.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.core.Assertions; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.shard.IndexShard; + +/** + * This class is used to assert that a thread does not access the current engine reference using the method + * {@link IndexShard#getEngineOrNull()} when it is executing some protected code blocks. A protected code block is a + * block of code which starts when {@link #accessStart()} is called and ends when {@link #accessEnd()} is called. + */ +public final class SafeEngineAccessThreadLocal { + + @Nullable // if assertions are disabled + private static final ThreadLocal threadLocalAccessor; + static { + threadLocalAccessor = Assertions.ENABLED ? new ThreadLocal<>() : null; + } + + private static class Accessor { + + private final Thread thread; + private final SetOnce failure; + + private Accessor(Thread thread) { + this.thread = thread; + this.failure = new SetOnce<>(); + } + + private void setFailure(AssertionError error) { + failure.set(error); + } + + private boolean isFailed() { + return failure.get() != null; + } + + private AssertionError getFailure() { + return failure.get(); + } + + @Override + public String toString() { + return thread.toString(); + } + } + + private SafeEngineAccessThreadLocal() {} + + private static Accessor getAccessorSafe() { + final var accessor = threadLocalAccessor.get(); + if (accessor != null && accessor.isFailed()) { + throw new AssertionError("Current thread has made an unsafe access to the engine", accessor.getFailure()); + } + return accessor; + } + + public static void accessStart() { + ensureAssertionsEnabled(); + final var accessor = getAccessorSafe(); + assert accessor == null : "current accessor already set"; + threadLocalAccessor.set(new Accessor(Thread.currentThread())); + } + + public static void accessEnd() { + ensureAssertionsEnabled(); + final var accessor = getAccessorSafe(); + assert accessor != null : "current accessor not set"; + var thread = Thread.currentThread(); + assert accessor.thread == thread : "current accessor [" + accessor + "] was set by a different thread [" + thread + ']'; + threadLocalAccessor.remove(); + } + + /** + * Use this method to assert that the current thread has not entered a protected execution code block. + */ + public static boolean assertSafeAccess() { + ensureAssertionsEnabled(); + final var accessor = getAccessorSafe(); + if (accessor != null) { + var message = "thread [" + accessor + "] should not access the engine using the getEngineOrNull() method"; + accessor.setFailure(new AssertionError(message)); // to be thrown later + assert false : message; + return false; + } + return true; + } + + private static void ensureAssertionsEnabled() { + if (Assertions.ENABLED == false) { + throw new AssertionError("Only use this method when assertions are enabled"); + } + assert threadLocalAccessor != null; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7fecc53826ff1..ca25c5614c68f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -88,12 +88,14 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.GetResult; +import org.elasticsearch.index.engine.EngineAwareRefreshListener; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; +import org.elasticsearch.index.engine.SafeEngineAccessThreadLocal; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; @@ -2240,11 +2242,25 @@ private boolean assertLastestCommitUserData() throws IOException { return true; } - private void onNewEngine(Engine newEngine) { + private void onNewEngine(Engine engine) { assert Thread.holdsLock(engineMutex); - refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); - refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); - refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); + assert engine != null; + + var engineConfig = engine.config(); + if (engineConfig.getExternalRefreshListener() != null) { + for (var listener : engineConfig.getExternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(engine); + } + } + } + if (engineConfig.getInternalRefreshListener() != null) { + for (var listener : engineConfig.getInternalRefreshListener()) { + if (listener instanceof EngineAwareRefreshListener engineListener) { + engineListener.onNewEngine(engine); + } + } + } } /** @@ -2996,8 +3012,22 @@ public void maybeSyncGlobalCheckpoint(final String reason) { return; } assert assertPrimaryMode(); - // only sync if there are no operations in flight, or when using async durability final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); + doMaybeSyncGlobalCheckpoint(reason, stats); + } + + public void maybeSyncGlobalCheckpoint(final String reason, final SeqNoStats stats) { + verifyNotClosed(); + assert shardRouting.primary() : "only call maybeSyncGlobalCheckpoint on primary shard"; + if (replicationTracker.isPrimaryMode() == false) { + return; + } + assert assertPrimaryMode(); + doMaybeSyncGlobalCheckpoint(reason, stats); + } + + private void doMaybeSyncGlobalCheckpoint(final String reason, final SeqNoStats stats) { + // only sync if there are no operations in flight, or when using async durability final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC; if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) { final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync(); @@ -3298,6 +3328,7 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { + assert SafeEngineAccessThreadLocal.assertSafeAccess(); return this.currentEngineReference.get(); } @@ -4082,13 +4113,21 @@ private void setRefreshPending(Engine engine) { }); } - private class RefreshPendingLocationListener implements ReferenceManager.RefreshListener { + private class RefreshPendingLocationListener implements EngineAwareRefreshListener { + + private Supplier lastWriteLocationSupplier; Translog.Location lastWriteLocation; + @Override + public void onNewEngine(Engine engine) { + this.lastWriteLocationSupplier = () -> engine.getTranslogLastWriteLocation(); + } + @Override public void beforeRefresh() { + assert lastWriteLocationSupplier != null; try { - lastWriteLocation = getEngine().getTranslogLastWriteLocation(); + lastWriteLocation = lastWriteLocationSupplier.get(); } catch (AlreadyClosedException exc) { // shard is closed - no location is fine lastWriteLocation = null; @@ -4097,6 +4136,7 @@ public void beforeRefresh() { @Override public void afterRefresh(boolean didRefresh) { + assert lastWriteLocationSupplier != null; if (didRefresh && lastWriteLocation != null) { pendingRefreshLocation.updateAndGet(pendingLocation -> { if (pendingLocation == null || pendingLocation.compareTo(lastWriteLocation) <= 0) { @@ -4109,14 +4149,25 @@ public void afterRefresh(boolean didRefresh) { } } - private class RefreshFieldHasValueListener implements ReferenceManager.RefreshListener { + private class RefreshFieldHasValueListener implements EngineAwareRefreshListener { + + private Supplier engineSearcherSupplier; + + @Override + public void onNewEngine(Engine engine) { + this.engineSearcherSupplier = () -> engine.acquireSearcher("field_has_value"); + } + @Override - public void beforeRefresh() {} + public void beforeRefresh() { + assert engineSearcherSupplier != null; + } @Override public void afterRefresh(boolean didRefresh) { + assert engineSearcherSupplier != null; if (enableFieldHasValue && (didRefresh || fieldInfos == FieldInfos.EMPTY)) { - try (Engine.Searcher hasValueSearcher = getEngine().acquireSearcher("field_has_value")) { + try (Engine.Searcher hasValueSearcher = engineSearcherSupplier.get()) { setFieldInfos(FieldInfos.getMergedFieldInfos(hasValueSearcher.getIndexReader())); } catch (AlreadyClosedException ignore) { // engine is closed - no updated FieldInfos is fine @@ -4133,16 +4184,25 @@ public ShardFieldStats getShardFieldStats() { return shardFieldStats; } - private class RefreshShardFieldStatsListener implements ReferenceManager.RefreshListener { + private class RefreshShardFieldStatsListener implements EngineAwareRefreshListener { + + private Supplier engineSearcherSupplier; + @Override - public void beforeRefresh() { + public void onNewEngine(Engine engine) { + this.engineSearcherSupplier = () -> engine.acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL); + } + @Override + public void beforeRefresh() { + assert engineSearcherSupplier != null; } @Override public void afterRefresh(boolean didRefresh) { + assert engineSearcherSupplier != null; if (shardFieldStats == null || didRefresh) { - try (var searcher = getEngine().acquireSearcher("shard_field_stats", Engine.SearcherScope.INTERNAL)) { + try (var searcher = engineSearcherSupplier.get()) { int numSegments = 0; int totalFields = 0; long usages = 0; diff --git a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java index 8a2fc396c6737..a60e812d4968f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java @@ -10,7 +10,6 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; -import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; @@ -19,6 +18,8 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineAwareRefreshListener; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; @@ -39,7 +40,7 @@ * * When {@link Closeable#close()}d it will no longer accept listeners and flush any existing listeners. */ -public final class RefreshListeners implements ReferenceManager.RefreshListener, Closeable { +public final class RefreshListeners implements EngineAwareRefreshListener, Closeable { private final IntSupplier getMaxRefreshListeners; private final Runnable forceRefresh; private final Logger logger; @@ -264,21 +265,21 @@ public synchronized int pendingCount() { /** * Setup the translog used to find the last refreshed location. */ - public void setCurrentRefreshLocationSupplier(Supplier currentRefreshLocationSupplier) { + private void setCurrentRefreshLocationSupplier(Supplier currentRefreshLocationSupplier) { this.currentRefreshLocationSupplier = currentRefreshLocationSupplier; } /** * Setup the engine used to find the last processed sequence number checkpoint. */ - public void setCurrentProcessedCheckpointSupplier(LongSupplier processedCheckpointSupplier) { + private void setCurrentProcessedCheckpointSupplier(LongSupplier processedCheckpointSupplier) { this.processedCheckpointSupplier = processedCheckpointSupplier; } /** * Setup the engine used to find the max issued seqNo. */ - public void setMaxIssuedSeqNoSupplier(LongSupplier maxIssuedSeqNoSupplier) { + private void setMaxIssuedSeqNoSupplier(LongSupplier maxIssuedSeqNoSupplier) { this.maxIssuedSeqNoSupplier = maxIssuedSeqNoSupplier; } @@ -297,6 +298,13 @@ public void setMaxIssuedSeqNoSupplier(LongSupplier maxIssuedSeqNoSupplier) { private LongSupplier processedCheckpointSupplier; private LongSupplier maxIssuedSeqNoSupplier; + @Override + public void onNewEngine(Engine engine) { + setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); + setCurrentProcessedCheckpointSupplier(engine::getProcessedLocalCheckpoint); + setMaxIssuedSeqNoSupplier(engine::getMaxSeqNo); + } + @Override public void beforeRefresh() throws IOException { currentRefreshLocation = currentRefreshLocationSupplier.get(); diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1c60803c55242..03d386acb7e6a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1158,7 +1158,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen Engine.SearcherSupplier searcherSupplier = null; ReaderContext readerContext = null; try { - searcherSupplier = shard.acquireSearcherSupplier(); + searcherSupplier = shard.acquireSearcherSupplier(); // TODO I think we need to fork here final ShardSearchContextId id = new ShardSearchContextId( sessionId, idGenerator.incrementAndGet(), @@ -1949,9 +1949,14 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re Rewriteable.rewriteAndFetch( request.getRewriteable(), indicesService.getDataRewriteContext(request::nowInMillis), - request.readerId() == null - ? listener.delegateFailureAndWrap((l, r) -> shard.ensureShardSearchActive(b -> l.onResponse(request))) - : listener.safeMap(r -> request) + request.readerId() == null ? listener.delegateFailureAndWrap((l, r) -> shard.ensureShardSearchActive(waitedForRefresh -> { + if (waitedForRefresh) { + // for to generic in case the listener accesses the engine + threadPool.generic().execute(ActionRunnable.supply(l, () -> request)); + } else { + l.onResponse(request); + } + })) : listener.safeMap(r -> request) ); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index ca616dc619ec9..2aa01f95ca4a7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -162,9 +162,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); - listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); - listeners.setCurrentProcessedCheckpointSupplier(engine::getProcessedLocalCheckpoint); - listeners.setMaxIssuedSeqNoSupplier(engine::getMaxSeqNo); + listeners.onNewEngine(engine); } @After