Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.ConnectTransportException;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

// TODO: Move this test to the Serverless repo once the IT framework is ready there.
public class EsqlCpsDoesNotUseSkipUnavailableIT extends AbstractCrossClusterTestCase {

public static class CpsPlugin extends Plugin implements ClusterPlugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(CpsEnableSetting);
}
}

private static final Setting<String> CpsEnableSetting = Setting.simpleString(
"serverless.cross_project.enabled",
Setting.Property.NodeScope
);

@Override
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
// Setting skip_unavailable=false results in a fatal error when the linked cluster is not available.
return Map.of(REMOTE_CLUSTER_1, false);
}

public void testCpsShouldNotUseSkipUnavailable() throws Exception {
// Add some dummy data to prove we are communicating fine with the remote.
assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index"));
client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();

// Shut down the linked cluster we'd be targeting in the search.
try {
cluster(REMOTE_CLUSTER_1).close();
} catch (Exception e) {
throw new AssertionError(e);
}

/*
* Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster
* and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true
* should not result in a fatal error.
*/

EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM *,*:* | limit 10");
request.allowPartialResults(true);
try (EsqlQueryResponse response = runQuery(request)) {
assertThat(response.isPartial(), is(true));
EsqlExecutionInfo info = response.getExecutionInfo();
assertThat(info.getCluster(REMOTE_CLUSTER_1).getStatus(), is(EsqlExecutionInfo.Cluster.Status.SKIPPED));
}

request = new EsqlQueryRequest().query("FROM *,*:* | limit 10");
try (EsqlQueryResponse response = runQuery(request)) {
fail("a fatal error should be thrown since allow_partial_results=false");
} catch (Exception e) {
assertThat(e, instanceOf(ConnectTransportException.class));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected void doResolvePolicies(
}

final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();
final Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
Expand Down Expand Up @@ -304,6 +304,7 @@ private void lookupPolicies(
Collection<String> remoteClusters,
boolean includeLocal,
Collection<UnresolvedPolicy> unresolvedPolicies,
EsqlExecutionInfo executionInfo,
ActionListener<Map<String, LookupResponse>> listener
) {
final Map<String, LookupResponse> lookupResponses = ConcurrentCollections.newConcurrentMap();
Expand All @@ -316,7 +317,7 @@ private void lookupPolicies(
if (remotePolicies.isEmpty() == false) {
for (String cluster : remoteClusters) {
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
getRemoteConnection(cluster, new ActionListener<Transport.Connection>() {
getRemoteConnection(cluster, executionInfo, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
transportService.sendRequest(
Expand All @@ -325,7 +326,7 @@ public void onResponse(Transport.Connection connection) {
new LookupRequest(cluster, remotePolicies),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)),
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, executionInfo, l)),
LookupResponse::new,
threadPool.executor(ThreadPool.Names.SEARCH)
)
Expand All @@ -334,7 +335,7 @@ public void onResponse(Transport.Connection connection) {

@Override
public void onFailure(Exception e) {
failIfSkipUnavailableFalse(e, cluster, lookupListener);
failIfSkipUnavailableFalse(e, cluster, executionInfo, lookupListener);
}
});
}
Expand All @@ -359,8 +360,13 @@ public void onFailure(Exception e) {
}
}

private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener<LookupResponse> lookupListener) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster).orElse(true)) {
private void failIfSkipUnavailableFalse(
Exception e,
String cluster,
EsqlExecutionInfo executionInfo,
ActionListener<LookupResponse> lookupListener
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here again (as below) we don't really need neither cluster nor EsqlExecutionInfo but only boolean executionInfo.shouldSkipOnFailure(cluster). Maybe we could simplify it by calculating this boolean above and pass it to the methods that need it? Just a suggestion.

if (ExceptionsHelper.isRemoteUnavailableException(e) && executionInfo.shouldSkipOnFailure(cluster)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think org.elasticsearch.xpack.esql.action.EsqlExecutionInfo#skipOnFailurePredicate is initialized with essentially the same:

return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true), includeCcsMetadata);

But that is definitely confusing that we use different things in different palces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, now that predicate is the place where we centralize this behavior.

lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
Expand Down Expand Up @@ -471,11 +477,7 @@ protected Map<String, EnrichPolicy> availablePolicies() {
return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies();
}

protected void getRemoteConnection(String cluster, ActionListener<Transport.Connection> listener) {
remoteClusterService.maybeEnsureConnectedAndGetConnection(
cluster,
remoteClusterService.isSkipUnavailable(cluster).orElse(true) == false,
listener
);
protected void getRemoteConnection(String cluster, EsqlExecutionInfo executionInfo, ActionListener<Transport.Connection> listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not saying this is wrong, but here you don't actually need EsqlExecutionInfo, only one flag that describes executionInfo.shouldSkipOnFailure(cluster) == false which is ensureConnected value. So maybe this should be boolean ensureConnected too?

remoteClusterService.maybeEnsureConnectedAndGetConnection(cluster, executionInfo.shouldSkipOnFailure(cluster) == false, listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to ensure - the desired behavior on CPS is that we always want to try to reconnect immediately if the connection is bad, when allow_partial = true, and don't try to reconnect when allow_partial = false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit of a stretch, as allow_partial is not really about connection retries, but on the other hand the previous behavior relied on skip_unavailable, that is a bit closer to the concept of connection but not really about reconnection either.
In conclusion, IMHO this is fine in terms of consistency (also compared to stateful).

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,11 @@ private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
// include_ccs_metadata is considered only if include_execution_metadata is not set
includeCcsMetadata = Boolean.TRUE.equals(request.includeCCSMetadata());
}
return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true), includeCcsMetadata);
Boolean allowPartialResults = request.allowPartialResults() != null ? request.allowPartialResults() : defaultAllowPartialResults;
return new EsqlExecutionInfo(
clusterAlias -> remoteClusterService.shouldSkipOnFailure(clusterAlias, allowPartialResults),
includeCcsMetadata
);
}

private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,11 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
}

@Override
protected void getRemoteConnection(String remoteCluster, ActionListener<Transport.Connection> listener) {
protected void getRemoteConnection(
String remoteCluster,
EsqlExecutionInfo executionInfo,
ActionListener<Transport.Connection> listener
) {
assertThat("Must only called on the local cluster", cluster, equalTo(LOCAL_CLUSTER_GROUP_KEY));
listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode()));
}
Expand Down