Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void wipeTestData() throws IOException {
}

private String getInexistentIndexErrorMessage() {
return "\"reason\" : \"Found 1 problem\\nline 1:1: Unknown index ";
return "Unknown index ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any valid reason for this change?

Fyi, the line:column information there is relevant for the accuracy of the parsing error messaging system. Those two numbers are linked to the Node itself (which comes from the ANTLR parser) and is used by the Verifier to build a consistent error message, no matter the error message itself and which Node it comes from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is too strict as it doesn't allow changing any details of how the errors are reported, and that led to some failures, but this may be older versions of the patch and I'll re-check why exactly it is needed. In general, I think our tests relying on specific error messages makes some things hard to refactor or fix (i.e. we have some parts of code produce "unknown index" and some "no such index" and we have a lot of tests that rely on that, and that means we essentially are testing implementation details instead of functionality, down to exact wording of the error message). In this particular case, beyond checking that there's an unknown index, it also checks that this is the only problem and that it is reported in a very specific way - which means if we changed how exactly unknown indices are handled in a particular case (e.g., say, moved the check from runtime to planning time, as we may have to do), this test would likely break. It may not be necessary now for this patch - going to check that - but I think this is something we may want to consider.

}

public void testInexistentIndexNameWithWildcard() throws IOException {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,8 @@ public DataType type() {
public List<String> originalTypes() {
return originalTypes;
}

public String toString() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required for the fix strictly speaking, but it improves observability of columns and makes it much easier to inspect them when debugging.

return "ColumnInfoImpl{" + "name='" + name + '\'' + ", type=" + type + ", originalTypes=" + originalTypes + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,10 @@ public void executePlan(
// starts computes on remote clusters
final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices);
for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) {
if (execInfo.getCluster(cluster.clusterAlias()).getStatus() != EsqlExecutionInfo.Cluster.Status.RUNNING) {
// if the cluster is already in the terminal state from the planning stage, no need to call it
continue;
}
clusterComputeHandler.startComputeOnRemoteCluster(
sessionId,
rootTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -146,7 +147,8 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu
StringBuilder sb = new StringBuilder();
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
// Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out.
if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) {
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
} else {
Expand Down Expand Up @@ -181,7 +183,11 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf
}
}

static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
static void updateExecutionInfoWithClustersWithNoMatchingIndices(
EsqlExecutionInfo executionInfo,
IndexResolution indexResolution,
QueryBuilder filter
) {
Set<String> clustersWithResolvedIndices = new HashSet<>();
// determine missing clusters
for (String indexName : indexResolution.resolvedIndices()) {
Expand All @@ -203,8 +209,8 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn
* Mark it as SKIPPED with 0 shards searched and took=0.
*/
for (String c : clustersWithNoMatchingIndices) {
if (executionInfo.getCluster(c).getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
// if cluster was already marked SKIPPED during enrich policy resolution, do not overwrite
if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) {
// if cluster was already in a terminal state, we don't need to check it again
continue;
}
final String indexExpression = executionInfo.getCluster(c).getIndexExpression();
Expand All @@ -218,16 +224,29 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn
} else {
fatalErrorMessage += "; " + error;
}
if (filter == null) {
// Not very useful since we don't send metadata on errors now, but may be useful in the future
// We check for filter since the filter may be the reason why the index is missing, and then it's ok
markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.FAILED, new VerificationException(error));
}
} else {
// no matching indices and no concrete index requested - just mark it as done, no error
markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null);
if (indexResolution.isValid()) {
// no matching indices and no concrete index requested - just mark it as done, no error
// We check for the valid resolution because if we have empty resolution it's still an error.
markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null);
}
}
}
if (fatalErrorMessage != null) {
throw new VerificationException(fatalErrorMessage);
}
}

// Filter-less version, mainly for testing where we don't need filter support
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, null);
}

// visible for testing
static boolean concreteIndexRequested(String indexExpression) {
if (Strings.isNullOrBlank(indexExpression)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,13 @@ public void analyzedPlan(
}).<PreAnalysisResult>andThen((l, result) -> {
// TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for
// invalid index resolution to updateExecutionInfo
if (result.indices.isValid()) {
// CCS indices and skip_unavailable cluster values can stop the analysis right here
if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return;
}
// If we run out of clusters to search due to unavailability we can stop the analysis right here
if (result.indices.isValid() && allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return;
// whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step
l.onResponse(result);
}).<PreAnalysisResult>andThen((l, result) -> {
// first attempt (maybe the only one) at analyzing the plan
analyzeAndMaybeRetry(analyzeAction, requestFilter, result, logicalPlanListener, l);
analyzeAndMaybeRetry(analyzeAction, requestFilter, result, executionInfo, logicalPlanListener, l);
}).<PreAnalysisResult>andThen((l, result) -> {
assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request";

Expand All @@ -374,6 +372,10 @@ public void analyzedPlan(
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
LogicalPlan plan;
try {
// the order here is tricky - if the cluster has been filtered and later became unavailable,
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.unavailableClusters());
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null);
plan = analyzeAction.apply(result);
} catch (Exception e) {
l.onFailure(e);
Expand Down Expand Up @@ -484,12 +486,12 @@ private boolean allCCSClustersSkipped(
ActionListener<LogicalPlan> logicalPlanListener
) {
IndexResolution indexResolution = result.indices;
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
// to let the LogicalPlanActionListener decide how to proceed
LOGGER.debug("No more clusters to search, ending analysis stage");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logging useful? If we see that in the logs, how does it help? Is it useful for debugging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is as it shows what is happening in the debug log - and yes, this is mainly for debugging.

logicalPlanListener.onFailure(new NoClustersToSearchException());
return true;
}
Expand All @@ -501,6 +503,7 @@ private static void analyzeAndMaybeRetry(
Function<PreAnalysisResult, LogicalPlan> analyzeAction,
QueryBuilder requestFilter,
PreAnalysisResult result,
EsqlExecutionInfo executionInfo,
ActionListener<LogicalPlan> logicalPlanListener,
ActionListener<PreAnalysisResult> l
) {
Expand All @@ -510,6 +513,10 @@ private static void analyzeAndMaybeRetry(
LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage);

try {
if (result.indices.isValid() || requestFilter != null) {
// Capture filtered out indices
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter);
}
plan = analyzeAction.apply(result);
} catch (Exception e) {
if (e instanceof VerificationException ve) {
Expand Down