Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
41279c7
ES|QL categorize with multiple groupings.
jan-elastic Dec 6, 2024
9f770d5
Fix VerifierTests
jan-elastic Dec 7, 2024
d7af8bf
Close stuff when constructing CategorizePackedValuesBlockHash fails
jan-elastic Dec 9, 2024
e6ac068
CategorizePackedValuesBlockHashTests
jan-elastic Dec 9, 2024
35e9811
Improve categorize javadocs
jan-elastic Dec 9, 2024
b44663a
Update docs/changelog/118173.yaml
jan-elastic Dec 9, 2024
82a38f9
Create CategorizePackedValuesBlockHash's deletegate page differently
jan-elastic Dec 9, 2024
5121e9a
Double check in BlockHash builder for single categorize
jan-elastic Dec 11, 2024
3c0325a
Reuse blocks array
jan-elastic Dec 11, 2024
e05f860
More CSV tests
jan-elastic Dec 11, 2024
abfc211
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 11, 2024
54ac1bf
Remove assumeTrue categorize_v5
jan-elastic Dec 11, 2024
5eb5189
Rename test
jan-elastic Dec 11, 2024
1deb2d4
Two more verifier tests
jan-elastic Dec 11, 2024
a633806
more CSV tests
jan-elastic Dec 11, 2024
e639268
Add JavaDocs/comments
jan-elastic Dec 11, 2024
5a4c8bb
spotless
jan-elastic Dec 11, 2024
5ecc9f9
Refactor/unify recategorize
jan-elastic Dec 11, 2024
16680d2
Better memory accounting
jan-elastic Dec 12, 2024
9588e10
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 12, 2024
a9c3ed7
fix csv test
jan-elastic Dec 12, 2024
8dd15ac
randomize CategorizePackedValuesBlockHashTests
jan-elastic Dec 12, 2024
7f82b43
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 12, 2024
0b0be87
Add TODO
jan-elastic Dec 12, 2024
3bccc78
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cat…
jan-elastic Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,16 @@ private IntBlock addIntermediate(Page page) {
seenNull = true;
return blockFactory.newConstantIntBlockWith(NULL_ORD, 1);
}

Map<Integer, Integer> idMap = readIntermediate(categorizerState.getBytesRef(0, new BytesRef()));
try (IntBlock.Builder newIdsBuilder = blockFactory.newIntBlockBuilder(idMap.size())) {
int fromId = idMap.containsKey(0) ? 0 : 1;
int toId = fromId + idMap.size();
for (int i = fromId; i < toId; i++) {
newIdsBuilder.appendInt(idMap.get(i));
}
return newIdsBuilder.build();
}
int[] ids = recategorize(categorizerState.getBytesRef(0, new BytesRef()), null);
return blockFactory.newIntArrayVector(ids, ids.length).asBlock();
}

/**
* Read intermediate state from a block.
*
* @return a map from the old category id to the new one. The old ids go from 0 to {@code size - 1}.
* Reads the intermediate state from a block and recategorizes the provided IDs.
* If no IDs are provided, the IDs are the IDs in the categorizer's state in order.
* (So 0...N-1 or 1...N, depending on whether null is present.)
*/
Map<Integer, Integer> readIntermediate(BytesRef bytes) {
int[] recategorize(BytesRef bytes, int[] ids) {
Map<Integer, Integer> idMap = new HashMap<>();
try (StreamInput in = new BytesArray(bytes).streamInput()) {
if (in.readBoolean()) {
Expand All @@ -184,10 +176,20 @@ Map<Integer, Integer> readIntermediate(BytesRef bytes) {
// +1 because the 0 ordinal is reserved for null
idMap.put(oldCategoryId + 1, newCategoryId + 1);
}
return idMap;
} catch (IOException e) {
throw new RuntimeException(e);
}
if (ids == null) {
ids = new int[idMap.size()];
int idOffset = idMap.containsKey(0) ? 0 : 1;
for (int i = 0; i < idMap.size(); i++) {
ids[i] = i + idOffset;
}
}
for (int i = 0; i < ids.length; i++) {
ids[i] = idMap.get(ids[i]);
}
return ids;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* BlockHash implementation for {@code Categorize} grouping function as first
* grouping expression, followed by one or mode other grouping expressions.
* <p>
* For the first grouping (the {@code Categorize} grouping function), a
* {@code CategorizeBlockHash} is used, which outputs integers (category IDs).
* Next, a {@code PackedValuesBlockHash} is used on the category IDs and the
* other groupings (which are not {@code Categorize}s).
*/
public class CategorizePackedValuesBlockHash extends BlockHash {

Expand Down Expand Up @@ -91,17 +95,11 @@ private IntBlock getCategories(Page page) {
} else {
BytesRefBlock stateBlock = page.getBlock(0);
BytesRef stateBytes = stateBlock.getBytesRef(0, new BytesRef());

try (StreamInput in = new BytesArray(stateBytes).streamInput()) {
BytesRef categorizerState = in.readBytesRef();
Map<Integer, Integer> idMap = categorizeBlockHash.readIntermediate(categorizerState);
int[] oldIds = in.readIntArray();
try (IntBlock.Builder newIds = blockFactory.newIntBlockBuilder(page.getPositionCount())) {
for (int oldId : oldIds) {
newIds.appendInt(idMap.get(oldId));
}
return newIds.build();
}
int[] ids = in.readIntArray();
ids = categorizeBlockHash.recategorize(categorizerState, ids);
return blockFactory.newIntArrayVector(ids, ids.length).asBlock();
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -112,6 +110,7 @@ private IntBlock getCategories(Page page) {
public Block[] getKeys() {
Block[] keys = packedValuesBlockHash.getKeys();
if (aggregatorMode.isOutputPartial() == false) {
// For final output, the keys are the category regexes.
try (
BytesRefBlock regexes = (BytesRefBlock) categorizeBlockHash.getKeys()[0];
BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(keys[0].getPositionCount())
Expand All @@ -131,9 +130,15 @@ public Block[] getKeys() {
keys[0] = builder.build();
}
} else {
// For intermediate output, the keys are the delegate PackedValuesBlockHash's
// keys, with the category IDs replaced by the categorizer's internal state
// together with the list of category IDs.
BytesRef state;
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeBytesRef(categorizeBlockHash.serializeCategorizer());
// It's a bit inefficient to copy the IntVector's values into an int[]
// and discard the array soon after. IntVector should maybe expose the
// underlying array instead. TODO: investigate whether that's worth it
IntVector idsVector = (IntVector) keys[0].asVector();
int[] idsArray = new int[idsVector.getPositionCount()];
for (int i = 0; i < idsVector.getPositionCount(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ COUNT():long | VALUES(str):keyword | category:keyword
1 | [a, b, c] | .*?disconnected.*?
;

limit before stats
required_capability: categorize_v5

FROM sample_data | SORT message | LIMIT 4
| STATS count=COUNT() BY category=CATEGORIZE(message)
| SORT category
;

count:long | category:keyword
3 | .*?Connected.+?to.*?
1 | .*?Connection.+?error.*?
;

skips stopwords
required_capability: categorize_v5

Expand Down Expand Up @@ -647,6 +660,22 @@ count:long | category:keyword | timestamp:datetime
1 | .*?Disconnected.*? | 2023-10-23T13:00:00.000Z
;


multiple groupings with categorize and limit before stats
required_capability: categorize_v5

FROM sample_data | SORT message | LIMIT 5
| STATS count=COUNT() BY category=CATEGORIZE(message), client_ip
| SORT category, client_ip
;

count:long | category:keyword | client_ip:ip
1 | .*?Connected.+?to.*? | 172.21.2.113
1 | .*?Connected.+?to.*? | 172.21.2.162
1 | .*?Connected.+?to.*? | 172.21.3.15
2 | .*?Connection.+?error.*? | 172.21.3.15
;

multiple groupings with categorize and nulls
required_capability: categorize_multiple_groupings

Expand Down Expand Up @@ -741,3 +770,17 @@ lang_low:double | lang_high:double | category:keyword | gender:keyword
3.0 | 3.25 | .*?Architect.*? | F
3.75 | null | .*?Architect.*? | M
;

multiple groupings with categorize on null row
required_capability: categorize_multiple_groupings

ROW message = null, str = ["a", "b", "c"]
| STATS COUNT(), VALUES(str) BY category=CATEGORIZE(message), str
| SORT str
;

COUNT():long | VALUES(str):keyword | category:keyword | str:keyword
1 | [a, b, c] | null | a
1 | [a, b, c] | null | b
1 | [a, b, c] | null | c
;
Original file line number Diff line number Diff line change
Expand Up @@ -1894,9 +1894,7 @@ public void testIntervalAsString() {
);
}

public void testCategorizeSingleGrouping() {
assumeTrue("requires Categorize capability", EsqlCapabilities.Cap.CATEGORIZE_V5.isEnabled());

public void testCategorizeOnlyFirstGrouping() {
query("FROM test | STATS COUNT(*) BY CATEGORIZE(first_name)");
query("FROM test | STATS COUNT(*) BY cat = CATEGORIZE(first_name)");
query("FROM test | STATS COUNT(*) BY CATEGORIZE(first_name), emp_no");
Expand All @@ -1914,11 +1912,17 @@ public void testCategorizeSingleGrouping() {
"1:55: CATEGORIZE grouping function [CATEGORIZE(first_name)] can only be in the first grouping expression",
error("FROM test | STATS COUNT(*) BY CATEGORIZE(first_name), CATEGORIZE(first_name)")
);
assertEquals(
"1:63: CATEGORIZE grouping function [CATEGORIZE(last_name)] can only be in the first grouping expression",
error("FROM test | STATS COUNT(*) BY CATEGORIZE(first_name), emp_no, CATEGORIZE(last_name)")
);
assertEquals(
"1:63: CATEGORIZE grouping function [CATEGORIZE(first_name)] can only be in the first grouping expression",
error("FROM test | STATS COUNT(*) BY CATEGORIZE(first_name), emp_no, CATEGORIZE(first_name)")
);
}

public void testCategorizeNestedGrouping() {
assumeTrue("requires Categorize capability", EsqlCapabilities.Cap.CATEGORIZE_V5.isEnabled());

query("from test | STATS COUNT(*) BY CATEGORIZE(LENGTH(first_name)::string)");

assertEquals(
Expand All @@ -1932,8 +1936,6 @@ public void testCategorizeNestedGrouping() {
}

public void testCategorizeWithinAggregations() {
assumeTrue("requires Categorize capability", EsqlCapabilities.Cap.CATEGORIZE_V5.isEnabled());

query("from test | STATS MV_COUNT(cat), COUNT(*) BY cat = CATEGORIZE(first_name)");
query("from test | STATS MV_COUNT(CATEGORIZE(first_name)), COUNT(*) BY cat = CATEGORIZE(first_name)");
query("from test | STATS MV_COUNT(CATEGORIZE(first_name)), COUNT(*) BY CATEGORIZE(first_name)");
Expand Down Expand Up @@ -1962,8 +1964,6 @@ public void testCategorizeWithinAggregations() {
}

public void testCategorizeWithFilteredAggregations() {
assumeTrue("requires Categorize capability", EsqlCapabilities.Cap.CATEGORIZE_V5.isEnabled());

query("FROM test | STATS COUNT(*) WHERE first_name == \"John\" BY CATEGORIZE(last_name)");
query("FROM test | STATS COUNT(*) WHERE last_name == \"Doe\" BY CATEGORIZE(last_name)");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.TestBlockFactory;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
Expand Down Expand Up @@ -1212,8 +1211,6 @@ public void testCombineProjectionWithAggregationFirstAndAliasedGroupingUsedInAgg
* \_EsRelation[test][_meta_field{f}#23, emp_no{f}#17, first_name{f}#18, ..]
*/
public void testCombineProjectionWithCategorizeGrouping() {
assumeTrue("requires Categorize capability", EsqlCapabilities.Cap.CATEGORIZE_V5.isEnabled());

var plan = plan("""
from test
| eval k = first_name, k1 = k
Expand Down Expand Up @@ -3949,8 +3946,6 @@ public void testNestedExpressionsInGroups() {
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
*/
public void testNestedExpressionsInGroupsWithCategorize() {
assumeTrue("requires Categorize capability", EsqlCapabilities.Cap.CATEGORIZE_V5.isEnabled());

var plan = optimizedPlan("""
from test
| stats c = count(salary) by CATEGORIZE(CONCAT(first_name, "abc"))
Expand Down
Loading