diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java index 3a530c5627115..00051264cbf6c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.operator.DriverProfile; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.parser.ParsingException; import org.junit.Before; @@ -19,6 +18,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -27,7 +27,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.hamcrest.Matchers.equalTo; -@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug") +// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug") public class ForkIT extends AbstractEsqlIntegTestCase { @Before @@ -800,6 +800,79 @@ public void testWithKeep() { } } + public void testWithUnsupportedFieldsWithSameBranches() { + var query = """ + FROM test-other + | FORK + ( WHERE id == "3") + ( WHERE id == "2" ) + | SORT _fork + """; + + try (var resp = run(query)) { + assertColumnNames(resp.columns(), List.of("content", "embedding", "id", "_fork")); + assertColumnTypes(resp.columns(), List.of("keyword", "unsupported", "keyword", "keyword")); + Iterable> expectedValues = List.of( + Arrays.stream(new Object[] { "This dog is really brown", null, "3", "fork1" }).toList(), + Arrays.stream(new Object[] { "This is a brown dog", null, "2", "fork2" }).toList() + ); + assertValues(resp.values(), expectedValues); + } + } + + public void testWithUnsupportedFieldsWithDifferentBranches() { + var query = """ + FROM test-other + | FORK + ( STATS x = count(*)) + ( WHERE id == "2" ) + | SORT _fork + """; + + try (var resp = run(query)) { + assertColumnNames(resp.columns(), List.of("x", "_fork", "content", "embedding", "id")); + assertColumnTypes(resp.columns(), List.of("long", "keyword", "keyword", "unsupported", "keyword")); + Iterable> expectedValues = List.of( + Arrays.stream(new Object[] { 3L, "fork1", null, null, null }).toList(), + Arrays.stream(new Object[] { null, "fork2", "This is a brown dog", null, "2" }).toList() + ); + assertValues(resp.values(), expectedValues); + } + } + + public void testWithUnsupportedFieldsAndConflicts() { + var firstQuery = """ + FROM test-other + | FORK + ( STATS embedding = count(*)) + ( WHERE id == "2" ) + | SORT _fork + """; + var e = expectThrows(VerificationException.class, () -> run(firstQuery)); + assertTrue(e.getMessage().contains("Column [embedding] has conflicting data types")); + + var secondQuery = """ + FROM test-other + | FORK + ( WHERE id == "2" ) + ( STATS embedding = count(*)) + | SORT _fork + """; + e = expectThrows(VerificationException.class, () -> run(secondQuery)); + assertTrue(e.getMessage().contains("Column [embedding] has conflicting data types")); + + var thirdQuery = """ + FROM test-other + | FORK + ( WHERE id == "2" ) + ( WHERE id == "3" ) + ( STATS embedding = count(*)) + | SORT _fork + """; + e = expectThrows(VerificationException.class, () -> run(thirdQuery)); + assertTrue(e.getMessage().contains("Column [embedding] has conflicting data types")); + } + public void testWithEvalWithConflictingTypes() { var query = """ FROM test @@ -976,12 +1049,21 @@ private void createAndPopulateIndices() { createRequest = client.prepareCreate(otherTestIndex) .setSettings(Settings.builder().put("index.number_of_shards", 1)) - .setMapping("id", "type=keyword", "content", "type=keyword"); + .setMapping("id", "type=keyword", "content", "type=keyword", "embedding", "type=sparse_vector"); assertAcked(createRequest); client().prepareBulk() - .add(new IndexRequest(otherTestIndex).id("1").source("id", "1", "content", "This is a brown fox")) - .add(new IndexRequest(otherTestIndex).id("2").source("id", "2", "content", "This is a brown dog")) - .add(new IndexRequest(otherTestIndex).id("3").source("id", "3", "content", "This dog is really brown")) + .add( + new IndexRequest(otherTestIndex).id("1") + .source("id", "1", "content", "This is a brown fox", "embedding", Map.of("abc", 1.0)) + ) + .add( + new IndexRequest(otherTestIndex).id("2") + .source("id", "2", "content", "This is a brown dog", "embedding", Map.of("def", 2.0)) + ) + .add( + new IndexRequest(otherTestIndex).id("3") + .source("id", "3", "content", "This dog is really brown", "embedding", Map.of("ghi", 1.0)) + ) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get(); ensureYellow(indexName); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 9f041263d302e..903d1af5b46d7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -760,6 +760,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { List newSubPlans = new ArrayList<>(); List outputUnion = Fork.outputUnion(fork.children()); List forkColumns = outputUnion.stream().map(Attribute::name).toList(); + Set unsupportedAttributeNames = Fork.outputUnsupportedAttributeNames(fork.children()); for (LogicalPlan logicalPlan : fork.children()) { Source source = logicalPlan.source(); @@ -773,7 +774,12 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { } } - List aliases = missing.stream().map(attr -> new Alias(source, attr.name(), Literal.of(attr, null))).toList(); + List aliases = missing.stream().map(attr -> { + // We cannot assign an alias with an UNSUPPORTED data type, so we use another type that is + // supported. This way we can add this missing column containing only null values to the fork branch output. + var attrType = attr.dataType() == UNSUPPORTED ? KEYWORD : attr.dataType(); + return new Alias(source, attr.name(), new Literal(attr.source(), null, attrType)); + }).toList(); // add the missing columns if (aliases.size() > 0) { @@ -785,7 +791,6 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { // We need to add an explicit Keep even if the outputs align // This is because at the moment the sub plans are executed and optimized separately and the output might change // during optimizations. Once we add streaming we might not need to add a Keep when the outputs already align. - // Note that until we add explicit support for KEEP in FORK branches, this condition will always be true. if (logicalPlan instanceof Keep == false || subPlanColumns.equals(forkColumns) == false) { changed = true; List newOutput = new ArrayList<>(); @@ -810,7 +815,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) { // We don't want to keep the same attributes that are outputted by the FORK branches. // Keeping the same attributes can have unintended side effects when applying optimizations like constant folding. - for (Attribute attr : newSubPlans.getFirst().output()) { + for (Attribute attr : outputUnion) { newOutput.add(new ReferenceAttribute(attr.source(), attr.name(), attr.dataType())); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java index 07809543ca3b1..f8def965b8ab1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java @@ -77,14 +77,14 @@ private static void checkMissingFork(QueryPlan plan, Failures failures) { private static void checkMissingForkBranch(QueryPlan plan, AttributeSet forkOutputSet, Failures failures) { Map attributeTypes = forkOutputSet.stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType)); - AttributeSet missing = AttributeSet.of(); + Set missing = new HashSet<>(); Set commonAttrs = new HashSet<>(); // get the missing attributes from the sub plan plan.output().forEach(attribute -> { var attrType = attributeTypes.get(attribute.name()); - if (attrType == null || attrType != attribute.dataType()) { + if (attrType == null || (attrType != attribute.dataType() && attrType != DataType.UNSUPPORTED)) { missing.add(attribute); } commonAttrs.add(attribute.name()); @@ -99,7 +99,12 @@ private static void checkMissingForkBranch(QueryPlan plan, AttributeSet forkO if (missing.isEmpty() == false) { failures.add( - fail(plan, "Plan [{}] optimized incorrectly due to missing attributes in subplans", plan.nodeString(), missing.toString()) + fail( + plan, + "Plan [{}] optimized incorrectly due to missing attributes in subplans: [{}]", + plan.nodeString(), + missing.toString() + ) ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java index c0e8b736fcb6c..d4848bbe6c226 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java @@ -103,9 +103,18 @@ public List output() { public static List outputUnion(List subplans) { List output = new ArrayList<>(); Set names = new HashSet<>(); + // these are attribute names we know should have an UNSUPPORTED data type in the FORK output + Set unsupportedAttributesNames = outputUnsupportedAttributeNames(subplans); for (var subPlan : subplans) { for (var attr : subPlan.output()) { + // When we have multiple attributes with the same name, the ones that have a supported data type take priority. + // We only add an attribute with an unsupported data type if we know that in the output of the rest of the FORK branches + // there exists no attribute with the same name and with a supported data type. + if (attr.dataType() == DataType.UNSUPPORTED && unsupportedAttributesNames.contains(attr.name()) == false) { + continue; + } + if (names.contains(attr.name()) == false && attr.name().equals(Analyzer.NO_FIELDS_NAME) == false) { names.add(attr.name()); output.add(attr); @@ -115,6 +124,34 @@ public static List outputUnion(List subplans) { return output; } + /** + * Returns a list of attribute names that will need to have the @{code UNSUPPORTED} data type in FORK output. + * These are attributes that are either {@code UNSUPPORTED} or missing in each FORK branch. + * If two branches have the same attribute name, but only in one of them the data type is {@code UNSUPPORTED}, this constitutes + * data type conflict, and so this attribute name will not be returned by this function. + * Data type conflicts are later on checked in {@code postAnalysisPlanVerification}. + */ + public static Set outputUnsupportedAttributeNames(List subplans) { + Set unsupportedAttributes = new HashSet<>(); + Set names = new HashSet<>(); + + for (var subPlan : subplans) { + for (var attr : subPlan.output()) { + var attrName = attr.name(); + if (unsupportedAttributes.contains(attrName) == false + && attr.dataType() == DataType.UNSUPPORTED + && names.contains(attrName) == false) { + unsupportedAttributes.add(attrName); + } else if (unsupportedAttributes.contains(attrName) && attr.dataType() != DataType.UNSUPPORTED) { + unsupportedAttributes.remove(attrName); + } + names.add(attrName); + } + } + + return unsupportedAttributes; + } + @Override public int hashCode() { return Objects.hash(Fork.class, children()); @@ -152,16 +189,20 @@ private static void checkFork(LogicalPlan plan, Failures failures) { failures.add(Failure.fail(otherFork, "Only a single FORK command is allowed, but found multiple")); }); - Map outputTypes = fork.children() - .getFirst() - .output() - .stream() - .collect(Collectors.toMap(Attribute::name, Attribute::dataType)); + Map outputTypes = fork.output().stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType)); - fork.children().stream().skip(1).forEach(subPlan -> { + fork.children().forEach(subPlan -> { for (Attribute attr : subPlan.output()) { - var actual = attr.dataType(); var expected = outputTypes.get(attr.name()); + + // If the FORK output has an UNSUPPORTED data type, we know there is no conflict. + // We only assign an UNSUPPORTED attribute in the FORK output when there exists no attribute with the + // same name and supported data type in any of the FORK branches. + if (expected == DataType.UNSUPPORTED) { + continue; + } + + var actual = attr.dataType(); if (actual != expected) { failures.add( Failure.fail(