diff --git a/docs/changelog/137675.yaml b/docs/changelog/137675.yaml new file mode 100644 index 0000000000000..ad551dfffc901 --- /dev/null +++ b/docs/changelog/137675.yaml @@ -0,0 +1,5 @@ +pr: 137675 +summary: PIT context relocation work on main repo +area: Search +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java index f68bd1ea52cb8..3962287dcc44e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java @@ -31,13 +31,13 @@ public final class SearchContextIdForNode implements Writeable { * @param node The target node where the search context ID is defined, or {@code null} if the shard is missing or unavailable. * @param searchContextId The {@link ShardSearchContextId}, or {@code null} if the shard is missing or unavailable. */ - SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) { + public SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) { this.node = node; this.clusterAlias = clusterAlias; this.searchContextId = searchContextId; } - SearchContextIdForNode(StreamInput in) throws IOException { + public SearchContextIdForNode(StreamInput in) throws IOException { boolean allowNull = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0); this.node = allowNull ? in.readOptionalString() : in.readString(); this.clusterAlias = in.readOptionalString(); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index a773340dac65f..60982752060d0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -2300,11 +2300,12 @@ static List getLocalShardsIteratorFromPointInTime( ShardSearchContextId shardSearchContextId = perNode.getSearchContextId(); if (shardSearchContextId.isRetryable()) { for (ShardRouting shard : shards) { - if (shard.currentNodeId().equals(perNode.getNode()) == false) { + if (shard.isSearchable() && shard.currentNodeId().equals(perNode.getNode()) == false) { targetNodes.add(shard.currentNodeId()); } } } + logger.trace("PIT retryable - adding shard copy on nodes [{}]", targetNodes); } catch (IndexNotFoundException | ShardNotFoundException e) { // We can hit these exceptions if the index was deleted after creating PIT or the cluster state on // this coordinating node is outdated. It's fine to ignore these extra "retry-able" target shards diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8d28250ce1919..10bd4ed9faf6b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -59,7 +59,9 @@ import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Assertions; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.CheckedSupplier; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; @@ -1015,6 +1017,17 @@ public SearcherSupplier acquireSearcherSupplier( Function wrapper, SearcherScope scope, SplitShardCountSummary splitShardCountSummary + ) throws EngineException { + ReferenceManager referenceManager = getReferenceManager(scope); + return acquireSearcherSupplier(wrapper, scope, splitShardCountSummary, referenceManager::acquire, referenceManager::release); + } + + public SearcherSupplier acquireSearcherSupplier( + Function wrapper, + SearcherScope scope, + SplitShardCountSummary splitShardCountSummary, + CheckedSupplier directorySupplier, + CheckedConsumer releaseAction ) throws EngineException { /* Acquire order here is store -> manager since we need * to make sure that the store is not closed before @@ -1024,8 +1037,7 @@ public SearcherSupplier acquireSearcherSupplier( } Releasable releasable = store::decRef; try { - ReferenceManager referenceManager = getReferenceManager(scope); - ElasticsearchDirectoryReader acquire = referenceManager.acquire(); + ElasticsearchDirectoryReader acquire = directorySupplier.get(); final DirectoryReader maybeWrappedDirectoryReader; if (scope == SearcherScope.EXTERNAL) { maybeWrappedDirectoryReader = wrapExternalDirectoryReader(acquire, splitShardCountSummary); @@ -1050,7 +1062,7 @@ public Searcher acquireSearcherInternal(String source) { @Override protected void doClose() { try { - referenceManager.release(acquire); + releaseAction.accept(acquire); } catch (IOException e) { throw new UncheckedIOException("failed to close", e); } catch (AlreadyClosedException e) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6730eaed01fd8..ca06e842b15d2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1751,7 +1751,7 @@ private void markSearcherAccessed() { lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis()); } - private Engine.Searcher wrapSearcher(Engine.Searcher searcher) { + public Engine.Searcher wrapSearcher(Engine.Searcher searcher) { assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader()) != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader"; boolean success = false; diff --git a/server/src/main/java/org/elasticsearch/search/ActiveReaders.java b/server/src/main/java/org/elasticsearch/search/ActiveReaders.java index bfa0080b8281e..5fea1f759b5a6 100644 --- a/server/src/main/java/org/elasticsearch/search/ActiveReaders.java +++ b/server/src/main/java/org/elasticsearch/search/ActiveReaders.java @@ -86,5 +86,4 @@ int relocationMapSize() { Collection values() { return activeReaders.values(); } - } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 27a06c2f662de..98f8d1ce661a4 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -161,6 +161,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.common.Strings.format; import static org.elasticsearch.core.TimeValue.timeValueHours; @@ -866,7 +867,7 @@ private IndexShard getShard(ShardSearchRequest request) { final ShardSearchContextId contextId = request.readerId(); if (contextId != null && sessionId.equals(contextId.getSessionId())) { final ReaderContext readerContext = activeReaders.get(contextId); - if (readerContext != null) { + if (readerContext != null && readerContext.isForcedExpired() == false) { return readerContext.indexShard(); } } @@ -1345,7 +1346,7 @@ final ReaderContext createAndPutReaderContext( } } - final ReaderContext createAndPutRelocatedPitContext( + public final ReaderContext createAndPutRelocatedPitContext( ShardSearchContextId contextId, IndexService indexService, IndexShard shard, @@ -1364,14 +1365,7 @@ final ReaderContext createAndPutRelocatedPitContext( final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); searchOperationListener.onNewReaderContext(finalReaderContext); readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); - activeReaders.putRelocatedReader(newKey, readerContext); - // ensure that if we race against afterIndexRemoved, we remove the context from the active list. - // this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout. - final Index index = readerContext.indexShard().shardId().getIndex(); - if (indicesService.hasIndex(index) == false) { - removeReaderContext(readerContext.id()); - throw new IndexNotFoundException(index); - } + putRelocatedReaderContext(newKey, readerContext); readerContext = null; return finalReaderContext; } else { @@ -1383,6 +1377,15 @@ final ReaderContext createAndPutRelocatedPitContext( } } + protected void putRelocatedReaderContext(Long mappingKey, ReaderContext context) { + activeReaders.putRelocatedReader(mappingKey, context); + final Index index = context.indexShard().shardId().getIndex(); + if (indicesService.hasIndex(index) == false) { + removeReaderContext(context.id()); + throw new IndexNotFoundException(index); + } + } + /** * Opens the reader context for given shardId. The newly opened reader context will be keep * until the {@code keepAlive} elapsed unless it is manually released. @@ -1407,6 +1410,11 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen searcherSupplier = null; // transfer ownership to reader context searchOperationListener.onNewReaderContext(readerContext); readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); + logger.debug( + "Opening new reader context [{}] on node [{}]", + readerContext.id(), + clusterService.state().nodes().getLocalNode() + ); putReaderContext(readerContext); readerContext = null; listener.onResponse(finalReaderContext.id()); @@ -1895,6 +1903,19 @@ public int getActiveContexts() { return this.activeReaders.size(); } + public long getActivePITContexts() { + return this.activeReaders.values().stream().filter(c -> c.singleSession() == false).filter(c -> c.scrollContext() == null).count(); + } + + public List getActivePITContexts(ShardId shardId) { + return this.activeReaders.values() + .stream() + .filter(c -> c.singleSession() == false) + .filter(c -> c.scrollContext() == null) + .filter(c -> c.indexShard().shardId().equals(shardId)) + .collect(Collectors.toList()); + } + /** * Returns the number of scroll contexts opened on the node */ @@ -1958,7 +1979,6 @@ protected void doRun() { freeReaderContext(context.id()); } } - } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java index c15b604b5b5fc..f29d922f6a4ac 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -53,6 +53,7 @@ public class ReaderContext implements Releasable { private final long startTimeInNano = System.nanoTime(); private Map context; + private boolean isForcedExpired = false; @SuppressWarnings("this-escape") public ReaderContext( @@ -116,6 +117,10 @@ private void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.accumulateAndGet(keepAlive, Math::max); } + public long keepAlive() { + return keepAlive.longValue(); + } + /** * Returns a releasable to indicate that the caller has stopped using this reader. * The time to live of the reader after usage can be extended using the provided @@ -134,10 +139,21 @@ public boolean isExpired() { if (refCounted.refCount() > 1) { return false; // being used by markAsUsed } + if (isForcedExpired) { + return true; + } final long elapsed = nowInMillis() - lastAccessTime.get(); return elapsed > keepAlive.get(); } + public boolean isForcedExpired() { + return isForcedExpired; + } + + public void forceExpired() { + isForcedExpired = true; + } + // BWC public ShardSearchRequest getShardSearchRequest(ShardSearchRequest other) { return Objects.requireNonNull(other, "ShardSearchRequest must be sent back in a fetch request"); diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index af43553c5b9ed..525e2a349c5cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -115,6 +115,13 @@ protected void putReaderContext(ReaderContext context) { super.putReaderContext(context); } + @Override + protected void putRelocatedReaderContext(Long mappingKey, ReaderContext context) { + onPutContext.accept(context); + addActiveContext(context); + super.putRelocatedReaderContext(mappingKey, context); + } + @Override protected ReaderContext removeReaderContext(ShardSearchContextId id) { final ReaderContext removed = super.removeReaderContext(id); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index 0ee6160149f81..0a6a81580de7b 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.junit.annotations.TestIssueLogging; import java.util.ArrayList; import java.util.Collection; @@ -44,6 +45,10 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; import static org.hamcrest.Matchers.equalTo; +@TestIssueLogging( + issueUrl = "https://github.com/elastic/elasticsearch/issues/129445", + value = "org.elasticsearch.action.search:DEBUG," + "org.elasticsearch.search:TRACE" +) public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase { @Override @@ -263,6 +268,7 @@ private void createTestIndex(String indexName, int docCount, int numShards) thro assertAcked(indicesAdmin().prepareDelete(indexName)); final int numberOfReplicas = between(0, 2); + logger.info("---> number of replicas: " + numberOfReplicas); final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build(); internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1);