-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ES|QL categorize with multiple groupings #118173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
41279c7
9f770d5
d7af8bf
e6ac068
35e9811
b44663a
82a38f9
5121e9a
3c0325a
e05f860
abfc211
54ac1bf
5eb5189
1deb2d4
a633806
e639268
5a4c8bb
5ecc9f9
16680d2
9588e10
a9c3ed7
8dd15ac
7f82b43
0b0be87
3bccc78
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 118173 | ||
| summary: ES|QL categorize with multiple groupings | ||
| area: Machine Learning | ||
| type: feature | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.compute.aggregation.blockhash; | ||
|
|
||
| import org.apache.lucene.util.BytesRef; | ||
| import org.elasticsearch.common.bytes.BytesArray; | ||
| import org.elasticsearch.common.io.stream.BytesStreamOutput; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.unit.ByteSizeValue; | ||
| import org.elasticsearch.common.util.BigArrays; | ||
| import org.elasticsearch.common.util.BitArray; | ||
| import org.elasticsearch.compute.aggregation.AggregatorMode; | ||
| import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; | ||
| import org.elasticsearch.compute.data.Block; | ||
| import org.elasticsearch.compute.data.BlockFactory; | ||
| import org.elasticsearch.compute.data.BytesRefBlock; | ||
| import org.elasticsearch.compute.data.ElementType; | ||
| import org.elasticsearch.compute.data.IntBlock; | ||
| import org.elasticsearch.compute.data.IntVector; | ||
| import org.elasticsearch.compute.data.Page; | ||
| import org.elasticsearch.core.ReleasableIterator; | ||
| import org.elasticsearch.core.Releasables; | ||
| import org.elasticsearch.index.analysis.AnalysisRegistry; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * BlockHash implementation for {@code Categorize} grouping function as first | ||
| * grouping expression, followed by one or mode other grouping expressions. | ||
| */ | ||
| public class CategorizePackedValuesBlockHash extends BlockHash { | ||
|
|
||
| private final List<GroupSpec> specs; | ||
| private final AggregatorMode aggregatorMode; | ||
| private final CategorizeBlockHash categorizeBlockHash; | ||
| private final PackedValuesBlockHash packedValuesBlockHash; | ||
|
|
||
| CategorizePackedValuesBlockHash( | ||
| List<GroupSpec> specs, | ||
| BlockFactory blockFactory, | ||
| AggregatorMode aggregatorMode, | ||
| AnalysisRegistry analysisRegistry, | ||
| int emitBatchSize | ||
| ) { | ||
| super(blockFactory); | ||
| this.specs = specs; | ||
| this.aggregatorMode = aggregatorMode; | ||
|
|
||
| List<GroupSpec> delegateSpecs = new ArrayList<>(); | ||
| delegateSpecs.add(new GroupSpec(0, ElementType.INT)); | ||
jan-elastic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for (int i = 1; i < specs.size(); i++) { | ||
| delegateSpecs.add(new GroupSpec(i, specs.get(i).elementType())); | ||
| } | ||
|
|
||
| boolean success = false; | ||
| try { | ||
| categorizeBlockHash = new CategorizeBlockHash(blockFactory, specs.get(0).channel(), aggregatorMode, analysisRegistry); | ||
| packedValuesBlockHash = new PackedValuesBlockHash(delegateSpecs, blockFactory, emitBatchSize); | ||
| success = true; | ||
| } finally { | ||
| if (success == false) { | ||
| close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void add(Page page, GroupingAggregatorFunction.AddInput addInput) { | ||
| try (IntBlock categories = getCategories(page)) { | ||
| Block[] blocks = new Block[specs.size()]; | ||
jan-elastic marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| blocks[0] = categories; | ||
| for (int i = 1; i < specs.size(); i++) { | ||
| blocks[i] = page.getBlock(specs.get(i).channel()); | ||
| } | ||
| packedValuesBlockHash.add(new Page(blocks), addInput); | ||
| } | ||
| } | ||
|
|
||
| private IntBlock getCategories(Page page) { | ||
| if (aggregatorMode.isInputPartial() == false) { | ||
| return categorizeBlockHash.addInitial(page); | ||
| } else { | ||
| BytesRefBlock stateBlock = page.getBlock(0); | ||
| BytesRef stateBytes = stateBlock.getBytesRef(0, new BytesRef()); | ||
|
|
||
| try (StreamInput in = new BytesArray(stateBytes).streamInput()) { | ||
| BytesRef categorizerState = in.readBytesRef(); | ||
| Map<Integer, Integer> idMap = categorizeBlockHash.readIntermediate(categorizerState); | ||
| int[] oldIds = in.readIntArray(); | ||
| try (IntBlock.Builder newIds = blockFactory.newIntBlockBuilder(page.getPositionCount())) { | ||
| for (int oldId : oldIds) { | ||
| newIds.appendInt(idMap.get(oldId)); | ||
| } | ||
| return newIds.build(); | ||
jan-elastic marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Block[] getKeys() { | ||
| Block[] keys = packedValuesBlockHash.getKeys(); | ||
| if (aggregatorMode.isOutputPartial() == false) { | ||
| try ( | ||
| BytesRefBlock regexes = (BytesRefBlock) categorizeBlockHash.getKeys()[0]; | ||
| BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(keys[0].getPositionCount()) | ||
| ) { | ||
| IntVector idsVector = (IntVector) keys[0].asVector(); | ||
| int idsOffset = categorizeBlockHash.seenNull() ? 0 : -1; | ||
| BytesRef scratch = new BytesRef(); | ||
| for (int i = 0; i < idsVector.getPositionCount(); i++) { | ||
| int id = idsVector.getInt(i); | ||
| if (id == 0) { | ||
| builder.appendNull(); | ||
| } else { | ||
| builder.appendBytesRef(regexes.getBytesRef(id + idsOffset, scratch)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for now: We're repeating, potentially, a lot of bytesref values here. I wonder if there is or it would make sense to have a BytesRefBlock that instead of all the bytesrefs, stores every value just once, and then a reference per position: -> @nik9000 Something to consider for later? Maybe it's too specific for this. And anyway, the next EVAL or whatever will duplicate the value again.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds like a nice thing to have, but definitely out of scope for this PR. However, the next If you have: then an efficient without ever duplicating.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For that SUBSTRING to not duplicate, we would need to add that "hashtable" strategy in the BytesRefBlockBuilder. It looks goo (?), but I wonder if using that by default could perform negatively in some scenarios. Something to try eventually probably
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like worth trying in the future. Are you making a note (issue) of this, so that the idea doesn't get lost?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure! I'll comment it with Nik, just in case it was considered and discarded already, and then I'll document it in an issue somewhere
jan-elastic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| keys[0].close(); | ||
| keys[0] = builder.build(); | ||
| } | ||
| } else { | ||
| BytesRef state; | ||
| try (BytesStreamOutput out = new BytesStreamOutput()) { | ||
jan-elastic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| out.writeBytesRef(categorizeBlockHash.serializeCategorizer()); | ||
| IntVector idsVector = (IntVector) keys[0].asVector(); | ||
| int[] idsArray = new int[idsVector.getPositionCount()]; | ||
jan-elastic marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (int i = 0; i < idsVector.getPositionCount(); i++) { | ||
| idsArray[i] = idsVector.getInt(i); | ||
| } | ||
| out.writeIntArray(idsArray); | ||
jan-elastic marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| state = out.bytes().toBytesRef(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| keys[0].close(); | ||
| keys[0] = blockFactory.newConstantBytesRefBlockWith(state, keys[0].getPositionCount()); | ||
| } | ||
| return keys; | ||
| } | ||
|
|
||
| @Override | ||
| public IntVector nonEmpty() { | ||
| return packedValuesBlockHash.nonEmpty(); | ||
| } | ||
|
|
||
| @Override | ||
| public BitArray seenGroupIds(BigArrays bigArrays) { | ||
| return packedValuesBlockHash.seenGroupIds(bigArrays); | ||
| } | ||
|
|
||
| @Override | ||
| public final ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| Releasables.close(categorizeBlockHash, packedValuesBlockHash); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.