Skip to content

Commit 7585f02

Browse files
authored
Tweak data node request index handling (#118542)
Small tweak around how data node requests handle no indices w.r.t. shards.
1 parent 1fb677f commit 7585f02

File tree

4 files changed

+326
-258
lines changed

4 files changed

+326
-258
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.elasticsearch.compute.data.BlockStreamInput;
2020
import org.elasticsearch.index.Index;
2121
import org.elasticsearch.index.shard.ShardId;
22+
import org.elasticsearch.logging.LogManager;
23+
import org.elasticsearch.logging.Logger;
2224
import org.elasticsearch.search.internal.AliasFilter;
2325
import org.elasticsearch.tasks.CancellableTask;
2426
import org.elasticsearch.tasks.Task;
@@ -32,17 +34,24 @@
3234

3335
import java.io.IOException;
3436
import java.util.Arrays;
37+
import java.util.Collections;
3538
import java.util.List;
3639
import java.util.Map;
3740
import java.util.Objects;
3841

42+
import static org.elasticsearch.core.Strings.format;
43+
import static org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField.NO_INDEX_PLACEHOLDER;
44+
import static org.elasticsearch.xpack.core.security.authz.IndicesAndAliasesResolverField.NO_INDICES_OR_ALIASES_ARRAY;
45+
3946
final class DataNodeRequest extends TransportRequest implements IndicesRequest.Replaceable {
47+
private static final Logger logger = LogManager.getLogger(DataNodeRequest.class);
48+
4049
private final String sessionId;
4150
private final Configuration configuration;
4251
private final String clusterAlias;
43-
private final List<ShardId> shardIds;
4452
private final Map<Index, AliasFilter> aliasFilters;
4553
private final PhysicalPlan plan;
54+
private List<ShardId> shardIds;
4655
private String[] indices;
4756
private final IndicesOptions indicesOptions;
4857

@@ -115,6 +124,10 @@ public String[] indices() {
115124
@Override
116125
public IndicesRequest indices(String... indices) {
117126
this.indices = indices;
127+
if (Arrays.equals(NO_INDICES_OR_ALIASES_ARRAY, indices) || Arrays.asList(indices).contains(NO_INDEX_PLACEHOLDER)) {
128+
logger.trace(() -> format("Indices empty after index resolution, also clearing shardIds %s", shardIds));
129+
this.shardIds = Collections.emptyList();
130+
}
118131
return this;
119132
}
120133

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ClusterRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
6565
protected ClusterComputeRequest createTestInstance() {
6666
var sessionId = randomAlphaOfLength(10);
6767
String query = randomQuery();
68-
PhysicalPlan physicalPlan = DataNodeRequestTests.mapAndMaybeOptimize(parse(query));
68+
PhysicalPlan physicalPlan = DataNodeRequestSerializationTests.mapAndMaybeOptimize(parse(query));
6969
OriginalIndices originalIndices = new OriginalIndices(
7070
generateRandomStringArray(10, 10, false, false),
7171
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.elasticsearch.action.support.IndicesOptions;
11+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.index.Index;
15+
import org.elasticsearch.index.IndexMode;
16+
import org.elasticsearch.index.query.TermQueryBuilder;
17+
import org.elasticsearch.index.shard.ShardId;
18+
import org.elasticsearch.search.SearchModule;
19+
import org.elasticsearch.search.internal.AliasFilter;
20+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
21+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
22+
import org.elasticsearch.xpack.esql.analysis.Analyzer;
23+
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
24+
import org.elasticsearch.xpack.esql.core.type.EsField;
25+
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
26+
import org.elasticsearch.xpack.esql.index.EsIndex;
27+
import org.elasticsearch.xpack.esql.index.IndexResolution;
28+
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
29+
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
30+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
31+
import org.elasticsearch.xpack.esql.optimizer.PhysicalPlanOptimizer;
32+
import org.elasticsearch.xpack.esql.parser.EsqlParser;
33+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
34+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
35+
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
36+
37+
import java.io.IOException;
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
43+
import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomTables;
44+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
45+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
46+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyPolicyResolution;
47+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
48+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
49+
50+
public class DataNodeRequestSerializationTests extends AbstractWireSerializingTestCase<DataNodeRequest> {
51+
52+
@Override
53+
protected Writeable.Reader<DataNodeRequest> instanceReader() {
54+
return DataNodeRequest::new;
55+
}
56+
57+
@Override
58+
protected NamedWriteableRegistry getNamedWriteableRegistry() {
59+
List<NamedWriteableRegistry.Entry> writeables = new ArrayList<>();
60+
writeables.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables());
61+
writeables.addAll(new EsqlPlugin().getNamedWriteables());
62+
return new NamedWriteableRegistry(writeables);
63+
}
64+
65+
@Override
66+
protected DataNodeRequest createTestInstance() {
67+
var sessionId = randomAlphaOfLength(10);
68+
String query = randomFrom("""
69+
from test
70+
| where round(emp_no) > 10
71+
| eval c = salary
72+
| stats x = avg(c)
73+
""", """
74+
from test
75+
| sort last_name
76+
| limit 10
77+
| where round(emp_no) > 10
78+
| eval c = first_name
79+
| stats x = avg(salary)
80+
""");
81+
List<ShardId> shardIds = randomList(1, 10, () -> new ShardId("index-" + between(1, 10), "n/a", between(1, 10)));
82+
PhysicalPlan physicalPlan = mapAndMaybeOptimize(parse(query));
83+
Map<Index, AliasFilter> aliasFilters = Map.of(
84+
new Index("concrete-index", "n/a"),
85+
AliasFilter.of(new TermQueryBuilder("id", "1"), "alias-1")
86+
);
87+
DataNodeRequest request = new DataNodeRequest(
88+
sessionId,
89+
randomConfiguration(query, randomTables()),
90+
randomAlphaOfLength(10),
91+
shardIds,
92+
aliasFilters,
93+
physicalPlan,
94+
generateRandomStringArray(10, 10, false, false),
95+
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
96+
);
97+
request.setParentTask(randomAlphaOfLength(10), randomNonNegativeLong());
98+
return request;
99+
}
100+
101+
@Override
102+
protected DataNodeRequest mutateInstance(DataNodeRequest in) throws IOException {
103+
return switch (between(0, 8)) {
104+
case 0 -> {
105+
var request = new DataNodeRequest(
106+
randomAlphaOfLength(20),
107+
in.configuration(),
108+
in.clusterAlias(),
109+
in.shardIds(),
110+
in.aliasFilters(),
111+
in.plan(),
112+
in.indices(),
113+
in.indicesOptions()
114+
);
115+
request.setParentTask(in.getParentTask());
116+
yield request;
117+
}
118+
case 1 -> {
119+
var request = new DataNodeRequest(
120+
in.sessionId(),
121+
randomConfiguration(),
122+
in.clusterAlias(),
123+
in.shardIds(),
124+
in.aliasFilters(),
125+
in.plan(),
126+
in.indices(),
127+
in.indicesOptions()
128+
);
129+
request.setParentTask(in.getParentTask());
130+
yield request;
131+
}
132+
case 2 -> {
133+
List<ShardId> shardIds = randomList(1, 10, () -> new ShardId("new-index-" + between(1, 10), "n/a", between(1, 10)));
134+
var request = new DataNodeRequest(
135+
in.sessionId(),
136+
in.configuration(),
137+
in.clusterAlias(),
138+
shardIds,
139+
in.aliasFilters(),
140+
in.plan(),
141+
in.indices(),
142+
in.indicesOptions()
143+
);
144+
request.setParentTask(in.getParentTask());
145+
yield request;
146+
}
147+
case 3 -> {
148+
String newQuery = randomFrom("""
149+
from test
150+
| where round(emp_no) > 100
151+
| eval c = salary
152+
| stats x = avg(c)
153+
""", """
154+
from test
155+
| sort last_name
156+
| limit 10
157+
| where round(emp_no) > 100
158+
| eval c = first_name
159+
| stats x = avg(salary)
160+
""");
161+
var request = new DataNodeRequest(
162+
in.sessionId(),
163+
in.configuration(),
164+
in.clusterAlias(),
165+
in.shardIds(),
166+
in.aliasFilters(),
167+
mapAndMaybeOptimize(parse(newQuery)),
168+
in.indices(),
169+
in.indicesOptions()
170+
);
171+
request.setParentTask(in.getParentTask());
172+
yield request;
173+
}
174+
case 4 -> {
175+
final Map<Index, AliasFilter> aliasFilters;
176+
if (randomBoolean()) {
177+
aliasFilters = Map.of();
178+
} else {
179+
aliasFilters = Map.of(new Index("concrete-index", "n/a"), AliasFilter.of(new TermQueryBuilder("id", "2"), "alias-2"));
180+
}
181+
var request = new DataNodeRequest(
182+
in.sessionId(),
183+
in.configuration(),
184+
in.clusterAlias(),
185+
in.shardIds(),
186+
aliasFilters,
187+
in.plan(),
188+
in.indices(),
189+
in.indicesOptions()
190+
);
191+
request.setParentTask(request.getParentTask());
192+
yield request;
193+
}
194+
case 5 -> {
195+
var request = new DataNodeRequest(
196+
in.sessionId(),
197+
in.configuration(),
198+
in.clusterAlias(),
199+
in.shardIds(),
200+
in.aliasFilters(),
201+
in.plan(),
202+
in.indices(),
203+
in.indicesOptions()
204+
);
205+
request.setParentTask(
206+
randomValueOtherThan(request.getParentTask().getNodeId(), () -> randomAlphaOfLength(10)),
207+
randomNonNegativeLong()
208+
);
209+
yield request;
210+
}
211+
case 6 -> {
212+
var clusterAlias = randomValueOtherThan(in.clusterAlias(), () -> randomAlphaOfLength(10));
213+
var request = new DataNodeRequest(
214+
in.sessionId(),
215+
in.configuration(),
216+
clusterAlias,
217+
in.shardIds(),
218+
in.aliasFilters(),
219+
in.plan(),
220+
in.indices(),
221+
in.indicesOptions()
222+
);
223+
request.setParentTask(request.getParentTask());
224+
yield request;
225+
}
226+
case 7 -> {
227+
var indices = randomValueOtherThan(in.indices(), () -> generateRandomStringArray(10, 10, false, false));
228+
var request = new DataNodeRequest(
229+
in.sessionId(),
230+
in.configuration(),
231+
in.clusterAlias(),
232+
in.shardIds(),
233+
in.aliasFilters(),
234+
in.plan(),
235+
indices,
236+
in.indicesOptions()
237+
);
238+
request.setParentTask(request.getParentTask());
239+
yield request;
240+
}
241+
case 8 -> {
242+
var indicesOptions = randomValueOtherThan(
243+
in.indicesOptions(),
244+
() -> IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
245+
);
246+
var request = new DataNodeRequest(
247+
in.sessionId(),
248+
in.configuration(),
249+
in.clusterAlias(),
250+
in.shardIds(),
251+
in.aliasFilters(),
252+
in.plan(),
253+
in.indices(),
254+
indicesOptions
255+
);
256+
request.setParentTask(request.getParentTask());
257+
yield request;
258+
}
259+
default -> throw new AssertionError("invalid value");
260+
};
261+
}
262+
263+
static LogicalPlan parse(String query) {
264+
Map<String, EsField> mapping = loadMapping("mapping-basic.json");
265+
EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD));
266+
IndexResolution getIndexResult = IndexResolution.valid(test);
267+
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(TEST_CFG));
268+
var analyzer = new Analyzer(
269+
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, emptyPolicyResolution()),
270+
TEST_VERIFIER
271+
);
272+
return logicalOptimizer.optimize(analyzer.analyze(new EsqlParser().createStatement(query)));
273+
}
274+
275+
static PhysicalPlan mapAndMaybeOptimize(LogicalPlan logicalPlan) {
276+
var physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(TEST_CFG));
277+
var mapper = new Mapper();
278+
var physical = mapper.map(logicalPlan);
279+
if (randomBoolean()) {
280+
physical = physicalPlanOptimizer.optimize(physical);
281+
}
282+
return physical;
283+
}
284+
285+
@Override
286+
protected List<String> filteredWarnings() {
287+
return withDefaultLimitWarning(super.filteredWarnings());
288+
}
289+
}

0 commit comments

Comments
 (0)