Skip to content

Commit 3fbf04e

Browse files
authored
Fix CCS exchange when multi cluster aliases point to same cluster (#117297) (#117389)
[esql] > Unexpected error from Elasticsearch: illegal_state_exception - sink exchanger for id [ruxoDDxXTGW55oIPHoCT-g:964613010] already exists. This issue occurs when two or more clusterAliases point to the same physical remote cluster. The exchange service assumes the destination is unique, which is not true in this topology. This PR addresses the problem by appending a suffix using a monotonic increasing number, ensuring that different exchanges are created in such cases. Another issue arising from this behavior is that data on a remote cluster is processed multiple times, leading to incorrect results. I can work on the fix for this once we agree that this is an issue.
1 parent b9940e0 commit 3fbf04e

File tree

6 files changed

+93
-17
lines changed

6 files changed

+93
-17
lines changed

docs/changelog/117297.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117297
2+
summary: Fix CCS exchange when multi cluster aliases point to same cluster
3+
area: ES|QL
4+
type: bug
5+
issues: []

test/framework/src/main/java/org/elasticsearch/test/AbstractMultiClustersTestCase.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.client.internal.Client;
1818
import org.elasticsearch.common.network.NetworkModule;
1919
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.transport.TransportAddress;
2021
import org.elasticsearch.core.IOUtils;
2122
import org.elasticsearch.core.Strings;
2223
import org.elasticsearch.plugins.Plugin;
@@ -44,6 +45,7 @@
4445

4546
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING;
4647
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
48+
import static org.hamcrest.Matchers.contains;
4749
import static org.hamcrest.Matchers.empty;
4850
import static org.hamcrest.Matchers.hasKey;
4951
import static org.hamcrest.Matchers.not;
@@ -149,19 +151,23 @@ public static void stopClusters() throws IOException {
149151
}
150152

151153
protected void disconnectFromRemoteClusters() throws Exception {
152-
Settings.Builder settings = Settings.builder();
153154
final Set<String> clusterAliases = clusterGroup.clusterAliases();
154155
for (String clusterAlias : clusterAliases) {
155156
if (clusterAlias.equals(LOCAL_CLUSTER) == false) {
156-
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
157-
settings.putNull("cluster.remote." + clusterAlias + ".mode");
158-
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
157+
removeRemoteCluster(clusterAlias);
159158
}
160159
}
160+
}
161+
162+
protected void removeRemoteCluster(String clusterAlias) throws Exception {
163+
Settings.Builder settings = Settings.builder();
164+
settings.putNull("cluster.remote." + clusterAlias + ".seeds");
165+
settings.putNull("cluster.remote." + clusterAlias + ".mode");
166+
settings.putNull("cluster.remote." + clusterAlias + ".proxy_address");
161167
client().admin().cluster().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).get();
162168
assertBusy(() -> {
163169
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
164-
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), empty());
170+
assertThat(transportService.getRemoteClusterService().getRegisteredRemoteClusterNames(), not(contains(clusterAlias)));
165171
}
166172
});
167173
}
@@ -178,12 +184,17 @@ protected void configureAndConnectsToRemoteClusters() throws Exception {
178184
}
179185

180186
protected void configureRemoteCluster(String clusterAlias, Collection<String> seedNodes) throws Exception {
181-
final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
182-
Settings.Builder settings = Settings.builder();
183-
final List<String> seedAddresses = seedNodes.stream().map(node -> {
187+
final var seedAddresses = seedNodes.stream().map(node -> {
184188
final TransportService transportService = cluster(clusterAlias).getInstance(TransportService.class, node);
185-
return transportService.boundAddress().publishAddress().toString();
189+
return transportService.boundAddress().publishAddress();
186190
}).toList();
191+
configureRemoteClusterWithSeedAddresses(clusterAlias, seedAddresses);
192+
}
193+
194+
protected void configureRemoteClusterWithSeedAddresses(String clusterAlias, Collection<TransportAddress> seedNodes) throws Exception {
195+
final String remoteClusterSettingPrefix = "cluster.remote." + clusterAlias + ".";
196+
Settings.Builder settings = Settings.builder();
197+
final List<String> seedAddresses = seedNodes.stream().map(TransportAddress::toString).toList();
187198
boolean skipUnavailable = skipUnavailableForRemoteClusters().containsKey(clusterAlias)
188199
? skipUnavailableForRemoteClusters().get(clusterAlias)
189200
: DEFAULT_SKIP_UNAVAILABLE;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.util.Map;
43+
import java.util.Set;
4344
import java.util.concurrent.Executor;
4445
import java.util.concurrent.atomic.AtomicLong;
4546

@@ -339,6 +340,10 @@ public boolean isEmpty() {
339340
return sinks.isEmpty();
340341
}
341342

343+
public Set<String> sinkKeys() {
344+
return sinks.keySet();
345+
}
346+
342347
@Override
343348
protected void doStart() {
344349

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10+
import org.elasticsearch.action.ActionFuture;
1011
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
1112
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
1213
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -15,6 +16,7 @@
1516
import org.elasticsearch.action.support.WriteRequest;
1617
import org.elasticsearch.common.settings.Setting;
1718
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.transport.TransportAddress;
1820
import org.elasticsearch.compute.operator.DriverTaskRunner;
1921
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2022
import org.elasticsearch.core.TimeValue;
@@ -27,8 +29,10 @@
2729
import org.elasticsearch.search.lookup.SearchLookup;
2830
import org.elasticsearch.tasks.TaskInfo;
2931
import org.elasticsearch.test.AbstractMultiClustersTestCase;
32+
import org.elasticsearch.transport.TransportService;
3033
import org.elasticsearch.xcontent.XContentBuilder;
3134
import org.elasticsearch.xcontent.json.JsonXContent;
35+
import org.elasticsearch.xpack.esql.plugin.ComputeService;
3236
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
3337
import org.junit.Before;
3438

@@ -40,8 +44,10 @@
4044
import java.util.concurrent.CountDownLatch;
4145
import java.util.concurrent.TimeUnit;
4246

47+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
4348
import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomPragmas;
4449
import static org.hamcrest.Matchers.containsString;
50+
import static org.hamcrest.Matchers.equalTo;
4551
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4652
import static org.hamcrest.Matchers.hasSize;
4753

@@ -189,4 +195,44 @@ public void testCancel() throws Exception {
189195
Exception error = expectThrows(Exception.class, requestFuture::actionGet);
190196
assertThat(error.getMessage(), containsString("proxy timeout"));
191197
}
198+
199+
public void testSameRemoteClusters() throws Exception {
200+
TransportAddress address = cluster(REMOTE_CLUSTER).getInstance(TransportService.class).getLocalNode().getAddress();
201+
int moreClusters = between(1, 5);
202+
for (int i = 0; i < moreClusters; i++) {
203+
String clusterAlias = REMOTE_CLUSTER + "-" + i;
204+
configureRemoteClusterWithSeedAddresses(clusterAlias, List.of(address));
205+
}
206+
int numDocs = between(10, 100);
207+
createRemoteIndex(numDocs);
208+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
209+
request.query("FROM *:test | STATS total=sum(const) | LIMIT 1");
210+
request.pragmas(randomPragmas());
211+
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
212+
try {
213+
try {
214+
assertBusy(() -> {
215+
List<TaskInfo> tasks = client(REMOTE_CLUSTER).admin()
216+
.cluster()
217+
.prepareListTasks()
218+
.setActions(ComputeService.CLUSTER_ACTION_NAME)
219+
.get()
220+
.getTasks();
221+
assertThat(tasks, hasSize(moreClusters + 1));
222+
});
223+
} finally {
224+
PauseFieldPlugin.allowEmitting.countDown();
225+
}
226+
try (EsqlQueryResponse resp = future.actionGet(30, TimeUnit.SECONDS)) {
227+
// TODO: This produces incorrect results because data on the remote cluster is processed multiple times.
228+
long expectedCount = numDocs * (moreClusters + 1L);
229+
assertThat(getValuesList(resp), equalTo(List.of(List.of(expectedCount))));
230+
}
231+
} finally {
232+
for (int i = 0; i < moreClusters; i++) {
233+
String clusterAlias = REMOTE_CLUSTER + "-" + i;
234+
removeRemoteCluster(clusterAlias);
235+
}
236+
}
237+
}
192238
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,8 @@ protected void doRun() throws Exception {
401401
});
402402
sessionId = foundTasks.get(0).taskId().toString();
403403
assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES));
404-
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId);
404+
String exchangeId = exchangeService.sinkKeys().stream().filter(s -> s.startsWith(sessionId)).findFirst().get();
405+
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(exchangeId);
405406
waitedForPages = randomBoolean();
406407
if (waitedForPages) {
407408
// do not fail exchange requests until we have some pages

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.util.Set;
8383
import java.util.concurrent.Executor;
8484
import java.util.concurrent.atomic.AtomicBoolean;
85+
import java.util.concurrent.atomic.AtomicLong;
8586

8687
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
8788

@@ -101,6 +102,7 @@ public class ComputeService {
101102
private final EnrichLookupService enrichLookupService;
102103
private final LookupFromIndexService lookupFromIndexService;
103104
private final ClusterService clusterService;
105+
private final AtomicLong childSessionIdGenerator = new AtomicLong();
104106

105107
public ComputeService(
106108
SearchService searchService,
@@ -167,7 +169,7 @@ public void execute(
167169
return;
168170
}
169171
var computeContext = new ComputeContext(
170-
sessionId,
172+
newChildSession(sessionId),
171173
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
172174
List.of(),
173175
configuration,
@@ -330,22 +332,23 @@ private void startComputeOnDataNodes(
330332
// the new remote exchange sink, and initialize the computation on the target node via data-node-request.
331333
for (DataNode node : dataNodeResult.dataNodes()) {
332334
var queryPragmas = configuration.pragmas();
335+
var childSessionId = newChildSession(sessionId);
333336
ExchangeService.openExchange(
334337
transportService,
335338
node.connection,
336-
sessionId,
339+
childSessionId,
337340
queryPragmas.exchangeBufferSize(),
338341
esqlExecutor,
339342
refs.acquire().delegateFailureAndWrap((l, unused) -> {
340-
var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection);
343+
var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, node.connection);
341344
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
342345
ActionListener<ComputeResponse> computeResponseListener = computeListener.acquireCompute(clusterAlias);
343346
var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null));
344347
transportService.sendChildRequest(
345348
node.connection,
346349
DATA_ACTION_NAME,
347350
new DataNodeRequest(
348-
sessionId,
351+
childSessionId,
349352
configuration,
350353
clusterAlias,
351354
node.shardIds,
@@ -378,17 +381,18 @@ private void startComputeOnRemoteClusters(
378381
var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
379382
try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) {
380383
for (RemoteCluster cluster : clusters) {
384+
final var childSessionId = newChildSession(sessionId);
381385
ExchangeService.openExchange(
382386
transportService,
383387
cluster.connection,
384-
sessionId,
388+
childSessionId,
385389
queryPragmas.exchangeBufferSize(),
386390
esqlExecutor,
387391
refs.acquire().delegateFailureAndWrap((l, unused) -> {
388-
var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection);
392+
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
389393
exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients());
390394
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
391-
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan);
395+
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
392396
var clusterListener = ActionListener.runBefore(
393397
computeListener.acquireCompute(cluster.clusterAlias()),
394398
() -> l.onResponse(null)
@@ -912,4 +916,8 @@ public List<SearchExecutionContext> searchExecutionContexts() {
912916
return searchContexts.stream().map(ctx -> ctx.getSearchExecutionContext()).toList();
913917
}
914918
}
919+
920+
private String newChildSession(String session) {
921+
return session + "/" + childSessionIdGenerator.incrementAndGet();
922+
}
915923
}

0 commit comments

Comments
 (0)