Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 76 additions & 5 deletions docs/en/14-reference/03-taos-sql/41-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,20 @@ PERIOD(period_time[, offset_time])

A scheduled trigger is driven by a fixed interval based on the system time, essentially functioning as a scheduled task. It does not belong to the category of window triggers. Parameter definitions are as follows:

- period_time: The scheduling interval. Supported time units include milliseconds (a), seconds (s), minutes (m), hours (h), and days (d). The supported range is [10a, 3650d].
- offset_time: (Optional) The scheduling offset. Supported units include milliseconds (a), seconds (s), minutes (m), and hours (h). The offset value must be less than 1 day.
- period_time: The scheduling interval. Supported time units include milliseconds (a), seconds (s), minutes (m), hours (h), days (d), weeks (w), months (n), and years (y). The supported range is [10a, 3650d].
- offset_time: (Optional) The scheduling offset. Supported units include milliseconds (a), seconds (s), minutes (m), hours (h), and days (d). For week/month/year units, the offset must be strictly less than the trigger period; for month units, validation is based on 28 days/month (e.g., `PERIOD(1n, 28d)` is invalid).

Usage Notes:

- When the scheduling interval is less than one day, the base time is calculated as midnight (00:00) plus the scheduling offset. The next trigger time is determined based on this base time and the specified interval. The base time resets to midnight each day. The time between the last trigger of one day and the base time of the next day may be shorter than the scheduling interval. For example:
- If the scheduling interval is 5 hours 30 minutes, the trigger times for the day will be [00:00, 05:30, 11:00, 16:30, 22:00]. The trigger times for subsequent days will be the same.
- With the same interval but an offset of 1 minute, the trigger times will be [00:01, 05:31, 11:01, 16:31, 22:01] each day.
- Under the same conditions, if the stream is created when the system time is 12:00, the trigger times for the current day will be [16:31, 22:01]. From the next day onwards, the trigger times will be [00:01, 05:31, 11:01, 16:31, 22:01].
- When the scheduling interval is greater than or equal to 1 day, the base time is calculated as midnight (00:00) of the current day plus the scheduling offset, and it will not reset on subsequent days. For example:
- If the scheduling interval is 1 day 1 hour and the stream is created when the system time is 05-01 12:00, the trigger times will be [05-02 01:00, 05-03 02:00, 05-04 03:00, 05-05 04:00, …].
- Under the same conditions, if the time offset is 1 minute, the trigger times will be [05-02 01:01, 05-03 02:02, 05-04 03:03, 05-05 04:04, …].
- When the scheduling interval is greater than or equal to 1 day, the base time is calculated from the server timezone's Unix epoch (1970-01-01 00:00:00) plus the scheduling offset, aligned by integer multiples of the trigger interval to ensure global consistency across all tasks. For example:
- With a scheduling interval of 2 days, all tasks using this interval will trigger at times that are integer multiples of 2 days from the epoch (e.g., 1970-01-03 00:00:00, 1970-01-05 00:00:00, ...), ensuring global alignment.
- With a scheduling interval of 1 week (`PERIOD(1w)`), triggers align to every Monday at 00:00:00; `PERIOD(1w, 1d)` triggers every Tuesday at 00:00:00.
- With a scheduling interval of 1 month (`PERIOD(1n)`), triggers align to the 1st of each month at 00:00:00; `PERIOD(1n, 14d)` triggers on the 15th of each month at 00:00:00.
- With a scheduling interval of 1 year (`PERIOD(1y)`), triggers align to January 1st at 00:00:00 each year; `PERIOD(1y, 31d)` triggers on February 1st at 00:00:00 each year.

Applicable scenarios: Situations requiring scheduled computation driven continuously by system time, such as generating daily statistics every hour, or sending scheduled statistical reports once a day.

Expand Down Expand Up @@ -1018,3 +1020,72 @@ CREATE stream stream_consumer_energy
FROM meters
WHERE ts >= cast(_tprev_localtime/1000000 AS timestamp) AND ts <= cast(_tlocaltime/1000000 AS timestamp);
```

- Every Monday at 00:00:00, compute the weekly device operation summary for the previous week and write the results to the weekly_summary table.

```SQL
CREATE STREAM weekly_device_summary
PERIOD(1w)
FROM meters PARTITION BY location
INTO weekly_summary
AS
SELECT _wstart AS week_start,
location,
AVG(current) AS avg_current,
MAX(voltage) AS max_voltage,
COUNT(*) AS record_count
FROM meters
INTERVAL(1w)
PARTITION BY location;
```

- On the 1st of each month at 00:00:00, compute the energy consumption bill for the previous month and write the results to the monthly_bill table.

```SQL
CREATE STREAM monthly_energy_bill
PERIOD(1n)
FROM meters PARTITION BY location, groupId
INTO monthly_bill
AS
SELECT _wstart AS month_start,
location,
groupId,
SUM(current * voltage) AS total_energy
FROM meters
INTERVAL(1n)
PARTITION BY location, groupId;
```

- On the 15th of each month at 00:00:00, compute the mid-month settlement report (using the offset parameter).

```SQL
CREATE STREAM mid_month_settlement
PERIOD(1n, 14d)
FROM meters PARTITION BY location
INTO mid_month_settlement_table
AS
SELECT _wstart AS period_start,
location,
SUM(current * voltage) AS total_energy
FROM meters
INTERVAL(1n)
PARTITION BY location;
```

- On January 1st at 00:00:00 each year, archive the full data from the previous year.

```SQL
CREATE STREAM yearly_archive
PERIOD(1y)
FROM meters PARTITION BY location, groupId
INTO yearly_archive_table
AS
SELECT _wstart AS year_start,
location,
groupId,
AVG(current) AS avg_current,
SUM(current * voltage) AS total_energy
FROM meters
INTERVAL(1y)
PARTITION BY location, groupId;
```
2 changes: 2 additions & 0 deletions docs/en/14-reference/09-error-code.md
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,8 @@ Below are the business error codes for each module.
| 0x8000268E | Invalid table type | Incorrect Table type | Check and correct the SQL statement |
| 0x8000268F | Invalid ref column type | Virtual table's column type and data source column's type are different | Check and correct the SQL statement |
| 0x80002690 | Create child table using virtual super table | Create non-virtual child table using virtual super table | Check and correct the SQL statement |
| 0x800026AF | Invalid offset unit | Invalid time unit used in offset clause | Check and correct the SQL statement |
| 0x800026B0 | Invalid offset value | Invalid offset value in time window | Check and correct the SQL statement |
| 0x80002696 | Invalid sliding offset | Invalid sliding offset | Check and correct the SQL statement |
| 0x80002697 | Invalid interval offset | Invalid interval offset | Check and correct the SQL statement |
| 0x80002698 | Invalid extend value | Invalid extend value | Check and correct the SQL statement |
Expand Down
81 changes: 76 additions & 5 deletions docs/zh/14-reference/03-taos-sql/41-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,20 @@ PERIOD(period_time[, offset_time])

定时触发通过系统时间的固定间隔来驱动,本质上就是我们常说的定时任务。定时触发不属于窗口触发。各参数含义如下:

- period_time:定时间隔,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d),支持的时间范围为 `[10a, 3650d]`。
- offset_time:可选,定时偏移,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h),偏移大小应该小于 1 天
- period_time:定时间隔,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d)、周 (w)、月 (n)、年 (y),支持的时间范围为 `[10a, 3650d]`。
- offset_time:可选,定时偏移,支持的时间单位包括:毫秒 (a)、秒 (s)、分 (m)、小时 (h)、天 (d)。对于周/月/年单位,offset 必须严格小于触发周期;对于月单位,以 28 天/月为基准静态校验(如 `PERIOD(1n, 28d)` 非法)

使用说明:

- 定时间隔小于 1 天时,基准时间点为每日零点加定时偏移,根据定时间隔来确定下次触发的时间点。基准时间点在每日零点重置。每日最后一次触发的时间点与下一日的基准时间点之间的间隔可能小于定时间隔。例如:
- 定时间隔为 5 小时 30 分钟,那么当天的触发时刻为 `[00:00, 05:30, 11:00, 16:30, 22:00]`,后续每一天的触发时刻都是相同的。
- 同样的定时间隔,如果指定时间偏移为 1 分钟,那么当天的触发时刻为 `[00:01, 05:31, 11:01, 16:31, 22:01]`,后续每一天的触发时刻都是相同的。
- 同样条件下,如果建流时当前系统时间为 `12:00`,那么当天的触发时刻为 `[16:31, 22:01]`,后续每一天内的触发时刻为 `[00:01, 05:31, 11:01, 16:31, 22:01]`。
- 定时间隔大于等于 1 天时,基准时间点为当日的零点加定时偏移,后续不会重置。例如:
- 定时间隔为 1 天 1 小时,建流时当前系统时间为 `05-01 12:00`,那么在当天及随后几天的触发时刻为 `[05-02 01:00, 05-03 02:00, 05-04 03:00, 05-05 04:00, ……]`。
- 同样条件下,如果指定时间偏移为 1 分钟,那么当天及随后几天的触发时刻为 `[05-02 01:01, 05-03 02:02, 05-04 03:03, 05-05 04:04, ……]`。
- 定时间隔大于等于 1 天时,基准时间点为服务端时区的 Unix epoch(1970-01-01 00:00:00)加定时偏移,按触发间隔整除对齐,保证所有任务触发时刻全局一致。例如:
- 定时间隔为 2 天,所有使用该间隔的任务都会在距离 epoch 整数倍 2 天的时刻触发(如 1970-01-03 00:00:00, 1970-01-05 00:00:00, ...),确保全局对齐。
- 定时间隔为 1 周(`PERIOD(1w)`),触发时刻对齐每周一 00:00:00;`PERIOD(1w, 1d)` 则在每周二 00:00:00 触发。
- 定时间隔为 1 月(`PERIOD(1n)`),触发时刻对齐每月 1 日 00:00:00;`PERIOD(1n, 14d)` 则在每月 15 日 00:00:00 触发。
- 定时间隔为 1 年(`PERIOD(1y)`),触发时刻对齐每年 1 月 1 日 00:00:00;`PERIOD(1y, 31d)` 则在每年 2 月 1 日 00:00:00 触发。

适用场景:需要按照系统时间连续定时驱动计算的场景,例如每小时计算生成一次当天的统计数据,每天定时发送统计报告等。

Expand Down Expand Up @@ -1016,3 +1018,72 @@ CREATE stream stream_consumer_energy
FROM meters
WHERE ts >= cast(_tprev_localtime/1000000 AS timestamp) AND ts <= cast(_tlocaltime/1000000 AS timestamp);
```

- 每周一 00:00:00 计算上周的设备运行汇总,计算结果写入 weekly_summary 表。

```SQL
CREATE STREAM weekly_device_summary
PERIOD(1w)
FROM meters PARTITION BY location
INTO weekly_summary
AS
SELECT _wstart AS week_start,
location,
AVG(current) AS avg_current,
MAX(voltage) AS max_voltage,
COUNT(*) AS record_count
FROM meters
INTERVAL(1w)
PARTITION BY location;
```

- 每月 1 日 00:00:00 计算上月的能耗账单,计算结果写入 monthly_bill 表。

```SQL
CREATE STREAM monthly_energy_bill
PERIOD(1n)
FROM meters PARTITION BY location, groupId
INTO monthly_bill
AS
SELECT _wstart AS month_start,
location,
groupId,
SUM(current * voltage) AS total_energy
FROM meters
INTERVAL(1n)
PARTITION BY location, groupId;
```

- 每月 15 日 00:00:00 计算半月结算报表(使用 offset 参数)。

```SQL
CREATE STREAM mid_month_settlement
PERIOD(1n, 14d)
FROM meters PARTITION BY location
INTO mid_month_settlement_table
AS
SELECT _wstart AS period_start,
location,
SUM(current * voltage) AS total_energy
FROM meters
INTERVAL(1n)
PARTITION BY location;
```

- 每年 1 月 1 日 00:00:00 归档上一年的全量数据。

```SQL
CREATE STREAM yearly_archive
PERIOD(1y)
FROM meters PARTITION BY location, groupId
INTO yearly_archive_table
AS
SELECT _wstart AS year_start,
location,
groupId,
AVG(current) AS avg_current,
SUM(current * voltage) AS total_energy
FROM meters
INTERVAL(1y)
PARTITION BY location, groupId;
```
2 changes: 2 additions & 0 deletions docs/zh/14-reference/09-error-code.md
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ TSDB 错误码包括 taosc 客户端和服务端,所有语言的连接器无
| 0x8000268E | Invalid table type | 表类型不正确 | 检查并修正 SQL 语句 |
| 0x8000268F | Invalid ref column type | 虚拟表列的数据类型与数据源的数据类型不同 | 检查并修正 SQL 语句 |
| 0x80002690 | Create child table using virtual super table | 创建非虚拟子表 USING 了虚拟超级表 | 检查并修正 SQL 语句 |
| 0x800026AF | Invalid offset unit | offset 子句中使用了无效的时间单位 | 检查并修正 SQL 语句 |
| 0x800026B0 | Invalid offset value | 时间窗口中的 offset 值无效 | 检查并修正 SQL 语句 |
| 0x80002696 | Invalid sliding offset | sliding 窗口偏移量非法 | 检查并修正 SQL 语句 |
| 0x80002697 | Invalid interval offset | interval 窗口偏移量非法 | 检查并修正 SQL 语句 |
| 0x80002698 | Invalid extend value | extend 参数非法 | 检查并修正 SQL 语句 |
Expand Down
1 change: 1 addition & 0 deletions include/common/ttime.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ int32_t convertCalendarTimeFromUnitToPrecision(int64_t time, char fromUnit, int
int32_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit, int64_t* pRes);
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal, timezone_t tz, void* charsetCxt);
int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision);
int64_t alignToNaturalBoundary(int64_t timestamp, char unit, int64_t value, int64_t offset, int32_t precision, timezone_t tz);

int32_t taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision);
char* formatTimestampLocal(char* buf, int64_t val, int precision);
Expand Down
2 changes: 2 additions & 0 deletions include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_PAR_NOT_ALLOWED_FILL_MODE TAOS_DEF_ERROR_CODE(0, 0x26AC)
#define TSDB_CODE_PAR_NOT_ALLOWED_FILL_VALUES TAOS_DEF_ERROR_CODE(0, 0x26AD)
#define TSDB_CODE_PAR_INVALID_SURROUND_TIME_VALUES TAOS_DEF_ERROR_CODE(0, 0x26AE)
#define TSDB_CODE_PAR_INVALID_OFFSET_UNIT TAOS_DEF_ERROR_CODE(0, 0x26AF)
#define TSDB_CODE_PAR_INVALID_OFFSET_VALUE TAOS_DEF_ERROR_CODE(0, 0x26B0)
//
#define TSDB_CODE_PAR_PRIV_TYPE_TARGET_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x26E0)
#define TSDB_CODE_PAR_COL_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x26E1)
Expand Down
6 changes: 4 additions & 2 deletions include/util/tringbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ extern "C" {
} \
} while (0)

typedef TRINGBUF(void) TRingBufGeneral;

static FORCE_INLINE int32_t tringbufExtend(void *rbuf, int32_t expSize, int32_t eleSize) {
TRINGBUF(void) *rb = rbuf;
TRingBufGeneral *rb = (TRingBufGeneral *)rbuf;

int32_t capacity = TRINGBUF_CAPACITY(rb);
if (expSize <= capacity) return 0;
Expand Down Expand Up @@ -72,7 +74,7 @@ static FORCE_INLINE int32_t tringbufExtend(void *rbuf, int32_t expSize, int32_t
}

static FORCE_INLINE int32_t tringbufPushBatch(void *rbuf, const void *elePtr, int32_t numEle, int32_t eleSize) {
TRINGBUF(void) *rb = rbuf;
TRingBufGeneral *rb = (TRingBufGeneral *)rbuf;

int32_t ret = 0;
if (rb->size + numEle > rb->capacity) {
Expand Down
6 changes: 6 additions & 0 deletions source/common/src/msg/streamMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,9 @@ int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerD
}
case WINDOW_TYPE_PERIOD: {
// period trigger
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.periodUnit));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.offsetUnit));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->trigger.period.precision));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.period));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->trigger.period.offset));
break;
Expand Down Expand Up @@ -1253,6 +1256,9 @@ int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployM

case WINDOW_TYPE_PERIOD:
// period trigger
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.periodUnit));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, (int8_t*)&pMsg->trigger.period.offsetUnit));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->trigger.period.precision));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.period));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->trigger.period.offset));
break;
Expand Down
Loading
Loading