Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/125636.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125636
summary: Make `numberOfChannels` consistent with layout map by removing duplicated
`ChannelSet`
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,32 @@ private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilde
assertEquals(List.of(List.of(false, 9.1), List.of(true, 8.1)), result.get("values"));
}

public void testMultipleBatchesWithLookupJoin() throws IOException {
assumeTrue(
"Requires new null alias ids for join with multiple batches",
EsqlCapabilities.Cap.REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES.isEnabled()
);
// Create more than 10 indices to trigger multiple batches of data node execution.
// The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
for (int i = 1; i <= 20; i++) {
createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
}
bulkLoadTestDataLookupMode(10);
// lookup join with and without sort
for (String sort : List.of("", "| sort integer")) {
var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
Map<String, Object> result = runEsql(query);
var columns = as(result.get("columns"), List.class);
assertEquals(21, columns.size());
var values = as(result.get("values"), List.class);
assertEquals(10, values.size());
}
// clean up
for (int i = 1; i <= 20; i++) {
assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
}
}

public void testErrorMessageForLiteralDateMathOverflow() throws IOException {
List<String> dateMathOverflowExpressions = List.of(
"2147483647 day + 1 day",
Expand Down Expand Up @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) {
return "[" + value + ", " + value + "]";
}

private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
Request request = new Request("PUT", "/" + indexName);
String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}

public static RequestObjectBuilder requestObjectBuilder() throws IOException {
return new RequestObjectBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword
10092 | 1 | English
10093 | 3 | Spanish
;

multipleBatchesWithSort
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| sort language_code, birth_date
| keep language_code
| limit 1
;

language_code:integer
1
;

multipleBatchesWithMvExpand
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| mv_expand birth_date
| sort birth_date, language_code
| limit 1
;

birth_date:datetime |language_code:integer
1952-02-27T00:00:00.000Z |null
;

multipleBatchesWithAggregate1
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats x=max(birth_date), y=min(language_code)
;

x:datetime |y:integer
1965-01-03T00:00:00.000Z |1
;

multipleBatchesWithAggregate2
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(birth_date) by language_code
| sort language_code
| limit 1
;

m:datetime |language_code:integer
null |1
;

multipleBatchesWithAggregate3
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(language_code) by birth_date
| sort birth_date
| limit 1
;

m:integer |birth_date:datetime
null |1952-02-27T00:00:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,12 @@ public enum Cap {
/**
* Index component selector syntax (my-data-stream-name::failures)
*/
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),

/**
* Create null alias with new id in ReplaceMissingFieldWithNull when there is lookup join with multiple batches.
*/
REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the name should reflect the current fix more clearly.


private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,16 @@ public Layout build() {
Map<NameId, ChannelAndType> layout = new HashMap<>();
int numberOfChannels = 0;
for (ChannelSet set : channels) {
int channel = numberOfChannels++;
boolean createNewChannel = true;
int channel = 0;
for (NameId id : set.nameIds) {
if (layout.containsKey(id)) {
continue;
}
if (createNewChannel) {
channel = numberOfChannels++;
createNewChannel = false;
}
ChannelAndType next = new ChannelAndType(channel, set.type);
ChannelAndType prev = layout.put(id, next);
// Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238
Expand Down
Loading