Skip to content

Commit 685998f

Browse files
committed
Enable CSV tests
1 parent 5855935 commit 685998f

File tree

6 files changed

+78
-35
lines changed

6 files changed

+78
-35
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.ENABLE_LOOKUP_JOIN_ON_REMOTE;
5353
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
5454
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
55+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_SUPPORTS_REMOTE;
5556
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V11;
5657
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12;
5758
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
@@ -138,9 +139,18 @@ protected void shouldSkipTest(String testName) throws IOException {
138139
assumeFalse("can't test with _index metadata", (remoteMetadata == false) && hasIndexMetadata(testCase.query));
139140
Version oldVersion = Version.min(Clusters.localClusterVersion(), Clusters.remoteClusterVersion());
140141
assumeTrue("Test " + testName + " is skipped on " + oldVersion, isEnabled(testName, instructions, oldVersion));
141-
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
142-
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
143-
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V11.capabilityName()));
142+
if (testCase.requiredCapabilities.contains(INLINESTATS.capabilityName())
143+
|| testCase.requiredCapabilities.contains(INLINESTATS_V11.capabilityName())
144+
|| testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName())) {
145+
assumeTrue(
146+
"INLINESTATS in CCS not supported for this version",
147+
hasCapabilities(adminClient(), List.of(INLINESTATS_SUPPORTS_REMOTE.capabilityName()))
148+
);
149+
assumeTrue(
150+
"INLINESTATS in CCS not supported for this version",
151+
hasCapabilities(remoteClusterClient(), List.of(INLINESTATS_SUPPORTS_REMOTE.capabilityName()))
152+
);
153+
}
144154
if (testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())) {
145155
assumeTrue(
146156
"LOOKUP JOIN not yet supported in CCS",

x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,19 @@ FROM hosts METADATA _index
1111
eth0 |epsilon gw instance|epsilon |[fe80::cae2:65ff:fece:feb9, fe80::cae2:65ff:fece:fec0, fe80::cae2:65ff:fece:fec1]|fe80::cae2:65ff:fece:fec1|hosts |1 |null
1212
;
1313

14+
allFieldsReturnedNoIndex
15+
required_capability: inlinestats_v11
16+
17+
FROM hosts
18+
| INLINESTATS c = COUNT(*) BY host_group
19+
| SORT c
20+
| LIMIT 1
21+
;
22+
23+
card:keyword | description:text | host:keyword | ip0:ip | ip1:ip | c:long | host_group:text
24+
eth0 |epsilon gw instance|epsilon |[fe80::cae2:65ff:fece:feb9, fe80::cae2:65ff:fece:fec0, fe80::cae2:65ff:fece:fec1]|fe80::cae2:65ff:fece:fec1|1 |null
25+
;
26+
1427
maxOfInt
1528
required_capability: inlinestats_v11
1629
// tag::max-languages[]
@@ -1197,6 +1210,20 @@ description:text| host:keyword | ip0:ip | _index:keyword | x:ip | ip1
11971210
beta k8s server |beta |127.0.0.1 |hosts |127.0.0.2|2 |Kubernetes cluster|eth1
11981211
;
11991212

1213+
inlineStatsOverrideEVALed_FieldWithSameNameNoIndex
1214+
required_capability: inlinestats_v11
1215+
1216+
FROM hosts
1217+
| EVAL x = ip1
1218+
| INLINESTATS ip1 = COUNT(*) BY host_group, card
1219+
| SORT ip1 DESC, x
1220+
| LIMIT 1
1221+
;
1222+
1223+
description:text| host:keyword | ip0:ip | x:ip | ip1:long | host_group:text | card:keyword
1224+
beta k8s server |beta |127.0.0.1 |127.0.0.2|2 |Kubernetes cluster|eth1
1225+
;
1226+
12001227
doubleShadowing
12011228
required_capability: inlinestats_v11
12021229

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1442,7 +1442,10 @@ public enum Cap {
14421442
/**
14431443
* FORK with remote indices
14441444
*/
1445-
ENABLE_FORK_FOR_REMOTE_INDICES(Build.current().isSnapshot());
1445+
ENABLE_FORK_FOR_REMOTE_INDICES(Build.current().isSnapshot()),
1446+
1447+
/** INLINESTATS supports remote indices */
1448+
INLINESTATS_SUPPORTS_REMOTE;
14461449

14471450
private final boolean enabled;
14481451

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,11 +368,11 @@ public boolean isMainPlan() {
368368
return inSubplan == false;
369369
}
370370

371-
public void startSubPlan() {
371+
public void startSubPlans() {
372372
this.inSubplan = true;
373373
}
374374

375-
public void finishSubPlan() {
375+
public void finishSubPlans() {
376376
this.inSubplan = false;
377377
}
378378

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -158,20 +158,27 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
158158
.setSuccessfulShards(resp.getSuccessfulShards())
159159
.setSkippedShards(resp.getSkippedShards())
160160
.setFailedShards(resp.getFailedShards());
161-
// TODO: correctly aggregate took time from subplans
162-
if (resp.getTook() != null) {
163-
builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()));
161+
if (v.getTook() != null && resp.getTook() != null) {
162+
// This can happen when we had some subplan executions before the main plan - we need to accumulate the took time
163+
builder.setTook(TimeValue.timeValueNanos(v.getTook().nanos() + resp.getTook().nanos()));
164164
} else {
165-
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
166-
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
167-
builder.setTook(executionInfo.tookSoFar());
165+
if (resp.getTook() != null) {
166+
builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()));
167+
} else {
168+
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
169+
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
170+
builder.setTook(executionInfo.tookSoFar());
171+
}
168172
}
169-
if (executionInfo.isMainPlan() && v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
173+
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
174+
builder.addFailures(v.getFailures());
170175
builder.addFailures(resp.failures);
171-
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
172-
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
173-
} else {
174-
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
176+
if (executionInfo.isMainPlan()) {
177+
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
178+
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
179+
} else {
180+
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
181+
}
175182
}
176183
}
177184
return builder.build();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,16 @@ private void executeSubPlans(
241241
// TODO: merge into one method
242242
if (subPlan != null) {
243243
// code-path to execute subplans
244-
executeSubPlan(new DriverCompletionInfo.Accumulator(), optimizedPlan, subPlan, executionInfo, runner, request, listener);
244+
executeSubPlan(
245+
new DriverCompletionInfo.Accumulator(),
246+
optimizedPlan,
247+
subPlan,
248+
executionInfo,
249+
runner,
250+
request,
251+
// Ensure we don't have subplan flag stuck in there on failure
252+
ActionListener.runAfter(listener, executionInfo::finishSubPlans)
253+
);
245254
} else {
246255
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
247256
// execute main plan
@@ -262,13 +271,9 @@ private void executeSubPlan(
262271
// Create a physical plan out of the logical sub-plan
263272
var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request);
264273

265-
executionInfo.startSubPlan();
266-
var listenerForSublan = listener.delegateResponse((l, e) -> {
267-
executionInfo.finishSubPlan();
268-
l.onFailure(e);
269-
});
274+
executionInfo.startSubPlans();
270275

271-
runner.run(physicalSubPlan, listenerForSublan.delegateFailureAndWrap((next, result) -> {
276+
runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> {
272277
try {
273278
// Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
274279
completionInfoAccumulator.accumulate(result.completionInfo());
@@ -288,8 +293,7 @@ private void executeSubPlan(
288293
var newSubPlan = firstSubPlan(newLogicalPlan);
289294

290295
if (newSubPlan == null) {// run the final "main" plan
291-
// TODO: failures!
292-
executionInfo.finishSubPlan();
296+
executionInfo.finishSubPlans();
293297
LOGGER.debug("Executing final plan:\n{}", newLogicalPlan);
294298
var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
295299
runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
@@ -299,15 +303,7 @@ private void executeSubPlan(
299303
);
300304
}));
301305
} else {// continue executing the subplans
302-
executeSubPlan(
303-
completionInfoAccumulator,
304-
newLogicalPlan,
305-
newSubPlan,
306-
executionInfo,
307-
runner,
308-
request,
309-
listenerForSublan
310-
);
306+
executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener);
311307
}
312308
} finally {
313309
Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));

0 commit comments

Comments
 (0)