Skip to content

Commit 9022ccc

Browse files
authored
ESQL: CATEGORIZE as a BlockHash (#114317)
Re-implement `CATEGORIZE` in a way that works for multi-node clusters. This requires that data is first categorized on each data node in a first pass, then the categorizers from each data node are merged on the coordinator node and previously categorized rows are re-categorized. BlockHashes, used in HashAggregations, already work in a very similar way. E.g. for queries like `... | STATS ... BY field1, field2` they map values for `field1` and `field2` to unique integer ids that are then passed to the actual aggregate functions to identify which "bucket" a row belongs to. When passed from the data nodes to the coordinator, the BlockHashes are also merged to obtain unique ids for every value in `field1, field2` that is seen on the coordinator (not only on the local data nodes). Therefore, we re-implement `CATEGORIZE` as a special BlockHash. To choose the correct BlockHash when a query plan is mapped to physical operations, the `AggregateExec` query plan node needs to know that we will be categorizing the field `message` in a query containing `... | STATS ... BY c = CATEGORIZE(message)`. For this reason, _we do not extract the expression_ `c = CATEGORIZE(message)` into an `EVAL` node, in contrast to e.g. `STATS ... BY b = BUCKET(field, 10)`. The expression `c = CATEGORIZE(message)` simply remains inside the `AggregateExec`'s groupings. **Important limitation:** For now, to use `CATEGORIZE` in a `STATS` command, there can be only 1 grouping (the `CATEGORIZE`) overall.
1 parent 418cbbf commit 9022ccc

File tree

35 files changed

+1660
-325
lines changed

35 files changed

+1660
-325
lines changed

docs/changelog/114317.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114317
2+
summary: "ESQL: CATEGORIZE as a `BlockHash`"
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/reference/esql/functions/kibana/definition/categorize.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/reference/esql/functions/types/categorize.asciidoc

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

muted-tests.yml

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ tests:
6767
- class: org.elasticsearch.xpack.transform.integration.TransformIT
6868
method: testStopWaitForCheckpoint
6969
issue: https://github.com/elastic/elasticsearch/issues/106113
70-
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
71-
method: test {categorize.Categorize SYNC}
72-
issue: https://github.com/elastic/elasticsearch/issues/113722
7370
- class: org.elasticsearch.kibana.KibanaThreadPoolIT
7471
method: testBlockedThreadPoolsRejectUserRequests
7572
issue: https://github.com/elastic/elasticsearch/issues/113939
@@ -126,12 +123,6 @@ tests:
126123
- class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT
127124
method: testLookbackWithIndicesOptions
128125
issue: https://github.com/elastic/elasticsearch/issues/116127
129-
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
130-
method: test {categorize.Categorize SYNC}
131-
issue: https://github.com/elastic/elasticsearch/issues/113054
132-
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
133-
method: test {categorize.Categorize ASYNC}
134-
issue: https://github.com/elastic/elasticsearch/issues/113055
135126
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
136127
method: test {p0=transform/transforms_start_stop/Test start already started transform}
137128
issue: https://github.com/elastic/elasticsearch/issues/98802
@@ -153,9 +144,6 @@ tests:
153144
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
154145
method: testAllocationPreventedForRemoval
155146
issue: https://github.com/elastic/elasticsearch/issues/116363
156-
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
157-
method: test {categorize.Categorize ASYNC}
158-
issue: https://github.com/elastic/elasticsearch/issues/116373
159147
- class: org.elasticsearch.threadpool.SimpleThreadPoolIT
160148
method: testThreadPoolMetrics
161149
issue: https://github.com/elastic/elasticsearch/issues/108320
@@ -168,9 +156,6 @@ tests:
168156
- class: org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsCanMatchOnCoordinatorIntegTests
169157
method: testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQueryingAnyNodeWhenTheyAreOutsideOfTheQueryRange
170158
issue: https://github.com/elastic/elasticsearch/issues/116523
171-
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
172-
method: test {categorize.Categorize}
173-
issue: https://github.com/elastic/elasticsearch/issues/116434
174159
- class: org.elasticsearch.upgrades.SearchStatesIT
175160
method: testBWCSearchStates
176161
issue: https://github.com/elastic/elasticsearch/issues/116617
@@ -229,9 +214,6 @@ tests:
229214
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
230215
method: test {p0=transform/transforms_reset/Test reset running transform}
231216
issue: https://github.com/elastic/elasticsearch/issues/117473
232-
- class: org.elasticsearch.xpack.esql.qa.single_node.FieldExtractorIT
233-
method: testConstantKeywordField
234-
issue: https://github.com/elastic/elasticsearch/issues/117524
235217
- class: org.elasticsearch.xpack.esql.qa.multi_node.FieldExtractorIT
236218
method: testConstantKeywordField
237219
issue: https://github.com/elastic/elasticsearch/issues/117524
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.aggregation.blockhash;
9+
10+
import org.apache.lucene.util.BytesRefBuilder;
11+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
12+
import org.elasticsearch.common.unit.ByteSizeValue;
13+
import org.elasticsearch.common.util.BigArrays;
14+
import org.elasticsearch.common.util.BitArray;
15+
import org.elasticsearch.common.util.BytesRefHash;
16+
import org.elasticsearch.compute.data.Block;
17+
import org.elasticsearch.compute.data.BlockFactory;
18+
import org.elasticsearch.compute.data.BytesRefVector;
19+
import org.elasticsearch.compute.data.IntBlock;
20+
import org.elasticsearch.compute.data.IntVector;
21+
import org.elasticsearch.compute.data.Page;
22+
import org.elasticsearch.core.ReleasableIterator;
23+
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationBytesRefHash;
24+
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationPartOfSpeechDictionary;
25+
import org.elasticsearch.xpack.ml.aggs.categorization.SerializableTokenListCategory;
26+
import org.elasticsearch.xpack.ml.aggs.categorization.TokenListCategorizer;
27+
28+
import java.io.IOException;
29+
30+
/**
31+
* Base BlockHash implementation for {@code Categorize} grouping function.
32+
*/
33+
public abstract class AbstractCategorizeBlockHash extends BlockHash {
34+
// TODO: this should probably also take an emitBatchSize
35+
private final int channel;
36+
private final boolean outputPartial;
37+
protected final TokenListCategorizer.CloseableTokenListCategorizer categorizer;
38+
39+
AbstractCategorizeBlockHash(BlockFactory blockFactory, int channel, boolean outputPartial) {
40+
super(blockFactory);
41+
this.channel = channel;
42+
this.outputPartial = outputPartial;
43+
this.categorizer = new TokenListCategorizer.CloseableTokenListCategorizer(
44+
new CategorizationBytesRefHash(new BytesRefHash(2048, blockFactory.bigArrays())),
45+
CategorizationPartOfSpeechDictionary.getInstance(),
46+
0.70f
47+
);
48+
}
49+
50+
protected int channel() {
51+
return channel;
52+
}
53+
54+
@Override
55+
public Block[] getKeys() {
56+
return new Block[] { outputPartial ? buildIntermediateBlock() : buildFinalBlock() };
57+
}
58+
59+
@Override
60+
public IntVector nonEmpty() {
61+
return IntVector.range(0, categorizer.getCategoryCount(), blockFactory);
62+
}
63+
64+
@Override
65+
public BitArray seenGroupIds(BigArrays bigArrays) {
66+
throw new UnsupportedOperationException();
67+
}
68+
69+
@Override
70+
public final ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
71+
throw new UnsupportedOperationException();
72+
}
73+
74+
/**
75+
* Serializes the intermediate state into a single BytesRef block, or an empty Null block if there are no categories.
76+
*/
77+
private Block buildIntermediateBlock() {
78+
if (categorizer.getCategoryCount() == 0) {
79+
return blockFactory.newConstantNullBlock(0);
80+
}
81+
try (BytesStreamOutput out = new BytesStreamOutput()) {
82+
// TODO be more careful here.
83+
out.writeVInt(categorizer.getCategoryCount());
84+
for (SerializableTokenListCategory category : categorizer.toCategoriesById()) {
85+
category.writeTo(out);
86+
}
87+
// We're returning a block with N positions just because the Page must have all blocks with the same position count!
88+
return blockFactory.newConstantBytesRefBlockWith(out.bytes().toBytesRef(), categorizer.getCategoryCount());
89+
} catch (IOException e) {
90+
throw new RuntimeException(e);
91+
}
92+
}
93+
94+
private Block buildFinalBlock() {
95+
try (BytesRefVector.Builder result = blockFactory.newBytesRefVectorBuilder(categorizer.getCategoryCount())) {
96+
BytesRefBuilder scratch = new BytesRefBuilder();
97+
for (SerializableTokenListCategory category : categorizer.toCategoriesById()) {
98+
scratch.copyChars(category.getRegex());
99+
result.appendBytesRef(scratch.get());
100+
scratch.clear();
101+
}
102+
return result.build().asBlock();
103+
}
104+
}
105+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.util.Int3Hash;
1515
import org.elasticsearch.common.util.LongHash;
1616
import org.elasticsearch.common.util.LongLongHash;
17+
import org.elasticsearch.compute.aggregation.AggregatorMode;
1718
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
1819
import org.elasticsearch.compute.aggregation.SeenGroupIds;
1920
import org.elasticsearch.compute.data.Block;
@@ -58,9 +59,7 @@
5859
* leave a big gap, even if we never see {@code null}.
5960
* </p>
6061
*/
61-
public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
62-
permits BooleanBlockHash, BytesRefBlockHash, DoubleBlockHash, IntBlockHash, LongBlockHash, BytesRef2BlockHash, BytesRef3BlockHash, //
63-
NullBlockHash, PackedValuesBlockHash, BytesRefLongBlockHash, LongLongBlockHash, TimeSeriesBlockHash {
62+
public abstract class BlockHash implements Releasable, SeenGroupIds {
6463

6564
protected final BlockFactory blockFactory;
6665

@@ -107,7 +106,15 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
107106
@Override
108107
public abstract BitArray seenGroupIds(BigArrays bigArrays);
109108

110-
public record GroupSpec(int channel, ElementType elementType) {}
109+
/**
110+
* @param isCategorize Whether this group is a CATEGORIZE() or not.
111+
* May be changed in the future when more stateful grouping functions are added.
112+
*/
113+
public record GroupSpec(int channel, ElementType elementType, boolean isCategorize) {
114+
public GroupSpec(int channel, ElementType elementType) {
115+
this(channel, elementType, false);
116+
}
117+
}
111118

112119
/**
113120
* Creates a specialized hash table that maps one or more {@link Block}s to ids.
@@ -159,6 +166,19 @@ public static BlockHash buildPackedValuesBlockHash(List<GroupSpec> groups, Block
159166
return new PackedValuesBlockHash(groups, blockFactory, emitBatchSize);
160167
}
161168

169+
/**
170+
* Builds a BlockHash for the Categorize grouping function.
171+
*/
172+
public static BlockHash buildCategorizeBlockHash(List<GroupSpec> groups, AggregatorMode aggregatorMode, BlockFactory blockFactory) {
173+
if (groups.size() != 1) {
174+
throw new IllegalArgumentException("only a single CATEGORIZE group can used");
175+
}
176+
177+
return aggregatorMode.isInputPartial()
178+
? new CategorizedIntermediateBlockHash(groups.get(0).channel, blockFactory, aggregatorMode.isOutputPartial())
179+
: new CategorizeRawBlockHash(groups.get(0).channel, blockFactory, aggregatorMode.isOutputPartial());
180+
}
181+
162182
/**
163183
* Creates a specialized hash table that maps a {@link Block} of the given input element type to ids.
164184
*/
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.aggregation.blockhash;
9+
10+
import org.apache.lucene.analysis.core.WhitespaceTokenizer;
11+
import org.apache.lucene.util.BytesRef;
12+
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
13+
import org.elasticsearch.compute.data.Block;
14+
import org.elasticsearch.compute.data.BlockFactory;
15+
import org.elasticsearch.compute.data.BytesRefBlock;
16+
import org.elasticsearch.compute.data.BytesRefVector;
17+
import org.elasticsearch.compute.data.IntBlock;
18+
import org.elasticsearch.compute.data.IntVector;
19+
import org.elasticsearch.compute.data.Page;
20+
import org.elasticsearch.core.Releasable;
21+
import org.elasticsearch.core.Releasables;
22+
import org.elasticsearch.index.analysis.CharFilterFactory;
23+
import org.elasticsearch.index.analysis.CustomAnalyzer;
24+
import org.elasticsearch.index.analysis.TokenFilterFactory;
25+
import org.elasticsearch.index.analysis.TokenizerFactory;
26+
import org.elasticsearch.xpack.ml.aggs.categorization.TokenListCategorizer;
27+
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
28+
29+
/**
30+
* BlockHash implementation for {@code Categorize} grouping function.
31+
* <p>
32+
* This implementation expects rows, and can't deserialize intermediate states coming from other nodes.
33+
* </p>
34+
*/
35+
public class CategorizeRawBlockHash extends AbstractCategorizeBlockHash {
36+
private final CategorizeEvaluator evaluator;
37+
38+
CategorizeRawBlockHash(int channel, BlockFactory blockFactory, boolean outputPartial) {
39+
super(blockFactory, channel, outputPartial);
40+
CategorizationAnalyzer analyzer = new CategorizationAnalyzer(
41+
// TODO: should be the same analyzer as used in Production
42+
new CustomAnalyzer(
43+
TokenizerFactory.newFactory("whitespace", WhitespaceTokenizer::new),
44+
new CharFilterFactory[0],
45+
new TokenFilterFactory[0]
46+
),
47+
true
48+
);
49+
this.evaluator = new CategorizeEvaluator(analyzer, categorizer, blockFactory);
50+
}
51+
52+
@Override
53+
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
54+
try (IntBlock result = (IntBlock) evaluator.eval(page.getBlock(channel()))) {
55+
addInput.add(0, result);
56+
}
57+
}
58+
59+
@Override
60+
public void close() {
61+
evaluator.close();
62+
}
63+
64+
/**
65+
* Similar implementation to an Evaluator.
66+
*/
67+
public static final class CategorizeEvaluator implements Releasable {
68+
private final CategorizationAnalyzer analyzer;
69+
70+
private final TokenListCategorizer.CloseableTokenListCategorizer categorizer;
71+
72+
private final BlockFactory blockFactory;
73+
74+
public CategorizeEvaluator(
75+
CategorizationAnalyzer analyzer,
76+
TokenListCategorizer.CloseableTokenListCategorizer categorizer,
77+
BlockFactory blockFactory
78+
) {
79+
this.analyzer = analyzer;
80+
this.categorizer = categorizer;
81+
this.blockFactory = blockFactory;
82+
}
83+
84+
public Block eval(BytesRefBlock vBlock) {
85+
BytesRefVector vVector = vBlock.asVector();
86+
if (vVector == null) {
87+
return eval(vBlock.getPositionCount(), vBlock);
88+
}
89+
IntVector vector = eval(vBlock.getPositionCount(), vVector);
90+
return vector.asBlock();
91+
}
92+
93+
public IntBlock eval(int positionCount, BytesRefBlock vBlock) {
94+
try (IntBlock.Builder result = blockFactory.newIntBlockBuilder(positionCount)) {
95+
BytesRef vScratch = new BytesRef();
96+
for (int p = 0; p < positionCount; p++) {
97+
if (vBlock.isNull(p)) {
98+
result.appendNull();
99+
continue;
100+
}
101+
int first = vBlock.getFirstValueIndex(p);
102+
int count = vBlock.getValueCount(p);
103+
if (count == 1) {
104+
result.appendInt(process(vBlock.getBytesRef(first, vScratch)));
105+
continue;
106+
}
107+
int end = first + count;
108+
result.beginPositionEntry();
109+
for (int i = first; i < end; i++) {
110+
result.appendInt(process(vBlock.getBytesRef(i, vScratch)));
111+
}
112+
result.endPositionEntry();
113+
}
114+
return result.build();
115+
}
116+
}
117+
118+
public IntVector eval(int positionCount, BytesRefVector vVector) {
119+
try (IntVector.FixedBuilder result = blockFactory.newIntVectorFixedBuilder(positionCount)) {
120+
BytesRef vScratch = new BytesRef();
121+
for (int p = 0; p < positionCount; p++) {
122+
result.appendInt(p, process(vVector.getBytesRef(p, vScratch)));
123+
}
124+
return result.build();
125+
}
126+
}
127+
128+
private int process(BytesRef v) {
129+
return categorizer.computeCategory(v.utf8ToString(), analyzer).getId();
130+
}
131+
132+
@Override
133+
public void close() {
134+
Releasables.closeExpectNoException(analyzer, categorizer);
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)