Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.junit.Before;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -27,7 +27,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.equalTo;

@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug")
// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug")
public class ForkIT extends AbstractEsqlIntegTestCase {

@Before
Expand Down Expand Up @@ -800,6 +800,79 @@ public void testWithKeep() {
}
}

public void testWithUnsupportedFieldsWithSameBranches() {
var query = """
FROM test-other
| FORK
( WHERE id == "3")
( WHERE id == "2" )
| SORT _fork
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("content", "embedding", "id", "_fork"));
assertColumnTypes(resp.columns(), List.of("keyword", "unsupported", "keyword", "keyword"));
Iterable<Iterable<Object>> expectedValues = List.of(
Arrays.stream(new Object[] { "This dog is really brown", null, "3", "fork1" }).toList(),
Arrays.stream(new Object[] { "This is a brown dog", null, "2", "fork2" }).toList()
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithUnsupportedFieldsWithDifferentBranches() {
var query = """
FROM test-other
| FORK
( STATS x = count(*))
( WHERE id == "2" )
| SORT _fork
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("x", "_fork", "content", "embedding", "id"));
assertColumnTypes(resp.columns(), List.of("long", "keyword", "keyword", "unsupported", "keyword"));
Iterable<Iterable<Object>> expectedValues = List.of(
Arrays.stream(new Object[] { 3L, "fork1", null, null, null }).toList(),
Arrays.stream(new Object[] { null, "fork2", "This is a brown dog", null, "2" }).toList()
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithUnsupportedFieldsAndConflicts() {
var firstQuery = """
FROM test-other
| FORK
( STATS embedding = count(*))
( WHERE id == "2" )
| SORT _fork
""";
var e = expectThrows(VerificationException.class, () -> run(firstQuery));
assertTrue(e.getMessage().contains("Column [embedding] has conflicting data types"));

var secondQuery = """
FROM test-other
| FORK
( WHERE id == "2" )
( STATS embedding = count(*))
| SORT _fork
""";
e = expectThrows(VerificationException.class, () -> run(secondQuery));
assertTrue(e.getMessage().contains("Column [embedding] has conflicting data types"));

var thirdQuery = """
FROM test-other
| FORK
( WHERE id == "2" )
( WHERE id == "3" )
( STATS embedding = count(*))
| SORT _fork
""";
e = expectThrows(VerificationException.class, () -> run(thirdQuery));
assertTrue(e.getMessage().contains("Column [embedding] has conflicting data types"));
}

public void testWithEvalWithConflictingTypes() {
var query = """
FROM test
Expand Down Expand Up @@ -976,12 +1049,21 @@ private void createAndPopulateIndices() {

createRequest = client.prepareCreate(otherTestIndex)
.setSettings(Settings.builder().put("index.number_of_shards", 1))
.setMapping("id", "type=keyword", "content", "type=keyword");
.setMapping("id", "type=keyword", "content", "type=keyword", "embedding", "type=sparse_vector");
assertAcked(createRequest);
client().prepareBulk()
.add(new IndexRequest(otherTestIndex).id("1").source("id", "1", "content", "This is a brown fox"))
.add(new IndexRequest(otherTestIndex).id("2").source("id", "2", "content", "This is a brown dog"))
.add(new IndexRequest(otherTestIndex).id("3").source("id", "3", "content", "This dog is really brown"))
.add(
new IndexRequest(otherTestIndex).id("1")
.source("id", "1", "content", "This is a brown fox", "embedding", Map.of("abc", 1.0))
)
.add(
new IndexRequest(otherTestIndex).id("2")
.source("id", "2", "content", "This is a brown dog", "embedding", Map.of("def", 2.0))
)
.add(
new IndexRequest(otherTestIndex).id("3")
.source("id", "3", "content", "This dog is really brown", "embedding", Map.of("ghi", 1.0))
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
List<LogicalPlan> newSubPlans = new ArrayList<>();
List<Attribute> outputUnion = Fork.outputUnion(fork.children());
List<String> forkColumns = outputUnion.stream().map(Attribute::name).toList();
Set<String> unsupportedAttributeNames = Fork.outputUnsupportedAttributeNames(fork.children());

for (LogicalPlan logicalPlan : fork.children()) {
Source source = logicalPlan.source();
Expand All @@ -773,7 +774,12 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
}
}

List<Alias> aliases = missing.stream().map(attr -> new Alias(source, attr.name(), Literal.of(attr, null))).toList();
List<Alias> aliases = missing.stream().map(attr -> {
// We cannot assign an alias with an UNSUPPORTED data type, so we use another type that is
// supported. This way we can add this missing column containing only null values to the fork branch output.
var attrType = attr.dataType() == UNSUPPORTED ? KEYWORD : attr.dataType();
return new Alias(source, attr.name(), new Literal(attr.source(), null, attrType));
}).toList();

// add the missing columns
if (aliases.size() > 0) {
Expand All @@ -785,7 +791,6 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
// We need to add an explicit Keep even if the outputs align
// This is because at the moment the sub plans are executed and optimized separately and the output might change
// during optimizations. Once we add streaming we might not need to add a Keep when the outputs already align.
// Note that until we add explicit support for KEEP in FORK branches, this condition will always be true.
if (logicalPlan instanceof Keep == false || subPlanColumns.equals(forkColumns) == false) {
changed = true;
List<Attribute> newOutput = new ArrayList<>();
Expand All @@ -810,7 +815,7 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {

// We don't want to keep the same attributes that are outputted by the FORK branches.
// Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
for (Attribute attr : newSubPlans.getFirst().output()) {
for (Attribute attr : outputUnion) {
newOutput.add(new ReferenceAttribute(attr.source(), attr.name(), attr.dataType()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ private static void checkMissingFork(QueryPlan<?> plan, Failures failures) {

private static void checkMissingForkBranch(QueryPlan<?> plan, AttributeSet forkOutputSet, Failures failures) {
Map<String, DataType> attributeTypes = forkOutputSet.stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType));
AttributeSet missing = AttributeSet.of();
Set<Attribute> missing = new HashSet<>();

Set<String> commonAttrs = new HashSet<>();

// get the missing attributes from the sub plan
plan.output().forEach(attribute -> {
var attrType = attributeTypes.get(attribute.name());
if (attrType == null || attrType != attribute.dataType()) {
if (attrType == null || (attrType != attribute.dataType() && attrType != DataType.UNSUPPORTED)) {
missing.add(attribute);
}
commonAttrs.add(attribute.name());
Expand All @@ -99,7 +99,12 @@ private static void checkMissingForkBranch(QueryPlan<?> plan, AttributeSet forkO

if (missing.isEmpty() == false) {
failures.add(
fail(plan, "Plan [{}] optimized incorrectly due to missing attributes in subplans", plan.nodeString(), missing.toString())
fail(
plan,
"Plan [{}] optimized incorrectly due to missing attributes in subplans: [{}]",
plan.nodeString(),
missing.toString()
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,18 @@ public List<Attribute> output() {
public static List<Attribute> outputUnion(List<LogicalPlan> subplans) {
List<Attribute> output = new ArrayList<>();
Set<String> names = new HashSet<>();
// these are attribute names we know should have an UNSUPPORTED data type in the FORK output
Set<String> unsupportedAttributesNames = outputUnsupportedAttributeNames(subplans);

for (var subPlan : subplans) {
for (var attr : subPlan.output()) {
// When we have multiple attributes with the same name, the ones that have a supported data type take priority.
// We only add an attribute with an unsupported data type if we know that in the output of the rest of the FORK branches
// there exists no attribute with the same name and with a supported data type.
if (attr.dataType() == DataType.UNSUPPORTED && unsupportedAttributesNames.contains(attr.name()) == false) {
continue;
}

if (names.contains(attr.name()) == false && attr.name().equals(Analyzer.NO_FIELDS_NAME) == false) {
names.add(attr.name());
output.add(attr);
Expand All @@ -115,6 +124,34 @@ public static List<Attribute> outputUnion(List<LogicalPlan> subplans) {
return output;
}

/**
* Returns a list of attribute names that will need to have the @{code UNSUPPORTED} data type in FORK output.
* These are attributes that are either {@code UNSUPPORTED} or missing in each FORK branch.
* If two branches have the same attribute name, but only in one of them the data type is {@code UNSUPPORTED}, this constitutes
* data type conflict, and so this attribute name will not be returned by this function.
* Data type conflicts are later on checked in {@code postAnalysisPlanVerification}.
*/
public static Set<String> outputUnsupportedAttributeNames(List<LogicalPlan> subplans) {
Set<String> unsupportedAttributes = new HashSet<>();
Set<String> names = new HashSet<>();

for (var subPlan : subplans) {
for (var attr : subPlan.output()) {
var attrName = attr.name();
if (unsupportedAttributes.contains(attrName) == false
&& attr.dataType() == DataType.UNSUPPORTED
&& names.contains(attrName) == false) {
unsupportedAttributes.add(attrName);
} else if (unsupportedAttributes.contains(attrName) && attr.dataType() != DataType.UNSUPPORTED) {
unsupportedAttributes.remove(attrName);
}
names.add(attrName);
}
}

return unsupportedAttributes;
}

@Override
public int hashCode() {
return Objects.hash(Fork.class, children());
Expand Down Expand Up @@ -152,16 +189,20 @@ private static void checkFork(LogicalPlan plan, Failures failures) {
failures.add(Failure.fail(otherFork, "Only a single FORK command is allowed, but found multiple"));
});

Map<String, DataType> outputTypes = fork.children()
.getFirst()
.output()
.stream()
.collect(Collectors.toMap(Attribute::name, Attribute::dataType));
Map<String, DataType> outputTypes = fork.output().stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType));

fork.children().stream().skip(1).forEach(subPlan -> {
fork.children().forEach(subPlan -> {
for (Attribute attr : subPlan.output()) {
var actual = attr.dataType();
var expected = outputTypes.get(attr.name());

// If the FORK output has an UNSUPPORTED data type, we know there is no conflict.
// We only assign an UNSUPPORTED attribute in the FORK output when there exists no attribute with the
// same name and supported data type in any of the FORK branches.
if (expected == DataType.UNSUPPORTED) {
continue;
}

var actual = attr.dataType();
if (actual != expected) {
failures.add(
Failure.fail(
Expand Down