|
39 | 39 | import java.util.concurrent.Executor; |
40 | 40 | import java.util.concurrent.atomic.AtomicBoolean; |
41 | 41 | import java.util.concurrent.atomic.AtomicReference; |
42 | | -import java.util.function.Function; |
43 | 42 |
|
44 | 43 | /** |
45 | 44 | * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes. |
@@ -146,37 +145,27 @@ void startComputeOnRemoteCluster( |
146 | 145 | } |
147 | 146 |
|
148 | 147 | private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { |
149 | | - Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> { |
150 | | - if (status != EsqlExecutionInfo.Cluster.Status.RUNNING) { |
151 | | - return status; |
152 | | - } else if (executionInfo.isStopped() || resp.failedShards > 0) { |
153 | | - return EsqlExecutionInfo.Cluster.Status.PARTIAL; |
| 148 | + executionInfo.swapCluster(clusterAlias, (k, v) -> { |
| 149 | + var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(resp.getTotalShards()) |
| 150 | + .setSuccessfulShards(resp.getSuccessfulShards()) |
| 151 | + .setSkippedShards(resp.getSkippedShards()) |
| 152 | + .setFailedShards(resp.getFailedShards()); |
| 153 | + if (resp.getTook() != null) { |
| 154 | + builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos())); |
154 | 155 | } else { |
155 | | - return EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; |
| 156 | + // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator |
| 157 | + // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response |
| 158 | + builder.setTook(executionInfo.tookSoFar()); |
156 | 159 | } |
157 | | - }; |
158 | | - if (resp.getTook() != null) { |
159 | | - var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()); |
160 | | - executionInfo.swapCluster( |
161 | | - clusterAlias, |
162 | | - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) |
163 | | - .setTook(tookTime) |
164 | | - .setTotalShards(resp.getTotalShards()) |
165 | | - .setSuccessfulShards(resp.getSuccessfulShards()) |
166 | | - .setSkippedShards(resp.getSkippedShards()) |
167 | | - .setFailedShards(resp.getFailedShards()) |
168 | | - .build() |
169 | | - ); |
170 | | - } else { |
171 | | - // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator |
172 | | - // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response |
173 | | - executionInfo.swapCluster( |
174 | | - clusterAlias, |
175 | | - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) |
176 | | - .setTook(executionInfo.tookSoFar()) |
177 | | - .build() |
178 | | - ); |
179 | | - } |
| 160 | + if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { |
| 161 | + if (executionInfo.isStopped() || resp.failedShards > 0) { |
| 162 | + builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL); |
| 163 | + } else { |
| 164 | + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); |
| 165 | + } |
| 166 | + } |
| 167 | + return builder.build(); |
| 168 | + }); |
180 | 169 | } |
181 | 170 |
|
182 | 171 | List<RemoteCluster> getRemoteClusters( |
|
0 commit comments