diff --git a/docs/en/14-reference/03-taos-sql/41-stream.md b/docs/en/14-reference/03-taos-sql/41-stream.md index 35d5f2ac4a74..29a9707116cb 100755 --- a/docs/en/14-reference/03-taos-sql/41-stream.md +++ b/docs/en/14-reference/03-taos-sql/41-stream.md @@ -75,8 +75,8 @@ PERIOD(period_time[, offset_time]) A scheduled trigger is driven by a fixed interval based on the system time, essentially functioning as a scheduled task. It does not belong to the category of window triggers. Parameter definitions are as follows: -- period_time: The scheduling interval. Supported time units include milliseconds (a), seconds (s), minutes (m), hours (h), and days (d). The supported range is [10a, 3650d]. -- offset_time: (Optional) The scheduling offset. Supported units include milliseconds (a), seconds (s), minutes (m), and hours (h). The offset value must be less than 1 day. +- period_time: The scheduling interval. Supported time units include milliseconds (a), seconds (s), minutes (m), hours (h), days (d), weeks (w), months (n), and years (y). The supported range is [10a, 3650d]. +- offset_time: (Optional) The scheduling offset. Supported units include milliseconds (a), seconds (s), minutes (m), hours (h), and days (d). For week/month/year units, the offset must be strictly less than the trigger period; for month units, validation is based on 28 days/month (e.g., `PERIOD(1n, 28d)` is invalid). Usage Notes: @@ -84,9 +84,11 @@ Usage Notes: - If the scheduling interval is 5 hours 30 minutes, the trigger times for the day will be [00:00, 05:30, 11:00, 16:30, 22:00]. The trigger times for subsequent days will be the same. - With the same interval but an offset of 1 minute, the trigger times will be [00:01, 05:31, 11:01, 16:31, 22:01] each day. - Under the same conditions, if the stream is created when the system time is 12:00, the trigger times for the current day will be [16:31, 22:01]. From the next day onwards, the trigger times will be [00:01, 05:31, 11:01, 16:31, 22:01]. -- When the scheduling interval is greater than or equal to 1 day, the base time is calculated as midnight (00:00) of the current day plus the scheduling offset, and it will not reset on subsequent days. For example: - - If the scheduling interval is 1 day 1 hour and the stream is created when the system time is 05-01 12:00, the trigger times will be [05-02 01:00, 05-03 02:00, 05-04 03:00, 05-05 04:00, …]. - - Under the same conditions, if the time offset is 1 minute, the trigger times will be [05-02 01:01, 05-03 02:02, 05-04 03:03, 05-05 04:04, …]. +- When the scheduling interval is greater than or equal to 1 day, the base time is calculated from the server timezone's Unix epoch (1970-01-01 00:00:00) plus the scheduling offset, aligned by integer multiples of the trigger interval to ensure global consistency across all tasks. For example: + - With a scheduling interval of 2 days, all tasks using this interval will trigger at times that are integer multiples of 2 days from the epoch (e.g., 1970-01-03 00:00:00, 1970-01-05 00:00:00, ...), ensuring global alignment. + - With a scheduling interval of 1 week (`PERIOD(1w)`), triggers align to every Monday at 00:00:00; `PERIOD(1w, 1d)` triggers every Tuesday at 00:00:00. + - With a scheduling interval of 1 month (`PERIOD(1n)`), triggers align to the 1st of each month at 00:00:00; `PERIOD(1n, 14d)` triggers on the 15th of each month at 00:00:00. + - With a scheduling interval of 1 year (`PERIOD(1y)`), triggers align to January 1st at 00:00:00 each year; `PERIOD(1y, 31d)` triggers on February 1st at 00:00:00 each year. Applicable scenarios: Situations requiring scheduled computation driven continuously by system time, such as generating daily statistics every hour, or sending scheduled statistical reports once a day. @@ -1018,3 +1020,72 @@ CREATE stream stream_consumer_energy FROM meters WHERE ts >= cast(_tprev_localtime/1000000 AS timestamp) AND ts <= cast(_tlocaltime/1000000 AS timestamp); ``` + +- Every Monday at 00:00:00, compute the weekly device operation summary for the previous week and write the results to the weekly_summary table. + +```SQL +CREATE STREAM weekly_device_summary + PERIOD(1w) + FROM meters PARTITION BY location + INTO weekly_summary + AS + SELECT _wstart AS week_start, + location, + AVG(current) AS avg_current, + MAX(voltage) AS max_voltage, + COUNT(*) AS record_count + FROM meters + INTERVAL(1w) + PARTITION BY location; +``` + +- On the 1st of each month at 00:00:00, compute the energy consumption bill for the previous month and write the results to the monthly_bill table. + +```SQL +CREATE STREAM monthly_energy_bill + PERIOD(1n) + FROM meters PARTITION BY location, groupId + INTO monthly_bill + AS + SELECT _wstart AS month_start, + location, + groupId, + SUM(current * voltage) AS total_energy + FROM meters + INTERVAL(1n) + PARTITION BY location, groupId; +``` + +- On the 15th of each month at 00:00:00, compute the mid-month settlement report (using the offset parameter). + +```SQL +CREATE STREAM mid_month_settlement + PERIOD(1n, 14d) + FROM meters PARTITION BY location + INTO mid_month_settlement_table + AS + SELECT _wstart AS period_start, + location, + SUM(current * voltage) AS total_energy + FROM meters + INTERVAL(1n) + PARTITION BY location; +``` + +- On January 1st at 00:00:00 each year, archive the full data from the previous year. + +```SQL +CREATE STREAM yearly_archive + PERIOD(1y) + FROM meters PARTITION BY location, groupId + INTO yearly_archive_table + AS + SELECT _wstart AS year_start, + location, + groupId, + AVG(current) AS avg_current, + SUM(current * voltage) AS total_energy + FROM meters + INTERVAL(1y) + PARTITION BY location, groupId; +``` diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 4182d6ac50d1..e695c45b7c45 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -559,6 +559,8 @@ Below are the business error codes for each module. | 0x8000268E | Invalid table type | Incorrect Table type | Check and correct the SQL statement | | 0x8000268F | Invalid ref column type | Virtual table's column type and data source column's type are different | Check and correct the SQL statement | | 0x80002690 | Create child table using virtual super table | Create non-virtual child table using virtual super table | Check and correct the SQL statement | +| 0x800026AF | Invalid offset unit | Invalid time unit used in offset clause | Check and correct the SQL statement | +| 0x800026B0 | Invalid offset value | Invalid offset value in time window | Check and correct the SQL statement | | 0x80002696 | Invalid sliding offset | Invalid sliding offset | Check and correct the SQL statement | | 0x80002697 | Invalid interval offset | Invalid interval offset | Check and correct the SQL statement | | 0x80002698 | Invalid extend value | Invalid extend value | Check and correct the SQL statement | diff --git a/docs/zh/14-reference/03-taos-sql/41-stream.md b/docs/zh/14-reference/03-taos-sql/41-stream.md index d3fed59bcec3..c2f7b7e1bada 100755 --- a/docs/zh/14-reference/03-taos-sql/41-stream.md +++ b/docs/zh/14-reference/03-taos-sql/41-stream.md @@ -74,8 +74,8 @@ PERIOD(period_time[, offset_time]) 定时触发通过系统时间的固定间隔来驱动,本质上就是我们常说的定时任务。定时触发不属于窗口触发。各参数含义如下: -- period_time:定时间隔,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d),支持的时间范围为 `[10a, 3650d]`。 -- offset_time:可选,定时偏移,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h),偏移大小应该小于 1 天。 +- period_time:定时间隔,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d)、周 (w)、月 (n)、年 (y),支持的时间范围为 `[10a, 3650d]`。 +- offset_time:可选,定时偏移,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d)。对于周/月/年单位,offset 必须严格小于触发周期;对于月单位,以 28 天/月为基准静态校验(如 `PERIOD(1n, 28d)` 非法)。 使用说明: @@ -83,9 +83,11 @@ PERIOD(period_time[, offset_time]) - 定时间隔为 5 小时 30 分钟,那么当天的触发时刻为 `[00:00, 05:30, 11:00, 16:30, 22:00]`,后续每一天的触发时刻都是相同的。 - 同样的定时间隔,如果指定时间偏移为 1 分钟,那么当天的触发时刻为 `[00:01, 05:31, 11:01, 16:31, 22:01]`,后续每一天的触发时刻都是相同的。 - 同样条件下,如果建流时当前系统时间为 `12:00`,那么当天的触发时刻为 `[16:31, 22:01]`,后续每一天内的触发时刻为 `[00:01, 05:31, 11:01, 16:31, 22:01]`。 -- 定时间隔大于等于 1 天时,基准时间点为当日的零点加定时偏移,后续不会重置。例如: - - 定时间隔为 1 天 1 小时,建流时当前系统时间为 `05-01 12:00`,那么在当天及随后几天的触发时刻为 `[05-02 01:00, 05-03 02:00, 05-04 03:00, 05-05 04:00, ……]`。 - - 同样条件下,如果指定时间偏移为 1 分钟,那么当天及随后几天的触发时刻为 `[05-02 01:01, 05-03 02:02, 05-04 03:03, 05-05 04:04, ……]`。 +- 定时间隔大于等于 1 天时,基准时间点为服务端时区的 Unix epoch(1970-01-01 00:00:00)加定时偏移,按触发间隔整除对齐,保证所有任务触发时刻全局一致。例如: + - 定时间隔为 2 天,所有使用该间隔的任务都会在距离 epoch 整数倍 2 天的时刻触发(如 1970-01-03 00:00:00, 1970-01-05 00:00:00, ...),确保全局对齐。 + - 定时间隔为 1 周(`PERIOD(1w)`),触发时刻对齐每周一 00:00:00;`PERIOD(1w, 1d)` 则在每周二 00:00:00 触发。 + - 定时间隔为 1 月(`PERIOD(1n)`),触发时刻对齐每月 1 日 00:00:00;`PERIOD(1n, 14d)` 则在每月 15 日 00:00:00 触发。 + - 定时间隔为 1 年(`PERIOD(1y)`),触发时刻对齐每年 1 月 1 日 00:00:00;`PERIOD(1y, 31d)` 则在每年 2 月 1 日 00:00:00 触发。 适用场景:需要按照系统时间连续定时驱动计算的场景,例如每小时计算生成一次当天的统计数据,每天定时发送统计报告等。 @@ -1016,3 +1018,72 @@ CREATE stream stream_consumer_energy FROM meters WHERE ts >= cast(_tprev_localtime/1000000 AS timestamp) AND ts <= cast(_tlocaltime/1000000 AS timestamp); ``` + +- 每周一 00:00:00 计算上周的设备运行汇总,计算结果写入 weekly_summary 表。 + +```SQL +CREATE STREAM weekly_device_summary + PERIOD(1w) + FROM meters PARTITION BY location + INTO weekly_summary + AS + SELECT _wstart AS week_start, + location, + AVG(current) AS avg_current, + MAX(voltage) AS max_voltage, + COUNT(*) AS record_count + FROM meters + INTERVAL(1w) + PARTITION BY location; +``` + +- 每月 1 日 00:00:00 计算上月的能耗账单,计算结果写入 monthly_bill 表。 + +```SQL +CREATE STREAM monthly_energy_bill + PERIOD(1n) + FROM meters PARTITION BY location, groupId + INTO monthly_bill + AS + SELECT _wstart AS month_start, + location, + groupId, + SUM(current * voltage) AS total_energy + FROM meters + INTERVAL(1n) + PARTITION BY location, groupId; +``` + +- 每月 15 日 00:00:00 计算半月结算报表(使用 offset 参数)。 + +```SQL +CREATE STREAM mid_month_settlement + PERIOD(1n, 14d) + FROM meters PARTITION BY location + INTO mid_month_settlement_table + AS + SELECT _wstart AS period_start, + location, + SUM(current * voltage) AS total_energy + FROM meters + INTERVAL(1n) + PARTITION BY location; +``` + +- 每年 1 月 1 日 00:00:00 归档上一年的全量数据。 + +```SQL +CREATE STREAM yearly_archive + PERIOD(1y) + FROM meters PARTITION BY location, groupId + INTO yearly_archive_table + AS + SELECT _wstart AS year_start, + location, + groupId, + AVG(current) AS avg_current, + SUM(current * voltage) AS total_energy + FROM meters + INTERVAL(1y) + PARTITION BY location, groupId; +``` diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index bac283c89261..49b9dbcb31e3 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -557,6 +557,8 @@ TSDB 错误码包括 taosc 客户端和服务端,所有语言的连接器无 | 0x8000268E | Invalid table type | 表类型不正确 | 检查并修正 SQL 语句 | | 0x8000268F | Invalid ref column type | 虚拟表列的数据类型与数据源的数据类型不同 | 检查并修正 SQL 语句 | | 0x80002690 | Create child table using virtual super table | 创建非虚拟子表 USING 了虚拟超级表 | 检查并修正 SQL 语句 | +| 0x800026AF | Invalid offset unit | offset 子句中使用了无效的时间单位 | 检查并修正 SQL 语句 | +| 0x800026B0 | Invalid offset value | 时间窗口中的 offset 值无效 | 检查并修正 SQL 语句 | | 0x80002696 | Invalid sliding offset | sliding 窗口偏移量非法 | 检查并修正 SQL 语句 | | 0x80002697 | Invalid interval offset | interval 窗口偏移量非法 | 检查并修正 SQL 语句 | | 0x80002698 | Invalid extend value | extend 参数非法 | 检查并修正 SQL 语句 | diff --git a/include/common/ttime.h b/include/common/ttime.h index 022c5c6cdcc1..825be4fe81e4 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -83,6 +83,7 @@ int32_t convertCalendarTimeFromUnitToPrecision(int64_t time, char fromUnit, int int32_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit, int64_t* pRes); int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal, timezone_t tz, void* charsetCxt); int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision); +int64_t alignToNaturalBoundary(int64_t timestamp, char unit, int64_t value, int64_t offset, int32_t precision, timezone_t tz); int32_t taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision); char* formatTimestampLocal(char* buf, int64_t val, int precision); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ce6384e1ffc4..1ea781b3b9f9 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1078,6 +1078,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_PAR_NOT_ALLOWED_FILL_MODE TAOS_DEF_ERROR_CODE(0, 0x26AC) #define TSDB_CODE_PAR_NOT_ALLOWED_FILL_VALUES TAOS_DEF_ERROR_CODE(0, 0x26AD) #define TSDB_CODE_PAR_INVALID_SURROUND_TIME_VALUES TAOS_DEF_ERROR_CODE(0, 0x26AE) +#define TSDB_CODE_PAR_INVALID_OFFSET_UNIT TAOS_DEF_ERROR_CODE(0, 0x26AF) +#define TSDB_CODE_PAR_INVALID_OFFSET_VALUE TAOS_DEF_ERROR_CODE(0, 0x26B0) // #define TSDB_CODE_PAR_PRIV_TYPE_TARGET_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x26E0) #define TSDB_CODE_PAR_COL_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x26E1) diff --git a/include/util/tringbuf.h b/include/util/tringbuf.h index 4cae6fd7c217..4add6546396d 100644 --- a/include/util/tringbuf.h +++ b/include/util/tringbuf.h @@ -43,8 +43,10 @@ extern "C" { } \ } while (0) +typedef TRINGBUF(void) TRingBufGeneral; + static FORCE_INLINE int32_t tringbufExtend(void *rbuf, int32_t expSize, int32_t eleSize) { - TRINGBUF(void) *rb = rbuf; + TRingBufGeneral *rb = (TRingBufGeneral *)rbuf; int32_t capacity = TRINGBUF_CAPACITY(rb); if (expSize <= capacity) return 0; @@ -72,7 +74,7 @@ static FORCE_INLINE int32_t tringbufExtend(void *rbuf, int32_t expSize, int32_t } static FORCE_INLINE int32_t tringbufPushBatch(void *rbuf, const void *elePtr, int32_t numEle, int32_t eleSize) { - TRINGBUF(void) *rb = rbuf; + TRingBufGeneral *rb = (TRingBufGeneral *)rbuf; int32_t ret = 0; if (rb->size + numEle > rb->capacity) { diff --git a/source/common/src/msg/streamMsg.c b/source/common/src/msg/streamMsg.c index ffc9b3ec22aa..9a3d8a4eb32c 100644 --- a/source/common/src/msg/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -681,6 +681,9 @@ int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerD } case WINDOW_TYPE_PERIOD: { // period trigger + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.periodUnit)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.offsetUnit)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.precision)); TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period)); TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset)); break; @@ -1253,6 +1256,9 @@ int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployM case WINDOW_TYPE_PERIOD: // period trigger + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.periodUnit)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.offsetUnit)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.period.precision)); TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period)); TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset)); break; diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 9117dc44a93c..afee017c4537 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -696,6 +696,116 @@ int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecisi TAOS_RETURN(TSDB_CODE_SUCCESS); } +/** + * @brief Align timestamp to natural calendar boundary (Monday/Month start/Year start) + * + * @param timestamp Input timestamp in specified precision + * @param unit Time unit: 'w' (week), 'n' (month), 'y' (year) + * @param value Period multiplier (e.g., 2 for 2 weeks) + * @param offset Offset from boundary in same precision as timestamp + * @param precision Time precision (TSDB_TIME_PRECISION_MILLI/MICRO/NANO) + * @param tz Timezone for calendar calculations + * @return Aligned timestamp at natural boundary + offset + * + * Examples: + * - alignToNaturalBoundary(ts, 'w', 1, 0, ...) -> Monday 00:00:00 + * - alignToNaturalBoundary(ts, 'n', 1, 0, ...) -> 1st of month 00:00:00 + * - alignToNaturalBoundary(ts, 'y', 1, 0, ...) -> Jan 1st 00:00:00 + */ +int64_t alignToNaturalBoundary(int64_t timestamp, char unit, int64_t value, int64_t offset, int32_t precision, + timezone_t tz) { + // Convert timestamp to seconds for calendar calculations + int64_t precisionFactor = TSDB_TICK_PER_SECOND(precision); + time_t t = timestamp / precisionFactor; + struct tm tm; + if (taosLocalTime(&t, &tm, NULL, 0, tz) == NULL){ + uError("%s failed to get local time, code:%d", __FUNCTION__, ERRNO); + return timestamp; + } + + int64_t aligned = 0; + + switch (unit) { + case 'w': { + // Align to Monday 00:00:00 + int daysSinceMonday = (tm.tm_wday + 6) % 7; // Convert Sunday=0 to Monday=0 + tm.tm_mday -= daysSinceMonday; + tm.tm_hour = 0; + tm.tm_min = 0; + tm.tm_sec = 0; + + // For multi-week periods, align based on epoch + time_t mondayTime = taosMktime(&tm, tz); + if (value > 1) { + // Calculate epoch Monday (1970-01-05 00:00:00) in the same timezone + struct tm epochTm = {0}; + epochTm.tm_year = 70; // 1970 + epochTm.tm_mon = 0; // January + epochTm.tm_mday = 5; // 5th (first Monday) + epochTm.tm_hour = 0; + epochTm.tm_min = 0; + epochTm.tm_sec = 0; + epochTm.tm_isdst = -1; + time_t epochMonday = taosMktime(&epochTm, tz); + + int64_t weeksSinceEpoch = (mondayTime - epochMonday) / (7 * 86400); + int64_t alignedWeeks = (weeksSinceEpoch / value) * value; + mondayTime = epochMonday + alignedWeeks * 7 * 86400; + } + + aligned = (int64_t)mondayTime * precisionFactor; + break; + } + + case 'n': { + // Align to 1st of month 00:00:00 + tm.tm_mday = 1; + tm.tm_hour = 0; + tm.tm_min = 0; + tm.tm_sec = 0; + + // For multi-month periods, align based on epoch + if (value > 1) { + int monthsSinceEpoch = (tm.tm_year - 70) * 12 + tm.tm_mon; + int alignedMonths = (monthsSinceEpoch / value) * value; + tm.tm_year = 70 + alignedMonths / 12; + tm.tm_mon = alignedMonths % 12; + } + + aligned = (int64_t)taosMktime(&tm, tz) * precisionFactor; + break; + } + + case 'y': { + // Align to Jan 1st 00:00:00 + tm.tm_mon = 0; + tm.tm_mday = 1; + tm.tm_hour = 0; + tm.tm_min = 0; + tm.tm_sec = 0; + + // For multi-year periods, align based on epoch + if (value > 1) { + int yearsSinceEpoch = tm.tm_year - 70; + int alignedYears = (yearsSinceEpoch / value) * value; + tm.tm_year = 70 + alignedYears; + } + + aligned = (int64_t)taosMktime(&tm, tz) * precisionFactor; + break; + } + + default: + // For other units, return timestamp as-is + return timestamp; + } + + // Apply offset to the aligned boundary + // Offset is already in the same precision as the timestamp + // Example: PERIOD(1w, 1d) -> Monday 00:00:00 + 1 day = Tuesday 00:00:00 + return aligned + offset; +} + /* * n - months * y - Years @@ -2302,4 +2412,4 @@ int32_t taosParseShortWeekday(const char* str) { } } return -1; -} \ No newline at end of file +} diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 2672cccda74d..f9be76cb4182 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -33,6 +33,20 @@ add_test( COMMAND dataformatTest ) +# ttimeNaturalUnitsTest.cpp +add_executable(ttimeNaturalUnitsTest "ttimeNaturalUnitsTest.cpp") +DEP_ext_gtest(ttimeNaturalUnitsTest) +target_link_libraries(ttimeNaturalUnitsTest PRIVATE common) +target_include_directories( + ttimeNaturalUnitsTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) +add_test( + NAME ttimeNaturalUnitsTest + COMMAND ttimeNaturalUnitsTest +) + # # cosCpTest.cpp # add_executable(cosCpTest "cosCpTest.cpp") # DEP_ext_gtest(cosCpTest) diff --git a/source/common/test/ttimeNaturalUnitsTest.cpp b/source/common/test/ttimeNaturalUnitsTest.cpp new file mode 100644 index 000000000000..bd759874b9b2 --- /dev/null +++ b/source/common/test/ttimeNaturalUnitsTest.cpp @@ -0,0 +1,472 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include "ttime.h" + +/** + * Test suite for natural time unit boundary alignment + * + * Tests the alignToNaturalBoundary() function for: + * - Week unit alignment (Monday 00:00:00) + * - Month unit alignment (1st of month 00:00:00) + * - Year unit alignment (Jan 1st 00:00:00) + * - Multi-period alignment (2w, 3n, 2y) + * - Offset application + */ + +class TimeNaturalUnitsTest : public ::testing::Test { + protected: + void SetUp() override { + // Use Asia/Shanghai timezone for consistent testing + tz = NULL; + taosSetGlobalTimezone("Asia/Shanghai"); + } + + /** + * Helper function to convert human-readable datetime to timestamp + * @param year Year (e.g., 2026) + * @param month Month (1-12) + * @param day Day (1-31) + * @param hour Hour (0-23) + * @param minute Minute (0-59) + * @param second Second (0-59) + * @param precision Time precision (TSDB_TIME_PRECISION_MILLI/MICRO/NANO) + * @return Timestamp in specified precision + */ + int64_t makeTimestamp(int year, int month, int day, int hour, int minute, int second, + int8_t precision = TSDB_TIME_PRECISION_MILLI) { + struct tm tm = {0}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + tm.tm_hour = hour; + tm.tm_min = minute; + tm.tm_sec = second; + tm.tm_isdst = -1; + + time_t t = taosMktime(&tm, tz); + int64_t ts = (int64_t)t; + + switch (precision) { + case TSDB_TIME_PRECISION_MILLI: + return ts * 1000LL; + case TSDB_TIME_PRECISION_MICRO: + return ts * 1000000LL; + case TSDB_TIME_PRECISION_NANO: + return ts * 1000000000LL; + default: + return ts * 1000LL; + } + } + + timezone_t tz; +}; + +/** + * Test week unit alignment to Monday 00:00:00 + */ +TEST_F(TimeNaturalUnitsTest, WeekAlignmentBasic) { + // Test timestamp: 2026-03-10 15:30:00 (Tuesday) + // Expected: align to 2026-03-09 00:00:00 (Monday) + + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0); + int64_t result = alignToNaturalBoundary(ts, 'w', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + + // Convert result to struct tm to verify + time_t t = result / 1000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + std::cout << "Original timestamp: " << ts << ", Aligned timestamp: " << result << std::endl; + + // Should be Monday (tm_wday = 1) + EXPECT_EQ(tm.tm_wday, 1); + // Should be 00:00:00 + EXPECT_EQ(tm.tm_hour, 0); + EXPECT_EQ(tm.tm_min, 0); + EXPECT_EQ(tm.tm_sec, 0); +} + +/** + * Test month unit alignment to 1st of month 00:00:00 + */ +TEST_F(TimeNaturalUnitsTest, MonthAlignmentBasic) { + // Test timestamp: 2026-03-15 12:00:00 + // Expected: align to 2026-03-01 00:00:00 + + int64_t ts = makeTimestamp(2026, 3, 15, 12, 0, 0); + int64_t result = alignToNaturalBoundary(ts, 'n', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + + time_t t = result / 1000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + std::cout << "Original timestamp: " << ts << ", Aligned timestamp: " << result << std::endl; + + // Should be 1st of month + EXPECT_EQ(tm.tm_mday, 1); + // Should be 00:00:00 + EXPECT_EQ(tm.tm_hour, 0); + EXPECT_EQ(tm.tm_min, 0); + EXPECT_EQ(tm.tm_sec, 0); +} + +/** + * Test year unit alignment to Jan 1st 00:00:00 + */ +TEST_F(TimeNaturalUnitsTest, YearAlignmentBasic) { + // Test timestamp: 2026-06-15 12:00:00 + // Expected: align to 2026-01-01 00:00:00 + + int64_t ts = makeTimestamp(2026, 6, 15, 12, 0, 0); + int64_t result = alignToNaturalBoundary(ts, 'y', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + + time_t t = result / 1000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + std::cout << "Original timestamp: " << ts << ", Aligned timestamp: " << result << std::endl; + + // Should be Jan 1st + EXPECT_EQ(tm.tm_mon, 0); // January = 0 + EXPECT_EQ(tm.tm_mday, 1); + // Should be 00:00:00 + EXPECT_EQ(tm.tm_hour, 0); + EXPECT_EQ(tm.tm_min, 0); + EXPECT_EQ(tm.tm_sec, 0); +} + +/** + * Test week unit with offset (1 day = Tuesday) + */ +TEST_F(TimeNaturalUnitsTest, WeekAlignmentWithOffset) { + // Test timestamp: 2026-03-10 15:30:00 (Tuesday) + // With 1 day offset, should align to Tuesday 00:00:00 + + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0); + int64_t offset = 24LL * 60LL * 60LL * 1000LL; // 1 day in milliseconds + int64_t result = alignToNaturalBoundary(ts, 'w', 1, offset, TSDB_TIME_PRECISION_MILLI, tz); + + time_t t = result / 1000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + std::cout << "Original timestamp: " << ts << ", Aligned timestamp: " << result << std::endl; + + // Should be Tuesday (tm_wday = 2) + EXPECT_EQ(tm.tm_wday, 2); + // Should be 00:00:00 + EXPECT_EQ(tm.tm_hour, 0); + EXPECT_EQ(tm.tm_min, 0); + EXPECT_EQ(tm.tm_sec, 0); +} + +/** + * Test multi-period week alignment (2 weeks) + */ +TEST_F(TimeNaturalUnitsTest, MultiPeriodWeekAlignment) { + // Test that 2-week periods align consistently + // Two timestamps within the same 2-week period should align to the same boundary + + int64_t ts1 = makeTimestamp(2026, 3, 10, 0, 0, 0); // 2026-03-10 (Tuesday, week 1) + int64_t ts2 = makeTimestamp(2026, 3, 13, 0, 0, 0); // 2026-03-13 (Friday, week 1) + + int64_t result1 = alignToNaturalBoundary(ts1, 'w', 2, 0, TSDB_TIME_PRECISION_MILLI, tz); + int64_t result2 = alignToNaturalBoundary(ts2, 'w', 2, 0, TSDB_TIME_PRECISION_MILLI, tz); + std::cout << "Timestamp 1: " << ts1 << ", Aligned 1: " << result1 << std::endl; + std::cout << "Timestamp 2: " << ts2 << ", Aligned 2: " << result2 << std::endl; + + // Both should align to the same 2-week boundary + EXPECT_EQ(result1, result2); + + // Test that adjacent 2-week periods align to different boundaries + int64_t ts3 = makeTimestamp(2026, 3, 17, 0, 0, 0); // 2026-03-17 (next week) + int64_t result3 = alignToNaturalBoundary(ts3, 'w', 2, 0, TSDB_TIME_PRECISION_MILLI, tz); + std::cout << "Timestamp 3: " << ts3 << ", Aligned 3: " << result3 << std::endl; + EXPECT_NE(result1, result3); // Should be different 2-week boundary + + // Window duration should be 14 days + int64_t duration = result3 - result1; + EXPECT_EQ(duration, 14LL * 24LL * 60LL * 60LL * 1000LL); +} + +/** + * Test multi-period month alignment (3 months / quarterly) + */ +TEST_F(TimeNaturalUnitsTest, MultiPeriodMonthAlignment) { + // Test that 3-month periods align consistently + // Timestamps within the same quarter should align to the same boundary + + int64_t ts1 = makeTimestamp(2026, 2, 15, 0, 0, 0); // 2026-02-15 (Q1) + int64_t ts2 = makeTimestamp(2026, 3, 20, 0, 0, 0); // 2026-03-20 (Q1) + + int64_t result1 = alignToNaturalBoundary(ts1, 'n', 3, 0, TSDB_TIME_PRECISION_MILLI, tz); + int64_t result2 = alignToNaturalBoundary(ts2, 'n', 3, 0, TSDB_TIME_PRECISION_MILLI, tz); + std::cout << "Timestamp 1: " << ts1 << ", Aligned 1: " << result1 << std::endl; + std::cout << "Timestamp 2: " << ts2 << ", Aligned 2: " << result2 << std::endl; + + // Both should align to Q1 start (2026-01-01) + EXPECT_EQ(result1, result2); + + // Verify it's January 1st + time_t t = result1 / 1000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + EXPECT_EQ(tm.tm_mon, 0); // January + EXPECT_EQ(tm.tm_mday, 1); + EXPECT_EQ(tm.tm_hour, 0); + + // Test that next quarter aligns to different boundary + int64_t ts3 = makeTimestamp(2026, 4, 15, 0, 0, 0); // 2026-04-15 (Q2) + int64_t result3 = alignToNaturalBoundary(ts3, 'n', 3, 0, TSDB_TIME_PRECISION_MILLI, tz); + std::cout << "Timestamp 3: " << ts3 << ", Aligned 3: " << result3 << std::endl; + EXPECT_NE(result1, result3); + + // Verify Q2 starts at April 1st + time_t t3 = result3 / 1000; + struct tm tm3; + taosLocalTime(&t3, &tm3, NULL, 0, NULL); + EXPECT_EQ(tm3.tm_mon, 3); // April + EXPECT_EQ(tm3.tm_mday, 1); +} + +/** + * Test multi-period year alignment (2 years) + */ +TEST_F(TimeNaturalUnitsTest, MultiPeriodYearAlignment) { + // Test that 2-year periods align consistently + // Timestamps within the same 2-year period should align to the same boundary + + int64_t ts1 = makeTimestamp(2026, 3, 15, 0, 0, 0); // 2026-03-15 + int64_t ts2 = makeTimestamp(2027, 8, 20, 0, 0, 0); // 2027-08-20 + + int64_t result1 = alignToNaturalBoundary(ts1, 'y', 2, 0, TSDB_TIME_PRECISION_MILLI, tz); + int64_t result2 = alignToNaturalBoundary(ts2, 'y', 2, 0, TSDB_TIME_PRECISION_MILLI, tz); + std::cout << "Timestamp 1: " << ts1 << ", Aligned 1: " << result1 << std::endl; + std::cout << "Timestamp 2: " << ts2 << ", Aligned 2: " << result2 << std::endl; + + // Both should align to the same 2-year boundary (2026-01-01) + EXPECT_EQ(result1, result2); + + // Verify it's 2026-01-01 + time_t t = result1 / 1000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + EXPECT_EQ(tm.tm_year, 126); // 2026 + EXPECT_EQ(tm.tm_mon, 0); // January + EXPECT_EQ(tm.tm_mday, 1); + EXPECT_EQ(tm.tm_hour, 0); + + // Test that next 2-year period aligns to different boundary + int64_t ts3 = makeTimestamp(2028, 6, 15, 0, 0, 0); // 2028-06-15 + int64_t result3 = alignToNaturalBoundary(ts3, 'y', 2, 0, TSDB_TIME_PRECISION_MILLI, tz); + std::cout << "Timestamp 3: " << ts3 << ", Aligned 3: " << result3 << std::endl; + EXPECT_NE(result1, result3); + + // Verify next period starts at 2028-01-01 + time_t t3 = result3 / 1000; + struct tm tm3; + taosLocalTime(&t3, &tm3, NULL, 0, NULL); + EXPECT_EQ(tm3.tm_year, 128); // 2028 + EXPECT_EQ(tm3.tm_mon, 0); // January + EXPECT_EQ(tm3.tm_mday, 1); +} + +/** + * Test microsecond precision + */ +TEST_F(TimeNaturalUnitsTest, MicrosecondPrecision) { + // Test with microsecond precision + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0, TSDB_TIME_PRECISION_MICRO); + int64_t result = alignToNaturalBoundary(ts, 'w', 1, 0, TSDB_TIME_PRECISION_MICRO, tz); + + // Result should be in microseconds + EXPECT_GT(result, 1000000000000LL); // Should be > 1 trillion (microseconds) + + // Convert to seconds for verification + time_t t = result / 1000000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + + EXPECT_EQ(tm.tm_wday, 1); // Monday + EXPECT_EQ(tm.tm_hour, 0); + EXPECT_EQ(tm.tm_min, 0); + EXPECT_EQ(tm.tm_sec, 0); +} + +/** + * Test nanosecond precision + */ +TEST_F(TimeNaturalUnitsTest, NanosecondPrecision) { + // Test with nanosecond precision + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0, TSDB_TIME_PRECISION_NANO); + int64_t result = alignToNaturalBoundary(ts, 'n', 1, 0, TSDB_TIME_PRECISION_NANO, tz); + + // Result should be in nanoseconds + EXPECT_GT(result, 1000000000000000LL); // Should be > 1 quadrillion (nanoseconds) + + // Convert to seconds for verification + time_t t = result / 1000000000; + struct tm tm; + taosLocalTime(&t, &tm, NULL, 0, NULL); + + EXPECT_EQ(tm.tm_mday, 1); // 1st of month + EXPECT_EQ(tm.tm_hour, 0); + EXPECT_EQ(tm.tm_min, 0); + EXPECT_EQ(tm.tm_sec, 0); +} + +/** + * Test that non-natural units return timestamp as-is + */ +TEST_F(TimeNaturalUnitsTest, NonNaturalUnitsPassthrough) { + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0); + + // Test with 'd' (day) unit - should return as-is + int64_t result = alignToNaturalBoundary(ts, 'd', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + EXPECT_EQ(result, ts); + + // Test with 'h' (hour) unit - should return as-is + result = alignToNaturalBoundary(ts, 'h', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + EXPECT_EQ(result, ts); +} + +/** + * Test getDuration() function for week unit conversion + * Verifies that 'w' unit is correctly converted to 7 days + */ +TEST_F(TimeNaturalUnitsTest, GetDurationWeekUnit) { + int64_t result = 0; + int32_t code; + + // Test 1 week = 7 days in milliseconds + code = getDuration(1, 'w', &result, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); + EXPECT_EQ(result, 7LL * 24LL * 60LL * 60LL * 1000LL); // 7 days in ms + + // Test 2 weeks = 14 days in milliseconds + code = getDuration(2, 'w', &result, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); + EXPECT_EQ(result, 14LL * 24LL * 60LL * 60LL * 1000LL); // 14 days in ms + + // Test with microsecond precision + code = getDuration(1, 'w', &result, TSDB_TIME_PRECISION_MICRO); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); + EXPECT_EQ(result, 7LL * 24LL * 60LL * 60LL * 1000000LL); // 7 days in us + + // Test with nanosecond precision + code = getDuration(1, 'w', &result, TSDB_TIME_PRECISION_NANO); + EXPECT_EQ(code, TSDB_CODE_SUCCESS); + EXPECT_EQ(result, 7LL * 24LL * 60LL * 60LL * 1000000000LL); // 7 days in ns + + // Test overflow protection (very large value) + code = getDuration(INT64_MAX / (7LL * 24LL * 60LL * 60LL * 1000LL) + 1, 'w', &result, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(code, TSDB_CODE_OUT_OF_RANGE); +} + +/** + * Performance test for alignToNaturalBoundary() function + * Verify that the function executes in < 1ms on average (SC-006) + */ +TEST_F(TimeNaturalUnitsTest, PerformanceAlignToNaturalBoundary) { + const int iterations = 10000; + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0); + + // Test week unit performance + auto start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < iterations; i++) { + alignToNaturalBoundary(ts + i * 1000, 'w', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + } + auto end = std::chrono::high_resolution_clock::now(); + auto duration_week = std::chrono::duration_cast(end - start).count(); + double avg_week = duration_week / (double)iterations; + + std::cout << "Week alignment: " << iterations << " iterations in " << duration_week << " us" << std::endl; + std::cout << "Average time per call: " << avg_week << " us" << std::endl; + EXPECT_LT(avg_week, 1000.0); // Should be < 1ms (1000 us) + + // Test month unit performance + start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < iterations; i++) { + alignToNaturalBoundary(ts + i * 1000, 'n', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + } + end = std::chrono::high_resolution_clock::now(); + auto duration_month = std::chrono::duration_cast(end - start).count(); + double avg_month = duration_month / (double)iterations; + + std::cout << "Month alignment: " << iterations << " iterations in " << duration_month << " us" << std::endl; + std::cout << "Average time per call: " << avg_month << " us" << std::endl; + EXPECT_LT(avg_month, 1000.0); // Should be < 1ms (1000 us) + + // Test year unit performance + start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < iterations; i++) { + alignToNaturalBoundary(ts + i * 1000, 'y', 1, 0, TSDB_TIME_PRECISION_MILLI, tz); + } + end = std::chrono::high_resolution_clock::now(); + auto duration_year = std::chrono::duration_cast(end - start).count(); + double avg_year = duration_year / (double)iterations; + + std::cout << "Year alignment: " << iterations << " iterations in " << duration_year << " us" << std::endl; + std::cout << "Average time per call: " << avg_year << " us" << std::endl; + EXPECT_LT(avg_year, 1000.0); // Should be < 1ms (1000 us) +} + +/** + * Performance test for getDuration() function + * Verify that the function executes in < 1ms on average (SC-006) + */ +TEST_F(TimeNaturalUnitsTest, PerformanceGetDuration) { + const int iterations = 10000; + int64_t result = 0; + + // Test week unit performance + auto start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < iterations; i++) { + getDuration(1 + (i % 100), 'w', &result, TSDB_TIME_PRECISION_MILLI); + } + auto end = std::chrono::high_resolution_clock::now(); + auto duration_week = std::chrono::duration_cast(end - start).count(); + double avg_week = duration_week / (double)iterations; + + std::cout << "getDuration (week): " << iterations << " iterations in " << duration_week << " us" << std::endl; + std::cout << "Average time per call: " << avg_week << " us" << std::endl; + EXPECT_LT(avg_week, 1000.0); // Should be < 1ms (1000 us) + + // Test day unit performance + start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < iterations; i++) { + getDuration(1 + (i % 100), 'd', &result, TSDB_TIME_PRECISION_MILLI); + } + end = std::chrono::high_resolution_clock::now(); + auto duration_day = std::chrono::duration_cast(end - start).count(); + double avg_day = duration_day / (double)iterations; + + std::cout << "getDuration (day): " << iterations << " iterations in " << duration_day << " us" << std::endl; + std::cout << "Average time per call: " << avg_day << " us" << std::endl; + EXPECT_LT(avg_day, 1000.0); // Should be < 1ms (1000 us) + + // Test hour unit performance + start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < iterations; i++) { + getDuration(1 + (i % 100), 'h', &result, TSDB_TIME_PRECISION_MILLI); + } + end = std::chrono::high_resolution_clock::now(); + auto duration_hour = std::chrono::duration_cast(end - start).count(); + double avg_hour = duration_hour / (double)iterations; + + std::cout << "getDuration (hour): " << iterations << " iterations in " << duration_hour << " us" << std::endl; + std::cout << "Average time per call: " << avg_hour << " us" << std::endl; + EXPECT_LT(avg_hour, 1000.0); // Should be < 1ms (1000 us) +} diff --git a/source/libs/new-stream/inc/streamTriggerTask.h b/source/libs/new-stream/inc/streamTriggerTask.h index 8fe0dda9c3c9..b2365f7ec4ab 100644 --- a/source/libs/new-stream/inc/streamTriggerTask.h +++ b/source/libs/new-stream/inc/streamTriggerTask.h @@ -479,7 +479,10 @@ int32_t stTriggerTaskExecute(SStreamTriggerTask *pTask, const SStreamMsg *pMsg); // helper function in trigger task // check whether the state data equals to the zeroth state -int32_t stIsStateEqualZeroth(void *pStateData, void *pZeroth, bool *pIsEqual); +int32_t stIsStateEqualZeroth(void *pStateData, void *pZeroth, bool *pIsEqual); +STimeWindow stTriggerTaskGetTimeWindow(SStreamTriggerTask *pTask, int64_t ts); +void stTriggerTaskPrevTimeWindow(SStreamTriggerTask *pTask, STimeWindow *pWindow); +void stTriggerTaskNextTimeWindow(SStreamTriggerTask *pTask, STimeWindow *pWindow); #ifdef __cplusplus } diff --git a/source/libs/new-stream/src/streamTriggerTask.c b/source/libs/new-stream/src/streamTriggerTask.c index 11b7bce64971..dddda4a2c778 100644 --- a/source/libs/new-stream/src/streamTriggerTask.c +++ b/source/libs/new-stream/src/streamTriggerTask.c @@ -287,9 +287,38 @@ static int32_t stTriggerTaskAllocAhandle(SStreamTriggerTask *pTask, int64_t sess return code; } -static STimeWindow stTriggerTaskGetTimeWindow(SStreamTriggerTask *pTask, int64_t ts) { +/** + * @brief Get the time window containing the given timestamp + * + * For natural time units (week/month/year), this function aligns the window + * to natural boundaries (Monday for weeks, 1st of month, Jan 1st for years). + * For other units, it uses standard interval truncation. + * + * @param pTask Trigger task containing interval configuration + * @param ts Timestamp to find the window for (in configured precision) + * @return STimeWindow Window containing the timestamp [skey, ekey] (closed interval) + * + * @note Window semantics: [boundary + 1, nextBoundary] closed interval + * - skey: start of window (previous boundary + 1) + * - ekey: end of window (next boundary, trigger time) + * + * @note For natural units (week/month/year): + * - Week: Window aligns to Monday 00:00:00 + offset + * - Month: Window aligns to 1st of month 00:00:00 + offset + * - Year: Window aligns to Jan 1st 00:00:00 + offset + * - Multi-period: Aligns to epoch-based boundaries (e.g., 2w, 3n, 2y) + * + * @note For regular units (a/s/m/h/d): + * - Window aligns based on interval and sliding configuration + * + * @example PERIOD(1w): Returns window [last Monday + 1, next Monday] + * @example PERIOD(1w, 1d): Returns window [last Tuesday + 1, next Tuesday] + * @example PERIOD(1n): Returns window [1st of last month + 1, 1st of next month] + */ +STimeWindow stTriggerTaskGetTimeWindow(SStreamTriggerTask *pTask, int64_t ts) { SInterval *pInterval = &pTask->interval; STimeWindow win = {0}; + if (pInterval->interval > 0) { win.skey = taosTimeTruncate(ts, pInterval); win.ekey = taosTimeGetIntervalEnd(win.skey, pInterval); @@ -297,24 +326,70 @@ static STimeWindow stTriggerTaskGetTimeWindow(SStreamTriggerTask *pTask, int64_t win.ekey = INT64_MAX; } } else { + char unit = pInterval->intervalUnit; int64_t day = convertTimePrecision(24 * 60 * 60 * 1000, TSDB_TIME_PRECISION_MILLI, pInterval->precision); - // truncate to the start of day - SInterval interval = {.intervalUnit = 'd', - .slidingUnit = 'd', - .offsetUnit = pInterval->offsetUnit, - .precision = pInterval->precision, - .interval = day, - .sliding = day}; - int64_t first = taosTimeTruncate(ts, &interval) + pInterval->offset; - if (pInterval->sliding > day) { - if (first >= ts) { - win.skey = first - pInterval->sliding + 1; - win.ekey = first; + if (unit == 'w' || unit == 'n' || unit == 'y') { + // Use natural boundary alignment for week/month/year units + // For week: pInterval->sliding is time value (in precision units) + // For month/year: pInterval->sliding is period count (month/year count) + int64_t periodCount = (unit == 'w') ? 1 : pInterval->sliding; + if (unit == 'w') { + // Calculate week count from time value + int64_t week = 7 * day; + periodCount = pInterval->sliding / week; + if (periodCount < 1) periodCount = 1; + } + + int64_t boundary = + alignToNaturalBoundary(ts, unit, periodCount, pInterval->offset, pInterval->precision, pInterval->timezone); + if (boundary == ts) { + // ts is exactly on boundary, return previous window [prevBoundary + 1, boundary] + int64_t prevBoundary = 0; + if (unit == 'w') { + prevBoundary = boundary - pInterval->sliding; + } else { + prevBoundary = taosTimeAdd(boundary, -1 * periodCount, unit, pInterval->precision, pInterval->timezone); + } + win.skey = prevBoundary + 1; + win.ekey = boundary; } else { - win.skey = first + 1; - win.ekey = first + pInterval->sliding; + // ts is between boundaries, return current window [boundary + 1, nextBoundary] + int64_t nextBoundary = 0; + if (unit == 'w') { + nextBoundary = boundary + pInterval->sliding; + } else { + nextBoundary = taosTimeAdd(boundary, periodCount, unit, pInterval->precision, pInterval->timezone); + } + win.skey = boundary + 1; + win.ekey = nextBoundary; + } + } else if (pInterval->sliding > day) { + SInterval interval = {.intervalUnit = pInterval->slidingUnit, + .slidingUnit = pInterval->slidingUnit, + .offsetUnit = pInterval->offsetUnit, + .precision = pInterval->precision, + .interval = pInterval->sliding, + .sliding = pInterval->sliding, + .offset = pInterval->offset}; + int64_t boundary = taosTimeTruncate(ts, &interval); + if (boundary == ts) { + int64_t prevBoundary = boundary - pInterval->sliding; + win.skey = prevBoundary + 1; + win.ekey = boundary; + } else { + int64_t nextBoundary = boundary + pInterval->sliding; + win.skey = boundary + 1; + win.ekey = nextBoundary; } } else { + // truncate to the start of day + SInterval interval = {.intervalUnit = 'd', + .slidingUnit = 'd', + .offsetUnit = pInterval->offsetUnit, + .precision = pInterval->precision, + .interval = day, + .sliding = day}; + int64_t first = taosTimeTruncate(ts, &interval) + pInterval->offset; if (first >= ts) { int64_t prev = first - day; win.skey = (ts - prev - 1) / pInterval->sliding * pInterval->sliding + prev + 1; @@ -328,8 +403,9 @@ static STimeWindow stTriggerTaskGetTimeWindow(SStreamTriggerTask *pTask, int64_t return win; } -static void stTriggerTaskPrevTimeWindow(SStreamTriggerTask *pTask, STimeWindow *pWindow) { +void stTriggerTaskPrevTimeWindow(SStreamTriggerTask *pTask, STimeWindow *pWindow) { SInterval *pInterval = &pTask->interval; + if (pInterval->interval > 0) { TSKEY prevStart = taosTimeAdd(pWindow->skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL); @@ -338,8 +414,30 @@ static void stTriggerTaskPrevTimeWindow(SStreamTriggerTask *pTask, STimeWindow * pWindow->skey = prevStart; pWindow->ekey = taosTimeGetIntervalEnd(prevStart, pInterval); } else { + char unit = pInterval->intervalUnit; int64_t day = convertTimePrecision(24 * 60 * 60 * 1000, TSDB_TIME_PRECISION_MILLI, pInterval->precision); - if (pInterval->sliding > day) { + // Window semantics: [boundary + 1, nextBoundary] + // Prev window: [prevBoundary + 1, boundary] + if (unit == 'w' || unit == 'n' || unit == 'y') { + // For week: pInterval->sliding is time value + // For month/year: pInterval->sliding is period count + int64_t periodCount = (unit == 'w') ? 1 : pInterval->sliding; + if (unit == 'w') { + int64_t week = 7 * day; + periodCount = pInterval->sliding / week; + if (periodCount < 1) periodCount = 1; + } + + int64_t boundary = pWindow->skey - 1; + int64_t prevBoundary = 0; + if (unit == 'w') { + prevBoundary = boundary - pInterval->sliding; + } else { + prevBoundary = taosTimeAdd(boundary, -1 * periodCount, unit, pInterval->precision, pInterval->timezone); + } + pWindow->skey = prevBoundary + 1; + pWindow->ekey = boundary; + } else if (pInterval->sliding > day) { pWindow->skey -= pInterval->sliding; pWindow->ekey -= pInterval->sliding; } else { @@ -361,8 +459,36 @@ static void stTriggerTaskPrevTimeWindow(SStreamTriggerTask *pTask, STimeWindow * } } -static void stTriggerTaskNextTimeWindow(SStreamTriggerTask *pTask, STimeWindow *pWindow) { +/** + * @brief Advance the time window to the next period + * + * For natural time units (week/month/year), this function advances the window + * by one period using calendar calculation to handle variable-length periods + * (e.g., months with different days, leap years). + * For other units, it uses standard interval sliding. + * + * @param pTask Trigger task containing interval configuration + * @param pWindow[in,out] Window to advance (modified in place) + * + * @note Window semantics: [boundary + 1, nextBoundary] closed interval + * - Current window: [currentBoundary + 1, nextBoundary] + * - Next window: [nextBoundary + 1, nextNextBoundary] + * + * @note For natural units (week/month/year): + * - Week: Adds N * 7 days to boundaries + * - Month: Uses calendar calculation (handles 28/29/30/31 day months) + * - Year: Uses calendar calculation (handles leap years) + * - new skey = old ekey + 1 + * - new ekey = old ekey + N periods (calendar-based) + * + * @note For regular units (a/s/m/h/d): + * - Advances by sliding interval + * + * @example Current window [2026-03-01 + 1, 2026-04-01] → Next window [2026-04-01 + 1, 2026-05-01] + */ +void stTriggerTaskNextTimeWindow(SStreamTriggerTask *pTask, STimeWindow *pWindow) { SInterval *pInterval = &pTask->interval; + if (pInterval->interval > 0) { TSKEY nextStart = taosTimeAdd(pWindow->skey, -1 * pInterval->offset, pInterval->offsetUnit, pInterval->precision, NULL); @@ -371,8 +497,30 @@ static void stTriggerTaskNextTimeWindow(SStreamTriggerTask *pTask, STimeWindow * pWindow->skey = nextStart; pWindow->ekey = taosTimeGetIntervalEnd(nextStart, pInterval); } else { + char unit = pInterval->intervalUnit; int64_t day = convertTimePrecision(24 * 60 * 60 * 1000, TSDB_TIME_PRECISION_MILLI, pInterval->precision); - if (pInterval->sliding > day) { + // Window semantics: [boundary + 1, nextBoundary] + // Next window: [nextBoundary + 1, nextNextBoundary] + if (unit == 'w' || unit == 'n' || unit == 'y') { + // For week: pInterval->sliding is time value + // For month/year: pInterval->sliding is period count + int64_t periodCount = (unit == 'w') ? 1 : pInterval->sliding; + if (unit == 'w') { + int64_t week = 7 * day; + periodCount = pInterval->sliding / week; + if (periodCount < 1) periodCount = 1; + } + + int64_t nextBoundary = pWindow->ekey; + int64_t nextNextBoundary = 0; + if (unit == 'w') { + nextNextBoundary = nextBoundary + pInterval->sliding; + } else { + nextNextBoundary = taosTimeAdd(nextBoundary, periodCount, unit, pInterval->precision, pInterval->timezone); + } + pWindow->skey = nextBoundary + 1; + pWindow->ekey = nextNextBoundary; + } else if (pInterval->sliding > day) { pWindow->skey += pInterval->sliding; pWindow->ekey += pInterval->sliding; } else { diff --git a/source/libs/new-stream/test/CMakeLists.txt b/source/libs/new-stream/test/CMakeLists.txt index 47f9b9251bff..7f26236b3080 100644 --- a/source/libs/new-stream/test/CMakeLists.txt +++ b/source/libs/new-stream/test/CMakeLists.txt @@ -48,4 +48,21 @@ add_test( NAME checkpointEncryptionTest COMMAND checkpointEncryptionTest ) + +ADD_EXECUTABLE(streamTriggerTaskTest streamTriggerTaskTest.cpp) +DEP_ext_gtest(streamTriggerTaskTest) +TARGET_INCLUDE_DIRECTORIES( + streamTriggerTaskTest + PUBLIC "${TD_SOURCE_DIR}/include/libs/new-stream" + PUBLIC "${TD_SOURCE_DIR}/include/libs/executor" + PUBLIC "${TD_SOURCE_DIR}/include/libs/qcom" + PUBLIC "${TD_SOURCE_DIR}/include/common" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +TARGET_LINK_LIBRARIES(streamTriggerTaskTest PUBLIC taos os common executor function index new-stream) + +add_test( + NAME streamTriggerTaskTest + COMMAND streamTriggerTaskTest +) endif(NOT ${TD_WINDOWS}) diff --git a/source/libs/new-stream/test/streamTriggerTaskTest.cpp b/source/libs/new-stream/test/streamTriggerTaskTest.cpp new file mode 100644 index 000000000000..5ffb3cdad73d --- /dev/null +++ b/source/libs/new-stream/test/streamTriggerTaskTest.cpp @@ -0,0 +1,639 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include + +extern "C" { +#include "streamTriggerTask.h" +#include "taosdef.h" +#include "taoserror.h" +#include "ttime.h" +} + +/** + * Test suite for stream trigger task window calculation with natural time units + * + * Tests the window calculation functions for: + * - Week unit window calculation and advancement + * - Month unit window calculation and advancement + * - Year unit window calculation and advancement + * - Offset application + * - Multi-period alignment + */ + +// Helper function to create a mock trigger task +static SStreamTriggerTask* createMockTriggerTask(char unit, int64_t interval, int64_t offset, int32_t precision) { + SStreamTriggerTask* pTask = (SStreamTriggerTask*)taosMemoryCalloc(1, sizeof(SStreamTriggerTask)); + if (!pTask) return NULL; + + pTask->interval.intervalUnit = unit; + pTask->interval.slidingUnit = unit; // For period trigger, slidingUnit == intervalUnit + pTask->interval.interval = 0; // For period trigger, interval is 0 + + // Convert period count to sliding value based on getDuration() logic + int64_t slidingValue = interval; + if (unit == 'w') { + // For week: getDuration converts to time value + int64_t week_ms = 7LL * 24LL * 60LL * 60LL * 1000LL; + slidingValue = interval * convertTimePrecision(week_ms, TSDB_TIME_PRECISION_MILLI, precision); + } else if (unit == 'n' || unit == 'y') { + // For month/year: getDuration does NOT handle these, sliding stores the count directly + slidingValue = interval; + } + + pTask->interval.sliding = slidingValue; + pTask->interval.offset = offset; + pTask->interval.offsetUnit = 'a'; // milliseconds + pTask->interval.precision = precision; + pTask->interval.timezone = NULL; + + return pTask; +} + +// Helper function to free mock trigger task +static void freeMockTriggerTask(SStreamTriggerTask* pTask) { + if (pTask) { + taosMemoryFree(pTask); + } +} + +class StreamTriggerTaskTest : public ::testing::Test { + protected: + void SetUp() override { + // Test setup + // Use Asia/Shanghai timezone for consistent testing + taosSetGlobalTimezone("Asia/Shanghai"); + } + + void TearDown() override { + // Test cleanup + } + + /** + * Helper function to convert human-readable datetime to timestamp + * @param year Year (e.g., 2026) + * @param month Month (1-12) + * @param day Day (1-31) + * @param hour Hour (0-23) + * @param minute Minute (0-59) + * @param second Second (0-59) + * @param precision Time precision (TSDB_TIME_PRECISION_MILLI/MICRO/NANO) + * @return Timestamp in specified precision + */ + int64_t makeTimestamp(int year, int month, int day, int hour, int minute, int second, + int8_t precision = TSDB_TIME_PRECISION_MILLI) { + struct tm tm = {0}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + tm.tm_hour = hour; + tm.tm_min = minute; + tm.tm_sec = second; + tm.tm_isdst = -1; + + time_t t = taosMktime(&tm, NULL); + int64_t ts = (int64_t)t; + + switch (precision) { + case TSDB_TIME_PRECISION_MILLI: + return ts * 1000LL; + case TSDB_TIME_PRECISION_MICRO: + return ts * 1000000LL; + case TSDB_TIME_PRECISION_NANO: + return ts * 1000000000LL; + default: + return ts * 1000LL; + } + } + + /** + * Helper function to verify window boundaries + * @param win Window to verify + * @param precision Time precision + * @param skeyYear Expected skey year (e.g., 2026), -1 to skip + * @param skeyMonth Expected skey month (1-12), -1 to skip + * @param skeyDay Expected skey day (1-31), -1 to skip + * @param skeyHour Expected skey hour (0-23), -1 to skip + * @param skeyMin Expected skey minute (0-59), -1 to skip + * @param skeySec Expected skey second (0-59), -1 to skip + * @param skeyWday Expected skey day of week (0-6, 0=Sunday), -1 to skip + * @param ekeyYear Expected ekey year (e.g., 2026), -1 to skip + * @param ekeyMonth Expected ekey month (1-12), -1 to skip + * @param ekeyDay Expected ekey day (1-31), -1 to skip + * @param ekeyHour Expected ekey hour (0-23), -1 to skip + * @param ekeyMin Expected ekey minute (0-59), -1 to skip + * @param ekeySec Expected ekey second (0-59), -1 to skip + * @param ekeyWday Expected ekey day of week (0-6, 0=Sunday), -1 to skip + * @param expectedDurationMs Expected duration in milliseconds (closed interval), -1 to skip + */ + void verifyWindow(const STimeWindow& win, int8_t precision, int skeyYear = -1, int skeyMonth = -1, int skeyDay = -1, + int skeyHour = -1, int skeyMin = -1, int skeySec = -1, int skeyWday = -1, int ekeyYear = -1, + int ekeyMonth = -1, int ekeyDay = -1, int ekeyHour = -1, int ekeyMin = -1, int ekeySec = -1, + int ekeyWday = -1, int64_t expectedDurationMs = -1) { + // Convert precision factor + int64_t precisionFactor = 1; + switch (precision) { + case TSDB_TIME_PRECISION_MILLI: + precisionFactor = 1000LL; + break; + case TSDB_TIME_PRECISION_MICRO: + precisionFactor = 1000000LL; + break; + case TSDB_TIME_PRECISION_NANO: + precisionFactor = 1000000000LL; + break; + } + + // Verify skey + time_t t_skey = win.skey / precisionFactor; + struct tm tm_skey; + taosLocalTime(&t_skey, &tm_skey, NULL, 0, NULL); + + if (skeyYear >= 0) EXPECT_EQ(tm_skey.tm_year, skeyYear - 1900); + if (skeyMonth >= 0) EXPECT_EQ(tm_skey.tm_mon, skeyMonth - 1); + if (skeyDay >= 0) EXPECT_EQ(tm_skey.tm_mday, skeyDay); + if (skeyHour >= 0) EXPECT_EQ(tm_skey.tm_hour, skeyHour); + if (skeyMin >= 0) EXPECT_EQ(tm_skey.tm_min, skeyMin); + if (skeySec >= 0) EXPECT_EQ(tm_skey.tm_sec, skeySec); + if (skeyWday >= 0) EXPECT_EQ(tm_skey.tm_wday, skeyWday); + + // Verify ekey + time_t t_ekey = win.ekey / precisionFactor; + struct tm tm_ekey; + taosLocalTime(&t_ekey, &tm_ekey, NULL, 0, NULL); + + if (ekeyYear >= 0) EXPECT_EQ(tm_ekey.tm_year, ekeyYear - 1900); + if (ekeyMonth >= 0) EXPECT_EQ(tm_ekey.tm_mon, ekeyMonth - 1); + if (ekeyDay >= 0) EXPECT_EQ(tm_ekey.tm_mday, ekeyDay); + if (ekeyHour >= 0) EXPECT_EQ(tm_ekey.tm_hour, ekeyHour); + if (ekeyMin >= 0) EXPECT_EQ(tm_ekey.tm_min, ekeyMin); + if (ekeySec >= 0) EXPECT_EQ(tm_ekey.tm_sec, ekeySec); + if (ekeyWday >= 0) EXPECT_EQ(tm_ekey.tm_wday, ekeyWday); + + // Verify duration (closed interval: ekey - skey + 1) + if (expectedDurationMs >= 0) { + int64_t duration = win.ekey - win.skey + 1; + int64_t expectedDuration = expectedDurationMs * (precisionFactor / 1000LL); + EXPECT_EQ(duration, expectedDuration); + } + } +}; + +/** + * Test week unit window calculation + * Verify that window aligns to Monday 00:00:00 + */ +TEST_F(StreamTriggerTaskTest, WeekWindowCalculation) { + // Create task with PERIOD(1w) + SStreamTriggerTask* pTask = createMockTriggerTask('w', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp: 2026-03-10 15:30:00 (Tuesday) + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0); + + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Window should be [2026-03-09 00:00:00 Monday, 2026-03-16 00:00:00 Monday] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2026, 3, 9, 0, 0, 0, 1, // skey: 2026-03-09 00:00:00 Monday + 2026, 3, 16, 0, 0, 0, 1, // ekey: 2026-03-16 00:00:00 Monday + 7LL * 24LL * 60LL * 60LL * 1000LL); // 7 days + + freeMockTriggerTask(pTask); +} + +/** + * Test month unit window calculation + * Verify that window aligns to 1st of month 00:00:00 + */ +TEST_F(StreamTriggerTaskTest, MonthWindowCalculation) { + // Create task with PERIOD(1n) + SStreamTriggerTask* pTask = createMockTriggerTask('n', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp: 2026-03-15 12:00:00 + int64_t ts = makeTimestamp(2026, 3, 15, 12, 0, 0); + + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Window should be [2026-03-01 00:00:00, 2026-04-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2026, 3, 1, 0, 0, 0, -1, // skey: 2026-03-01 00:00:00 + 2026, 4, 1, 0, 0, 0, -1); // ekey: 2026-04-01 00:00:00 + + freeMockTriggerTask(pTask); +} + +/** + * Test year unit window calculation + * Verify that window aligns to Jan 1st 00:00:00 + */ +TEST_F(StreamTriggerTaskTest, YearWindowCalculation) { + // Create task with PERIOD(1y) + SStreamTriggerTask* pTask = createMockTriggerTask('y', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp: 2026-06-15 12:00:00 + int64_t ts = makeTimestamp(2026, 6, 15, 12, 0, 0); + + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Window should be [2026-01-01 00:00:00, 2027-01-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2026, 1, 1, 0, 0, 0, -1, // skey: 2026-01-01 00:00:00 + 2027, 1, 1, 0, 0, 0, -1); // ekey: 2027-01-01 00:00:00 + + freeMockTriggerTask(pTask); +} + +/** + * Test window advancement for week unit + * Verify that next window's skey = current window's ekey + 1 + */ +TEST_F(StreamTriggerTaskTest, WeekWindowAdvancement) { + // Create task with PERIOD(1w) + SStreamTriggerTask* pTask = createMockTriggerTask('w', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Get initial window + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0); // 2026-03-10 15:30:00 (Tuesday) + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Verify first window: [2026-03-09 00:00:00 Monday, 2026-03-16 00:00:00 Monday] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2026, 3, 9, 0, 0, 0, 1, // skey: 2026-03-09 00:00:00 Monday + 2026, 3, 16, 0, -1, -1, 1, // ekey: 2026-03-16 00:00:00 Monday + 7LL * 24LL * 60LL * 60LL * 1000LL); // 7 days + + int64_t firstEkey = win.ekey; + + // Advance to next window + stTriggerTaskNextTimeWindow(pTask, &win); + + // Next window's skey should equal previous window's ekey + 1 (closed interval) + EXPECT_EQ(win.skey, firstEkey + 1); + + // Verify second window: [2026-03-16 00:00:00 Monday, 2026-03-23 00:00:00 Monday] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2026, 3, 16, 0, 0, 0, 1, // skey: 2026-03-16 00:00:00 Monday + 2026, 3, 23, 0, 0, 0, 1, // ekey: 2026-03-23 00:00:00 Monday + 7LL * 24LL * 60LL * 60LL * 1000LL); // 7 days + + freeMockTriggerTask(pTask); +} + +/** + * Test window advancement for month unit + * Verify correct handling of variable month lengths + */ +TEST_F(StreamTriggerTaskTest, MonthWindowAdvancement) { + // Create task with PERIOD(1n) + SStreamTriggerTask* pTask = createMockTriggerTask('n', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Start with January (31 days) + int64_t ts = makeTimestamp(2025, 1, 2, 0, 0, 0); // 2025-01-02 00:00:00 + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Verify first window: [2025-01-01 00:00:00, 2025-02-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2025, 1, 1, 0, 0, 0, -1, // skey: 2025-01-01 00:00:00 + 2025, 2, 1, 0, 0, 0, -1, // ekey: 2025-02-01 00:00:00 + 31LL * 24LL * 60LL * 60LL * 1000LL); // January has 31 days + + int64_t janEkey = win.ekey; + + // Advance to February + stTriggerTaskNextTimeWindow(pTask, &win); + EXPECT_EQ(win.skey, janEkey + 1); + + // Verify second window: [2025-02-01 00:00:00, 2025-03-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2025, 2, 1, 0, 0, 0, -1, // skey: 2025-02-01 00:00:00 + 2025, 3, 1, 0, 0, 0, -1, // ekey: 2025-03-01 00:00:00 + 28LL * 24LL * 60LL * 60LL * 1000LL); // February has 28 days + + freeMockTriggerTask(pTask); +} + +/** + * Test offset application for week unit + * Verify that window time = natural boundary + offset + */ +TEST_F(StreamTriggerTaskTest, WeekWindowWithOffset) { + // Create task with PERIOD(1w, 1d) - trigger on Tuesday + int64_t oneDayMs = 24LL * 60LL * 60LL * 1000LL; + SStreamTriggerTask* pTask = createMockTriggerTask('w', 1, oneDayMs, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp: 2026-03-10 15:30:00 (Tuesday) + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0); + + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Window should be [2026-03-10 00:00:00 Tuesday, 2026-03-17 00:00:00 Tuesday] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2026, 3, 10, 0, 0, 0, 2, // skey: 2026-03-10 00:00:00 Tuesday + 2026, 3, 17, 0, 0, 0, 2); // ekey: 2026-03-17 00:00:00 Tuesday + + freeMockTriggerTask(pTask); +} + +/** + * Test multi-period week alignment (2 weeks) + * Verify epoch-based alignment + */ +TEST_F(StreamTriggerTaskTest, MultiPeriodWeekAlignment) { + // Create task with PERIOD(2w) + SStreamTriggerTask* pTask = createMockTriggerTask('w', 2, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Two timestamps in different weeks but same 2-week period should align to the same boundary + // Use timestamps that are 7 days apart (1 week) within the same 2-week period + int64_t ts1 = makeTimestamp(2026, 3, 03, 0, 0, 0); // 2026-03-03 (Tuesday, week 1) + int64_t ts2 = makeTimestamp(2026, 3, 10, 0, 0, 0); // 2026-03-10 (Tuesday, week 2) + + STimeWindow win1 = stTriggerTaskGetTimeWindow(pTask, ts1); + STimeWindow win2 = stTriggerTaskGetTimeWindow(pTask, ts2); + + // They are in the same 2-week period + EXPECT_EQ(win1.ekey, win2.ekey); + + // Window should be [2026-03-02 00:00:00 Monday, 2026-03-16 00:00:00 Monday] + verifyWindow(win1, TSDB_TIME_PRECISION_MILLI, 2026, 3, 2, 0, 0, 0, 1, // skey: 2026-03-02 00:00:00 Monday + 2026, 3, 16, 0, 0, 0, 1, // ekey: 2026-03-16 00:00:00 Monday + 14LL * 24LL * 60LL * 60LL * 1000LL); // 14 days + + freeMockTriggerTask(pTask); +} + +/** + * Test multi-period month alignment (3 months) + * Verify epoch-based alignment for quarterly periods + */ +TEST_F(StreamTriggerTaskTest, MultiPeriodMonthAlignment) { + // Create task with PERIOD(3n) - quarterly + SStreamTriggerTask* pTask = createMockTriggerTask('n', 3, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp in Q1 2026 + int64_t ts = makeTimestamp(2026, 2, 1, 0, 0, 0); // 2026-02-01 00:00:00 + + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Window should be [2026-01-01 00:00:00, 2026-04-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2026, 1, 1, 0, 0, 0, -1, // skey: 2026-01-01 00:00:00 + 2026, 4, 1, 0, 0, 0, -1); // ekey: 2026-04-01 00:00:00 + + freeMockTriggerTask(pTask); +} + +/** + * Test microsecond precision + */ +TEST_F(StreamTriggerTaskTest, MicrosecondPrecision) { + // Create task with PERIOD(1w) in microsecond precision + SStreamTriggerTask* pTask = createMockTriggerTask('w', 1, 0, TSDB_TIME_PRECISION_MICRO); + ASSERT_NE(pTask, nullptr); + + // Test timestamp in microseconds + int64_t ts = makeTimestamp(2026, 3, 10, 15, 30, 0, TSDB_TIME_PRECISION_MICRO); // 2026-03-10 15:30:00 in microseconds + + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Result should be in microseconds + EXPECT_GT(win.skey, 1000000000000LL); // Should be > 1 trillion (microseconds) + EXPECT_GT(win.ekey, 1000000000000LL); + + // Window should be [2026-03-09 00:00:00, 2026-03-16 00:00:00] in microseconds + verifyWindow(win, TSDB_TIME_PRECISION_MICRO, 2026, 3, 9, 0, 0, 0, -1, // skey: 2026-03-09 00:00:00 + 2026, 3, 16, 0, 0, 0, -1, // ekey: 2026-03-16 00:00:00 + 7LL * 24LL * 60LL * 60LL * 1000LL); // 7 days (in ms, will be converted) + + freeMockTriggerTask(pTask); +} + +/** + * Test year window advancement + * Verify correct handling of leap years + */ +TEST_F(StreamTriggerTaskTest, YearWindowAdvancement) { + // Create task with PERIOD(1y) + SStreamTriggerTask* pTask = createMockTriggerTask('y', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Start with 2024 (leap year) + int64_t ts = makeTimestamp(2024, 1, 2, 0, 0, 0); // 2024-01-01 00:00:00 + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Verify first window: [2024-01-01 00:00:00, 2025-01-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2024, 1, 1, 0, 0, 0, -1, // skey: 2024-01-01 00:00:00 + 2025, 1, 1, 0, -1, -1, -1, // ekey: 2025-01-01 00:00:00 + 366LL * 24LL * 60LL * 60LL * 1000LL); // 2024 is leap year (366 days) + + stTriggerTaskNextTimeWindow(pTask, &win); + + // Verify second window: [2025-01-01 00:00:00, 2026-01-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2025, 1, 1, 0, 0, 0, -1, // skey: 2025-01-01 00:00:00 + 2026, 1, 1, 0, 0, 0, -1, // ekey: 2026-01-01 00:00:00 + 365LL * 24LL * 60LL * 60LL * 1000LL); // 2025 is normal year (365 days) + + freeMockTriggerTask(pTask); +} + +/** + * Test leap year February 29th handling + * Verify correct window calculation for leap year boundary + */ +TEST_F(StreamTriggerTaskTest, LeapYearFebruary29) { + // Create task with PERIOD(1n) + SStreamTriggerTask* pTask = createMockTriggerTask('n', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp: 2024-02-29 12:00:00 (leap year) + int64_t ts = makeTimestamp(2024, 2, 29, 12, 0, 0); + + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Window should be [2024-02-01 00:00:00, 2024-03-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2024, 2, 1, 0, 0, 0, -1, // skey: 2024-02-01 00:00:00 + 2024, 3, 1, 0, 0, 0, -1, // ekey: 2024-03-01 00:00:00 + 29LL * 24LL * 60LL * 60LL * 1000LL); // February 2024 has 29 days + + freeMockTriggerTask(pTask); +} + +/** + * Test month boundary transitions (small to large month) + * Verify correct handling of February to March transition + */ +TEST_F(StreamTriggerTaskTest, MonthBoundarySmallToLarge) { + // Create task with PERIOD(1n) + SStreamTriggerTask* pTask = createMockTriggerTask('n', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Start with February 2025 (28 days, non-leap year) + int64_t ts = makeTimestamp(2025, 2, 2, 0, 0, 0); // 2025-02-02 00:00:00 + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Verify February window: [2025-02-01 00:00:00, 2025-03-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2025, 2, 1, 0, 0, 0, -1, // skey: 2025-02-01 00:00:00 + 2025, 3, 1, 0, 0, 0, -1, // ekey: 2025-03-01 00:00:00 + 28LL * 24LL * 60LL * 60LL * 1000LL); // February has 28 days + + int64_t febEkey = win.ekey; + + // Advance to March (31 days) + stTriggerTaskNextTimeWindow(pTask, &win); + EXPECT_EQ(win.skey, febEkey + 1); + + // Verify March window: [2025-03-01 00:00:00, 2025-04-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2025, 3, 1, 0, 0, 0, -1, // skey: 2025-03-01 00:00:00 + 2025, 4, 1, 0, 0, 0, -1, // ekey: 2025-04-01 00:00:00 + 31LL * 24LL * 60LL * 60LL * 1000LL); // March has 31 days + + freeMockTriggerTask(pTask); +} + +/** + * Test month boundary transitions (large to small month) + * Verify correct handling of January to February transition + */ +TEST_F(StreamTriggerTaskTest, MonthBoundaryLargeToSmall) { + // Create task with PERIOD(1n) + SStreamTriggerTask* pTask = createMockTriggerTask('n', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Start with January 2025 (31 days) + int64_t ts = makeTimestamp(2025, 1, 2, 0, 0, 0); // 2025-01-01 00:00:00 + STimeWindow win = stTriggerTaskGetTimeWindow(pTask, ts); + + // Verify January window: [2025-01-01 00:00:00, 2025-02-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2025, 1, 1, 0, 0, 0, -1, // skey: 2025-01-01 00:00:00 + 2025, 2, 1, 0, 0, 0, -1, // ekey: 2025-02-01 00:00:00 + 31LL * 24LL * 60LL * 60LL * 1000LL); // January has 31 days + + int64_t janEkey = win.ekey; + + // Advance to February (28 days, non-leap year) + stTriggerTaskNextTimeWindow(pTask, &win); + EXPECT_EQ(win.skey, janEkey + 1); + + // Verify February window: [2025-02-01 00:00:00, 2025-03-01 00:00:00] + verifyWindow(win, TSDB_TIME_PRECISION_MILLI, 2025, 2, 1, 0, 0, 0, -1, // skey: 2025-02-01 00:00:00 + 2025, 3, 1, 0, 0, 0, -1, // ekey: 2025-03-01 00:00:00 + 28LL * 24LL * 60LL * 60LL * 1000LL); // February has 28 days + + freeMockTriggerTask(pTask); +} + +/** + * Test multi-period epoch alignment verification + * Verify that 2-week periods align consistently across different timestamps + */ +TEST_F(StreamTriggerTaskTest, EpochAlignmentVerification) { + // Create task with PERIOD(2w) + SStreamTriggerTask* pTask = createMockTriggerTask('w', 2, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test multiple timestamps across different 2-week periods + // All should align to epoch-based 2-week boundaries + + // Timestamp 1: 2026-01-06 (week 1 of 2026) + int64_t ts1 = makeTimestamp(2026, 1, 6, 0, 0, 0); + STimeWindow win1 = stTriggerTaskGetTimeWindow(pTask, ts1); + + // Timestamp 2: 2026-01-20 (week 3 of 2026, should be in next 2-week period) + int64_t ts2 = makeTimestamp(2026, 1, 20, 0, 0, 0); + STimeWindow win2 = stTriggerTaskGetTimeWindow(pTask, ts2); + + // Windows should be different (different 2-week periods) + EXPECT_NE(win1.skey, win2.skey); + + // But win2.skey should equal win1.ekey + 1 (continuous, closed interval) + EXPECT_EQ(win2.skey, win1.ekey + 1); + + // Window 1 should be [2026-01-05 00:00:00 Monday, 2026-01-19 00:00:00 Monday] + // Window 2 should be [2026-01-19 00:00:00 Monday, 2026-02-02 00:00:00 Monday] + verifyWindow(win1, TSDB_TIME_PRECISION_MILLI, 2026, 1, 5, 0, 0, 0, 1, // skey: 2026-01-05 00:00:00 Monday + 2026, 1, 19, 0, 0, 0, 1, // ekey: 2026-01-19 00:00:00 Monday + 14LL * 24LL * 60LL * 60LL * 1000LL); // 14 days + + verifyWindow(win2, TSDB_TIME_PRECISION_MILLI, 2026, 1, 19, 0, 0, 0, 1, // skey: 2026-01-19 00:00:00 Monday + 2026, 2, 2, 0, 0, 0, 1, // ekey: 2026-02-02 00:00:00 Monday + 14LL * 24LL * 60LL * 60LL * 1000LL); // 14 days + + freeMockTriggerTask(pTask); +} + +/** + * Test daylight saving time transition (if applicable) + * Note: This test assumes server timezone observes DST + * The trigger time should remain at 00:00:00 local time regardless of DST + */ +TEST_F(StreamTriggerTaskTest, DaylightSavingTimeTransition) { + // Create task with PERIOD(1w) + SStreamTriggerTask* pTask = createMockTriggerTask('w', 1, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp during DST transition period (example: March 2026 in US) + // Note: Actual DST dates vary by timezone + // This test verifies that mktime() handles DST correctly + + // Before DST: 2026-03-02 (Monday before DST starts) + int64_t ts1 = makeTimestamp(2026, 3, 2, 0, 0, 0); + STimeWindow win1 = stTriggerTaskGetTimeWindow(pTask, ts1); + + // After DST: 2026-03-16 (Monday after DST starts) + int64_t ts2 = makeTimestamp(2026, 3, 16, 0, 0, 0); + STimeWindow win2 = stTriggerTaskGetTimeWindow(pTask, ts2); + + // Both windows should align to Monday 00:00:00 local time + verifyWindow(win1, TSDB_TIME_PRECISION_MILLI, 2026, -1, -1, 0, -1, -1, 1, // skey: 2026 Monday 00:00:00 + -1, -1, -1, -1, -1, -1, -1); + + verifyWindow(win2, TSDB_TIME_PRECISION_MILLI, 2026, -1, -1, 0, -1, -1, 1, // skey: 2026 Monday 00:00:00 + -1, -1, -1, -1, -1, -1, -1); + + freeMockTriggerTask(pTask); +} + +/** + * Test multi-period year alignment (2 years) + * Verify epoch-based alignment for biennial periods + */ +TEST_F(StreamTriggerTaskTest, MultiPeriodYearAlignment) { + // Create task with PERIOD(2y) + SStreamTriggerTask* pTask = createMockTriggerTask('y', 2, 0, TSDB_TIME_PRECISION_MILLI); + ASSERT_NE(pTask, nullptr); + + // Test timestamp in 2024 (even year from epoch 1970) + int64_t ts1 = makeTimestamp(2024, 1, 1, 0, 0, 0); // 2024-01-01 00:00:00 + STimeWindow win1 = stTriggerTaskGetTimeWindow(pTask, ts1); + + // Test timestamp in 2025 (odd year from epoch 1970) + int64_t ts2 = makeTimestamp(2025, 1, 1, 0, 0, 0); // 2025-01-01 00:00:00 + STimeWindow win2 = stTriggerTaskGetTimeWindow(pTask, ts2); + + // 2024 and 2025 are in different 2-year periods + EXPECT_NE(win1.skey, win2.skey); + EXPECT_NE(win1.ekey, win2.ekey); + + // win2.skey should equal win1.ekey + 1 (consecutive windows) + EXPECT_EQ(win2.skey, win1.ekey + 1); + + // Window 1 should be [2022-01-01 00:00:00, 2024-01-01 00:00:00] + // Window 2 should be [2024-01-01 00:00:00, 2026-01-01 00:00:00] + verifyWindow(win1, TSDB_TIME_PRECISION_MILLI, 2022, 1, 1, 0, 0, 0, -1, // skey: 2022-01-01 00:00:00 + 2024, 1, 1, 0, 0, 0, -1); // ekey: 2024-01-01 00:00:00 + + verifyWindow(win2, TSDB_TIME_PRECISION_MILLI, 2024, 1, 1, 0, 0, 0, -1, // skey: 2024-01-01 00:00:00 + 2026, 1, 1, 0, 0, 0, -1); // ekey: 2026-01-01 00:00:00 + + freeMockTriggerTask(pTask); +} diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3d4c5d256276..a5f30fd20130 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8719,21 +8719,79 @@ static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSe static const int64_t periodLowerBound = 10; static const int64_t periodUpperBound = (int64_t)3650 * 24 * 60 * 60 * 1000; // 10 years in milliseconds -static const int64_t offsetUpperBound = (int64_t)24 * 60 * 60 * 1000; // 1 day in milliseconds +/** + * @brief Validate PERIOD trigger window parameters + * + * Validates period interval and offset parameters for stream PERIOD triggers. + * Performs unit validation, range checking, and offset-period relationship validation. + * + * @param pCxt Translation context for error reporting + * @param pPeriod Period window node containing interval and offset + * @return TSDB_CODE_SUCCESS on valid parameters, error code otherwise + * + * @note Period unit validation: + * - Natural units: n (month), y (year) + * - Fixed units: a (ms), s (second), m (minute), h (hour), d (day), w (week) + * @note Period range validation: + * - Month: [1n, 120n] (10 years) + * - Year: [1y, 10y] + * - Fixed units: [10a, 3650d] + * @note Offset validation: + * - Supported units: a, s, m, h, d (week/month/year not allowed) + * - Must satisfy: offset < period (strict inequality) + * - Month unit special case: offset < N * 28 days (shortest month) + */ static int32_t checkPeriodWindow(STranslateContext* pCxt, SPeriodWindowNode* pPeriod) { uint8_t precision = TSDB_TIME_PRECISION_MILLI; SValueNode* pPer = (SValueNode*)pPeriod->pPeroid; SValueNode* pOffset = (SValueNode*)pPeriod->pOffset; if (pPer) { - if (pPer->unit != 'a' && pPer->unit != 's' && pPer->unit != 'm' && pPer->unit != 'h' && pPer->unit != 'd') { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PERIOD_UNIT, pPer->unit); + if (pPer->unit != 'a' && pPer->unit != 's' && pPer->unit != 'm' && pPer->unit != 'h' && pPer->unit != 'd' && + pPer->unit != 'w' && pPer->unit != 'n' && pPer->unit != 'y') { + char errMsg[256]; + snprintf(errMsg, sizeof(errMsg), + "Invalid time unit '%c' in PERIOD interval. " + "Supported interval units: a (millisecond), s (second), m (minute), h (hour), d (day), w (week), n (month), y (year)", + pPer->unit); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PERIOD_UNIT, errMsg); + } + + // Range validation based on unit type + // Note: For 'n' and 'y', datum.i is the original value (e.g., 3 for "3n") + // For other units, datum.i is already converted by getDuration() to the target precision + int64_t value = pPer->datum.i; + bool outOfRange = false; + const char* rangeMsg = NULL; + + switch (pPer->unit) { + case 'n': // Month: [1n, 120n] (10 years) + // datum.i is the original value for natural units + if (value < 1 || value > 120) { + outOfRange = true; + rangeMsg = "Period value out of range [1n, 120n]"; + } + break; + case 'y': // Year: [1y, 10y] + // datum.i is the original value for natural units + if (value < 1 || value > 10) { + outOfRange = true; + rangeMsg = "Period value out of range [1y, 10y]"; + } + break; + default: // Fixed duration units: a/s/m/h/d/w + // datum.i is already converted to target precision + if (pPer->datum.i / getPrecisionMultiple(precision) < periodLowerBound || + pPer->datum.i / getPrecisionMultiple(precision) > periodUpperBound) { + outOfRange = true; + rangeMsg = "Period value out of range [10a, 3650d]"; + } + break; } - if (pPer->datum.i / getPrecisionMultiple(precision) < periodLowerBound || - pPer->datum.i / getPrecisionMultiple(precision) > periodUpperBound) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PERIOD_RANGE, - "Period value out of range [10a, 3650d]"); + + if (outOfRange) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_PERIOD_RANGE, rangeMsg); } } @@ -8747,13 +8805,29 @@ static int32_t checkPeriodWindow(STranslateContext* pCxt, SPeriodWindowNode* pPe generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_TRIGGER, "Negative period offset value")); } - if (pOffset->unit != 'a' && pOffset->unit != 's' && pOffset->unit != 'm' && pOffset->unit != 'h') { - PAR_ERR_RET(generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WIN_OFFSET_UNIT, pOffset->unit)); - } - - if (pOffset->datum.i > offsetUpperBound) { - PAR_ERR_RET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_STREAM_INVALID_TRIGGER, - "Period offset value should less than 1d")); + if (pOffset->unit != 'a' && pOffset->unit != 's' && pOffset->unit != 'm' && pOffset->unit != 'h' && pOffset->unit != 'd') { + char errMsg[256]; + snprintf(errMsg, sizeof(errMsg), + "Invalid time unit '%c' in PERIOD offset. " + "Supported offset units: a (millisecond), s (second), m (minute), h (hour), d (day)", + pOffset->unit); + PAR_ERR_RET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_OFFSET_UNIT, errMsg)); + } + + // Validate offset < period (strict inequality) + // Note: offset datum.i is already converted by getDuration() to target precision + // period datum.i is converted for non-natural units, original value for 'n'/'y' + // Convert both to milliseconds for comparison + int64_t offsetValue = pOffset->datum.i; + int64_t periodValue = pPer->datum.i; + if (pPer->unit == 'n') { + periodValue = convertTimePrecision(periodValue * 28LL * MILLISECOND_PER_DAY, TSDB_TIME_PRECISION_MILLI, precision); // Convert N months to milliseconds using 28 days/month + } else if (pPer->unit == 'y') { + periodValue = convertTimePrecision(periodValue * 365LL * MILLISECOND_PER_DAY, TSDB_TIME_PRECISION_MILLI, precision); // Convert N years to milliseconds using 365 days/year + } + if (offsetValue >= periodValue) { + PAR_ERR_RET(generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_OFFSET_VALUE, + "Offset must be strictly less than period")); } } diff --git a/source/libs/parser/test/parStreamTest.cpp b/source/libs/parser/test/parStreamTest.cpp index e2e5bd8a994b..aa0c2ee7b307 100644 --- a/source/libs/parser/test/parStreamTest.cpp +++ b/source/libs/parser/test/parStreamTest.cpp @@ -1734,14 +1734,13 @@ TEST_F(ParserStreamTest, TestErrorTriggerWindow) { // invalid period unit run("create stream stream_streamdb.s1 period(1u) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_UNIT); run("create stream stream_streamdb.s1 period(1b) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_UNIT); - run("create stream stream_streamdb.s1 period(1w) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_UNIT); - run("create stream stream_streamdb.s1 period(1n) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_UNIT); - run("create stream stream_streamdb.s1 period(1y) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_UNIT); // period val out of [10a, 3650d] run("create stream stream_streamdb.s1 period(9a) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_RANGE); run("create stream stream_streamdb.s1 period(3660d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_RANGE); - + run("create stream stream_streamdb.s1 period(522w) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_RANGE); + run("create stream stream_streamdb.s1 period(121n) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_RANGE); + run("create stream stream_streamdb.s1 period(11y) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _twstart, avg(c1) from stream_querydb.stream_t2", TSDB_CODE_PAR_INVALID_PERIOD_RANGE); } TEST_F(ParserStreamTest, TestErrorTriggerTable) { @@ -1931,4 +1930,55 @@ TEST_F(ParserStreamTest, TestErrorQueryPlaceHolder) { } +TEST_F(ParserStreamTest, TestPeriodOffset) { + setAsyncFlag("-1"); + + // Valid offset cases + // PERIOD(1w, 1d) - week with day offset + run("create stream stream_streamdb.s1 period(1w, 1d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)"); + + // PERIOD(1w, 12h) - week with hour offset + run("create stream stream_streamdb.s1 period(1w, 12h) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)"); + + // PERIOD(1n, 14d) - month with day offset + run("create stream stream_streamdb.s1 period(1n, 14d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1n)"); + + // PERIOD(1y, 31d) - year with day offset + run("create stream stream_streamdb.s1 period(1y, 31d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1y)"); + + // Invalid offset cases + // offset >= period (should fail) + run("create stream stream_streamdb.s1 period(1w, 7d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)", TSDB_CODE_PAR_INVALID_OFFSET_VALUE); + + // offset > period (should fail) + run("create stream stream_streamdb.s1 period(1w, 8d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)", TSDB_CODE_PAR_INVALID_OFFSET_VALUE); + + // Month unit offset overflow (>= 28 days for 1 month) + run("create stream stream_streamdb.s1 period(1n, 28d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1n)", TSDB_CODE_PAR_INVALID_OFFSET_VALUE); + + // Month unit offset overflow (>= 56 days for 2 months) + run("create stream stream_streamdb.s1 period(2n, 56d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(2n)", TSDB_CODE_PAR_INVALID_OFFSET_VALUE); + + // Valid month offset (< 28 days) + run("create stream stream_streamdb.s1 period(1n, 27d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1n)"); + + // Invalid offset unit (week not allowed for offset) + run("create stream stream_streamdb.s1 period(1n, 1w) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1n)", TSDB_CODE_PAR_INVALID_OFFSET_UNIT); + + // Invalid offset unit (month not allowed for offset) + run("create stream stream_streamdb.s1 period(1y, 1n) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1y)", TSDB_CODE_PAR_INVALID_OFFSET_UNIT); + + // Invalid offset unit (year not allowed for offset) + run("create stream stream_streamdb.s1 period(1y, 1y) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1y)", TSDB_CODE_PAR_INVALID_OFFSET_UNIT); + + // Multi-unit combinations + // Week + various offset units + run("create stream stream_streamdb.s1 period(1w, 3600000a) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)"); // 1 hour in ms + run("create stream stream_streamdb.s1 period(1w, 3600s) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)"); // 1 hour in seconds + run("create stream stream_streamdb.s1 period(1w, 60m) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)"); // 1 hour in minutes + + // Negative offset (should fail with syntax error) + run("create stream stream_streamdb.s1 period(1w, -1d) from stream_triggerdb.stream_t1 into stream_outdb.stream_out as select _wstart, avg(c1) from stream_querydb.stream_t2 interval(1w)", TSDB_CODE_PAR_SYNTAX_ERROR, PARSER_STAGE_PARSE); +} + } // namespace ParserTest diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d1c7ecf6ad37..77e3c484fed1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -177,6 +177,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FAIL_GENERATE_JSON, "failed to generate JS TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR, "bind number out of range or not match") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PERIOD_UNIT, "Invalid period unit") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PERIOD_RANGE, "Invalid period range") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_OFFSET_UNIT, "Invalid offset unit") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_OFFSET_VALUE, "Invalid offset value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_DROP_VTABLE, "Invalid drop vtable") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COLUMN_REF, "Invalid column reference") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SLIDING_OFFSET, "Invalid sliding offset") diff --git a/test/cases/18-StreamProcessing/03-TriggerMode/test_period_natural_units.py b/test/cases/18-StreamProcessing/03-TriggerMode/test_period_natural_units.py new file mode 100644 index 000000000000..f240ccdbfe62 --- /dev/null +++ b/test/cases/18-StreamProcessing/03-TriggerMode/test_period_natural_units.py @@ -0,0 +1,344 @@ +from new_test_framework.utils import tdLog, tdSql, tdStream + + +class TestPeriodNaturalUnits: + def setup_class(self): + tdLog.debug("start to execute %s" % __file__) + + def prepare_env(self): + """Prepare test environment""" + tdLog.info("=== Prepare test environment ===") + tdStream.dropAllStreamsAndDbs() + tdStream.createSnode() + tdStream.init_database("test_period_natural") + tdSql.execute("use test_period_natural") + tdSql.execute("create table meters (ts timestamp, current float, voltage int)") + tdLog.success("Test environment prepared") + + # ========== Week Unit Tests ========== + + def _test_week_basic(self): + """Test basic week unit syntax - PERIOD(1w)""" + tdLog.info("=== Test PERIOD(1w) basic syntax ===") + + tdSql.execute("drop stream if exists s_week_1w") + tdSql.execute(""" + create stream s_week_1w period(1w) into s_week_1w_output + as select _wstart, avg(current) from meters interval(1w) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_week_1w'") + tdSql.checkRows(1) + + tdLog.success("PERIOD(1w) basic syntax test passed") + + def _test_week_multi_period(self): + """Test multi-week period syntax - PERIOD(2w), PERIOD(4w)""" + tdLog.info("=== Test multi-week period syntax ===") + + # PERIOD(2w) - bi-weekly + tdSql.execute("drop stream if exists s_week_2w") + tdSql.execute(""" + create stream s_week_2w period(2w) into s_week_2w_output + as select _wstart, avg(current) from meters interval(2w) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_week_2w'") + tdSql.checkRows(1) + + # PERIOD(4w) - monthly (4 weeks) + tdSql.execute("drop stream if exists s_week_4w") + tdSql.execute(""" + create stream s_week_4w period(4w) into s_week_4w_output + as select _wstart, avg(current) from meters interval(4w) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_week_4w'") + tdSql.checkRows(1) + + tdLog.success("Multi-week period syntax test passed") + + def _test_week_errors(self): + """Test week unit error cases""" + tdLog.info("=== Test week unit errors ===") + + # 0w - should fail (minimum is 1w) + tdSql.error(""" + create stream s_week_0w period(0w) into s_week_0w_output + as select _wstart, avg(current) from meters interval(1w) + """) + + # 522w - should fail (maximum is 521w) + tdSql.error(""" + create stream s_week_522w period(522w) into s_week_522w_output + as select _wstart, avg(current) from meters interval(522w) + """) + + # Invalid unit 'x' + tdSql.error(""" + create stream s_invalid_unit period(1x) into s_invalid_unit_output + as select _wstart, avg(current) from meters interval(1w) + """) + + tdLog.success("Week unit errors test passed") + + # ========== Month Unit Tests ========== + + def _test_month_basic(self): + """Test basic month unit syntax - PERIOD(1n)""" + tdLog.info("=== Test month unit basic syntax ===") + + tdSql.execute("drop stream if exists s_month_1n") + tdSql.execute(""" + create stream s_month_1n period(1n) into s_month_1n_output + as select _wstart, avg(current) from meters interval(1n) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_month_1n'") + tdSql.checkRows(1) + + # PERIOD(3n) - quarterly + tdSql.execute("drop stream if exists s_month_3n") + tdSql.execute(""" + create stream s_month_3n period(3n) into s_month_3n_output + as select _wstart, avg(current) from meters interval(3n) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_month_3n'") + tdSql.checkRows(1) + + tdLog.success("Month unit basic syntax test passed") + + def _test_month_errors(self): + """Test month unit error cases""" + tdLog.info("=== Test month unit errors ===") + + # 0n - should fail + tdSql.error(""" + create stream s_month_0n period(0n) into s_month_0n_output + as select _wstart, avg(current) from meters interval(1n) + """) + + # 121n - should fail (maximum is 120n) + tdSql.error(""" + create stream s_month_121n period(121n) into s_month_121n_output + as select _wstart, avg(current) from meters interval(121n) + """) + + tdLog.success("Month unit errors test passed") + + # ========== Year Unit Tests ========== + + def _test_year_basic(self): + """Test basic year unit syntax - PERIOD(1y)""" + tdLog.info("=== Test year unit basic syntax ===") + + tdSql.execute("drop stream if exists s_year_1y") + tdSql.execute(""" + create stream s_year_1y period(1y) into s_year_1y_output + as select _wstart, avg(current) from meters interval(1y) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_year_1y'") + tdSql.checkRows(1) + + # PERIOD(2y) + tdSql.execute("drop stream if exists s_year_2y") + tdSql.execute(""" + create stream s_year_2y period(2y) into s_year_2y_output + as select _wstart, avg(current) from meters interval(2y) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_year_2y'") + tdSql.checkRows(1) + + tdLog.success("Year unit basic syntax test passed") + + def _test_year_errors(self): + """Test year unit error cases""" + tdLog.info("=== Test year unit errors ===") + + # 0y - should fail + tdSql.error(""" + create stream s_year_0y period(0y) into s_year_0y_output + as select _wstart, avg(current) from meters interval(1y) + """) + + # 11y - should fail (maximum is 10y) + tdSql.error(""" + create stream s_year_11y period(11y) into s_year_11y_output + as select _wstart, avg(current) from meters interval(11y) + """) + + tdLog.success("Year unit errors test passed") + + # ========== Offset Tests ========== + + def _test_offset_basic(self): + """Test basic offset syntax""" + tdLog.info("=== Test basic offset syntax ===") + + # PERIOD(1w, 1d) - trigger on Tuesday (Monday + 1 day) + tdSql.execute("drop stream if exists s_offset_1w_1d") + tdSql.execute(""" + create stream s_offset_1w_1d period(1w, 1d) into s_offset_1w_1d_output + as select _wstart, avg(current) from meters interval(1w) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_offset_1w_1d'") + tdSql.checkRows(1) + + # PERIOD(1w, 12h) - trigger on Monday noon + tdSql.execute("drop stream if exists s_offset_1w_12h") + tdSql.execute(""" + create stream s_offset_1w_12h period(1w, 12h) into s_offset_1w_12h_output + as select _wstart, avg(current) from meters interval(1w) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_offset_1w_12h'") + tdSql.checkRows(1) + + tdLog.success("Basic offset syntax test passed") + + def _test_offset_cross_unit(self): + """Test cross-unit offset combinations""" + tdLog.info("=== Test cross-unit offset ===") + + # Week + day offset + tdSql.execute("drop stream if exists s_offset_week_day") + tdSql.execute(""" + create stream s_offset_week_day period(2w, 3d) into s_offset_week_day_output + as select _wstart, avg(current) from meters interval(2w) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_offset_week_day'") + tdSql.checkRows(1) + + # Month + day offset + tdSql.execute("drop stream if exists s_offset_month_day") + tdSql.execute(""" + create stream s_offset_month_day period(1n, 14d) into s_offset_month_day_output + as select _wstart, avg(current) from meters interval(1n) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_offset_month_day'") + tdSql.checkRows(1) + + # Year + day offset + tdSql.execute("drop stream if exists s_offset_year_day") + tdSql.execute(""" + create stream s_offset_year_day period(1y, 31d) into s_offset_year_day_output + as select _wstart, avg(current) from meters interval(1y) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_offset_year_day'") + tdSql.checkRows(1) + + tdLog.success("Cross-unit offset test passed") + + def _test_offset_validation_errors(self): + """Test offset validation error cases""" + tdLog.info("=== Test offset validation errors ===") + + # offset >= period (should fail) + tdSql.error(""" + create stream s_offset_equal period(1w, 7d) into s_offset_equal_output + as select _wstart, avg(current) from meters interval(1w) + """) + + # offset > period (should fail) + tdSql.error(""" + create stream s_offset_greater period(1w, 8d) into s_offset_greater_output + as select _wstart, avg(current) from meters interval(1w) + """) + + # Negative offset (should fail) + tdSql.error(""" + create stream s_offset_negative period(1w, -1d) into s_offset_negative_output + as select _wstart, avg(current) from meters interval(1w) + """) + + tdLog.success("Offset validation errors test passed") + + def _test_offset_month_overflow(self): + """Test month unit offset overflow validation""" + tdLog.info("=== Test month offset overflow ===") + + # PERIOD(1n, 28d) - should fail (>= 28 days) + tdSql.error(""" + create stream s_month_overflow_28d period(1n, 28d) into s_month_overflow_28d_output + as select _wstart, avg(current) from meters interval(1n) + """) + + # PERIOD(2n, 56d) - should fail (>= 2*28 days) + tdSql.error(""" + create stream s_month_overflow_56d period(2n, 56d) into s_month_overflow_56d_output + as select _wstart, avg(current) from meters interval(2n) + """) + + # PERIOD(1n, 27d) - should succeed (< 28 days) + tdSql.execute("drop stream if exists s_month_valid_27d") + tdSql.execute(""" + create stream s_month_valid_27d period(1n, 27d) into s_month_valid_27d_output + as select _wstart, avg(current) from meters interval(1n) + """) + tdSql.query("select stream_name from information_schema.ins_streams where stream_name='s_month_valid_27d'") + tdSql.checkRows(1) + + tdLog.success("Month offset overflow test passed") + + def _test_offset_invalid_units(self): + """Test invalid offset units""" + tdLog.info("=== Test invalid offset units ===") + + # Week unit not allowed for offset + tdSql.error(""" + create stream s_offset_invalid_w period(1n, 1w) into s_offset_invalid_w_output + as select _wstart, avg(current) from meters interval(1n) + """) + + # Month unit not allowed for offset + tdSql.error(""" + create stream s_offset_invalid_n period(1y, 1n) into s_offset_invalid_n_output + as select _wstart, avg(current) from meters interval(1y) + """) + + # Year unit not allowed for offset + tdSql.error(""" + create stream s_offset_invalid_y period(1y, 1y) into s_offset_invalid_y_output + as select _wstart, avg(current) from meters interval(1y) + """) + + tdLog.success("Invalid offset units test passed") + + def test_period_natural_units(self): + """ + Test PERIOD trigger with natural time units (week/month/year) + + Purpose: Comprehensive tests for stream PERIOD trigger with natural time units + Steps: + 1. Test week unit (PERIOD(1w), PERIOD(2w)) + 2. Test month unit (PERIOD(1n), PERIOD(3n)) + 3. Test year unit (PERIOD(1y), PERIOD(2y)) + 4. Test offset parameter (PERIOD(1w, 1d), PERIOD(1n, 14d)) + 5. Test validation errors (invalid units, out-of-range, offset overflow) + Expected: All natural time unit syntax accepted, proper validation, metadata stored correctly + + Since: 3.4.1.0 + + Labels: stream, common, ci + + Feishu: https://project.feishu.cn/taosdata_td/feature/detail/6490755304 + + History: + - 2026-03-10 Initial implementation for natural time units feature + """ + self.prepare_env() + + # Week unit tests + self._test_week_basic() + self._test_week_multi_period() + self._test_week_errors() + + # Month unit tests + self._test_month_basic() + self._test_month_errors() + + # Year unit tests + self._test_year_basic() + self._test_year_errors() + + # Offset tests + self._test_offset_basic() + self._test_offset_cross_unit() + self._test_offset_validation_errors() + self._test_offset_month_overflow() + self._test_offset_invalid_units() diff --git a/test/cases/18-StreamProcessing/20-UseCase/tobacco_data/idmp/stream.json b/test/cases/18-StreamProcessing/20-UseCase/tobacco_data/idmp/stream.json index 7b2db1547f13..4ff8202785d2 100644 --- a/test/cases/18-StreamProcessing/20-UseCase/tobacco_data/idmp/stream.json +++ b/test/cases/18-StreamProcessing/20-UseCase/tobacco_data/idmp/stream.json @@ -350,7 +350,7 @@ { "id": 10, "name": "ana_电子皮带秤_f1w1a_belt_scale_08_定时窗口_平均测量值", - "create": "CREATE STREAM IF NOT EXISTS `idmp`.`%STREAM_NAME` PERIOD(10s,10s) FROM `idmp`.`vt_电子皮带秤_f1w1a_belt_scale_08` STREAM_OPTIONS(IGNORE_DISORDER) NOTIFY('ws://idmp:6042/eventReceive') ON(WINDOW_OPEN|WINDOW_CLOSE) INTO `idmp`.`%STREAM_NAME` AS SELECT cast(_tlocaltime/1000000 as timestamp) as output_timestamp, AVG(`测量值`) AS `测量值` FROM idmp.`vt_电子皮带秤_f1w1a_belt_scale_08` WHERE ts <= cast(_tlocaltime/1000000 as timestamp);", + "create": "CREATE STREAM IF NOT EXISTS `idmp`.`%STREAM_NAME` PERIOD(10s, 9s) FROM `idmp`.`vt_电子皮带秤_f1w1a_belt_scale_08` STREAM_OPTIONS(IGNORE_DISORDER) NOTIFY('ws://idmp:6042/eventReceive') ON(WINDOW_OPEN|WINDOW_CLOSE) INTO `idmp`.`%STREAM_NAME` AS SELECT cast(_tlocaltime/1000000 as timestamp) as output_timestamp, AVG(`测量值`) AS `测量值` FROM idmp.`vt_电子皮带秤_f1w1a_belt_scale_08` WHERE ts <= cast(_tlocaltime/1000000 as timestamp);", "data": [ "INSERT INTO `f1w1a_belt_scale_08`(ts,measured_value) VALUES(now, 0.0),(now+1s, 1.0),(now+2s, 2.0),(now+3s, 3.0),(now+4s, 4.0),(now+5s, 5.0),(now+6s, 6.0),(now+7s, 7.0),(now+8s, 8.0),(now+9s, 9.0);" ], @@ -457,4 +457,4 @@ } ] } -] \ No newline at end of file +] diff --git a/test/ci/cases.task b/test/ci/cases.task index fe4f41d8f4c2..2b6df13049c7 100644 --- a/test/ci/cases.task +++ b/test/ci/cases.task @@ -730,6 +730,7 @@ ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_state.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_ts_7622.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_window_true_for.py +,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_period_natural_units.py ## 04-Options ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/04-Options/test_abnormal_data_table.py ,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/04-Options/test_abnormal_data_vtable.py