Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
73a174f
Improve PIT context relocation
cbuescher Sep 22, 2025
730efd4
Update docs/changelog/135231.yaml
cbuescher Sep 22, 2025
ad4219c
Add keepalive to second and third query
cbuescher Sep 25, 2025
fb526d4
Merge branch 'main' into improve-pit-relocation
cbuescher Sep 25, 2025
6bcb45f
Adressing review comments
cbuescher Sep 26, 2025
841c5b3
Rework failure usage in updating PIT id
cbuescher Oct 8, 2025
b76a959
[CI] Auto commit changes from spotless
Oct 8, 2025
237ea4c
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 9, 2025
9085194
Adding feature flag
cbuescher Oct 9, 2025
d7166cd
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 13, 2025
cbab660
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 16, 2025
df9d875
Addressing review comments
cbuescher Oct 17, 2025
613cf73
Fix AbstractSearchAsyncActionTests.testMaybeReEncode
cbuescher Oct 19, 2025
0528e1d
Add new ActiveReaders class encapsulating active readers and relocati…
cbuescher Oct 19, 2025
07fad27
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 21, 2025
76340ca
Minor change in node lookup
cbuescher Oct 21, 2025
bb0f265
Update PIT ids only when we saw SearchContextMissingException as reas…
cbuescher Oct 21, 2025
c3d1b65
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 21, 2025
033e445
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 22, 2025
5bf1853
minor logging change
cbuescher Oct 22, 2025
cc3c6c5
Modify updating PIT id conditions
cbuescher Oct 22, 2025
67872f6
Adressing review comments
cbuescher Oct 23, 2025
308c366
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 23, 2025
5011068
Correct condition for updating pit
cbuescher Oct 23, 2025
b64a366
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 27, 2025
8ea193d
Going back to always close contexts if nodeId changed
cbuescher Oct 27, 2025
df1b1c7
Close context when concurrently retrying same id twice
cbuescher Oct 27, 2025
9f5a1ad
small cleanup
cbuescher Oct 28, 2025
bee8400
more logger cleanups
cbuescher Oct 28, 2025
9fe9542
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 28, 2025
3dca6e0
Separate normal and relocated putContext code paths
cbuescher Oct 29, 2025
0a06acf
minor cleanups
cbuescher Oct 29, 2025
1a302ac
Exemt context from current session from relocation handling
cbuescher Oct 30, 2025
b9f6f3d
Add RetrySearchIntegTests test for removed PIT context
cbuescher Oct 30, 2025
7a7aeef
Adressing review comments
cbuescher Oct 31, 2025
0edfa87
Merge branch 'main' into improve-pit-relocation
cbuescher Oct 31, 2025
65747c4
Adapt changelog
cbuescher Nov 1, 2025
d2618d8
Merge branch 'main' into improve-pit-relocation
cbuescher Nov 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/135231.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135231
summary: Improve retrying PIT contexts for read-only indices
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ParsedScrollId;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -703,13 +705,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception {
} finally {
respFromProdIndex.decRef();
}
SearchPhaseExecutionException error = expectThrows(
SearchPhaseExecutionException.class,
client().prepareSearchScroll(respFromDemoIndexScrollId)
SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId);
SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder);
assertEquals(1, error.shardFailures().length);
ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId();
ShardSearchContextId shardSearchContextId = parsedScrollId.getContext()[0].getSearchContextId();
assertThat(
error.shardFailures()[0].getCause().getMessage(),
containsString("No search context found for id [" + shardSearchContextId + "]")
);
for (ShardSearchFailure shardSearchFailure : error.shardFailures()) {
assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]"));
}
client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.Maps;
Expand All @@ -30,8 +32,10 @@
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand All @@ -40,6 +44,8 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -53,6 +59,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.action.search.TransportClosePointInTimeAction.closeContexts;
import static org.elasticsearch.core.Strings.format;

/**
Expand Down Expand Up @@ -94,11 +101,13 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
private final AtomicBoolean requestCancelled = new AtomicBoolean();
private final int skippedCount;
private final TransportVersion mintransportVersion;
protected final SearchResponseMetrics searchResponseMetrics;
protected long phaseStartTimeInNanos;

// protected for tests
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
private final Supplier<DiscoveryNodes> discoveryNodes;

AbstractSearchAsyncAction(
String name,
Expand Down Expand Up @@ -153,6 +162,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.nodeIdToConnection = nodeIdToConnection;
this.concreteIndexBoosts = concreteIndexBoosts;
this.clusterStateVersion = clusterState.version();
this.mintransportVersion = clusterState.getMinTransportVersion();
this.discoveryNodes = clusterState::nodes;
this.aliasFilter = aliasFilter;
this.results = resultConsumer;
// register the release of the query consumer to free up the circuit breaker memory
Expand Down Expand Up @@ -422,6 +433,7 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar
onShardGroupFailure(shardIndex, shard, e);
}
if (lastShard == false) {
logger.debug("Retrying shard [{}] with target [{}]", shard.getShardId(), nextShard);
performPhaseOnShard(shardIndex, shardIt, nextShard);
} else {
// count down outstanding shards, we're done with this shard as there's no more copies to try
Expand Down Expand Up @@ -613,10 +625,87 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
}

protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be async so we wait for old contexts to close before responding. We've seen ML not wait for closing PIT and overload the cluster. But we can do this in a follow-up since this change is behind a feature flag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered waiting here but that would potentially slow down the search that is triggering the id update though, if I understand correctly. I assumed that the fire-and-forget approach should be okay. I'm still not 100% clear under which circumstance we'd need to close a PIT context that is now on a new node here because the reason we are opening a part of the PIT on a new node is likely caused by that context missing in its original location. So nothing to clean up? Or what am I missing here?

We've seen ML not wait for closing PIT and overload the cluster.

Do you remember the details for that scenario? Both for checking how this relates to the situation here and for my own learning to avoid such scenarios in future work.

Copy link
Member

@dnhatn dnhatn Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shard-level request can fail for various reasons, such as a circuit breaking exception on a data node. In such cases, we try another node, and if successful, we replace the old context with the new one here. Alternatively, we could avoid replacing the search context, which means we do not need to close it. We can maintain a list of failure types (SearchContextMissingException or ...) that indicate the search contexts are no longer available and should be replaced. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, I like the idea of conditioning the Id rewrite on failure types, however when looking at how to do this here in AbstractSearchAsyncAction I noticed that we clear earlier shard failures when another shard responds with a result. I'm not sure yet how to make this possible here or if there are better locations in the code to do so. Need to do some digging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a different solution to this than we talked about directly in AbstractSearchAsyncAction without having to modify anything in SearchPhaseResult. Let me know if this looks okay to you, otherwise I'll change.

var source = request.source();
return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false
? source.pointInTimeBuilder().getEncodedId()
: null;
SearchSourceBuilder source = request.source();
// only (re-)build a search context id if we are running a long-lived point-in-time request
if (source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false) {
if (SearchService.PIT_RELOCATION_FEATURE_FLAG.isEnabled()) {
// we want to change node ids in the PIT id if any shards and its PIT context have moved
return maybeReEncodeNodeIds(
source.pointInTimeBuilder(),
results.getAtomicArray().asList(),
namedWriteableRegistry,
mintransportVersion,
searchTransportService,
discoveryNodes.get(),
logger
);
} else {
return source.pointInTimeBuilder().getEncodedId();
}
} else {
return null;
}
}

static <Result extends SearchPhaseResult> BytesReference maybeReEncodeNodeIds(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I haven't looked into how we handle partial results with re-encoding. Have you considered this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you are referring to the functionality added with #111516? If I understand that PR correctly we will have SearchContextIdForNode entries in the PIT with a "null" node entry. I think in that case we won't add that shard to the shard iterators of any subsequent search, so we won't get a Result for that shard. That is one reason why I added copying PIT id entries for everything that has no Result from the old to the new encoded ID without change.
Is that what you mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked again at how we handle partial results. The way I see it, when opening the PIT we can tolerate partial results, i.e. failures from certain shards when they are not available. The result will be an entry in the PIT id that has a ShardId but a ShardContextIdForNode with empty node/contextId
The way I see it, we should not change any of these entries, which is what is already happening in this method.
In addition the re-encoding step here doesn't change any entries for shards that have failed in the last search. They shouldn't be included in the "results" list and for that reason not update the related part in the updated PIT id. In cases where these failures are temporary, subsequent searches with the updated ID will try to hit the "old" shard context locations, if any of these can be re-tried we will updated that part of the PIT in a later call.

PointInTimeBuilder originalPit,
List<Result> results,
NamedWriteableRegistry namedWriteableRegistry,
TransportVersion mintransportVersion,
SearchTransportService searchTransportService,
DiscoveryNodes nodes,
Logger logger
) {
SearchContextId original = originalPit.getSearchContextId(namedWriteableRegistry);
// only create the following two collections if we detect an id change
Map<ShardId, SearchContextIdForNode> updatedShardMap = null;
Collection<SearchContextIdForNode> contextsToClose = null;
logger.debug("checking search result shards to detect PIT node changes");
for (Result result : results) {
SearchShardTarget searchShardTarget = result.getSearchShardTarget();
ShardId shardId = searchShardTarget.getShardId();
SearchContextIdForNode originalShard = original.shards().get(shardId);
if (originalShard != null && originalShard.getSearchContextId() != null && originalShard.getSearchContextId().isRetryable()) {
// check if the node is different, if so we need to re-encode the PIT
String originalNode = originalShard.getNode();
if (originalNode != null && originalNode.equals(searchShardTarget.getNodeId()) == false) {
// the target node for this shard entry in the PIT has changed, we need to update it
if (updatedShardMap == null) {
// initialize the map with entries from old map to keep ids for shards that have not responded in this results
updatedShardMap = new HashMap<>(original.shards());
contextsToClose = new ArrayList<>();
}
SearchContextIdForNode updatedId = new SearchContextIdForNode(
searchShardTarget.getClusterAlias(),
searchShardTarget.getNodeId(),
result.getContextId()
);

logger.debug("changing node for PIT shard id from [{}] to [{}]", originalShard, updatedId);
updatedShardMap.put(shardId, updatedId);
contextsToClose.add(original.shards().get(shardId));

}
}
}
if (updatedShardMap != null) {
// we free all old contexts that have moved, just in case we have re-tried them elsewhere
// but they still exist in the old location
closeContexts(nodes, searchTransportService, contextsToClose, new ActionListener<Integer>() {
@Override
public void onResponse(Integer integer) {
// ignore
}

@Override
public void onFailure(Exception e) {
logger.trace("Failure while freeing old point in time contexts", e);
}
});
return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, ShardSearchFailure.EMPTY_ARRAY);
} else {
return originalPit.getEncodedId();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;

Expand Down Expand Up @@ -143,53 +137,4 @@ private void onFailedFreedContext(Throwable e, DiscoveryNode node) {
private void finish() {
listener.onResponse(new ClearScrollResponse(hasFailed.get() == false, freedSearchContexts.get()));
}

/**
* Closes the given context id and reports the number of freed contexts via the listener
*/
public static void closeContexts(
DiscoveryNodes nodes,
SearchTransportService searchTransportService,
Collection<SearchContextIdForNode> contextIds,
ActionListener<Integer> listener
) {
final Set<String> clusters = contextIds.stream()
.map(SearchContextIdForNode::getClusterAlias)
.filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false)
.collect(Collectors.toSet());
final ListenableFuture<BiFunction<String, String, DiscoveryNode>> lookupListener = new ListenableFuture<>();
if (clusters.isEmpty()) {
lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId));
} else {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
}
lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> {
final var successes = new AtomicInteger();
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
for (SearchContextIdForNode contextId : contextIds) {
if (contextId.getNode() == null) {
// the shard was missing when creating the PIT, ignore.
continue;
}
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node != null) {
try {
searchTransportService.sendFreeContext(
searchTransportService.getConnection(contextId.getClusterAlias(), node),
contextId.getSearchContextId(),
refs.acquireListener().map(r -> {
if (r.isFreed()) {
successes.incrementAndGet();
}
return null;
})
);
} catch (Exception e) {
// ignored
}
}
}
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.transport.RemoteClusterAware;
Expand All @@ -30,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -62,6 +62,26 @@ public static BytesReference encode(
Map<String, AliasFilter> aliasFilter,
TransportVersion version,
ShardSearchFailure[] shardFailures
) {
Map<ShardId, SearchContextIdForNode> shards = searchPhaseResults.stream()
.collect(
Collectors.toMap(
r -> r.getSearchShardTarget().getShardId(),
r -> new SearchContextIdForNode(
r.getSearchShardTarget().getClusterAlias(),
r.getSearchShardTarget().getNodeId(),
r.getContextId()
)
)
);
return encode(shards, aliasFilter, version, shardFailures);
}

static BytesReference encode(
Map<ShardId, SearchContextIdForNode> shards,
Map<String, AliasFilter> aliasFilter,
TransportVersion version,
ShardSearchFailure[] shardFailures
) {
assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.V_8_16_0)
: "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version ["
Expand All @@ -71,12 +91,12 @@ public static BytesReference encode(
out.setTransportVersion(version);
TransportVersion.writeVersion(version, out);
boolean allowNullContextId = out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0);
int shardSize = searchPhaseResults.size() + (allowNullContextId ? shardFailures.length : 0);
int shardSize = shards.size() + (allowNullContextId ? shardFailures.length : 0);
out.writeVInt(shardSize);
for (var searchResult : searchPhaseResults) {
final SearchShardTarget target = searchResult.getSearchShardTarget();
target.getShardId().writeTo(out);
new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out);
for (ShardId shardId : shards.keySet()) {
shardId.writeTo(out);
SearchContextIdForNode searchContextIdForNode = shards.get(shardId);
searchContextIdForNode.writeTo(out);
}
if (allowNullContextId) {
for (var failure : shardFailures) {
Expand Down Expand Up @@ -142,4 +162,23 @@ public String[] getActualIndices() {
}
return indices.toArray(String[]::new);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
SearchContextId that = (SearchContextId) o;
return Objects.equals(shards, that.shards)
&& Objects.equals(aliasFilter, that.aliasFilter)
&& Objects.equals(contextIds, that.contextIds);
}

@Override
public int hashCode() {
return Objects.hash(shards, aliasFilter, contextIds);
}

@Override
public String toString() {
return "SearchContextId{" + "shards=" + shards + ", aliasFilter=" + aliasFilter + '}';
}
}
Loading