Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@
* <p>
* The generation code also looks for the optional methods {@code combineIntermediate}
* and {@code evaluateFinal} which are used to combine intermediate states and
* produce the final output. If the first is missing then the generated code will
* call the {@code combine} method to combine intermediate states. If the second
* is missing the generated code will make a block containing the primitive from
* the state. If either of those don't have sensible interpretations then the code
* generation code will throw an error, aborting the compilation.
* produce the final output. Please note, those are auto-generated when aggregating
* primitive types such as boolean, int, long, float, double.
* </p>
*/
@Target(ElementType.TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@
* To introduce your aggregation to the engine:
* <ul>
* <li>
* Add it to {@code org.elasticsearch.xpack.esql.planner.AggregateMapper}.
* Check all usages of other aggregations there, and replicate the logic.
* </li>
* <li>
* Implement serialization for your aggregation by implementing
* {@link org.elasticsearch.common.io.stream.NamedWriteable#getWriteableName},
* {@link org.elasticsearch.common.io.stream.NamedWriteable#writeTo},
Expand All @@ -97,16 +93,92 @@
* {@link org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateWritables#getNamedWriteables}.
* </li>
* <li>
* Do the same with {@link org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry}.
* Add it to {@link org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry}.
* </li>
* </ul>
* </li>
* </ol>
*
* <h3>Creating aggregators for your function</h3>
* <p>
* Aggregators contain the core logic of your aggregation. That is, how to combine values, what to store, how to process data, etc.
* Aggregators contain the core logic of how to combine values, what to store, how to process data, etc.
* Currently, we rely on code generation (per aggregation per type) in order to implement such functionality.
* This approach was picked for performance reasons (namely to avoid virtual method calls and boxing types).
* As a result we could not rely on interfaces implementation and generics.
* </p>
* <p>
* In order to implement aggregation logic create your class (typically named "${FunctionName}${Type}Aggregator").
* It must be placed in `org.elasticsearch.compute.aggregation` in order to be picked up by code generation.
* Annotate it with {@link org.elasticsearch.compute.ann.Aggregator} and {@link org.elasticsearch.compute.ann.GroupingAggregator}
* The first one is responsible for an entire data set aggregation, while the second one is responsible for grouping within buckets.
* </p>
* <h4>Before you start implementing it, please note that:</h4>
* <ul>
* <li>All methods must be public static</li>
* <li>
* {@code init/initSingle/initGrouping} could have optional {@link org.elasticsearch.common.util.BigArrays} or
* {@link org.elasticsearch.compute.operator.DriverContext} arguments that are going to be injected automatically.
* It is also possible to declare any number of arbitrary arguments that must be provided via generated Supplier.
* </li>
* <li>
* {@code combine, combineStates, combineIntermediate, evaluateFinal} methods (see below) could be generated automatically
* when both input type I and mutable accumulator state AggregatorState and GroupingAggregatorState are primitive (DOUBLE, INT).
* </li>
* <li>
* Code generation expects at least one IntermediateState field that is going to be used to keep
* the serialized state of the aggregation (eg AggregatorState and GroupingAggregatorState).
* It must be defined even if you rely on autogenerated implementation for the primitive types.
* </li>
* </ul>
* <h4>Aggregation expects:</h4>
* <ul>
* <li>
* type AggregatorState (a mutable state used to accumulate result of the aggregation) to be public, not inner and implements
* {@link org.elasticsearch.compute.aggregation.AggregatorState}
* </li>
* <li>type I (input to your aggregation function), usually primitive types and {@link org.apache.lucene.util.BytesRef}</li>
* <li>{@code AggregatorState init()} or {@code AggregatorState initSingle()} returns empty initialized aggregation state</li>
* <li>
* {@code void combine(AggregatorState state, I input)} or {@code AggregatorState combine(AggregatorState state, I input)}
* adds input entry to the aggregation state
* </li>
* <li>
* {@code void combineIntermediate(AggregatorState state, intermediate states)} adds serialized aggregation state
* to the current aggregation state (used to combine results across different nodes)
* </li>
* <li>
* {@code Block evaluateFinal(AggregatorState state, DriverContext)} converts the inner state of the aggregation to the result
* column
* </li>
* </ul>
* <h4>Grouping aggregation expects:</h4>
* <ul>
* <li>
* type GroupingAggregatorState (a mutable state used to accumulate result of the grouping aggregation) to be public,
* not inner and implements {@link org.elasticsearch.compute.aggregation.GroupingAggregatorState}
* </li>
* <li>type I (input to your aggregation function), usually primitive types and {@link org.apache.lucene.util.BytesRef}</li>
* <li>
* {@code GroupingAggregatorState init()} or {@code GroupingAggregatorState initGrouping()} returns empty initialized grouping
* aggregation state
* </li>
* <li>
* {@code void combine(GroupingAggregatorState state, int groupId, I input)} adds input entry to the corresponding group (bucket)
* of the grouping aggregation state
* </li>
* <li>
* {@code void combineStates(GroupingAggregatorState targetState, int targetGroupId, GS otherState, int otherGroupId)}
* merges other grouped aggregation state into the first one
* </li>
* <li>
* {@code void combineIntermediate(GroupingAggregatorState current, int groupId, intermediate states)} adds serialized
* aggregation state to the current grouped aggregation state (used to combine results across different nodes)
* </li>
* <li>
* {@code Block evaluateFinal(GroupingAggregatorState state, IntVectorSelected, DriverContext)} converts the inner state
* of the grouping aggregation to the result column
* </li>
* </ul>
* <ol>
* <li>
* Copy an existing aggregator to use as a base. You'll usually make one per type. Check other classes to see the naming pattern.
Expand All @@ -117,31 +189,8 @@
* </p>
* </li>
* <li>
* The methods in the aggregator will define how it will work:
* <ul>
* <li>
* Adding the `type init()` method will autogenerate the code to manage the state, using your returned value
* as the initial value for each group.
* </li>
* <li>
* Adding the `type initSingle()` or `type initGrouping()` methods will use the state object you return there instead.
* <p>
* You will also have to provide `evaluateIntermediate()` and `evaluateFinal()` methods this way.
* </p>
* </li>
* </ul>
* Depending on the way you use, adapt your `combine*()` methods to receive one or other type as their first parameters.
* </li>
* <li>
* If it's also a {@link org.elasticsearch.compute.ann.GroupingAggregator}, you should provide the same methods as commented before:
* <ul>
* <li>
* Add an `initGrouping()`, unless you're using the `init()` method
* </li>
* <li>
* Add all the other methods, with the state parameter of the type of your `initGrouping()`.
* </li>
* </ul>
* Implement (or create an empty) methods according to the above list.
* Also check {@link org.elasticsearch.compute.ann.Aggregator} JavaDoc as it contains generated method usage.
* </li>
* <li>
* Make a test for your aggregator.
Expand All @@ -152,16 +201,8 @@
* </p>
* </li>
* <li>
* Check the Javadoc of the {@link org.elasticsearch.compute.ann.Aggregator}
* and {@link org.elasticsearch.compute.ann.GroupingAggregator} annotations.
* Add/Modify them on your aggregator.
* </li>
* <li>
* The {@link org.elasticsearch.compute.ann.Aggregator} JavaDoc explains the static methods you should add.
* </li>
* <li>
* After implementing the required methods (Even if they have a dummy implementation),
* run the CsvTests to generate some extra required classes.
* Code generation is triggered when running the tests.
* Run the CsvTests to generate the code. Generated code should include:
* <p>
* One of them will be the {@code AggregatorFunctionSupplier} for your aggregator.
* Find it by its name ({@code <Aggregation-name><Type>AggregatorFunctionSupplier}),
Expand Down