Skip to content

Commit 01731ae

Browse files
authored
Stateless real-time mget (#96763)
The mget counterpart of #93976. Relates ES-5677
1 parent 4c6f7a4 commit 01731ae

File tree

5 files changed

+142
-35
lines changed

5 files changed

+142
-35
lines changed

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/40_routing.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ routing:
88
index:
99
number_of_shards: 5
1010
number_of_routing_shards: 5
11-
number_of_replicas: 0
11+
auto_expand_replicas: 0-1
1212

1313
- do:
1414
cluster.health:
@@ -51,11 +51,15 @@ requires routing:
5151
settings:
5252
index:
5353
number_of_shards: 5
54-
number_of_replicas: 0
54+
auto_expand_replicas: 0-1
5555
mappings:
5656
_routing:
5757
required: true
5858

59+
- do:
60+
cluster.health:
61+
wait_for_status: green
62+
5963
- do:
6064
index:
6165
index: test_1

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/mget/60_realtime_refresh.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
settings:
1010
index:
1111
refresh_interval: -1
12-
number_of_replicas: 0
12+
auto_expand_replicas: 0-1
1313

1414
- do:
1515
cluster.health:

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes
180180
private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener)
181181
throws IOException {
182182
ShardId shardId = indexShard.shardId();
183-
DiscoveryNode node = getCurrentNodeOfPrimary(shardId);
183+
var node = getCurrentNodeOfPrimary(clusterService.state(), shardId);
184184
if (request.refresh()) {
185185
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
186186
var refreshRequest = new BasicReplicationRequest(shardId);
@@ -226,8 +226,7 @@ private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexSh
226226
}
227227
}
228228

229-
private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) {
230-
var clusterState = clusterService.state();
229+
static DiscoveryNode getCurrentNodeOfPrimary(ClusterState clusterState, ShardId shardId) {
231230
var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId);
232231
if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) {
233232
throw new NoShardAvailableActionException(shardId, "primary shard is not active");

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 130 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,21 @@
99
package org.elasticsearch.action.get;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.ActionListenerResponseHandler;
1213
import org.elasticsearch.action.ActionRunnable;
1314
import org.elasticsearch.action.ActionType;
15+
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
1416
import org.elasticsearch.action.support.ActionFilters;
1517
import org.elasticsearch.action.support.TransportActions;
18+
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
1619
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
20+
import org.elasticsearch.client.internal.node.NodeClient;
1721
import org.elasticsearch.cluster.ClusterState;
1822
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
24+
import org.elasticsearch.cluster.routing.PlainShardIterator;
1925
import org.elasticsearch.cluster.routing.ShardIterator;
26+
import org.elasticsearch.cluster.routing.ShardRouting;
2027
import org.elasticsearch.cluster.service.ClusterService;
2128
import org.elasticsearch.common.inject.Inject;
2229
import org.elasticsearch.common.io.stream.Writeable;
@@ -26,20 +33,25 @@
2633
import org.elasticsearch.index.shard.ShardId;
2734
import org.elasticsearch.indices.ExecutorSelector;
2835
import org.elasticsearch.indices.IndicesService;
36+
import org.elasticsearch.logging.LogManager;
37+
import org.elasticsearch.logging.Logger;
2938
import org.elasticsearch.threadpool.ThreadPool;
3039
import org.elasticsearch.transport.TransportService;
3140

3241
import java.io.IOException;
3342

43+
import static org.elasticsearch.action.get.TransportGetAction.getCurrentNodeOfPrimary;
3444
import static org.elasticsearch.core.Strings.format;
3545

3646
public class TransportShardMultiGetAction extends TransportSingleShardAction<MultiGetShardRequest, MultiGetShardResponse> {
3747

3848
private static final String ACTION_NAME = MultiGetAction.NAME + "[shard]";
3949
public static final ActionType<MultiGetShardResponse> TYPE = new ActionType<>(ACTION_NAME, MultiGetShardResponse::new);
50+
private static final Logger logger = LogManager.getLogger(TransportShardMultiGetAction.class);
4051

4152
private final IndicesService indicesService;
4253
private final ExecutorSelector executorSelector;
54+
private final NodeClient client;
4355

4456
@Inject
4557
public TransportShardMultiGetAction(
@@ -49,7 +61,8 @@ public TransportShardMultiGetAction(
4961
ThreadPool threadPool,
5062
ActionFilters actionFilters,
5163
IndexNameExpressionResolver indexNameExpressionResolver,
52-
ExecutorSelector executorSelector
64+
ExecutorSelector executorSelector,
65+
NodeClient client
5366
) {
5467
super(
5568
ACTION_NAME,
@@ -63,6 +76,7 @@ public TransportShardMultiGetAction(
6376
);
6477
this.indicesService = indicesService;
6578
this.executorSelector = executorSelector;
79+
this.client = client;
6680
}
6781

6882
@Override
@@ -84,14 +98,23 @@ protected boolean resolveIndex(MultiGetShardRequest request) {
8498
protected ShardIterator shards(ClusterState state, InternalRequest request) {
8599
ShardIterator iterator = clusterService.operationRouting()
86100
.getShards(state, request.request().index(), request.request().shardId(), request.request().preference());
87-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator);
101+
if (iterator == null) {
102+
return null;
103+
}
104+
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
88105
}
89106

90107
@Override
91108
protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId, ActionListener<MultiGetShardResponse> listener)
92109
throws IOException {
93110
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
94111
IndexShard indexShard = indexService.getShard(shardId.id());
112+
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
113+
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
114+
return;
115+
}
116+
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
117+
: "A TransportShardMultiGetAction should always be handled by a search shard in Stateless";
95118
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
96119
asyncShardMultiGet(request, shardId, listener);
97120
} else {
@@ -107,35 +130,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
107130

108131
@Override
109132
protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) {
110-
var indexShard = getIndexShard(shardId);
111133
MultiGetShardResponse response = new MultiGetShardResponse();
112134
for (int i = 0; i < request.locations.size(); i++) {
113-
MultiGetRequest.Item item = request.items.get(i);
114-
try {
115-
GetResult getResult = indexShard.getService()
116-
.get(
117-
item.id(),
118-
item.storedFields(),
119-
request.realtime(),
120-
item.version(),
121-
item.versionType(),
122-
item.fetchSourceContext(),
123-
request.isForceSyntheticSource()
124-
);
125-
response.add(request.locations.get(i), new GetResponse(getResult));
126-
} catch (RuntimeException e) {
127-
if (TransportActions.isShardNotAvailableException(e)) {
128-
throw e;
129-
} else {
130-
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
131-
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e));
132-
}
133-
} catch (IOException e) {
134-
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
135-
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e));
136-
}
135+
getAndAddToResponse(shardId, i, request, response);
137136
}
138-
139137
return response;
140138
}
141139

@@ -151,6 +149,110 @@ protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
151149
}
152150
}
153151

152+
private void handleMultiGetOnUnpromotableShard(
153+
MultiGetShardRequest request,
154+
IndexShard indexShard,
155+
ActionListener<MultiGetShardResponse> listener
156+
) throws IOException {
157+
ShardId shardId = indexShard.shardId();
158+
var node = getCurrentNodeOfPrimary(clusterService.state(), shardId);
159+
if (request.refresh()) {
160+
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
161+
var refreshRequest = new BasicReplicationRequest(shardId);
162+
refreshRequest.setParentTask(request.getParentTask());
163+
client.executeLocally(
164+
TransportShardRefreshAction.TYPE,
165+
refreshRequest,
166+
listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l))
167+
);
168+
} else if (request.realtime()) {
169+
TransportShardMultiGetFomTranslogAction.Request mgetFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request(
170+
request,
171+
shardId
172+
);
173+
mgetFromTranslogRequest.setParentTask(request.getParentTask());
174+
transportService.sendRequest(
175+
node,
176+
TransportShardMultiGetFomTranslogAction.NAME,
177+
mgetFromTranslogRequest,
178+
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
179+
var responseHasMissingLocations = false;
180+
for (int i = 0; i < r.multiGetShardResponse().locations.size(); i++) {
181+
if (r.multiGetShardResponse().responses.get(i) == null && r.multiGetShardResponse().failures.get(i) == null) {
182+
responseHasMissingLocations = true;
183+
break;
184+
}
185+
}
186+
if (responseHasMissingLocations == false) {
187+
logger.debug("received result of all ids in real-time mget[shard] from the promotable shard.");
188+
l.onResponse(r.multiGetShardResponse());
189+
} else {
190+
logger.debug(
191+
"no result for some ids from the promotable shard (segment generation to wait for: {})",
192+
r.segmentGeneration()
193+
);
194+
if (r.segmentGeneration() == -1) {
195+
// Nothing to wait for (no previous unsafe generation), just handle the rest locally.
196+
ActionRunnable.supply(l, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)).run();
197+
} else {
198+
assert r.segmentGeneration() > -1L;
199+
indexShard.waitForSegmentGeneration(
200+
r.segmentGeneration(),
201+
listener.delegateFailureAndWrap(
202+
(ll, aLong) -> threadPool.executor(getExecutor(request, shardId))
203+
.execute(
204+
ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId))
205+
)
206+
)
207+
);
208+
}
209+
}
210+
}), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(request, shardId))
211+
);
212+
} else {
213+
// A non-real-time mget with no explicit refresh requested.
214+
super.asyncShardOperation(request, shardId, listener);
215+
}
216+
}
217+
218+
private MultiGetShardResponse handleLocalGets(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) {
219+
logger.trace("handling local gets for missing locations");
220+
for (int i = 0; i < response.locations.size(); i++) {
221+
if (response.responses.get(i) == null && response.failures.get(i) == null) {
222+
getAndAddToResponse(shardId, i, request, response);
223+
}
224+
}
225+
return response;
226+
}
227+
228+
private void getAndAddToResponse(ShardId shardId, int location, MultiGetShardRequest request, MultiGetShardResponse response) {
229+
var indexShard = getIndexShard(shardId);
230+
MultiGetRequest.Item item = request.items.get(location);
231+
try {
232+
GetResult getResult = indexShard.getService()
233+
.get(
234+
item.id(),
235+
item.storedFields(),
236+
request.realtime(),
237+
item.version(),
238+
item.versionType(),
239+
item.fetchSourceContext(),
240+
request.isForceSyntheticSource()
241+
);
242+
response.add(request.locations.get(location), new GetResponse(getResult));
243+
} catch (RuntimeException e) {
244+
if (TransportActions.isShardNotAvailableException(e)) {
245+
throw e;
246+
} else {
247+
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
248+
response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e));
249+
}
250+
} catch (IOException e) {
251+
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
252+
response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e));
253+
}
254+
}
255+
154256
private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, ActionListener<MultiGetShardResponse> listener)
155257
throws IOException {
156258
if (request.refresh() && request.realtime() == false) {

server/src/test/java/org/elasticsearch/action/get/TransportMultiGetActionTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ public TaskManager getTaskManager() {
165165
when(clusterService.localNode()).thenReturn(transportService.getLocalNode());
166166
when(clusterService.state()).thenReturn(clusterState);
167167
when(clusterService.operationRouting()).thenReturn(operationRouting);
168+
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
168169

169170
shardAction = new TransportShardMultiGetAction(
170171
clusterService,
@@ -173,7 +174,8 @@ public TaskManager getTaskManager() {
173174
threadPool,
174175
new ActionFilters(emptySet()),
175176
new Resolver(),
176-
EmptySystemIndices.INSTANCE.getExecutorSelector()
177+
EmptySystemIndices.INSTANCE.getExecutorSelector(),
178+
client
177179
) {
178180
@Override
179181
protected void doExecute(Task task, MultiGetShardRequest request, ActionListener<MultiGetShardResponse> listener) {}

0 commit comments

Comments
 (0)