Skip to content

Commit 68280cf

Browse files
committed
Adding re-writing of PIT id
1 parent cc60f50 commit 68280cf

File tree

7 files changed

+42
-10
lines changed

7 files changed

+42
-10
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.util.SetOnce;
1414
import org.elasticsearch.ElasticsearchException;
1515
import org.elasticsearch.ExceptionsHelper;
16+
import org.elasticsearch.TransportVersion;
1617
import org.elasticsearch.action.ActionListener;
1718
import org.elasticsearch.action.NoShardAvailableActionException;
1819
import org.elasticsearch.action.OriginalIndices;
@@ -93,6 +94,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9394
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
9495
private final AtomicBoolean requestCancelled = new AtomicBoolean();
9596
private final int skippedCount;
97+
private final TransportVersion clusterMinTransportVersion;
9698

9799
// protected for tests
98100
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
@@ -149,6 +151,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
149151
this.nodeIdToConnection = nodeIdToConnection;
150152
this.concreteIndexBoosts = concreteIndexBoosts;
151153
this.clusterStateVersion = clusterState.version();
154+
this.clusterMinTransportVersion = clusterState.getMinTransportVersion();
152155
this.aliasFilter = aliasFilter;
153156
this.results = resultConsumer;
154157
// register the release of the query consumer to free up the circuit breaker memory
@@ -609,7 +612,7 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
609612
protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
610613
var source = request.source();
611614
return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false
612-
? source.pointInTimeBuilder().getEncodedId()
615+
? SearchContextId.encode(results.getAtomicArray().asList(), aliasFilter, clusterMinTransportVersion, failures)
613616
: null;
614617
}
615618

server/src/main/java/org/elasticsearch/action/search/PITHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public static SearchContextId decodePITId(String id) throws IOException {
2626
return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id)));
2727
}
2828

29-
public static SearchContextId decodePITId(BytesReference id) throws IOException {
29+
public static SearchContextId decodePITId(BytesReference id) {
3030
try (var in = id.streamInput()) {
3131
final TransportVersion version = TransportVersion.readVersion(in);
3232
in.setTransportVersion(version);

server/src/main/java/org/elasticsearch/action/search/SearchContextId.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Collections;
3131
import java.util.List;
3232
import java.util.Map;
33+
import java.util.Objects;
3334
import java.util.Set;
3435
import java.util.TreeSet;
3536
import java.util.stream.Collectors;
@@ -39,7 +40,7 @@ public final class SearchContextId {
3940
private final Map<String, AliasFilter> aliasFilter;
4041
private final transient Set<ShardSearchContextId> contextIds;
4142

42-
SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
43+
public SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
4344
this.shards = shards;
4445
this.aliasFilter = aliasFilter;
4546
this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet());
@@ -57,8 +58,8 @@ public boolean contains(ShardSearchContextId contextId) {
5758
return contextIds.contains(contextId);
5859
}
5960

60-
public static BytesReference encode(
61-
List<SearchPhaseResult> searchPhaseResults,
61+
public static <SPR extends SearchPhaseResult> BytesReference encode(
62+
List<SPR> searchPhaseResults,
6263
Map<String, AliasFilter> aliasFilter,
6364
TransportVersion version,
6465
ShardSearchFailure[] shardFailures
@@ -142,4 +143,17 @@ public String[] getActualIndices() {
142143
}
143144
return indices.toArray(String[]::new);
144145
}
146+
147+
@Override
148+
public boolean equals(Object o) {
149+
if (o == null || getClass() != o.getClass()) return false;
150+
SearchContextId that = (SearchContextId) o;
151+
return Objects.equals(shards, that.shards) && Objects.equals(aliasFilter,
152+
that.aliasFilter) && Objects.equals(contextIds, that.contextIds);
153+
}
154+
155+
@Override
156+
public int hashCode() {
157+
return Objects.hash(shards, aliasFilter, contextIds);
158+
}
145159
}

server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.search.internal.ShardSearchContextId;
1818

1919
import java.io.IOException;
20+
import java.util.Objects;
2021

2122
public final class SearchContextIdForNode implements Writeable {
2223
private final String node;
@@ -30,7 +31,7 @@ public final class SearchContextIdForNode implements Writeable {
3031
* @param node The target node where the search context ID is defined, or {@code null} if the shard is missing or unavailable.
3132
* @param searchContextId The {@link ShardSearchContextId}, or {@code null} if the shard is missing or unavailable.
3233
*/
33-
SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) {
34+
public SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) {
3435
this.node = node;
3536
this.clusterAlias = clusterAlias;
3637
this.searchContextId = searchContextId;
@@ -103,4 +104,18 @@ public String toString() {
103104
+ '\''
104105
+ '}';
105106
}
107+
108+
@Override
109+
public boolean equals(Object o) {
110+
if (o == null || getClass() != o.getClass()) return false;
111+
SearchContextIdForNode that = (SearchContextIdForNode) o;
112+
return Objects.equals(node, that.node)
113+
&& Objects.equals(searchContextId, that.searchContextId)
114+
&& Objects.equals(clusterAlias, that.clusterAlias);
115+
}
116+
117+
@Override
118+
public int hashCode() {
119+
return Objects.hash(node, searchContextId, clusterAlias);
120+
}
106121
}

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ protected void innerOnResponse(SearchPhaseResult searchPhaseResult) {
709709
} catch (Exception e) {
710710
setFailure(state, dataNodeLocalIdx, e);
711711
} finally {
712-
doneFuture.onResponse(null);
712+
doneFuture.onResponse(null);
713713
}
714714
}
715715

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,7 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(
12331233
// Otherwise, we add the shard iterator without a target node, allowing a partial search failure to
12341234
// be thrown when a search phase attempts to access it.
12351235
targetNodes.add(perNode.getNode());
1236+
// TODO we will need to adapt something here as well I think
12361237
if (perNode.getSearchContextId().getSearcherId() != null) {
12371238
for (String node : group.allocatedNodes()) {
12381239
if (node.equals(perNode.getNode()) == false) {

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,6 @@ public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings
541541

542542
protected void putReaderContext(ReaderContext context) {
543543
final ReaderContext previous = activeReaders.put(context.readerContextId(), context);
544-
logger.info("--> adding reader context [{}], current [{}]", context.readerContextId(), activeReaders.size());
545544
assert previous == null;
546545
// ensure that if we race against afterIndexRemoved, we remove the context from the active list.
547546
// this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout.
@@ -556,8 +555,7 @@ protected ReaderContext removeReaderContext(ReaderContextId id) {
556555
if (logger.isTraceEnabled()) {
557556
logger.trace("removing reader context [{}]", id);
558557
}
559-
ReaderContext remove = activeReaders.remove(id);
560-
return remove;
558+
return activeReaders.remove(id);
561559
}
562560

563561
@Override
@@ -1261,6 +1259,7 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
12611259
if (searcherId == null) {
12621260
throw e;
12631261
}
1262+
// TODO this retries on context with same searcher id, currently offered in FrozenEngine and ReadonlyEngine
12641263
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
12651264
final IndexShard shard = indexService.getShard(request.shardId().id());
12661265
final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();

0 commit comments

Comments
 (0)