Skip to content

Commit e3b6a4b

Browse files
committed
Term vector API on stateless search nodes
Up to now, the (m)term vector API real-time requests were being executed on the indexing nodes of serverless. However, we would like to execute them on the search nodes, similar to real-time (m)GETs. This PR does that, by introducing an intermediate action for search nodes to become up-to-date with an indexing node in respect to the term vector API request, before executing it locally on the search node. The new intermediate action searches for any of the requested document IDs in the shard's LiveVersionMap and if it finds any of them there, it means the search nodes need to be refreshed in order to capture the new document IDs before searching for them. Relates ES-12112
1 parent 1e6473a commit e3b6a4b

File tree

11 files changed

+315
-45
lines changed

11 files changed

+315
-45
lines changed

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mtermvectors/30_routing.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@ routing:
77
settings:
88
index:
99
number_of_shards: 5
10-
number_of_replicas: 0
11-
12-
- do:
13-
cluster.health:
14-
wait_for_status: green
1510

1611
- do:
1712
index:
@@ -52,7 +47,6 @@ requires routing:
5247
settings:
5348
index:
5449
number_of_shards: 5
55-
number_of_replicas: 0
5650
mappings:
5751
_routing:
5852
required: true

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/20_issue7121.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,13 @@
77
index:
88
translog.flush_threshold_size: "512MB"
99
number_of_shards: 1
10-
number_of_replicas: 0
1110
refresh_interval: -1
1211
mappings:
1312
properties:
1413
text:
1514
type : "text"
1615
term_vector : "with_positions_offsets"
1716

18-
- do:
19-
cluster.health:
20-
wait_for_status: green
21-
2217
- do:
2318
index:
2419
index: testidx

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/termvectors/30_realtime.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@
77
settings:
88
index:
99
refresh_interval: -1
10-
number_of_replicas: 0
11-
12-
- do:
13-
cluster.health:
14-
wait_for_status: green
1510

1611
- do:
1712
index:

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@
207207
import org.elasticsearch.action.synonyms.TransportPutSynonymsAction;
208208
import org.elasticsearch.action.termvectors.MultiTermVectorsAction;
209209
import org.elasticsearch.action.termvectors.TermVectorsAction;
210+
import org.elasticsearch.action.termvectors.TransportEnsureDocsSearchableAction;
210211
import org.elasticsearch.action.termvectors.TransportMultiTermVectorsAction;
211212
import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction;
212213
import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
@@ -720,6 +721,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
720721

721722
actions.register(TransportIndexAction.TYPE, TransportIndexAction.class);
722723
actions.register(TransportGetAction.TYPE, TransportGetAction.class);
724+
actions.register(TransportEnsureDocsSearchableAction.TYPE, TransportEnsureDocsSearchableAction.class);
723725
actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class);
724726
actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class);
725727
actions.register(TransportShardMultiTermsVectorAction.TYPE, TransportShardMultiTermsVectorAction.class);
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
10+
*/
11+
12+
package org.elasticsearch.action.termvectors;
13+
14+
import org.apache.logging.log4j.LogManager;
15+
import org.apache.logging.log4j.Logger;
16+
import org.elasticsearch.ElasticsearchException;
17+
import org.elasticsearch.action.ActionListener;
18+
import org.elasticsearch.action.ActionRequestValidationException;
19+
import org.elasticsearch.action.ActionResponse;
20+
import org.elasticsearch.action.ActionType;
21+
import org.elasticsearch.action.NoShardAvailableActionException;
22+
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
23+
import org.elasticsearch.action.support.ActionFilters;
24+
import org.elasticsearch.action.support.ActiveShardCount;
25+
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
26+
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
27+
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
28+
import org.elasticsearch.client.internal.node.NodeClient;
29+
import org.elasticsearch.cluster.ProjectState;
30+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
31+
import org.elasticsearch.cluster.node.DiscoveryNode;
32+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
33+
import org.elasticsearch.cluster.project.ProjectResolver;
34+
import org.elasticsearch.cluster.routing.ShardIterator;
35+
import org.elasticsearch.cluster.service.ClusterService;
36+
import org.elasticsearch.common.io.stream.StreamInput;
37+
import org.elasticsearch.common.io.stream.StreamOutput;
38+
import org.elasticsearch.common.io.stream.Writeable;
39+
import org.elasticsearch.index.IndexService;
40+
import org.elasticsearch.index.mapper.Uid;
41+
import org.elasticsearch.index.shard.IndexShard;
42+
import org.elasticsearch.index.shard.ShardId;
43+
import org.elasticsearch.indices.IndicesService;
44+
import org.elasticsearch.injection.guice.Inject;
45+
import org.elasticsearch.threadpool.ThreadPool;
46+
import org.elasticsearch.transport.TransportService;
47+
48+
import java.io.IOException;
49+
import java.util.List;
50+
51+
/**
52+
* This action is used in serverless to ensure that documents are searchable on the search tier before processing
53+
* term vector requests. It is an intermediate action that is executed on the indexing node and responds
54+
* with a no-op (the search node can proceed to process the term vector request). The action may trigger an external refresh
55+
* to ensure the search shards are up to date before returning the no-op.
56+
*/
57+
public class TransportEnsureDocsSearchableAction extends TransportSingleShardAction<
58+
TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest,
59+
ActionResponse.Empty> {
60+
61+
private static final Logger logger = LogManager.getLogger(TransportEnsureDocsSearchableAction.class);
62+
private final NodeClient client;
63+
private final IndicesService indicesService;
64+
65+
private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "/eds";
66+
public static final ActionType<ActionResponse.Empty> TYPE = new ActionType<>(ACTION_NAME);
67+
68+
@Inject
69+
public TransportEnsureDocsSearchableAction(
70+
ClusterService clusterService,
71+
NodeClient client,
72+
TransportService transportService,
73+
IndicesService indicesService,
74+
ThreadPool threadPool,
75+
ActionFilters actionFilters,
76+
ProjectResolver projectResolver,
77+
IndexNameExpressionResolver indexNameExpressionResolver
78+
) {
79+
super(
80+
ACTION_NAME,
81+
threadPool,
82+
clusterService,
83+
transportService,
84+
actionFilters,
85+
projectResolver,
86+
indexNameExpressionResolver,
87+
TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest::new,
88+
threadPool.executor(ThreadPool.Names.GET)
89+
);
90+
this.client = client;
91+
this.indicesService = indicesService;
92+
}
93+
94+
@Override
95+
protected boolean isSubAction() {
96+
return true;
97+
}
98+
99+
@Override
100+
protected Writeable.Reader<ActionResponse.Empty> getResponseReader() {
101+
return in -> ActionResponse.Empty.INSTANCE;
102+
}
103+
104+
@Override
105+
protected boolean resolveIndex(TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest request) {
106+
return false;
107+
}
108+
109+
@Override
110+
protected ShardIterator shards(ProjectState state, InternalRequest request) {
111+
assert DiscoveryNode.isStateless(clusterService.getSettings()) : ACTION_NAME + " should only be used in stateless";
112+
final var primaryShard = state.routingTable()
113+
.shardRoutingTable(request.concreteIndex(), request.request().shardId())
114+
.primaryShard();
115+
if (primaryShard.active() == false) {
116+
throw new NoShardAvailableActionException(primaryShard.shardId(), "primary shard is not active");
117+
}
118+
DiscoveryNode node = state.cluster().nodes().get(primaryShard.currentNodeId());
119+
assert node != null;
120+
return new ShardIterator(primaryShard.shardId(), List.of(primaryShard));
121+
}
122+
123+
@Override
124+
protected void asyncShardOperation(
125+
TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest request,
126+
ShardId shardId,
127+
ActionListener<ActionResponse.Empty> listener
128+
) throws IOException {
129+
assert DiscoveryNode.isStateless(clusterService.getSettings()) : ACTION_NAME + " should only be used in stateless";
130+
assert DiscoveryNode.hasRole(clusterService.getSettings(), DiscoveryNodeRole.INDEX_ROLE)
131+
: ACTION_NAME + " should only be executed on a stateless indexing node";
132+
logger.debug("received request with {} docs", request.docIds.length);
133+
getExecutor(shardId).execute(() -> ActionListener.run(listener, l -> {
134+
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
135+
final IndexShard indexShard = indexService.getShard(shardId.id());
136+
boolean docsFoundInLiveVersionMap = false;
137+
for (String docId : request.docIds()) {
138+
final var docUid = Uid.encodeId(docId);
139+
// There are a couple of limited cases where we may unnecessarily trigger an additional external refresh:
140+
// 1. Asking whether a document is in the live version map may incur a stateless refresh in itself.
141+
// 2. The document may be in the live version map archive, even though it has been refreshed to the search shards. The
142+
// document will be removed from the archive in a subsequent stateless refresh.
143+
// We prefer simplicity to complexity (trying to avoid the unnecessary stateless refresh) for the above limited cases.
144+
boolean docInLiveVersionMap = indexShard.withEngine(engine -> engine.isDocumentInLiveVersionMap(docUid));
145+
if (docInLiveVersionMap) {
146+
logger.debug("doc id [{}] (uid [{}]) found in live version map of index shard [{}]", docId, docUid, shardId);
147+
docsFoundInLiveVersionMap = true;
148+
break;
149+
}
150+
}
151+
152+
if (docsFoundInLiveVersionMap) {
153+
logger.debug("refreshing index shard [{}] due to mtv_eds", shardId);
154+
BasicReplicationRequest refreshRequest = new BasicReplicationRequest(shardId);
155+
refreshRequest.waitForActiveShards(ActiveShardCount.NONE);
156+
client.executeLocally(TransportShardRefreshAction.TYPE, refreshRequest, l.delegateFailureAndWrap((ll, r) -> {
157+
// TransportShardRefreshAction.UnpromotableReplicasRefreshProxy.onPrimaryOperationComplete() returns a
158+
// single shard failure if unpromotable(s) failed, with a combined list of (supressed) exceptions.
159+
if (r.getShardInfo().getFailed() > 0) {
160+
assert r.getShardInfo().getFailed() == 1
161+
: "expected a single shard failure, got " + r.getShardInfo().getFailed() + " failures";
162+
throw new ElasticsearchException("failed to refresh [{}]", r.getShardInfo().getFailures()[0].getCause(), shardId);
163+
}
164+
logger.debug("refreshed index shard [{}] due to mtv_eds", shardId);
165+
ll.onResponse(ActionResponse.Empty.INSTANCE);
166+
}));
167+
} else {
168+
// Notice that there cannot be a race between the document(s) being evicted from the live version map due to an
169+
// ongoing refresh and before the search shards being updated with the new commit, because the documents are
170+
// guaranteed to be the in the live version map archive until search shards are updated with the new commit.
171+
// Thus, we can safely respond immediately as a no-op.
172+
logger.debug("mts_eds does not require refresh of index shard [{}]", shardId);
173+
l.onResponse(ActionResponse.Empty.INSTANCE);
174+
}
175+
}));
176+
}
177+
178+
@Override
179+
protected ActionResponse.Empty shardOperation(
180+
TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest request,
181+
ShardId shardId
182+
) {
183+
throw new UnsupportedOperationException();
184+
}
185+
186+
public static final class EnsureDocsSearchableRequest extends SingleShardRequest<EnsureDocsSearchableRequest> {
187+
188+
private int shardId; // this is not transmitted over the wire
189+
private String[] docIds;
190+
191+
public EnsureDocsSearchableRequest() {}
192+
193+
EnsureDocsSearchableRequest(StreamInput in) throws IOException {
194+
super(in);
195+
docIds = in.readStringArray();
196+
}
197+
198+
@Override
199+
public ActionRequestValidationException validate() {
200+
return super.validateNonNullIndex();
201+
}
202+
203+
public EnsureDocsSearchableRequest(String index, int shardId, String[] docIds) {
204+
super(index);
205+
this.shardId = shardId;
206+
this.docIds = docIds;
207+
}
208+
209+
public int shardId() {
210+
return this.shardId;
211+
}
212+
213+
public String[] docIds() {
214+
return docIds;
215+
}
216+
217+
@Override
218+
public void writeTo(StreamOutput out) throws IOException {
219+
super.writeTo(out);
220+
out.writeStringArray(docIds);
221+
}
222+
}
223+
224+
}

server/src/main/java/org/elasticsearch/action/termvectors/TransportShardMultiTermsVectorAction.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,21 @@
55
* Public License v 1"; you may not use this file except in compliance with, at
66
* your election, the "Elastic License 2.0", the "GNU Affero General Public
77
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*
9+
* This file was contributed to by generative AI
810
*/
911

1012
package org.elasticsearch.action.termvectors;
1113

14+
import org.elasticsearch.action.ActionListener;
1215
import org.elasticsearch.action.ActionType;
1316
import org.elasticsearch.action.support.ActionFilters;
1417
import org.elasticsearch.action.support.TransportActions;
1518
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
19+
import org.elasticsearch.client.internal.node.NodeClient;
1620
import org.elasticsearch.cluster.ProjectState;
1721
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
1823
import org.elasticsearch.cluster.project.ProjectResolver;
1924
import org.elasticsearch.cluster.routing.ShardIterator;
2025
import org.elasticsearch.cluster.service.ClusterService;
@@ -28,12 +33,15 @@
2833
import org.elasticsearch.threadpool.ThreadPool;
2934
import org.elasticsearch.transport.TransportService;
3035

36+
import java.io.IOException;
37+
3138
import static org.elasticsearch.core.Strings.format;
3239

3340
public class TransportShardMultiTermsVectorAction extends TransportSingleShardAction<
3441
MultiTermVectorsShardRequest,
3542
MultiTermVectorsShardResponse> {
3643

44+
private final NodeClient client;
3745
private final IndicesService indicesService;
3846

3947
private static final String ACTION_NAME = MultiTermVectorsAction.NAME + "[shard]";
@@ -42,6 +50,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
4250
@Inject
4351
public TransportShardMultiTermsVectorAction(
4452
ClusterService clusterService,
53+
NodeClient client,
4554
TransportService transportService,
4655
IndicesService indicesService,
4756
ThreadPool threadPool,
@@ -60,6 +69,7 @@ public TransportShardMultiTermsVectorAction(
6069
MultiTermVectorsShardRequest::new,
6170
threadPool.executor(ThreadPool.Names.GET)
6271
);
72+
this.client = client;
6373
this.indicesService = indicesService;
6474
}
6575

@@ -80,9 +90,43 @@ protected boolean resolveIndex(MultiTermVectorsShardRequest request) {
8090

8191
@Override
8292
protected ShardIterator shards(ProjectState project, InternalRequest request) {
83-
ShardIterator shards = clusterService.operationRouting()
93+
ShardIterator iterator = clusterService.operationRouting()
8494
.getShards(project, request.concreteIndex(), request.request().shardId(), request.request().preference());
85-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(shards);
95+
if (iterator == null) {
96+
return null;
97+
}
98+
return ShardIterator.allSearchableShards(iterator);
99+
}
100+
101+
@Override
102+
protected void asyncShardOperation(
103+
MultiTermVectorsShardRequest request,
104+
ShardId shardId,
105+
ActionListener<MultiTermVectorsShardResponse> listener
106+
) throws IOException {
107+
if (DiscoveryNode.isStateless(clusterService.getSettings())) {
108+
final String[] realTimeIds = request.requests.stream()
109+
.filter(r -> r.realtime())
110+
.map(TermVectorsRequest::id)
111+
.toArray(String[]::new);
112+
if (realTimeIds.length > 0) {
113+
final var ensureDocsSearchableRequest = new TransportEnsureDocsSearchableAction.EnsureDocsSearchableRequest(
114+
request.index(),
115+
shardId.id(),
116+
realTimeIds
117+
);
118+
ensureDocsSearchableRequest.setParentTask(clusterService.localNode().getId(), request.getParentTask().getId());
119+
client.executeLocally(
120+
TransportEnsureDocsSearchableAction.TYPE,
121+
ensureDocsSearchableRequest,
122+
listener.delegateFailureAndWrap((l, r) -> super.asyncShardOperation(request, shardId, l))
123+
);
124+
} else {
125+
super.asyncShardOperation(request, shardId, listener);
126+
}
127+
} else {
128+
super.asyncShardOperation(request, shardId, listener);
129+
}
86130
}
87131

88132
@Override

0 commit comments

Comments
 (0)