Skip to content

Commit d3b27a1

Browse files
committed
Pull feedback
1 parent 31e0eb4 commit d3b27a1

File tree

6 files changed

+55
-53
lines changed

6 files changed

+55
-53
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
3030
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3131
import org.elasticsearch.xpack.esql.session.Configuration;
32-
import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils;
32+
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
3333

3434
import java.util.ArrayList;
3535
import java.util.List;
@@ -130,8 +130,8 @@ void startComputeOnRemoteCluster(
130130
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
131131
);
132132
}, e -> {
133-
if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
134-
EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards(
133+
if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
134+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
135135
executionInfo,
136136
clusterAlias,
137137
EsqlExecutionInfo.Cluster.Status.SKIPPED,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717
import org.elasticsearch.transport.TransportRequest;
1818
import org.elasticsearch.transport.TransportService;
1919
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
20-
import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils;
20+
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
2121

2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.function.Supplier;
2525

26-
// Create group task for this cluster. This group task ensures that two branches of the computation:
27-
// the exchange sink and the cluster request, belong to the same group and each of them can cancel the other.
28-
// runAfter listeners below ensure that the group is finalized when both branches are done.
29-
// The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too.
26+
/**
27+
* Create group task for this cluster. This group task ensures that two branches of the computation:
28+
* the exchange sink and the cluster request, belong to the same group and each of them can cancel the other.
29+
* runAfter listeners below ensure that the group is finalized when both branches are done.
30+
* The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too.
31+
*/
3032
class RemoteListenerGroup {
3133
private final CancellableTask groupTask;
3234
private final ActionListener<Void> exchangeRequestListener;
@@ -72,8 +74,8 @@ class RemoteListenerGroup {
7274
private <T> ActionListener<T> createCancellingListener(String reason, ActionListener<T> delegate, Runnable finishGroup) {
7375
return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> {
7476
taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> {
75-
if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
76-
EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards(
77+
if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
78+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
7779
executionInfo,
7880
clusterAlias,
7981
EsqlExecutionInfo.Cluster.Status.PARTIAL,
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
import java.util.Objects;
4242
import java.util.Set;
4343

44-
public class EsqlSessionCCSUtils {
44+
public class EsqlCCSUtils {
4545

46-
private EsqlSessionCCSUtils() {}
46+
private EsqlCCSUtils() {}
4747

4848
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
4949
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
@@ -335,9 +335,9 @@ public static void checkForCcsLicense(
335335
}
336336

337337
/**
338-
* Mark cluster with a default cluster state with the given status and potentially failure from exception.
339-
* Most metrics are set to 0 except for "took" which is set to the total time taken so far.
340-
* The status must be the final state of the cluster, not RUNNING.
338+
* Mark cluster with a final status (success or failure).
339+
* Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far.
340+
* The status must be the final status of the cluster, not RUNNING.
341341
*/
342342
public static void markClusterWithFinalStateAndNoShards(
343343
EsqlExecutionInfo executionInfo,
@@ -366,6 +366,6 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo,
366366
}
367367

368368
return ExceptionsHelper.isRemoteUnavailableException(e)
369-
|| (e instanceof RemoteTransportException && e.getCause() instanceof TaskCancelledException);
369+
|| (e instanceof RemoteTransportException && ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null);
370370
}
371371
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
161161
parse(request.query(), request.params()),
162162
executionInfo,
163163
request.filter(),
164-
new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
164+
new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
165165
@Override
166166
public void onResponse(LogicalPlan analyzedPlan) {
167167
preMapper.preMapper(
@@ -188,7 +188,7 @@ public void executeOptimizedPlan(
188188
) {
189189
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
190190
// TODO: this could be snuck into the underlying listener
191-
EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
191+
EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
192192
// execute any potential subplans
193193
executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
194194
}
@@ -315,7 +315,7 @@ public void analyzedPlan(
315315
.collect(Collectors.toSet());
316316
final List<TableInfo> indices = preAnalysis.indices;
317317

318-
EsqlSessionCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
318+
EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
319319

320320
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
321321
indices.stream()
@@ -430,7 +430,7 @@ private void preAnalyzeIndices(
430430
}
431431
// if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search
432432
// based only on available clusters (which could now be an empty list)
433-
String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
433+
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
434434
if (indexExpressionToResolve.isEmpty()) {
435435
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
436436
listener.onResponse(
@@ -464,8 +464,8 @@ private boolean analyzeCCSIndices(
464464
ActionListener<PreAnalysisResult> l
465465
) {
466466
IndexResolution indexResolution = result.indices;
467-
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
468-
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
467+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
468+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
469469
if (executionInfo.isCrossClusterSearch()
470470
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
471471
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
139139
fields.put(name, field);
140140
}
141141

142-
Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(
142+
Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlCCSUtils.determineUnavailableRemoteClusters(
143143
fieldCapsResponse.getFailures()
144144
);
145145

0 commit comments

Comments
 (0)