Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,26 @@ public void testIndicesDontExist() throws IOException {
ResponseException e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo"))));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo]"), containsString("Unknown index [remote_cluster:foo]")));
assertThat(
e.getMessage(),
anyOf(
containsString("Unknown index [foo]"),
containsString("Unknown index [*:foo]"),
containsString("Unknown index [remote_cluster:foo]")
)
);

e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo*"))));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo*]"), containsString("Unknown index [remote_cluster:foo*]")));
assertThat(
e.getMessage(),
anyOf(
containsString("Unknown index [foo*]"),
containsString("Unknown index [*:foo*]"),
containsString("Unknown index [remote_cluster:foo*]")
)
);

e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo, test1")));
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,19 @@ public void testQueryAgainstNonMatchingClusterWildcardPattern() {

// since this wildcarded expression does not resolve to a valid remote cluster, it is not considered
// a cross-cluster search and thus should not throw a license error
String q = "FROM xremote*:events";
{
String limit1 = q + " | STATS count(*)";
try (EsqlQueryResponse resp = runQuery(limit1, requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}
try (EsqlQueryResponse resp = runQuery("FROM xremote*:events | STATS count(*)", requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}

String limit0 = q + " | LIMIT 0";
try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
assertThat(getValuesList(resp).size(), equalTo(0));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}
try (EsqlQueryResponse resp = runQuery("FROM xremote*:events | LIMIT 0", requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
assertThat(getValuesList(resp).size(), equalTo(0));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,24 +383,24 @@ protected void testSearchesAgainstNonMatchingIndices(boolean exceptionWithSkipUn
// an error is thrown if there is a concrete index that does not match
{
String q = "FROM nomatch*,cluster-a:nomatch";
String expectedError = "Unknown index [cluster-a:nomatch,nomatch*]";
String expectedError = "Unknown index [nomatch*,cluster-a:nomatch]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in 3 below cases the error message index now matches the original input (notice that the order was different).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ordering stable - i.e. is it going to always match original order, no matter how many patterns and clusters we have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, error uses indexPattern. We are supplying original user input into it now.

}

// an error is thrown if there are no matching indices at all - local with wildcard, remote with wildcard
{
String q = "FROM nomatch*,cluster-a:nomatch*";
String expectedError = "Unknown index [cluster-a:nomatch*,nomatch*]";
String expectedError = "Unknown index [nomatch*,cluster-a:nomatch*]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
}
{
String q = "FROM nomatch,cluster-a:nomatch";
String expectedError = "Unknown index [cluster-a:nomatch,nomatch]";
String expectedError = "Unknown index [nomatch,cluster-a:nomatch]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
}
{
String q = "FROM nomatch,cluster-a:nomatch*";
String expectedError = "Unknown index [cluster-a:nomatch*,nomatch]";
String expectedError = "Unknown index [nomatch,cluster-a:nomatch*]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public PreAnalysis preAnalyze(LogicalPlan plan) {

protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
Holder<IndexMode> indexMode = new Holder<>();
Holder<IndexPattern> index = new Holder<>();
Holder<IndexPattern> indexPattern = new Holder<>();
List<IndexPattern> lookupIndices = new ArrayList<>();
plan.forEachUp(UnresolvedRelation.class, p -> {
if (p.indexMode() == IndexMode.LOOKUP) {
lookupIndices.add(p.indexPattern());
} else if (indexMode.get() == null || indexMode.get() == p.indexMode()) {
indexMode.set(p.indexMode());
index.set(p.indexPattern());
indexPattern.set(p.indexPattern());
} else {
throw new IllegalStateException("index mode is already set");
}
Expand Down Expand Up @@ -92,7 +92,7 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {

return new PreAnalysis(
indexMode.get(),
index.get(),
indexPattern.get(),
unresolvedEnriches,
lookupIndices,
indexMode.get() == IndexMode.TIME_SERIES || supportsAggregateMetricDouble.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,30 +155,6 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn
}
}

static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
StringBuilder sb = new StringBuilder();
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
// Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out.
if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) {
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
} else {
String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
for (String index : indexExpression.split(",")) {
sb.append(clusterAlias).append(':').append(index).append(',');
}
}
}
}

if (sb.length() > 0) {
return sb.substring(0, sb.length() - 1);
} else {
return "";
}
}

static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo, String localPattern) {
if (executionInfo.getClusters().isEmpty()) {
return localPattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,15 +695,14 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SYSTEM_READ
);
if (preAnalysis.indexPattern() != null) {
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
if (executionInfo.clusterAliases().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're back to the same question of whether this is right or not when we call it the second time. Is this intentional to have it here, or it's just sneaked in from merging the previous patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #135353 (comment)

OK, I see! But we are calling preAnalyzeMainIndices twice - once at the beginning (now) and once if the filtered analysis fails. Is that also true for the second invocation? In any case, I'd add a comment to it - with this change, the existing comment is no longer true, and would be just confusing.

There are several aspects to this question:

  • today this behaves different from local only resolution: local does not allow empty resolution. We would like to change it to alight with DSL behavior. Once that is done this check should actually go away.
  • with respect for retrying analysis without filter: this would retry the same expression (even if some clusters should be skipped resulting in duplicated connection errors). I feel like this falls into ES-12978 issue category. With flat expression it is very tricky to manipulate input to exclude those clusters. Since we are going to be restarting analysis from scratch I would rather clear errors and start analysis fresh with no filter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All considered, IMHO this seems reasonable.

I feel like this falls into ES-12978

ES-12978 is for CPS, but I guess we have the same problem with CCS now.
My assumption is that we'll address this problem before 9.3 FF.

Since we are going to be restarting analysis from scratch I would rather clear errors and start analysis fresh

++

// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
listener.onResponse(
result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())))
);
} else {
indexResolver.resolveAsMergedMapping(
indexExpressionToResolve,
preAnalysis.indexPattern().indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (preAnalysis.indexMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ private static FieldCapabilitiesRequest createFieldCapsRequest(
req.fields(fieldNames.toArray(String[]::new));
req.includeUnmapped(true);
req.indexFilter(requestFilter);
req.returnLocalAll(false);
// lenient because we throw our own errors looking at the response e.g. if something was not resolved
// also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable
req.indicesOptions(FIELD_CAPS_INDICES_OPTIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,62 +59,6 @@ public class EsqlCCSUtilsTests extends ESTestCase {
private final String REMOTE1_ALIAS = "remote1";
private final String REMOTE2_ALIAS = "remote2";

public void testCreateIndexExpressionFromAvailableClusters() {
var skipped = EsqlExecutionInfo.Cluster.Status.SKIPPED;
// no clusters marked as skipped
{
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));
assertIndexPattern(
EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo),
containsInAnyOrder("logs*", "remote1:*", "remote2:mylogs1", "remote2:mylogs2", "remote2:logs*")
);
}

// one cluster marked as skipped, so not present in revised index expression
{
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true));
executionInfo.swapCluster(
REMOTE2_ALIAS,
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
);
assertIndexPattern(
EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo),
containsInAnyOrder("logs*", "remote1:*", "remote1:foo")
);
}

// two clusters marked as skipped, so only local cluster present in revised index expression
{
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, skipped));
executionInfo.swapCluster(
REMOTE2_ALIAS,
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
);
assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*"));
}

// only remotes present and all marked as skipped, so in revised index expression should be empty string
{
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
executionInfo.swapCluster(
REMOTE1_ALIAS,
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED)
);
executionInfo.swapCluster(
REMOTE2_ALIAS,
(k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, skipped)
);
assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo(""));
}
}

public void testCreateQualifiedLookupIndexExpressionFromAvailableClusters() {

var skipped = EsqlExecutionInfo.Cluster.Status.SKIPPED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -986,16 +983,7 @@ public void testAlias() throws Exception {
Request request = esqlRequest("FROM " + index + " | KEEP emp_id | SORT emp_id | LIMIT 100");
ResponseException error = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(request));
assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(400));
String expectedIndexExpressionInError = index.replace("*", "my_remote_cluster");
Pattern p = Pattern.compile("Unknown index \\[([^\\]]+)\\]");
Matcher m = p.matcher(error.getMessage());
assertTrue("Pattern matcher to parse error message did not find matching string: " + error.getMessage(), m.find());
String unknownIndexExpressionInErrorMessage = m.group(1);
Set<String> actualUnknownIndexes = org.elasticsearch.common.Strings.commaDelimitedListToSet(
unknownIndexExpressionInErrorMessage
);
Set<String> expectedUnknownIndexes = org.elasticsearch.common.Strings.commaDelimitedListToSet(expectedIndexExpressionInError);
assertThat(actualUnknownIndexes, equalTo(expectedUnknownIndexes));
assertThat(error.getMessage(), containsString("Unknown index [" + index + "]"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Match original query pattern, not semi resolved pattern

}

for (var index : List.of(
Expand Down