|
37 | 37 | import java.util.Map; |
38 | 38 | import java.util.Set; |
39 | 39 | import java.util.concurrent.Executor; |
40 | | -import java.util.concurrent.atomic.AtomicInteger; |
| 40 | +import java.util.concurrent.atomic.AtomicBoolean; |
41 | 41 | import java.util.concurrent.atomic.AtomicReference; |
42 | 42 | import java.util.function.Function; |
43 | 43 |
|
@@ -82,10 +82,10 @@ void startComputeOnRemoteCluster( |
82 | 82 | listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); |
83 | 83 | final var childSessionId = computeService.newChildSession(sessionId); |
84 | 84 | final String clusterAlias = cluster.clusterAlias(); |
85 | | - final AtomicInteger pagesFetched = new AtomicInteger(); |
| 85 | + final AtomicBoolean pagesFetched = new AtomicBoolean(); |
86 | 86 | final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>(); |
87 | 87 | listener = listener.delegateResponse((l, e) -> { |
88 | | - final boolean receivedResults = finalResponse.get() != null || pagesFetched.get() > 0; |
| 88 | + final boolean receivedResults = finalResponse.get() != null || pagesFetched.get(); |
89 | 89 | if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { |
90 | 90 | EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); |
91 | 91 | l.onResponse(List.of()); |
@@ -122,7 +122,7 @@ void startComputeOnRemoteCluster( |
122 | 122 | exchangeSource.addRemoteSink( |
123 | 123 | remoteSink, |
124 | 124 | failFast, |
125 | | - pagesFetched::incrementAndGet, |
| 125 | + () -> pagesFetched.set(true), |
126 | 126 | queryPragmas.concurrentExchangeClients(), |
127 | 127 | computeListener.acquireAvoid() |
128 | 128 | ); |
|
0 commit comments