Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ public static <E> AttributeMap<E> of(Attribute key, E value) {
return map;
}

public static <E> AttributeMap<E> mapAll(Collection<? extends E> collection, Function<E, Attribute> keyMapper) {
final AttributeMap<E> map = new AttributeMap<>();
collection.forEach(e -> map.add(keyMapper.apply(e), e));
return map;
}

public static <E> Builder<E> builder() {
return new Builder<>(new AttributeMap<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,18 @@ public void testValuesIteratorRemoval() {
it.remove();
assertThat(it.hasNext(), is(false));
}

public void testMappAll() {
var one = a("one");
var two = a("two");
var three = a("three");

Collection<Attribute> collection = asList(one, two, three);
var map = AttributeMap.mapAll(collection, NamedExpression::toAttribute);

var builder = AttributeMap.builder();
collection.forEach(e -> builder.put(e, e.toAttribute()));

assertThat(map, is(builder.build()));
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes make insure that the country field is part of the languages_lookup_non_unique_key schema. Otherwise the CSV tests generate an index resolution without it, which leads to failed queries when using the index in FROM.
The index was previously using similar languages schema, which lacks the fields, but which was added in dynamically by the data. Also, the index had only been used in LOOKUP JOIN before.
The change then also leads to updated order of the MV value ["Germany", "Austria"]

Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ public class CsvTestsDataLoader {
).withSetting("lookup-settings.json");
private static final TestDataset LANGUAGES = new TestDataset("languages");
private static final TestDataset LANGUAGES_LOOKUP = LANGUAGES.withIndex("languages_lookup").withSetting("lookup-settings.json");
private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_LOOKUP.withIndex("languages_lookup_non_unique_key")
.withData("languages_non_unique_key.csv");
private static final TestDataset LANGUAGES_NON_UNIQUE_KEY = new TestDataset("languages_non_unique_key");
private static final TestDataset LANGUAGES_LOOKUP_NON_UNIQUE_KEY = LANGUAGES_NON_UNIQUE_KEY.withIndex("languages_lookup_non_unique_key")
.withSetting("lookup-settings.json");
private static final TestDataset LANGUAGES_NESTED_FIELDS = new TestDataset(
"languages_nested_fields",
"mapping-languages_nested_fields.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3970,3 +3970,23 @@ FROM k8s-downsampled
2024-05-09T23:30:00.000Z | staging | three | {"min":341.0,"max":592.0,"sum":1956.0,"value_count":5} | 824.0
2024-05-09T23:30:00.000Z | staging | two | {"min":442.0,"max":1011.0,"sum":3850.0,"value_count":7} | 1419.0
;

selfShadowing
required_capability: inline_stats
required_capability: fix_join_output_merging

FROM languages_lookup_non_unique_key
| KEEP country, language_name
| EVAL language_code = null::integer
| INLINE STATS MAX(language_code) BY language_code
| SORT country
| LIMIT 5
;

country:text |language_name:keyword |MAX(language_code):integer |language_code:integer
Atlantis |null |null |null
[Austria, Germany]|German |null |null
Canada |English |null |null
Mv-Land |Mv-Lang |null |null
Mv-Land2 |Mv-Lang2 |null |null
;
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ language_code:integer | language_name:keyword
6 |null
;

selfShadowing
required_capability: join_lookup_v12
required_capability: fix_join_output_merging

FROM languages_lookup_non_unique_key
| EVAL language_code = null::integer
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| LIMIT 3
;

language_code:integer |country:text |country.keyword:keyword |language_name:keyword
null |null |null |null
null |null |null |null
null |null |null |null
;

nonUniqueLeftKeyOnTheDataNode
required_capability: join_lookup_v12

Expand Down Expand Up @@ -240,7 +256,7 @@ emp_no:integer | language_code:integer | language_name:keyword | country:text
10091 | 1 | null | United Kingdom
10091 | 1 | English | United States of America
10091 | 1 | English | null
10092 | 2 | German | [Germany, Austria]
10092 | 2 | German | [Austria, Germany]
10092 | 2 | German | Switzerland
10092 | 2 | German | null
10093 | 3 | null | null
Expand All @@ -265,7 +281,7 @@ emp_no:integer | language_code:integer | language_name:keyword | country:text
10001 | 1 | English | null
10001 | 1 | null | United Kingdom
10001 | 1 | English | United States of America
10002 | 2 | German | [Germany, Austria]
10002 | 2 | German | [Austria, Germany]
10002 | 2 | German | Switzerland
10002 | 2 | German | null
10003 | 3 | null | null
Expand Down Expand Up @@ -308,7 +324,7 @@ ROW language_code = 2

ignoreOrder:true
language_code:integer | country:text | language_name:keyword
2 | [Germany, Austria] | German
2 | [Austria, Germany] | German
2 | Switzerland | German
2 | null | German
;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"properties" : {
"language_code" : {
"type" : "integer"
},
"language_name" : {
"type" : "keyword"
},
"country": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,11 @@ public enum Cap {
*/
FIX_JOIN_MASKING_REGEX_EXTRACT,

/**
* Allow the merging of the children to use {@code Aliase}s, instead of just {@code ReferenceAttribute}s.
*/
FIX_JOIN_OUTPUT_MERGING,

/**
* Avid GROK and DISSECT attributes being removed when resolving fields.
* see <a href="https://github.com/elastic/elasticsearch/issues/127468"> ES|QL: Grok only supports KEYWORD or TEXT values, found expression [type] type [INTEGER] #127468 </a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.util.Holder;
Expand All @@ -29,7 +31,7 @@
import java.util.List;
import java.util.Set;

import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputExpressions;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;

/**
Expand Down Expand Up @@ -217,16 +219,19 @@ public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
}

@Override
List<Attribute> computeOutput(List<Attribute> left, List<Attribute> right) {
public List<NamedExpression> computeOutputExpressions(List<? extends NamedExpression> left, List<? extends NamedExpression> right) {
JoinType joinType = config().type();
List<Attribute> output;
List<NamedExpression> output;
if (LEFT.equals(joinType)) {
List<Attribute> leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList();
List<Attribute> rightWithAppendedKeys = new ArrayList<>(right);
rightWithAppendedKeys.removeAll(config().rightFields());
rightWithAppendedKeys.addAll(config().leftFields());

output = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys);
List<? extends NamedExpression> leftOutputWithoutKeys = left.stream()
.filter(ne -> config().leftFields().contains(ne.toAttribute()) == false)
.toList();
List<NamedExpression> rightWithAppendedLeftKeys = new ArrayList<>(right);
rightWithAppendedLeftKeys.removeIf(ne -> config().rightFields().contains(ne.toAttribute()));
AttributeMap<NamedExpression> leftAttrMap = AttributeMap.mapAll(left, NamedExpression::toAttribute);
config().leftFields().forEach(lk -> rightWithAppendedLeftKeys.add(leftAttrMap.getOrDefault(lk, lk)));

output = mergeOutputExpressions(rightWithAppendedLeftKeys, leftOutputWithoutKeys);
} else {
throw new IllegalArgumentException(joinType.joinName() + " unsupported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ protected NodeInfo<Join> info() {
@Override
public List<Attribute> output() {
if (lazyOutput == null) {
lazyOutput = computeOutput(left().output(), right().output());
lazyOutput = Expressions.asAttributes(computeOutputExpressions(left().output(), right().output()));
}
return lazyOutput;
}
Expand Down Expand Up @@ -211,10 +211,6 @@ public List<Attribute> rightOutputFields() {
return rightOutputFields;
}

List<Attribute> computeOutput(List<Attribute> left, List<Attribute> right) {
return Expressions.asAttributes(computeOutputExpressions(left, right, config));
}

public List<NamedExpression> computeOutputExpressions(List<? extends NamedExpression> left, List<? extends NamedExpression> right) {
return computeOutputExpressions(left, right, config);
}
Expand Down