Skip to content

Conversation

@nik9000
Copy link
Member

@nik9000 nik9000 commented Sep 11, 2024

This adds support to the compute engine for "stateful grouping functions". Think of these like ExpressionEvaluators but they can:

  • Encode extra state to be passed to the coordinating node as part of the agg
  • Use that extra state to transform the group keys on the coordinating node
  • Apply a transformation using the output after the aggregation is complete
  • Use a different intermediate representation as final representation (think, "I group on an integer, but when finished I transform into a string")

import java.util.ArrayList;
import java.util.List;

public record GroupingKey(AggregatorMode mode, Thing thing) implements EvalOperator.ExpressionEvaluator {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've somewhat mirrored the way we do grouping aggs with this and it seems to have worked out fairly well. It's not perfect, but it's a lot less confusing than I thought it would be.

}

blockHash.add(wrapPage(page), add);
blockHash.add(new Page(keys), add);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to modify BlockHash to take a Block[] with the blocks in the right position. But that seems like something for another time.


blockHash.add(wrapPage(page), add);
blockHash.add(new Page(keys), add);
hashNanos += System.nanoTime() - add.hashStart;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth timing the evaluation here.

int[] aggBlockCounts = new int[aggregators.size()];
for (int a = 0; a < aggregators.size(); a++) {
aggBlockCounts[a] = aggregators.get(a).evaluateBlockCount();
blockCount += aggBlockCounts[a];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it a lot easier to read if I encoded the resultOffsets into the GroupKeys. It'd be even easier to read if the offsets were encoded into the aggregators too. Or if we returns Block[].

@nik9000
Copy link
Member Author

nik9000 commented Sep 17, 2024

@jan-elastic have a look at this one. It's closer, I think.

Once we can figure out how this is supposed to work in the unit test I think we can iterate some more on the language side to figure out how to make it build that.


@Override
public void replaceIntermediateKeys(BlockFactory blockFactory, Block[] blocks) {
// NOCOMMIT this offset can't be the same in the result array and intermediate array
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bumped into this a few days ago and I think I need to dig some more - in this brave new world there's two "shapes" of data coming out of these grouping functions - the intermediate shape and the final shape. This is pretty similar to how aggs work - which is something I never fully understood to be honest. Anyway, I'm using resultOffset here - but that's the result offset of the intermediate data. not the final offset. So it can't be right.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nik9000. I might be missing some context, but it seems we're trying to include SerializableTokenListCategory alongside the aggregated results for each driver. Could we resolve this by adding infrastructure to support SerializableTokenListCategory (or a variant) as the new block hash key?

CategorizationPartOfSpeechDictionary.getInstance(),
0.70f
);
evaluator = new CategorizeEvaluator(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the CategorizeEvaluator will be executed twice: here and in toEvaluator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same.

However, when running the CategorizeOperatorTests it seems that Categorize::toEvaluator is never executed.

@ivancea ivancea self-requested a review September 30, 2024 15:27
@alex-spies alex-spies self-requested a review September 30, 2024 15:28
@costin costin self-requested a review September 30, 2024 15:28
@iverase iverase self-requested a review September 30, 2024 15:29
@craigtaverner craigtaverner self-requested a review September 30, 2024 15:52
Copy link
Contributor

@ivancea ivancea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review with some questions

int offset = 0;
for (int g = 0; g < groups.size(); g++) {
blocks[offset] = keys[g];
groups.get(g).finish(blocks, selected, driverContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No offset passed to the finish() here? How does it know where to place the blocks?

return mode.isInputPartial() ? thing.evalIntermediateInput(blockFactory, page) : thing.evalRawInput(page);
}

public int finishBlockCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We're calling this "finish", while in the aggregator it's "evaluate". Some reason to keep those names separated? From what I understand, the operation is nearly the same (?)

maxPageSize,
false
List.of(
// NOCOMMIT double check the mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting just in case this was forgotten

import java.util.List;

public record GroupingKey(AggregatorMode mode, Thing thing, BlockFactory blockFactory) implements EvalOperator.ExpressionEvaluator {
public interface Thing extends Releasable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "Thing" the final name here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A smidgen of javadoc would also greatly help understand what the "Thing" is supposed to do.

@nik9000
Copy link
Member Author

nik9000 commented Oct 4, 2024

Could we resolve this by adding infrastructure to support SerializableTokenListCategory (or a variant) as the new block hash key?

I missed this comment two weeks ago and it's pretty important.

So I didn't make it a special BlockHash implementation because I thought "we need this to be available at any positions and to be able to combine with other functions". But that's not actually true. We're fine, at least right now, with not doing that. With only supporting GROUP BY CATEGORIZE(message). No other groupings.

In fact! When we got to support stuff like GROUP BY DATE_TRUNC(timestamp), CATEGORIZE(message), we want a very specific behavior for CATEGORIZE.

I suppose my objection is "this is a function in my mind". But I think your right. It doesn't have to be.

I'm going to see what it'd look like to build this as a specialize BlockHash implementation.

@nik9000
Copy link
Member Author

nik9000 commented Oct 4, 2024

Thanks @iverase for pointing out that comment. I'd like to blame missing it on being sick. But I dunno. It's really important!

@nik9000
Copy link
Member Author

nik9000 commented Oct 4, 2024

OK! I tried it and I think it will work but my head's spinning a bit so I'm going to work on something easier for a little while and get back to it.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a first look from the query planning/optimization perspective and mostly left a bunch of questions :)

}

public GroupingKey.Supplier groupingKey(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
return mode -> new GroupingKeyFactory(source(), toEvaluator.apply(field), mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is it correct that we plan to have expressions like STATS ... BY CATEGORIZE(SUBSTRING(field, 10)) be fully evaluated as part of running the aggregation? I'm asking because we're passing an expression evaluator here, which makes this feel close to a normal function that could be run in an EVAL (except that it needs another counterpart on the coordinator node).

Currently, we move evaluations out of the agg into a separate EVAL step before, so that STATS ... BY 2*field becomes EVAL $$temp = 2*field | STATS ... BY 2*field and I wonder how this will work with CATEGORIZE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say ideally that works. Even though the usecase is pretty narrow, I think it's a bit ugly if it doesn't work.

What problem occurs with categorize? I think the following queries are equivalent:

... | STATS ... BY CATEGORIZE(SUBSTRING(field, 10))
... | EVAL s = SUBSTRING(field, 10) | STATS ... BY CATEGORIZE(s)

More complicated is the following:

... | STATS ... BY SOME_FUNCTION(CATEGORIZE(field))

Currently there is no function that works on a category. However, if we let CATEGORIZE output the category regexes (as a string), then you could do:

... | STATS ... BY SUBSTRING(CATEGORIZE(field), 10)

which I guess is a pain to evaluate.

It's somewhat equivalent to

... | STATS ... BY c = CATEGORIZE(field) | EVAL s = SUBSTRING(c, 10) | STATS BY ... c

But first you need to finalize the CATEGORIZE on the coordinating node, next do the SUBSTRING, and finally aggregate the first STATS one more time. Unless I'm overlooking something...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... | STATS ... BY SOME_FUNCTION(CATEGORIZE(field))

is very interesting. For now, I think we should just consider this invalid, but there's options to make this work:

  1. CATEGORIZE actually is a pipeline breaker, i.e. it needs to be finalized on the coordinator. We could declare it its own, standalone physical execution plan node, and the execution plans could be something like categorize (initial) -> exchange (send to coord.) -> categorize (final) -> eval (some_function) -> stats (on the coordinator). In cases without a function on top of the categorize, the optimizer could fuse the stats and the categorize into one combined node.
  2. We could use a 2-phase approach, where we determine the full categorizer in the first phase, so that categorize can be executed like a regular function in the second phase.
... | STATS ... BY CATEGORIZE(SUBSTRING(field, 10))
... | EVAL s = SUBSTRING(field, 10) | STATS ... BY CATEGORIZE(s)

For this equivalence, our optimizer already has a mechanism to turn the former into the latter; we just need to teach it about CATEGORIZE.

Comment on lines +192 to +194
new CategorizationBytesRefHash(new BytesRefHash(2048, context.bigArrays())),
CategorizationPartOfSpeechDictionary.getInstance(),
0.70f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's extract these constants into final statics with good names, or let's have a helper method for creating the token list categorizer since there's some repetition in this file.

}
}

private static class GroupingKeyThing implements GroupingKey.Thing {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's give this a better name + javadoc


@Override
public void fetchIntermediateState(BlockFactory blockFactory, Block[] blocks, int positionCount) {
blocks[resultOffset + 1] = buildIntermediateBlock(blockFactory, positionCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What meaning does resultOffset + 1 have vs. resultOffset? Is this the offset of the block holding the final category?

Can we abstract this into a private member variable, so this becomes clearer, like blocks[finalCategoryBlockOffset] or so?

}

private static class GroupingKeyThing implements GroupingKey.Thing {
private final int resultOffset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be more meaningful?

Suggested change
private final int resultOffset;
private final int initialCategoryBlockOffset;

Provided my suggested name is even correct :D


@Override
public void close() {
evaluator.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we also have to close the categorizer?

for (int oldCategoryId = 0; oldCategoryId < count; oldCategoryId++) {
SerializableTokenListCategory category = new SerializableTokenListCategory(in);
int newCategoryId = categorizer.mergeWireCategory(category).getId();
System.err.println("category id map: " + oldCategoryId + " -> " + newCategoryId + " (" + category.getRegex() + ")");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover

record GroupingKeyFactory(Source source, ExpressionEvaluator.Factory field, AggregatorMode mode) implements GroupingKey.Factory {
@Override
public GroupingKey apply(DriverContext context, int resultOffset) {
ExpressionEvaluator field = this.field.get(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need an expression evaluator here? So we can compute expressions like STATS ... BY CATEGORIZE(SUBSTRING(field, 10))?

If CATEGORIZE is only run as part of a STATS, this may not be the best way to generalize it to nested expressions; for aggs, our current way is to extract nested expressions into an EVAL - like EVAL $$temp = SUBSTRING(field, 10) | STATS ... BY CATEGORIZE($$temp).

Related and important: Currently our optimizer expects the BY ... groups to contain only field and reference attributes after the initial substitution step; I understand that CATEGORIZE breaks this assumption, so we'll have to go and update some optimizer rules to expect references, fields and CATEGORIZE in the groupings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bleh, never mind. I realized we require running the initial categorization as an Eval and the second one as part of the final Aggregate, like so

Coordinator:
AggregateExec[[categorize(x){r}#57],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#57],FINAL,[categorize(x){r}#57, count{r
}#62, seen{r}#63],58]
\_ExchangeSourceExec[[categorize(x){r}#57, count{r}#62, seen{r}#63],true]

Local:
ExchangeSinkExec[[categorize(x){r}#72, count{r}#77, seen{r}#78],true]
\_AggregateExec[[categorize(x){r}#72],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#72],INITIAL,[categorize(x){r}#72, count
{r}#81, seen{r}#82],58]
  \_EvalExec[[CATEGORIZE(x{f}#75) AS categorize(x)]]
    \_FieldExtractExec[x{f}#75]<[]>
      \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#83], limit[], sort[] estimatedRowSize[104]

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I think I'm starting to wrap my head around this.

I have a couple concerns. The biggest one is that this looks like we're changing how aggs work quite a lot in a way that seems very specific to CATEGORIZE; I wonder if this is needed.

More specifically, I can see that the physical plans for a query like FROM test | STATS count(*) BY categorize(x) look like:

Coordinator:
AggregateExec[[categorize(x){r}#57],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#57],FINAL,[categorize(x){r}#57, count{r
}#62, seen{r}#63],58] // final categorization, aggregation, and replacing category ID by category STRING
\_ExchangeSourceExec[[categorize(x){r}#57, count{r}#62, seen{r}#63],true]

Local:
ExchangeSinkExec[[categorize(x){r}#72, count{r}#77, seen{r}#78],true]
\_AggregateExec[[categorize(x){r}#72],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#72],INITIAL,[categorize(x){r}#72, count
{r}#81, seen{r}#82],58]
  \_EvalExec[[CATEGORIZE(x{f}#75) AS categorize(x)]] // initial categorization
...

So we

  • first run an EVAL to get the initial categorization, then
  • we perform the initial aggregation based on the computed, local category id, and finally
  • on the coordinator, we compute the global category id, do the final aggregation, and transform the output, all in the final AggregateExec node.

I'm afraid that may lead to a lot of special casing, and it prevents us from easily computing stuff like STATS ... BY SUBSTRING(CATEGORIZE(field), 10), because that requires to compute the final category id before performing the aggregation.

So, my main point is: Do the aggregation and the categorization aggregation need to be so intimately intertwined? Couldn't we make this work with fewer changes by pulling this apart into an execution plan like

EvalExec[[transform_categorize(x)]] // replace by category ID by category STRING
\_AggregateExec[[categorize(x){r}#57],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#57],FINAL,[categorize(x){r}#57, count{r
}#62, seen{r}#63],58] // ONLY aggregate based on category ID
  \_EvalExec[[final_categorize(x)]] // compute final category ids
    \_ExchangeSourceExec[[categorize(x){r}#57, count{r}#62, seen{r}#63],true]

Local:
ExchangeSinkExec[[categorize(x){r}#72, count{r}#77, seen{r}#78],true]
\_AggregateExec[[categorize(x){r}#72],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#72],INITIAL,[categorize(x){r}#72, count
{r}#81, seen{r}#82],58]
  \_EvalExec[[CATEGORIZE(x{f}#75) AS categorize(x)]] // initial categorization
...

Of course, this requires that the aggregation passes through the mapping from category id to category string rather than consuming both immediately.


Block evalIntermediateInput(BlockFactory blockFactory, Page page);

void fetchIntermediateState(BlockFactory blockFactory, Block[] blocks, int positionCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: I've seen in the implementation that this actually builds the intermediate state, rather than just fetching it.

GroupingAggregator.Factory valuesAggregatorForGroupingsInTimeSeries(int timeBucketChannel);
}

public static GroupingKey.Supplier forStatelessGrouping(int channel, ElementType elementType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a short javadoc would help understand what this does and what "stateless" means in this context.


ElementType intermediateElementType();

GroupingAggregator.Factory valuesAggregatorForGroupingsInTimeSeries(int timeBucketChannel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love some javadoc to understand what this does.

import java.util.List;

public record GroupingKey(AggregatorMode mode, Thing thing, BlockFactory blockFactory) implements EvalOperator.ExpressionEvaluator {
public interface Thing extends Releasable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A smidgen of javadoc would also greatly help understand what the "Thing" is supposed to do.

record GroupingKeyFactory(Source source, ExpressionEvaluator.Factory field, AggregatorMode mode) implements GroupingKey.Factory {
@Override
public GroupingKey apply(DriverContext context, int resultOffset) {
ExpressionEvaluator field = this.field.get(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bleh, never mind. I realized we require running the initial categorization as an Eval and the second one as part of the final Aggregate, like so

Coordinator:
AggregateExec[[categorize(x){r}#57],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#57],FINAL,[categorize(x){r}#57, count{r
}#62, seen{r}#63],58]
\_ExchangeSourceExec[[categorize(x){r}#57, count{r}#62, seen{r}#63],true]

Local:
ExchangeSinkExec[[categorize(x){r}#72, count{r}#77, seen{r}#78],true]
\_AggregateExec[[categorize(x){r}#72],[COUNT([2a][KEYWORD]) AS count(*), categorize(x){r}#72],INITIAL,[categorize(x){r}#72, count
{r}#81, seen{r}#82],58]
  \_EvalExec[[CATEGORIZE(x{f}#75) AS categorize(x)]]
    \_FieldExtractExec[x{f}#75]<[]>
      \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#83], limit[], sort[] estimatedRowSize[104]

Comment on lines +162 to +164
List<GroupingKey.Factory> groupings = new ArrayList<>(groupSpecs.size());
for (GroupSpec group : groupSpecs) {
groupings.add(group.toGroupingKey().get(aggregatorMode));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't do anything yet, does it? Currently, the groups contain only references to a CATEGORIZE expressions, but the reference attributes have no knowledge of what to do in the FINAL aggregator mode, no?

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few other things I noticed:

  • In follow ups, we'll need to test lots of edge cases with different queries to ensure interaction with other commands and with optimization rules is correct; as always with new language features :)
  • Currently, the implementation seems incomplete, right? At least queries like FROM test | STATS count(*) by categorize(x) and ROW x = \"asdf fdsa\" | STATS count(*) by categorize(x) result in a NPE.
  • In terms of the logical and physical plans, I think we need to model CATEGORIZE's execution a bit cleaner:
    • The intermediate attributes in AggregateExec don't mention that CATEGORIZE outputs 2 columns, and that the final AggregateExec consumes both of those columns.
    • The EVAL that does the initial categorization doesn't say that it outputs 2 columns.
    • The physical plan on the coordinator node doesn't seem to actually represent that it does something special with the columns from CATEGORIZE, it looks like a regular aggregation.

@nik9000
Copy link
Member Author

nik9000 commented Jan 6, 2025

This has been replaced by #114317. CATEGORIZE is mostly done now. This was a lovely prototype to learn more. but Nhat pointed us to a better way and we abandoned this code. Much nicer.

@nik9000 nik9000 closed this Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants