Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/137675.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137675
summary: PIT context relocation work on main repo
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public final class SearchContextIdForNode implements Writeable {
* @param node The target node where the search context ID is defined, or {@code null} if the shard is missing or unavailable.
* @param searchContextId The {@link ShardSearchContextId}, or {@code null} if the shard is missing or unavailable.
*/
SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) {
public SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) {
this.node = node;
this.clusterAlias = clusterAlias;
this.searchContextId = searchContextId;
}

SearchContextIdForNode(StreamInput in) throws IOException {
public SearchContextIdForNode(StreamInput in) throws IOException {
boolean allowNull = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0);
this.node = allowNull ? in.readOptionalString() : in.readString();
this.clusterAlias = in.readOptionalString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2300,11 +2300,12 @@ static List<SearchShardIterator> getLocalShardsIteratorFromPointInTime(
ShardSearchContextId shardSearchContextId = perNode.getSearchContextId();
if (shardSearchContextId.isRetryable()) {
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
if (shard.isSearchable() && shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
}
}
}
logger.trace("PIT retryable - adding shard copy on nodes [{}]", targetNodes);
} catch (IndexNotFoundException | ShardNotFoundException e) {
// We can hit these exceptions if the index was deleted after creating PIT or the cluster state on
// this coordinating node is outdated. It's fine to ignore these extra "retry-able" target shards
Expand Down
18 changes: 15 additions & 3 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.CheckedSupplier;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -1015,6 +1017,17 @@ public SearcherSupplier acquireSearcherSupplier(
Function<Searcher, Searcher> wrapper,
SearcherScope scope,
SplitShardCountSummary splitShardCountSummary
) throws EngineException {
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
return acquireSearcherSupplier(wrapper, scope, splitShardCountSummary, referenceManager::acquire, referenceManager::release);
}

public SearcherSupplier acquireSearcherSupplier(
Function<Searcher, Searcher> wrapper,
SearcherScope scope,
SplitShardCountSummary splitShardCountSummary,
CheckedSupplier<ElasticsearchDirectoryReader, IOException> directorySupplier,
CheckedConsumer<ElasticsearchDirectoryReader, IOException> releaseAction
) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
Expand All @@ -1024,8 +1037,7 @@ public SearcherSupplier acquireSearcherSupplier(
}
Releasable releasable = store::decRef;
try {
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
ElasticsearchDirectoryReader acquire = directorySupplier.get();
final DirectoryReader maybeWrappedDirectoryReader;
if (scope == SearcherScope.EXTERNAL) {
maybeWrappedDirectoryReader = wrapExternalDirectoryReader(acquire, splitShardCountSummary);
Expand All @@ -1050,7 +1062,7 @@ public Searcher acquireSearcherInternal(String source) {
@Override
protected void doClose() {
try {
referenceManager.release(acquire);
releaseAction.accept(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ private void markSearcherAccessed() {
lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis());
}

private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
public Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader()) != null
: "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
boolean success = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,4 @@ int relocationMapSize() {
Collection<ReaderContext> values() {
return activeReaders.values();
}

}
42 changes: 31 additions & 11 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.common.Strings.format;
import static org.elasticsearch.core.TimeValue.timeValueHours;
Expand Down Expand Up @@ -866,7 +867,7 @@ private IndexShard getShard(ShardSearchRequest request) {
final ShardSearchContextId contextId = request.readerId();
if (contextId != null && sessionId.equals(contextId.getSessionId())) {
final ReaderContext readerContext = activeReaders.get(contextId);
if (readerContext != null) {
if (readerContext != null && readerContext.isForcedExpired() == false) {
return readerContext.indexShard();
}
}
Expand Down Expand Up @@ -1345,7 +1346,7 @@ final ReaderContext createAndPutReaderContext(
}
}

final ReaderContext createAndPutRelocatedPitContext(
public final ReaderContext createAndPutRelocatedPitContext(
ShardSearchContextId contextId,
IndexService indexService,
IndexShard shard,
Expand All @@ -1364,14 +1365,7 @@ final ReaderContext createAndPutRelocatedPitContext(
final SearchOperationListener searchOperationListener = shard.getSearchOperationListener();
searchOperationListener.onNewReaderContext(finalReaderContext);
readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext));
activeReaders.putRelocatedReader(newKey, readerContext);
// ensure that if we race against afterIndexRemoved, we remove the context from the active list.
// this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout.
final Index index = readerContext.indexShard().shardId().getIndex();
if (indicesService.hasIndex(index) == false) {
removeReaderContext(readerContext.id());
throw new IndexNotFoundException(index);
}
putRelocatedReaderContext(newKey, readerContext);
readerContext = null;
return finalReaderContext;
} else {
Expand All @@ -1383,6 +1377,15 @@ final ReaderContext createAndPutRelocatedPitContext(
}
}

protected void putRelocatedReaderContext(Long mappingKey, ReaderContext context) {
activeReaders.putRelocatedReader(mappingKey, context);
final Index index = context.indexShard().shardId().getIndex();
if (indicesService.hasIndex(index) == false) {
removeReaderContext(context.id());
throw new IndexNotFoundException(index);
}
}

/**
* Opens the reader context for given shardId. The newly opened reader context will be keep
* until the {@code keepAlive} elapsed unless it is manually released.
Expand All @@ -1407,6 +1410,11 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen
searcherSupplier = null; // transfer ownership to reader context
searchOperationListener.onNewReaderContext(readerContext);
readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext));
logger.debug(
"Opening new reader context [{}] on node [{}]",
readerContext.id(),
clusterService.state().nodes().getLocalNode()
);
putReaderContext(readerContext);
readerContext = null;
listener.onResponse(finalReaderContext.id());
Expand Down Expand Up @@ -1895,6 +1903,19 @@ public int getActiveContexts() {
return this.activeReaders.size();
}

public long getActivePITContexts() {
return this.activeReaders.values().stream().filter(c -> c.singleSession() == false).filter(c -> c.scrollContext() == null).count();
}

public List<ReaderContext> getActivePITContexts(ShardId shardId) {
return this.activeReaders.values()
.stream()
.filter(c -> c.singleSession() == false)
.filter(c -> c.scrollContext() == null)
.filter(c -> c.indexShard().shardId().equals(shardId))
.collect(Collectors.toList());
}

/**
* Returns the number of scroll contexts opened on the node
*/
Expand Down Expand Up @@ -1958,7 +1979,6 @@ protected void doRun() {
freeReaderContext(context.id());
}
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ReaderContext implements Releasable {
private final long startTimeInNano = System.nanoTime();

private Map<String, Object> context;
private boolean isForcedExpired = false;

@SuppressWarnings("this-escape")
public ReaderContext(
Expand Down Expand Up @@ -116,6 +117,10 @@ private void tryUpdateKeepAlive(long keepAlive) {
this.keepAlive.accumulateAndGet(keepAlive, Math::max);
}

public long keepAlive() {
return keepAlive.longValue();
}

/**
* Returns a releasable to indicate that the caller has stopped using this reader.
* The time to live of the reader after usage can be extended using the provided
Expand All @@ -134,10 +139,21 @@ public boolean isExpired() {
if (refCounted.refCount() > 1) {
return false; // being used by markAsUsed
}
if (isForcedExpired) {
return true;
}
final long elapsed = nowInMillis() - lastAccessTime.get();
return elapsed > keepAlive.get();
}

public boolean isForcedExpired() {
return isForcedExpired;
}

public void forceExpired() {
isForcedExpired = true;
}

// BWC
public ShardSearchRequest getShardSearchRequest(ShardSearchRequest other) {
return Objects.requireNonNull(other, "ShardSearchRequest must be sent back in a fetch request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ protected void putReaderContext(ReaderContext context) {
super.putReaderContext(context);
}

@Override
protected void putRelocatedReaderContext(Long mappingKey, ReaderContext context) {
onPutContext.accept(context);
addActiveContext(context);
super.putRelocatedReaderContext(mappingKey, context);
}

@Override
protected ReaderContext removeReaderContext(ShardSearchContextId id) {
final ReaderContext removed = super.removeReaderContext(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -44,6 +45,10 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.hamcrest.Matchers.equalTo;

@TestIssueLogging(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change related?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only here for improved visibility during testing, will remove before merge

issueUrl = "https://github.com/elastic/elasticsearch/issues/129445",
value = "org.elasticsearch.action.search:DEBUG," + "org.elasticsearch.search:TRACE"
)
public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
Expand Down Expand Up @@ -263,6 +268,7 @@ private void createTestIndex(String indexName, int docCount, int numShards) thro
assertAcked(indicesAdmin().prepareDelete(indexName));

final int numberOfReplicas = between(0, 2);
logger.info("---> number of replicas: " + numberOfReplicas);
final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build();
internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1);

Expand Down