55
66package org .opensearch .sql .ast .tree ;
77
8+ import static org .opensearch .sql .ast .dsl .AstDSL .aggregate ;
9+ import static org .opensearch .sql .ast .dsl .AstDSL .doubleLiteral ;
10+ import static org .opensearch .sql .ast .dsl .AstDSL .eval ;
11+ import static org .opensearch .sql .ast .dsl .AstDSL .function ;
12+ import static org .opensearch .sql .ast .dsl .AstDSL .stringLiteral ;
13+ import static org .opensearch .sql .ast .expression .IntervalUnit .MILLISECOND ;
14+ import static org .opensearch .sql .ast .tree .Chart .PerFunctionRateExprBuilder .timestampadd ;
15+ import static org .opensearch .sql .ast .tree .Chart .PerFunctionRateExprBuilder .timestampdiff ;
16+ import static org .opensearch .sql .expression .function .BuiltinFunctionName .DIVIDE ;
17+ import static org .opensearch .sql .expression .function .BuiltinFunctionName .MULTIPLY ;
18+ import static org .opensearch .sql .expression .function .BuiltinFunctionName .SUM ;
19+ import static org .opensearch .sql .expression .function .BuiltinFunctionName .TIMESTAMPADD ;
20+ import static org .opensearch .sql .expression .function .BuiltinFunctionName .TIMESTAMPDIFF ;
21+
822import com .google .common .collect .ImmutableList ;
923import java .util .List ;
24+ import java .util .Locale ;
25+ import java .util .Map ;
26+ import java .util .Optional ;
1027import lombok .AllArgsConstructor ;
1128import lombok .EqualsAndHashCode ;
1229import lombok .Getter ;
30+ import lombok .RequiredArgsConstructor ;
1331import lombok .ToString ;
1432import org .opensearch .sql .ast .AbstractNodeVisitor ;
1533import org .opensearch .sql .ast .dsl .AstDSL ;
34+ import org .opensearch .sql .ast .expression .AggregateFunction ;
35+ import org .opensearch .sql .ast .expression .Alias ;
1636import org .opensearch .sql .ast .expression .Argument ;
37+ import org .opensearch .sql .ast .expression .Field ;
38+ import org .opensearch .sql .ast .expression .Function ;
39+ import org .opensearch .sql .ast .expression .IntervalUnit ;
40+ import org .opensearch .sql .ast .expression .Let ;
1741import org .opensearch .sql .ast .expression .Literal ;
42+ import org .opensearch .sql .ast .expression .Span ;
43+ import org .opensearch .sql .ast .expression .SpanUnit ;
1844import org .opensearch .sql .ast .expression .UnresolvedExpression ;
45+ import org .opensearch .sql .calcite .utils .PlanUtils ;
1946
2047/** AST node represent chart command. */
2148@ Getter
@@ -39,8 +66,8 @@ public class Chart extends UnresolvedPlan {
3966
4067 @ Override
4168 public UnresolvedPlan attach (UnresolvedPlan child ) {
42- this . child = child ;
43- return this ;
69+ // Transform after child attached to avoid unintentionally overriding it
70+ return toBuilder (). child ( child ). build (). transformPerFunction () ;
4471 }
4572
4673 @ Override
@@ -52,4 +79,131 @@ public List<UnresolvedPlan> getChild() {
5279 public <T , C > T accept (AbstractNodeVisitor <T , C > nodeVisitor , C context ) {
5380 return nodeVisitor .visitChart (this , context );
5481 }
82+
83+ /**
84+ * Transform per function to eval-based post-processing on sum result by chart. Specifically,
85+ * calculate how many seconds are in the time bucket based on the span option dynamically, then
86+ * divide the aggregated sum value by the number of seconds to get the per-second rate.
87+ *
88+ * <p>For example, with span=5m per_second(field): per second rate = sum(field) / 300 seconds
89+ *
90+ * @return eval+chart if per function present, or the original chart otherwise.
91+ */
92+ private UnresolvedPlan transformPerFunction () {
93+ Optional <PerFunction > perFuncOpt = PerFunction .from (aggregationFunction );
94+ if (perFuncOpt .isEmpty ()) {
95+ return this ;
96+ }
97+
98+ PerFunction perFunc = perFuncOpt .get ();
99+ // For chart, the rowSplit should contain the span information
100+ UnresolvedExpression spanExpr = rowSplit ;
101+ if (rowSplit instanceof Alias ) {
102+ spanExpr = ((Alias ) rowSplit ).getDelegated ();
103+ }
104+ if (!(spanExpr instanceof Span )) {
105+ return this ; // Cannot transform without span information
106+ }
107+
108+ Span span = (Span ) spanExpr ;
109+ Field spanStartTime = AstDSL .implicitTimestampField ();
110+ Function spanEndTime = timestampadd (span .getUnit (), span .getValue (), spanStartTime );
111+ Function spanMillis = timestampdiff (MILLISECOND , spanStartTime , spanEndTime );
112+ final int SECOND_IN_MILLISECOND = 1000 ;
113+ return eval (
114+ chart (AstDSL .alias (perFunc .aggName , PerFunctionRateExprBuilder .sum (perFunc .aggArg ))),
115+ let (perFunc .aggName )
116+ .multiply (perFunc .seconds * SECOND_IN_MILLISECOND )
117+ .dividedBy (spanMillis ));
118+ }
119+
120+ private Chart chart (UnresolvedExpression newAggregationFunction ) {
121+ return this .toBuilder ().aggregationFunction (newAggregationFunction ).build ();
122+ }
123+
124+ @ RequiredArgsConstructor
125+ static class PerFunction {
126+ private static final Map <String , Integer > UNIT_SECONDS =
127+ Map .of (
128+ "per_second" , 1 ,
129+ "per_minute" , 60 ,
130+ "per_hour" , 3600 ,
131+ "per_day" , 86400 );
132+ private final String aggName ;
133+ private final UnresolvedExpression aggArg ;
134+ private final int seconds ;
135+
136+ static Optional <PerFunction > from (UnresolvedExpression aggExpr ) {
137+ if (aggExpr instanceof Alias ) {
138+ return from (((Alias ) aggExpr ).getDelegated ());
139+ }
140+ ;
141+ if (!(aggExpr instanceof AggregateFunction )) {
142+ return Optional .empty ();
143+ }
144+
145+ AggregateFunction aggFunc = (AggregateFunction ) aggExpr ;
146+ String aggFuncName = aggFunc .getFuncName ().toLowerCase (Locale .ROOT );
147+ if (!UNIT_SECONDS .containsKey (aggFuncName )) {
148+ return Optional .empty ();
149+ }
150+
151+ String aggName = toAggName (aggFunc );
152+ return Optional .of (
153+ new PerFunction (aggName , aggFunc .getField (), UNIT_SECONDS .get (aggFuncName )));
154+ }
155+
156+ private static String toAggName (AggregateFunction aggFunc ) {
157+ String fieldName =
158+ (aggFunc .getField () instanceof Field )
159+ ? ((Field ) aggFunc .getField ()).getField ().toString ()
160+ : aggFunc .getField ().toString ();
161+ return String .format (Locale .ROOT , "%s(%s)" , aggFunc .getFuncName (), fieldName );
162+ }
163+ }
164+
165+ private PerFunctionRateExprBuilder let (String fieldName ) {
166+ return new PerFunctionRateExprBuilder (AstDSL .field (fieldName ));
167+ }
168+
169+ /** Fluent builder for creating Let expressions with mathematical operations. */
170+ static class PerFunctionRateExprBuilder {
171+ private final Field field ;
172+ private UnresolvedExpression expr ;
173+
174+ PerFunctionRateExprBuilder (Field field ) {
175+ this .field = field ;
176+ this .expr = field ;
177+ }
178+
179+ PerFunctionRateExprBuilder multiply (Integer multiplier ) {
180+ // Promote to double literal to avoid integer division in downstream
181+ this .expr =
182+ function (
183+ MULTIPLY .getName ().getFunctionName (), expr , doubleLiteral (multiplier .doubleValue ()));
184+ return this ;
185+ }
186+
187+ Let dividedBy (UnresolvedExpression divisor ) {
188+ return AstDSL .let (field , function (DIVIDE .getName ().getFunctionName (), expr , divisor ));
189+ }
190+
191+ static UnresolvedExpression sum (UnresolvedExpression field ) {
192+ return aggregate (SUM .getName ().getFunctionName (), field );
193+ }
194+
195+ static Function timestampadd (
196+ SpanUnit unit , UnresolvedExpression value , UnresolvedExpression timestampField ) {
197+ UnresolvedExpression intervalUnit =
198+ stringLiteral (PlanUtils .spanUnitToIntervalUnit (unit ).toString ());
199+ return function (
200+ TIMESTAMPADD .getName ().getFunctionName (), intervalUnit , value , timestampField );
201+ }
202+
203+ static Function timestampdiff (
204+ IntervalUnit unit , UnresolvedExpression start , UnresolvedExpression end ) {
205+ return function (
206+ TIMESTAMPDIFF .getName ().getFunctionName (), stringLiteral (unit .toString ()), start , end );
207+ }
208+ }
55209}
0 commit comments