Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1000,12 +1000,56 @@ private Pair<List<RexNode>, List<AggCall>> aggregateWithTrimming(
Pair<List<RexNode>, List<AggCall>> reResolved =
resolveAttributesForAggregation(groupExprList, aggExprList, context);

List<String> intendedGroupKeyAliases = getGroupKeyNamesAfterAggregation(reResolved.getLeft());
context.relBuilder.aggregate(
context.relBuilder.groupKey(reResolved.getLeft()), reResolved.getRight());
// During aggregation, Calcite projects both input dependencies and output group-by fields.
// When names conflict, Calcite adds numeric suffixes (e.g., "value0").
// Apply explicit renaming to restore the intended aliases.
context.relBuilder.rename(intendedGroupKeyAliases);

return Pair.of(reResolved.getLeft(), reResolved.getRight());
}

/**
* Imitates {@code Registrar.registerExpression} of {@link RelBuilder} to derive the output order
* of group-by keys after aggregation.
*
* <p>The projected input reference comes first, while any other computed expression follows.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Registrar.registerExpression, seems the other computed expression won't promise following the original order if there is expression duplication.

But since our PPL only allow span expr in our group by and it cannot be combined with other span expr. This logic may be right and I cannot find any bad case so far.

Copy link
Copy Markdown
Collaborator Author

@yuancu yuancu Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a bad case: stats count() by value, value, @timestamp. I'll fix it.

Update: Fixed by checking duplication

*/
private List<String> getGroupKeyNamesAfterAggregation(List<RexNode> nodes) {
List<RexNode> reordered = new ArrayList<>();
List<RexNode> left = new ArrayList<>();
for (RexNode n : nodes) {
// The same group-key won't be added twice
if (reordered.contains(n) || left.contains(n)) {
continue;
}
if (isInputRef(n)) {
reordered.add(n);
} else {
left.add(n);
}
}
reordered.addAll(left);
return reordered.stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(RexLiteral::stringValue)
.toList();
}

/** Whether a rex node is an aliased input reference */
private boolean isInputRef(RexNode node) {
return switch (node.getKind()) {
case AS, DESCENDING, NULLS_FIRST, NULLS_LAST -> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case that we have DESCENDING, NULLS_FIRST, NULLS_LAST in our stats .. by ... command

Copy link
Copy Markdown
Collaborator Author

@yuancu yuancu Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I didn't manage to create any. It seems there is always a projection after sorting and before aggregation.

E.g.

LogicalAggregate(group=[{0}], count()=[COUNT()])
  LogicalProject(value=[$2])
    LogicalSort(sort0=[$2], dir0=[DESC-nulls-last])

final List<RexNode> operands = ((RexCall) node).operands;
yield isInputRef(operands.getFirst());
}
default -> node instanceof RexInputRef;
};
}
Comment on lines +1043 to +1051
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the PlanUtil.getInputRefs be used to replace this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they serve different purposes. PlanUtil.getInputRefs returns all referred input refs. Besides, if a node refers multiple inputs, it will return all of them. Yet here I just want to check whether a node is an input ref (optionally aliased), keeping the node as is.


/**
* Resolve attributes for aggregation.
*
Expand Down Expand Up @@ -1104,7 +1148,7 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
aggregationAttributes.getLeft().stream()
.map(this::extractAliasLiteral)
.flatMap(Optional::stream)
.map(ref -> ((RexLiteral) ref).getValueAs(String.class))
.map(ref -> ref.getValueAs(String.class))
.map(context.relBuilder::field)
.map(f -> (RexNode) f)
.toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
setup:
- do:
indices.create:
index: time_test
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "time_test"}}'
- '{"category":"A","value":1000,"@timestamp":"2024-01-01T00:00:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"B","value":2000,"@timestamp":"2024-01-01T00:05:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"A","value":1500,"@timestamp":"2024-01-01T00:10:00Z"}'
- '{"index": {"_index": "time_test"}}'
- '{"category":"C","value":3000,"@timestamp":"2024-01-01T00:20:00Z"}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"Test span aggregation with field name collision - basic case":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by span(value, 1000) as value

- match: { total: 3 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[2, 1000], [1, 2000], [1, 3000]] }

---
"Test span aggregation with field name collision - multiple aggregations":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count(), avg(value) by span(value, 1000) as value

- match: { total: 3 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "avg(value)", "type": "double"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[2, 1250.0, 1000], [1, 2000.0, 2000], [1, 3000.0, 3000]] }

---
"Test span aggregation without name collision - multiple group-by":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by span(@timestamp, 10min) as @timestamp, category, value

- match: { total: 4 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "@timestamp", "type": "timestamp"}, {"name": "category", "type": "string"}, {"name": "value", "type": "bigint"}] }
- match: { datarows: [[1, "2024-01-01 00:00:00", "A", 1000], [1, "2024-01-01 00:10:00", "A", 1500], [1, "2024-01-01 00:00:00", "B", 2000], [1, "2024-01-01 00:20:00", "C", 3000]] }

---
"Test span aggregation with duplicated group keys":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=time_test | stats count() by value, value, span(@timestamp, 10min) as @timestamp

- match: { total: 4 }
- match: { schema: [{"name": "count()", "type": "bigint"}, {"name": "@timestamp", "type": "timestamp"}, {"name": "value", "type": "bigint"}, {"name": "value0", "type": "bigint"}] }
- match: { datarows: [[1, "2024-01-01 00:00:00", 1000, 1000], [1, "2024-01-01 00:10:00", 1500, 1500], [1, "2024-01-01 00:00:00", 2000, 2000], [1, "2024-01-01 00:20:00", 3000, 3000]] }
Loading