Skip to content

Commit 1fb78e7

Browse files
committed
Additional changes on this PR
1 parent 2d519b3 commit 1fb78e7

File tree

7 files changed

+104
-20
lines changed

7 files changed

+104
-20
lines changed

server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,27 @@
88
*/
99
package org.elasticsearch.action.search;
1010

11+
import org.apache.logging.log4j.LogManager;
1112
import org.apache.logging.log4j.Logger;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.support.RefCountingRunnable;
1415
import org.elasticsearch.cluster.node.DiscoveryNode;
1516
import org.elasticsearch.cluster.node.DiscoveryNodes;
17+
import org.elasticsearch.cluster.project.ProjectResolver;
18+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
19+
import org.elasticsearch.cluster.routing.ShardRouting;
20+
import org.elasticsearch.cluster.service.ClusterService;
1621
import org.elasticsearch.common.Strings;
1722
import org.elasticsearch.common.util.concurrent.ListenableFuture;
23+
import org.elasticsearch.index.shard.ShardId;
1824
import org.elasticsearch.transport.Transport;
1925
import org.elasticsearch.transport.TransportResponse;
2026

2127
import java.util.ArrayList;
22-
import java.util.Collection;
2328
import java.util.Collections;
2429
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Map.Entry;
2532
import java.util.Set;
2633
import java.util.concurrent.atomic.AtomicBoolean;
2734
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +45,7 @@ public final class ClearScrollController implements Runnable {
3845
private final AtomicBoolean hasFailed = new AtomicBoolean(false);
3946
private final AtomicInteger freedSearchContexts = new AtomicInteger(0);
4047
private final Logger logger;
48+
private static final Logger staticLogger = LogManager.getLogger(ClearScrollController.class);
4149
private final Runnable runner;
4250

4351
ClearScrollController(
@@ -148,12 +156,15 @@ private void finish() {
148156
* Closes the given context id and reports the number of freed contexts via the listener
149157
*/
150158
public static void closeContexts(
151-
DiscoveryNodes nodes,
159+
ClusterService clusterService,
160+
ProjectResolver projectResolver,
152161
SearchTransportService searchTransportService,
153-
Collection<SearchContextIdForNode> contextIds,
162+
Map<ShardId, SearchContextIdForNode> shards,
154163
ActionListener<Integer> listener
155164
) {
156-
final Set<String> clusters = contextIds.stream()
165+
DiscoveryNodes nodes = clusterService.state().nodes();
166+
final Set<String> clusters = shards.values()
167+
.stream()
157168
.map(SearchContextIdForNode::getClusterAlias)
158169
.filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false)
159170
.collect(Collectors.toSet());
@@ -166,16 +177,34 @@ public static void closeContexts(
166177
lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> {
167178
final var successes = new AtomicInteger();
168179
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
169-
for (SearchContextIdForNode contextId : contextIds) {
180+
for (Entry<ShardId, SearchContextIdForNode> entry : shards.entrySet()) {
181+
var contextId = entry.getValue();
170182
if (contextId.getNode() == null) {
171183
// the shard was missing when creating the PIT, ignore.
172184
continue;
173185
}
174186
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
187+
188+
Set<DiscoveryNode> targetNodes;
175189
if (node != null) {
190+
targetNodes = Collections.singleton(node);
191+
} else {
192+
staticLogger.info("---> missing node when closing context: " + contextId.getNode());
193+
// TODO we won't be able to use this with remote clusters
194+
IndexShardRoutingTable indexShardRoutingTable = clusterService.state()
195+
.routingTable(projectResolver.getProjectId())
196+
.shardRoutingTable(entry.getKey());
197+
targetNodes = indexShardRoutingTable.assignedUnpromotableShards()
198+
.stream()
199+
.map(ShardRouting::currentNodeId)
200+
.map(nodeId -> nodeLookup.apply(contextId.getClusterAlias(), nodeId))
201+
.collect(Collectors.toSet());
202+
staticLogger.info("---> trying alternative nodes to close context: " + targetNodes);
203+
}
204+
for (DiscoveryNode targetNode : targetNodes) {
176205
try {
177206
searchTransportService.sendFreeContext(
178-
searchTransportService.getConnection(contextId.getClusterAlias(), node),
207+
searchTransportService.getConnection(contextId.getClusterAlias(), targetNode),
179208
contextId.getSearchContextId(),
180209
refs.acquireListener().map(r -> {
181210
if (r.isFreed()) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.search;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.common.bytes.BytesArray;
14+
import org.elasticsearch.common.bytes.BytesReference;
15+
import org.elasticsearch.common.util.Maps;
16+
import org.elasticsearch.index.shard.ShardId;
17+
18+
import java.io.IOException;
19+
import java.util.Base64;
20+
import java.util.Collections;
21+
import java.util.Map;
22+
23+
public class PITHelper {
24+
25+
public static SearchContextId decodePITId(String id) {
26+
return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id)));
27+
}
28+
29+
public static SearchContextId decodePITId(BytesReference id) {
30+
try (var in = id.streamInput()) {
31+
final TransportVersion version = TransportVersion.readVersion(in);
32+
in.setTransportVersion(version);
33+
final Map<ShardId, SearchContextIdForNode> shards = Collections.unmodifiableMap(
34+
in.readCollection(Maps::newHashMapWithExpectedSize, (i, map) -> map.put(new ShardId(in), new SearchContextIdForNode(in)))
35+
);
36+
return new SearchContextId(shards, Collections.emptyMap());
37+
} catch (IOException e) {
38+
assert false : e;
39+
throw new IllegalArgumentException(e);
40+
}
41+
}
42+
}

server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,45 +13,51 @@
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.support.ActionFilters;
1515
import org.elasticsearch.action.support.HandledTransportAction;
16+
import org.elasticsearch.cluster.project.ProjectResolver;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1819
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.index.shard.ShardId;
1921
import org.elasticsearch.injection.guice.Inject;
2022
import org.elasticsearch.tasks.Task;
2123
import org.elasticsearch.transport.TransportService;
2224

23-
import java.util.Collection;
25+
import java.util.Map;
2426

2527
public class TransportClosePointInTimeAction extends HandledTransportAction<ClosePointInTimeRequest, ClosePointInTimeResponse> {
2628

2729
public static final ActionType<ClosePointInTimeResponse> TYPE = new ActionType<>("indices:data/read/close_point_in_time");
2830
private final ClusterService clusterService;
2931
private final SearchTransportService searchTransportService;
3032
private final NamedWriteableRegistry namedWriteableRegistry;
33+
private final ProjectResolver projectResolver;
3134

3235
@Inject
3336
public TransportClosePointInTimeAction(
3437
TransportService transportService,
3538
ClusterService clusterService,
3639
ActionFilters actionFilters,
3740
SearchTransportService searchTransportService,
38-
NamedWriteableRegistry namedWriteableRegistry
41+
NamedWriteableRegistry namedWriteableRegistry,
42+
ProjectResolver projectResolver
3943
) {
4044
super(TYPE.name(), transportService, actionFilters, ClosePointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
4145
this.clusterService = clusterService;
4246
this.searchTransportService = searchTransportService;
4347
this.namedWriteableRegistry = namedWriteableRegistry;
48+
this.projectResolver = projectResolver;
4449
}
4550

4651
@Override
4752
protected void doExecute(Task task, ClosePointInTimeRequest request, ActionListener<ClosePointInTimeResponse> listener) {
4853
final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.getId());
49-
final Collection<SearchContextIdForNode> contextIds = searchContextId.shards().values();
54+
Map<ShardId, SearchContextIdForNode> shards = searchContextId.shards();
5055
ClearScrollController.closeContexts(
51-
clusterService.state().nodes(),
56+
clusterService,
57+
projectResolver,
5258
searchTransportService,
53-
contextIds,
54-
listener.map(freed -> new ClosePointInTimeResponse(freed == contextIds.size(), freed))
59+
shards,
60+
listener.map(freed -> new ClosePointInTimeResponse(freed == shards.size(), freed))
5561
);
5662
}
5763
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ private void executeRequest(
381381
);
382382
frozenIndexCheck(resolvedIndices);
383383
}
384+
logger.info("Executing search request on node [{}] with indices [{}]", clusterService.getNodeName(), resolvedIndices);
384385

385386
final SearchSourceBuilder source = original.source();
386387
if (shouldOpenPIT(source)) {
@@ -1271,6 +1272,8 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(
12711272
// Otherwise, we add the shard iterator without a target node, allowing a partial search failure to
12721273
// be thrown when a search phase attempts to access it.
12731274
targetNodes.add(perNode.getNode());
1275+
// TODO this looks like its on the cross-cluster search path, we will need to adapt the retry mechanism here as well I
1276+
// think
12741277
if (perNode.getSearchContextId().getSearcherId() != null) {
12751278
for (String node : group.allocatedNodes()) {
12761279
if (node.equals(perNode.getNode()) == false) {
@@ -1355,6 +1358,7 @@ private void executeSearch(
13551358
SearchResponse.Clusters clusters,
13561359
SearchPhaseProvider searchPhaseProvider
13571360
) {
1361+
logger.info("Executing search locally.");
13581362
if (searchRequest.allowPartialSearchResults() == null) {
13591363
// No user preference defined in search request - apply cluster service default
13601364
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
@@ -1944,7 +1948,8 @@ static List<SearchShardIterator> getLocalShardsIteratorFromPointInTime(
19441948
try {
19451949
final ShardIterator shards = OperationRouting.getShards(projectState.routingTable(), shardId);
19461950
// Prefer executing shard requests on nodes that are part of PIT first.
1947-
if (projectState.cluster().nodes().nodeExists(perNode.getNode())) {
1951+
boolean nodeExists = projectState.cluster().nodes().nodeExists(perNode.getNode());
1952+
if (nodeExists) {
19481953
targetNodes.add(perNode.getNode());
19491954
} else {
19501955
logger.debug(
@@ -1953,7 +1958,7 @@ static List<SearchShardIterator> getLocalShardsIteratorFromPointInTime(
19531958
perNode.getSearchContextId()
19541959
);
19551960
}
1956-
if (perNode.getSearchContextId().getSearcherId() != null) {
1961+
if (perNode.getSearchContextId().getSearcherId() != null || nodeExists == false) {
19571962
for (ShardRouting shard : shards) {
19581963
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
19591964
targetNodes.add(shard.currentNodeId());

server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ private void tryUpdateKeepAlive(long keepAlive) {
116116
this.keepAlive.accumulateAndGet(keepAlive, Math::max);
117117
}
118118

119+
public long keepAlive() {
120+
return keepAlive.longValue();
121+
}
122+
119123
/**
120124
* Returns a releasable to indicate that the caller has stopped using this reader.
121125
* The time to live of the reader after usage can be extended using the provided

server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,7 +1258,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException {
12581258
DiscoveryNode node = DiscoveryNodeUtils.create("node_g");
12591259
failureCause = new NodeClosedException(node);
12601260
failureCause = new NoShardAvailableActionException(new ShardId("_index_g", "_uuid_g", 6), "node_g", failureCause);
1261-
ShardSearchContextId shardSearchContextId = new ShardSearchContextId(UUIDs.randomBase64UUID(), 0, null);
1261+
String sessionId = UUIDs.randomBase64UUID();
12621262
ShardSearchFailure[] shardFailures = new ShardSearchFailure[] {
12631263
new ShardSearchFailure(
12641264
new ParsingException(0, 0, "Parsing g", null),
@@ -1268,7 +1268,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException {
12681268
new RepositoryException("repository_g", "Repo"),
12691269
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), null)
12701270
),
1271-
new ShardSearchFailure(new SearchContextMissingException(shardSearchContextId), null) };
1271+
new ShardSearchFailure(new SearchContextMissingException(new ShardSearchContextId(sessionId, 0L)), null) };
12721272
failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures);
12731273
expectedCause = new ElasticsearchException(
12741274
"Elasticsearch exception [type=node_closed_exception, " + "reason=node closed " + node + "]"
@@ -1291,10 +1291,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException {
12911291
);
12921292
expected.addSuppressed(
12931293
new ElasticsearchException(
1294-
"Elasticsearch exception [type=search_context_missing_exception, "
1295-
+ "reason=No search context found for id ["
1296-
+ shardSearchContextId
1297-
+ "]]"
1294+
"Elasticsearch exception [type=search_context_missing_exception, " + "reason=No search context found for id [0]]"
12981295
)
12991296
);
13001297
}

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1710,6 +1710,7 @@ public void testLocalShardIteratorFromPointInTime() {
17101710
if (context.getSearchContextId().getSearcherId() == null) {
17111711
assertThat(shardIterator.getTargetNodeIds(), hasSize(1));
17121712
} else {
1713+
// TODO this branch seems never executed by this test. Needs investigation.
17131714
final List<String> targetNodes = clusterState.routingTable(project)
17141715
.index(indexMetadata.getIndex())
17151716
.shard(id)

0 commit comments

Comments
 (0)