Skip to content

Commit e0a03f0

Browse files
committed
Refactor timewrap: extract TimewrapUtils, add precise calendar arithmetic
- Extract all timewrap helper methods to TimewrapUtils.java in calcite/utils/ - Add precise EXTRACT-based period computation for month/quarter/year - Add cumDaysBeforeMonth with leap year CASE expression for precise quarter offset - Month/quarter/year period assignment now uses calendar arithmetic instead of approximate fixed-length conversions Signed-off-by: Jialiang Li <jialiang.li@hey.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 9c02b16 commit e0a03f0

File tree

3 files changed

+442
-183
lines changed

3 files changed

+442
-183
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 95 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,8 @@
8787
import org.opensearch.sql.ast.expression.Alias;
8888
import org.opensearch.sql.ast.expression.AllFields;
8989
import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta;
90-
import org.opensearch.sql.ast.expression.And;
9190
import org.opensearch.sql.ast.expression.Argument;
9291
import org.opensearch.sql.ast.expression.Argument.ArgumentMap;
93-
import org.opensearch.sql.ast.expression.Compare;
9492
import org.opensearch.sql.ast.expression.Field;
9593
import org.opensearch.sql.ast.expression.Function;
9694
import org.opensearch.sql.ast.expression.Let;
@@ -168,6 +166,7 @@
168166
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
169167
import org.opensearch.sql.calcite.utils.PPLHintUtils;
170168
import org.opensearch.sql.calcite.utils.PlanUtils;
169+
import org.opensearch.sql.calcite.utils.TimewrapUtils;
171170
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
172171
import org.opensearch.sql.calcite.utils.WildcardUtils;
173172
import org.opensearch.sql.common.patterns.PatternUtils;
@@ -3096,19 +3095,14 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) {
30963095
return relBuilder.peek();
30973096
}
30983097

3099-
private static final int TIMEWRAP_MAX_PERIODS = 20;
3100-
31013098
@Override
31023099
public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) {
31033100
visitChildren(node, context);
31043101

31053102
// Signal the execution engine to strip all-null columns and rename with absolute offsets
31063103
CalcitePlanContext.stripNullColumns.set(true);
3107-
// Both align=now and align=end use _before suffix (matching Splunk behavior).
3108-
// align=end would use search end time as reference, but PPL has no search time range
3109-
// context, so both modes currently use query execution time.
31103104
CalcitePlanContext.timewrapUnitName.set(
3111-
timewrapUnitBaseName(node.getUnit(), node.getValue()) + "|_before");
3105+
TimewrapUtils.unitBaseName(node.getUnit(), node.getValue()) + "|_before");
31123106

31133107
RelBuilder b = context.relBuilder;
31143108
RexBuilder rx = context.rexBuilder;
@@ -3118,62 +3112,100 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) {
31183112
String tsFieldName = fieldNames.get(0);
31193113
List<String> valueFieldNames = fieldNames.subList(1, fieldNames.size());
31203114

3121-
long spanSec = timewrapSpanToSeconds(node.getUnit(), node.getValue());
3115+
boolean variableLength = TimewrapUtils.isVariableLengthUnit(node.getUnit());
31223116
RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
31233117

3124-
// Step 1: Convert timestamps to epoch seconds via UNIX_TIMESTAMP, add MAX OVER()
3125-
RexNode tsEpochExpr =
3126-
rx.makeCast(
3127-
bigintType,
3128-
rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, b.field(tsFieldName)),
3129-
true);
3130-
b.projectPlus(
3131-
b.alias(tsEpochExpr, "__ts_epoch__"),
3132-
b.aggregateCall(SqlStdOperatorTable.MAX, tsEpochExpr).over().as("__max_epoch__"));
3133-
3134-
// Step 2: Compute period_number and offset
3135-
RexNode tsEpoch = b.field("__ts_epoch__");
3136-
RexNode maxEpoch = b.field("__max_epoch__");
3137-
RexNode spanLit = rx.makeBigintLiteral(BigDecimal.valueOf(spanSec));
3138-
3139-
// period = (max_epoch - ts_epoch) / span_sec + 1 (integer division truncates)
3140-
RexNode diff = rx.makeCall(SqlStdOperatorTable.MINUS, maxEpoch, tsEpoch);
3141-
RexNode periodNum =
3142-
rx.makeCall(
3143-
SqlStdOperatorTable.PLUS,
3144-
rx.makeCall(SqlStdOperatorTable.DIVIDE, diff, spanLit),
3145-
rx.makeExactLiteral(BigDecimal.ONE, bigintType));
3146-
3147-
// offset_sec = ts_epoch MOD span_sec
3148-
// Convert back to actual timestamp: latest_period_start + offset
3149-
RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MOD, tsEpoch, spanLit);
3150-
RexNode latestPeriodStart =
3151-
rx.makeCall(
3152-
SqlStdOperatorTable.MINUS,
3153-
maxEpoch,
3154-
rx.makeCall(SqlStdOperatorTable.MOD, maxEpoch, spanLit));
3155-
RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, latestPeriodStart, offsetSec);
3156-
RexNode displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch);
3157-
3158-
// Compute base_offset for absolute period naming in execution engine.
3159-
// align=now: reference = current time
3160-
// align=end: reference = WHERE upper bound (search end time), fallback to now
3118+
RexNode periodNum;
3119+
RexNode displayTimestamp;
31613120
RexNode baseOffset;
3162-
long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000;
3163-
Long referenceEpoch = null;
3164-
if ("end".equals(node.getAlign())) {
3165-
// Try to extract the upper bound from a WHERE clause on the timestamp field
3166-
referenceEpoch = extractTimestampUpperBound(node);
3167-
}
3168-
if (referenceEpoch == null) {
3169-
referenceEpoch = nowEpochSec;
3170-
}
3171-
RexNode refLit = rx.makeBigintLiteral(BigDecimal.valueOf(referenceEpoch));
3172-
baseOffset =
3173-
rx.makeCall(
3174-
SqlStdOperatorTable.DIVIDE,
3175-
rx.makeCall(SqlStdOperatorTable.MINUS, refLit, maxEpoch),
3176-
spanLit);
3121+
3122+
if (variableLength) {
3123+
// --- Variable-length units (month, quarter, year): EXTRACT-based calendar arithmetic ---
3124+
RexNode tsField = b.field(tsFieldName);
3125+
RexNode tsUnitNum =
3126+
TimewrapUtils.calendarUnitNumber(rx, tsField, node.getUnit(), node.getValue());
3127+
3128+
b.projectPlus(b.aggregateCall(SqlStdOperatorTable.MAX, tsField).over().as("__max_ts__"));
3129+
RexNode maxTs = b.field("__max_ts__");
3130+
RexNode maxUnitNum =
3131+
TimewrapUtils.calendarUnitNumber(rx, maxTs, node.getUnit(), node.getValue());
3132+
3133+
periodNum =
3134+
rx.makeCall(
3135+
SqlStdOperatorTable.PLUS,
3136+
rx.makeCall(SqlStdOperatorTable.MINUS, maxUnitNum, tsUnitNum),
3137+
rx.makeExactLiteral(BigDecimal.ONE, bigintType));
3138+
3139+
RexNode tsEpoch =
3140+
rx.makeCast(bigintType, rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, tsField), true);
3141+
RexNode unitStartEpoch = TimewrapUtils.calendarUnitStartEpoch(rx, tsField, node.getUnit());
3142+
RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MINUS, tsEpoch, unitStartEpoch);
3143+
RexNode maxUnitStartEpoch = TimewrapUtils.calendarUnitStartEpoch(rx, maxTs, node.getUnit());
3144+
RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, maxUnitStartEpoch, offsetSec);
3145+
displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch);
3146+
3147+
long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000;
3148+
Long referenceEpoch = null;
3149+
if ("end".equals(node.getAlign())) {
3150+
referenceEpoch = TimewrapUtils.extractTimestampUpperBound(node);
3151+
}
3152+
if (referenceEpoch == null) {
3153+
referenceEpoch = nowEpochSec;
3154+
}
3155+
long refUnitNum =
3156+
TimewrapUtils.calendarUnitNumberFromEpoch(
3157+
referenceEpoch, node.getUnit(), node.getValue());
3158+
RexNode refUnitNumLit = rx.makeBigintLiteral(BigDecimal.valueOf(refUnitNum));
3159+
baseOffset = rx.makeCall(SqlStdOperatorTable.MINUS, refUnitNumLit, maxUnitNum);
3160+
3161+
} else {
3162+
// --- Fixed-length units (sec, min, hr, day, week): epoch-based arithmetic ---
3163+
long spanSec = TimewrapUtils.spanToSeconds(node.getUnit(), node.getValue());
3164+
3165+
RexNode tsEpochExpr =
3166+
rx.makeCast(
3167+
bigintType,
3168+
rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, b.field(tsFieldName)),
3169+
true);
3170+
b.projectPlus(
3171+
b.alias(tsEpochExpr, "__ts_epoch__"),
3172+
b.aggregateCall(SqlStdOperatorTable.MAX, tsEpochExpr).over().as("__max_epoch__"));
3173+
3174+
RexNode tsEpoch = b.field("__ts_epoch__");
3175+
RexNode maxEpoch = b.field("__max_epoch__");
3176+
RexNode spanLit = rx.makeBigintLiteral(BigDecimal.valueOf(spanSec));
3177+
3178+
RexNode diff = rx.makeCall(SqlStdOperatorTable.MINUS, maxEpoch, tsEpoch);
3179+
periodNum =
3180+
rx.makeCall(
3181+
SqlStdOperatorTable.PLUS,
3182+
rx.makeCall(SqlStdOperatorTable.DIVIDE, diff, spanLit),
3183+
rx.makeExactLiteral(BigDecimal.ONE, bigintType));
3184+
3185+
RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MOD, tsEpoch, spanLit);
3186+
RexNode latestPeriodStart =
3187+
rx.makeCall(
3188+
SqlStdOperatorTable.MINUS,
3189+
maxEpoch,
3190+
rx.makeCall(SqlStdOperatorTable.MOD, maxEpoch, spanLit));
3191+
RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, latestPeriodStart, offsetSec);
3192+
displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch);
3193+
3194+
long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000;
3195+
Long referenceEpoch = null;
3196+
if ("end".equals(node.getAlign())) {
3197+
referenceEpoch = TimewrapUtils.extractTimestampUpperBound(node);
3198+
}
3199+
if (referenceEpoch == null) {
3200+
referenceEpoch = nowEpochSec;
3201+
}
3202+
RexNode refLit = rx.makeBigintLiteral(BigDecimal.valueOf(referenceEpoch));
3203+
baseOffset =
3204+
rx.makeCall(
3205+
SqlStdOperatorTable.DIVIDE,
3206+
rx.makeCall(SqlStdOperatorTable.MINUS, refLit, maxEpoch),
3207+
spanLit);
3208+
}
31773209

31783210
// Step 3: Project [display_timestamp, value_columns..., base_offset, period]
31793211
// base_offset is included in the group key so it survives the PIVOT
@@ -3193,8 +3225,8 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) {
31933225
b.groupKey(b.field(tsFieldName), b.field("__base_offset__")),
31943226
valueFieldNames.stream().map(f -> (RelBuilder.AggCall) b.max(b.field(f)).as("")).toList(),
31953227
ImmutableList.of(b.field("__period__")),
3196-
IntStream.rangeClosed(1, TIMEWRAP_MAX_PERIODS)
3197-
.map(i -> TIMEWRAP_MAX_PERIODS + 1 - i) // reverse: oldest period first
3228+
IntStream.rangeClosed(1, TimewrapUtils.MAX_PERIODS)
3229+
.map(i -> TimewrapUtils.MAX_PERIODS + 1 - i) // reverse: oldest period first
31983230
.mapToObj(
31993231
i ->
32003232
Map.entry(
@@ -3228,117 +3260,6 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) {
32283260
return b.peek();
32293261
}
32303262

3231-
/**
3232-
* Convert a span unit and value to approximate seconds. Variable-length units use standard
3233-
* approximations: month=30 days, quarter=91 days, year=365 days.
3234-
*/
3235-
private long timewrapSpanToSeconds(SpanUnit unit, int value) {
3236-
return switch (unit.getName()) {
3237-
case "s" -> value;
3238-
case "m" -> value * 60L;
3239-
case "h" -> value * 3_600L;
3240-
case "d" -> value * 86_400L;
3241-
case "w" -> value * 7L * 86_400L;
3242-
case "M" -> value * 30L * 86_400L; // month ≈ 30 days
3243-
case "q" -> value * 91L * 86_400L; // quarter ≈ 91 days
3244-
case "y" -> value * 365L * 86_400L; // year ≈ 365 days
3245-
default ->
3246-
throw new SemanticCheckException("Unsupported time unit in timewrap: " + unit.getName());
3247-
};
3248-
}
3249-
3250-
/**
3251-
* Get the timescale base name for timewrap column naming. Returns singular and plural forms
3252-
* separated by "|", e.g., "day|days". Used by the execution engine to build absolute period names
3253-
* like "501days_before".
3254-
*/
3255-
private String timewrapUnitBaseName(SpanUnit unit, int value) {
3256-
String singular =
3257-
switch (unit.getName()) {
3258-
case "s" -> "second";
3259-
case "m" -> "minute";
3260-
case "h" -> "hour";
3261-
case "d" -> "day";
3262-
case "w" -> "week";
3263-
case "M" -> "month";
3264-
case "q" -> "quarter";
3265-
case "y" -> "year";
3266-
default -> "period";
3267-
};
3268-
String plural = singular + "s";
3269-
// Encode value so execution engine can compute totalUnits = (base_offset + period) * value
3270-
return value + "|" + singular + "|" + plural;
3271-
}
3272-
3273-
/**
3274-
* Walk the AST from a Timewrap node to find a WHERE clause with an upper bound on the timestamp
3275-
* field (e.g., @timestamp <= '2024-07-03 18:00:00'). Returns the upper bound as epoch seconds, or
3276-
* null if not found.
3277-
*/
3278-
private Long extractTimestampUpperBound(Timewrap node) {
3279-
// Walk: Timewrap → Chart → Filter → inspect condition
3280-
Node current = node;
3281-
while (current != null && !current.getChild().isEmpty()) {
3282-
current = current.getChild().get(0);
3283-
if (current instanceof Filter filter) {
3284-
return findUpperBound(filter.getCondition());
3285-
}
3286-
}
3287-
return null;
3288-
}
3289-
3290-
/** Recursively search an expression tree for a timestamp upper bound (<=). */
3291-
private Long findUpperBound(UnresolvedExpression expr) {
3292-
if (expr instanceof And) {
3293-
And and = (And) expr;
3294-
Long left = findUpperBound(and.getLeft());
3295-
Long right = findUpperBound(and.getRight());
3296-
// If both sides have upper bounds, use the smaller one (tighter bound)
3297-
if (left != null && right != null) return Math.min(left, right);
3298-
return left != null ? left : right;
3299-
}
3300-
if (expr instanceof Compare cmp) {
3301-
String op = cmp.getOperator();
3302-
// Check for @timestamp <= X or @timestamp < X
3303-
if (("<=".equals(op) || "<".equals(op)) && isTimestampField(cmp.getLeft())) {
3304-
return parseTimestampLiteral(cmp.getRight());
3305-
}
3306-
// Check for X >= @timestamp or X > @timestamp
3307-
if ((">=".equals(op) || ">".equals(op)) && isTimestampField(cmp.getRight())) {
3308-
return parseTimestampLiteral(cmp.getLeft());
3309-
}
3310-
}
3311-
return null;
3312-
}
3313-
3314-
private boolean isTimestampField(UnresolvedExpression expr) {
3315-
if (expr instanceof Field field) {
3316-
String name = field.getField().toString();
3317-
return "@timestamp".equals(name) || "timestamp".equals(name);
3318-
}
3319-
return false;
3320-
}
3321-
3322-
private Long parseTimestampLiteral(UnresolvedExpression expr) {
3323-
if (expr instanceof Literal lit && lit.getValue() instanceof String s) {
3324-
try {
3325-
// Parse "yyyy-MM-dd HH:mm:ss" format
3326-
java.time.LocalDateTime ldt =
3327-
java.time.LocalDateTime.parse(
3328-
s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
3329-
return ldt.toEpochSecond(java.time.ZoneOffset.UTC);
3330-
} catch (Exception e) {
3331-
// Try ISO format
3332-
try {
3333-
return java.time.Instant.parse(s).getEpochSecond();
3334-
} catch (Exception ignored) {
3335-
return null;
3336-
}
3337-
}
3338-
}
3339-
return null;
3340-
}
3341-
33423263
/**
33433264
* Aggregate by column split then rank by grand total (summed value of each category). The output
33443265
* is <code>[col-split, grand-total, row-number]</code>

0 commit comments

Comments
 (0)