Skip to content

Commit 3ef91e0

Browse files
authored
Fix CCS exchange when multi cluster aliases point to same cluster (#117297) (#117387)
[esql] > Unexpected error from Elasticsearch: illegal_state_exception - sink exchanger for id [ruxoDDxXTGW55oIPHoCT-g:964613010] already exists. This issue occurs when two or more clusterAliases point to the same physical remote cluster. The exchange service assumes the destination is unique, which is not true in this topology. This PR addresses the problem by appending a suffix using a monotonic increasing number, ensuring that different exchanges are created in such cases. Another issue arising from this behavior is that data on a remote cluster is processed multiple times, leading to incorrect results. I can work on the fix for this once we agree that this is an issue.
1 parent d552b30 commit 3ef91e0

File tree

6 files changed

+93
-17
lines changed

6 files changed

+93
-17
lines changed

docs/changelog/117297.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117297
2+
summary: Fix CCS exchange when multi cluster aliases point to same cluster
3+
area: ES|QL
4+
type: bug
5+
issues: []

test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.client.internal.Client;
1818
import org.elasticsearch.common.network.NetworkModule;
1919
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.transport.TransportAddress;
2021
import org.elasticsearch.core.IOUtils;
2122
import org.elasticsearch.core.Strings;
2223
import org.elasticsearch.plugins.Plugin;
@@ -44,6 +45,7 @@
4445

4546
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING;
4647
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
48+
import static org.hamcrest.Matchers.contains;
4749
import static org.hamcrest.Matchers.empty;
4850
import static org.hamcrest.Matchers.hasKey;
4951
import static org.hamcrest.Matchers.not;
@@ -149,19 +151,23 @@ public static void stopClusters() throws IOException {
149151
}
150152

151153
protected void disconnectFromRemoteClusters() throws Exception {
152-
Settings.Builder settings = Settings.builder();
153154
final Set<String> clusterAliases = clusterGroup.clusterAliases();
154155
for (String clusterAlias : clusterAliases) {
155156
if (clusterAlias.equals(LOCAL_CLUSTER) == false) {
156-
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
157-
settings.putNull("cluster.remote." + clusterAlias + ".mode");
158-
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
157+
removeRemoteCluster(clusterAlias);
159158
}
160159
}
160+
}
161+
162+
protected void removeRemoteCluster(String clusterAlias) throws Exception {
163+
Settings.Builder settings = Settings.builder();
164+
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
165+
settings.putNull("cluster.remote." + clusterAlias + ".mode");
166+
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
161167
client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get();
162168
assertBusy(() -> {
163169
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
164-
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), empty());
170+
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias)));
165171
}
166172
});
167173
}
@@ -178,12 +184,17 @@ protected void configureAndConnectsToRemoteClusters() throws Exception {
178184
}
179185

180186
protected void configureRemoteCluster(String clusterAlias, Collection<String> seedNodes) throws Exception {
181-
final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
182-
Settings.Builder settings = Settings.builder();
183-
final List<String> seedAddresses = seedNodes.stream().map(node -> {
187+
final var seedAddresses = seedNodes.stream().map(node -> {
184188
final TransportService transportService = cluster(clusterAlias).getInstance(TransportService.class, node);
185-
return transportService.boundAddress().publishAddress().toString();
189+
return transportService.boundAddress().publishAddress();
186190
}).toList();
191+
configureRemoteClusterWithSeedAddresses(clusterAlias, seedAddresses);
192+
}
193+
194+
protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Collection<TransportAddress> seedNodes) throws Exception {
195+
final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
196+
Settings.Builder settings = Settings.builder();
197+
final List<String> seedAddresses = seedNodes.stream().map(TransportAddress::toString).toList();
187198
boolean skipUnavailable = skipUnavailableForRemoteClusters().containsKey(clusterAlias)
188199
? skipUnavailableForRemoteClusters().get(clusterAlias)
189200
: DEFAULT_SKIP_UNAVAILABLE;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.util.Map;
43+
import java.util.Set;
4344
import java.util.concurrent.Executor;
4445
import java.util.concurrent.atomic.AtomicLong;
4546

@@ -339,6 +340,10 @@ public boolean isEmpty() {
339340
return sinks.isEmpty();
340341
}
341342

343+
public Set<String> sinkKeys() {
344+
return sinks.keySet();
345+
}
346+
342347
@Override
343348
protected void doStart() {
344349

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10+
import org.elasticsearch.action.ActionFuture;
1011
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
1112
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
1213
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -15,6 +16,7 @@
1516
import org.elasticsearch.action.support.WriteRequest;
1617
import org.elasticsearch.common.settings.Setting;
1718
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.transport.TransportAddress;
1820
import org.elasticsearch.compute.operator.DriverTaskRunner;
1921
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2022
import org.elasticsearch.core.TimeValue;
@@ -27,8 +29,10 @@
2729
import org.elasticsearch.search.lookup.SearchLookup;
2830
import org.elasticsearch.tasks.TaskInfo;
2931
import org.elasticsearch.test.AbstractMultiClustersTestCase;
32+
import org.elasticsearch.transport.TransportService;
3033
import org.elasticsearch.xcontent.XContentBuilder;
3134
import org.elasticsearch.xcontent.json.JsonXContent;
35+
import org.elasticsearch.xpack.esql.plugin.ComputeService;
3236
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
3337
import org.junit.Before;
3438

@@ -40,8 +44,10 @@
4044
import java.util.concurrent.CountDownLatch;
4145
import java.util.concurrent.TimeUnit;
4246

47+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
4348
import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomPragmas;
4449
import static org.hamcrest.Matchers.containsString;
50+
import static org.hamcrest.Matchers.equalTo;
4551
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4652
import static org.hamcrest.Matchers.hasSize;
4753

@@ -189,4 +195,44 @@ public void testCancel() throws Exception {
189195
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
190196
assertThat(error.getMessage(), containsString("proxy timeout"));
191197
}
198+
199+
public void testSameRemoteClusters() throws Exception {
200+
TransportAddress address = cluster(REMOTE_CLUSTER).getInstance(TransportService.class).getLocalNode().getAddress();
201+
int moreClusters = between(1, 5);
202+
for (int i = 0; i < moreClusters; i++) {
203+
String clusterAlias = REMOTE_CLUSTER + "-" + i;
204+
configureRemoteClusterWithSeedAddresses(clusterAlias, List.of(address));
205+
}
206+
int numDocs = between(10, 100);
207+
createRemoteIndex(numDocs);
208+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
209+
request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
210+
request.pragmas(randomPragmas());
211+
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
212+
try {
213+
try {
214+
assertBusy(() -> {
215+
List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
216+
.cluster()
217+
.prepareListTasks()
218+
.setActions(ComputeService.CLUSTER_ACTION_NAME)
219+
.get()
220+
.getTasks();
221+
assertThat(tasks, hasSize(moreClusters + 1));
222+
});
223+
} finally {
224+
PauseFieldPlugin.allowEmitting.countDown();
225+
}
226+
try (EsqlQueryResponse resp = future.actionGet(30, TimeUnit.SECONDS)) {
227+
// TODO: This produces incorrect results because data on the remote cluster is processed multiple times.
228+
long expectedCount = numDocs * (moreClusters + 1L);
229+
assertThat(getValuesList(resp), equalTo(List.of(List.of(expectedCount))));
230+
}
231+
} finally {
232+
for (int i = 0; i < moreClusters; i++) {
233+
String clusterAlias = REMOTE_CLUSTER + "-" + i;
234+
removeRemoteCluster(clusterAlias);
235+
}
236+
}
237+
}
192238
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,8 @@ protected void doRun() throws Exception {
401401
});
402402
sessionId = foundTasks.get(0).taskId().toString();
403403
assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES));
404-
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId);
404+
String exchangeId = exchangeService.sinkKeys().stream().filter(s -> s.startsWith(sessionId)).findFirst().get();
405+
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(exchangeId);
405406
waitedForPages = randomBoolean();
406407
if (waitedForPages) {
407408
// do not fail exchange requests until we have some pages

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import java.util.Set;
8282
import java.util.concurrent.Executor;
8383
import java.util.concurrent.atomic.AtomicBoolean;
84+
import java.util.concurrent.atomic.AtomicLong;
8485

8586
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
8687

@@ -99,6 +100,7 @@ public class ComputeService {
99100
private final ExchangeService exchangeService;
100101
private final EnrichLookupService enrichLookupService;
101102
private final ClusterService clusterService;
103+
private final AtomicLong childSessionIdGenerator = new AtomicLong();
102104

103105
public ComputeService(
104106
SearchService searchService,
@@ -163,7 +165,7 @@ public void execute(
163165
return;
164166
}
165167
var computeContext = new ComputeContext(
166-
sessionId,
168+
newChildSession(sessionId),
167169
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
168170
List.of(),
169171
configuration,
@@ -332,22 +334,23 @@ private void startComputeOnDataNodes(
332334
// the new remote exchange sink, and initialize the computation on the target node via data-node-request.
333335
for (DataNode node : dataNodeResult.dataNodes()) {
334336
var queryPragmas = configuration.pragmas();
337+
var childSessionId = newChildSession(sessionId);
335338
ExchangeService.openExchange(
336339
transportService,
337340
node.connection,
338-
sessionId,
341+
childSessionId,
339342
queryPragmas.exchangeBufferSize(),
340343
esqlExecutor,
341344
refs.acquire().delegateFailureAndWrap((l, unused) -> {
342-
var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection);
345+
var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, node.connection);
343346
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
344347
ActionListener<ComputeResponse> computeResponseListener = computeListener.acquireCompute(clusterAlias);
345348
var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null));
346349
transportService.sendChildRequest(
347350
node.connection,
348351
DATA_ACTION_NAME,
349352
new DataNodeRequest(
350-
sessionId,
353+
childSessionId,
351354
configuration,
352355
clusterAlias,
353356
node.shardIds,
@@ -380,17 +383,18 @@ private void startComputeOnRemoteClusters(
380383
var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
381384
try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) {
382385
for (RemoteCluster cluster : clusters) {
386+
final var childSessionId = newChildSession(sessionId);
383387
ExchangeService.openExchange(
384388
transportService,
385389
cluster.connection,
386-
sessionId,
390+
childSessionId,
387391
queryPragmas.exchangeBufferSize(),
388392
esqlExecutor,
389393
refs.acquire().delegateFailureAndWrap((l, unused) -> {
390-
var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection);
394+
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
391395
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
392396
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
393-
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan);
397+
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
394398
var clusterListener = ActionListener.runBefore(
395399
computeListener.acquireCompute(cluster.clusterAlias()),
396400
() -> l.onResponse(null)
@@ -913,4 +917,8 @@ public List<SearchExecutionContext> searchExecutionContexts() {
913917
return searchContexts.stream().map(ctx -> ctx.getSearchExecutionContext()).toList();
914918
}
915919
}
920+
921+
private String newChildSession(String session) {
922+
return session + "/" + childSessionIdGenerator.incrementAndGet();
923+
}
916924
}

0 commit comments

Comments
 (0)