Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -997,13 +997,54 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
// because that Mapping only works for RexNode, but we need both AggCall and RexNode list.
Pair<List<RexNode>, List<AggCall>> reResolved =
resolveAttributesForAggregation(groupExprList, aggExprList, context);

List<String> names = getGroupKeyNamesAfterAggregation(reResolved.getLeft());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rename the var names to make it more meaningful

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

context.relBuilder.aggregate(
context.relBuilder.groupKey(reResolved.getLeft()), reResolved.getRight());

// During aggregation, Calcite projects both input dependencies and output group-by fields.
// When names conflict, Calcite adds numeric suffixes (e.g., "value0").
// Apply explicit renaming to restore the intended aliases.
if (names.size() == reResolved.getLeft().size()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the names.size not equals to reResolved.getLeft().size()? seems the condition is always true

Copy link
Copy Markdown
Collaborator Author

@yuancu yuancu Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lengths do not equal when a group key is not aliased -- under which circumstance extractAliasLiteral will return empty:

private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
  if (node == null) {
    return Optional.empty();
  } else if (node.getKind() == AS) {
    return Optional.of((RexLiteral) ((RexCall) node).getOperands().get(1));
  } else {
    return Optional.empty();
  }

Although it seems that all group keys are aliased in practice, this defense check was to prevent unintended future changes to avoid in-correspondent renaming. Should I remove it?

// Defense check: if any group-by key is not aliased, do not rename
context.relBuilder.rename(names);
}
return Pair.of(reResolved.getLeft(), reResolved.getRight());
}

/**
* Imitates {@code Registrar.registerExpression} of {@link RelBuilder} to derive the output order
* of group-by keys after aggregation.
*
* <p>The projected input reference comes first, while any other computed expression follows.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Registrar.registerExpression, seems the other computed expression won't promise following the original order if there is expression duplication.

But since our PPL only allow span expr in our group by and it cannot be combined with other span expr. This logic may be right and I cannot find any bad case so far.

Copy link
Copy Markdown
Collaborator Author

@yuancu yuancu Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a bad case: stats count() by value, value, @timestamp. I'll fix it.

Update: Fixed by checking duplication

*/
private List<String> getGroupKeyNamesAfterAggregation(List<RexNode> nodes) {
List<RexNode> reordered = new ArrayList<>();
List<RexNode> left = new ArrayList<>();
for (RexNode n : nodes) {
if (isInputRef(n)) {
reordered.add(n);
} else {
left.add(n);
}
}
reordered.addAll(left);
return reordered.stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(RexLiteral::stringValue)
.toList();
}

/** Whether a rex node is an aliased input reference */
private boolean isInputRef(RexNode node) {
return switch (node.getKind()) {
case AS, DESCENDING, NULLS_FIRST, NULLS_LAST -> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case that we have DESCENDING, NULLS_FIRST, NULLS_LAST in our stats .. by ... command

Copy link
Copy Markdown
Collaborator Author

@yuancu yuancu Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I didn't manage to create any. It seems there is always a projection after sorting and before aggregation.

E.g.

LogicalAggregate(group=[{0}], count()=[COUNT()])
  LogicalProject(value=[$2])
    LogicalSort(sort0=[$2], dir0=[DESC-nulls-last])

final List<RexNode> operands = ((RexCall) node).operands;
yield isInputRef(operands.getFirst());
}
default -> node instanceof RexInputRef;
};
}
Comment on lines +1043 to +1051
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the PlanUtil.getInputRefs be used to replace this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they serve different purposes. PlanUtil.getInputRefs returns all referred input refs. Besides, if a node refers multiple inputs, it will return all of them. Yet here I just want to check whether a node is an input ref (optionally aliased), keeping the node as is.


/**
* Resolve attributes for aggregation.
*
Expand Down Expand Up @@ -1102,7 +1143,7 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
aggregationAttributes.getLeft().stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.map(ref -> ref.getValueAs(String.class))
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
setup:
- do:
indices.create:
index: time_test
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "time_test"}}'
- '{"category":"A","value":1000,"@timestamp":"2024-01-01T00:00:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"B","value":2000,"@timestamp":"2024-01-01T00:05:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"A","value":1500,"@timestamp":"2024-01-01T00:10:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"C","value":3000,"@timestamp":"2024-01-01T00:20:00Z"}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"Test span aggregation with field name collision - basic case":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by span(value, 1000) as value

- match: { total: 3 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[2, 1000], [1, 2000], [1, 3000]] }

---
"Test span aggregation with field name collision - multiple aggregations":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count(), avg(value) by span(value, 1000) as value

- match: { total: 3 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "avg(value)", "type": "double"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[2, 1250.0, 1000], [1, 2000.0, 2000], [1, 3000.0, 3000]] }

---
"Test span aggregation without name collision - multiple group-by":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by span(@timestamp, 10min) as @timestamp, category, value

- match: { total: 4 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "@timestamp", "type": "timestamp"}, {"name": "category", "type": "string"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[1, "2024-01-01 00:00:00", "A", 1000], [1, "2024-01-01 00:10:00", "A", 1500], [1, "2024-01-01 00:00:00", "B", 2000], [1, "2024-01-01 00:20:00", "C", 3000]] }
Loading