Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .translation-init
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Translation initialization: 2025-09-18T03:36:24.323851
Translation initialization: 2025-09-24T01:35:39.530531
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import FunctionDescription from '@site/src/components/FunctionDescription';

<FunctionDescription description="Introduced or updated: v1.2.738"/>

CREATE TASK 语句用于定义一个新任务(Task),该任务可按计划或基于有向无环图(DAG)的任务图执行指定的 SQL 语句。
CREATE TASK 语句用于定义一个新任务,该任务可按计划或基于 DAG(有向无环图)的任务图执行指定的 SQL 语句。

**注意:** 此功能仅在 Databend Cloud 中开箱即用。

Expand All @@ -17,7 +17,7 @@ CREATE TASK 语句用于定义一个新任务(Task),该任务可按计划
CREATE [ OR REPLACE ] TASK [ IF NOT EXISTS ] <name>
WAREHOUSE = <string>
SCHEDULE = { <num> MINUTE | <num> SECOND | USING CRON <expr> <time_zone> }
[ AFTER <string> [ , <string> , ... ]]
[ AFTER <string>
[ WHEN <boolean_expr> ]
[ SUSPEND_TASK_AFTER_NUM_FAILURES = <num> ]
[ ERROR_INTEGRATION = <string> ]
Expand All @@ -29,39 +29,39 @@ AS

| 参数 | 描述 |
| ------------------------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| IF NOT EXISTS | 可选。若指定,仅当同名任务不存在时才创建任务。 |
| name | 任务名称必填。 |
| WAREHOUSE | 必填。指定任务使用的虚拟计算集群(Virtual Warehouse)。 |
| SCHEDULE | 必填。定义任务运行计划,可按分钟指定,或使用 CRON 表达式与时区。 |
| SUSPEND_TASK_AFTER_NUM_FAILURES | 可选。连续失败多少次后自动挂起任务。 |
| AFTER | 列出必须完成后才启动此任务的任务。 |
| IF NOT EXISTS | 可选。若指定,则仅当同名任务不存在时才创建任务。 |
| name | 任务名称必填。 |
| WAREHOUSE | 必填。指定任务使用的虚拟计算集群(Warehouse)。 |
| SCHEDULE | 必填。定义任务运行计划。可用分钟数或 CRON 表达式加时区表示。 |
| SUSPEND_TASK_AFTER_NUM_FAILURES | 可选。连续失败多少次后任务将自动挂起。 |
| AFTER | 列出在此任务开始前必须完成的任务。 |
| WHEN boolean_expr | 任务运行前必须为真的条件。 |
| [ERROR_INTEGRATION](../16-notification/index.md) | 可选。用于任务错误通知的通知集成(Notification Integration)名称,并应用特定的[任务错误负载](./10-task-error-integration-payload.md)。 |
| [ERROR_INTEGRATION](../16-notification/index.md) | 可选。用于任务错误通知的集成名称,附带特定的[任务错误负载](./10-task-error-integration-payload.md)。 |
| COMMENT | 可选。作为任务注释或描述的字符串字面量。 |
| session_parameter | 可选。指定任务运行时的会话参数。注意会话参数必须放在 CREATE TASK 语句中所有其他任务参数之后。 |
| sql | 任务将执行的 SQL 语句,可为单条语句或脚本必填。 |
| session_parameter | 可选。指定任务运行时的会话参数。注意会话参数必须放在 CREATE TASK 语句中所有其他任务参数之后。 |
| sql | 任务将执行的 SQL 语句,可为单条语句或脚本必填。 |

### 使用须知

- 必须为独立任务或任务 DAG 中的根任务定义计划;否则,任务仅在手动执行 `EXECUTE TASK` 时运行
- 不能为 DAG 中的子任务指定计划
- 创建任务后,必须执行 `ALTER TASK … RESUME`,任务才会按定义中的参数运行
- 独立任务或任务 DAG 中的根任务必须定义计划;否则,任务仅在使用 EXECUTE TASK 手动执行时运行
- DAG 中的子任务不能指定计划
- 创建任务后,必须执行 ALTER TASK … RESUME,任务才会按定义参数运行
- WHEN 条件仅支持 `<boolean_expression>` 的子集。
任务 WHEN 子句支持以下内容:
任务的 WHEN 子句支持以下内容:

- SQL 表达式中支持 [STREAM_STATUS](../../../20-sql-functions/17-table-functions/stream-status.md) 函数求值。该函数指示指定 Stream 是否包含变更跟踪数据。可在当前运行开始前评估指定 Stream 是否包含变更数据;若结果为 FALSE,则任务不运行。
- [STREAM_STATUS](../../../20-sql-functions/17-table-functions/stream-status.md) 可在 SQL 表达式中求值。该函数指示指定流是否包含变更跟踪数据。可在当前运行开始前评估指定流是否包含变更数据。若结果为 FALSE,则任务不运行。
- 布尔运算符,如 AND、OR、NOT 等。
- 数值、字符串与布尔类型之间的类型转换
- 数值、字符串与布尔类型之间的强制转换
- 比较运算符,如等于、不等于、大于、小于等。

:::note
警告:在任务中使用 STREAM_STATUS 时,引用 Stream 必须包含数据库名(例如 `STREAM_STATUS('mydb.stream_name')`)。
警告:在任务中使用 STREAM_STATUS 时,引用流必须包含数据库名(例如 `STREAM_STATUS('mydb.stream_name')`)。
:::

- 多个任务从同一 Table Stream 消费变更数据时,会获取不同的增量。当某任务通过 DML 语句消费 Stream 中的变更数据后,Stream 会推进 Offset,变更数据将不再对后续任务可用。当前建议仅让一个任务消费同一 Stream 的变更数据;可为同一表创建多个 Stream,由不同任务分别消费。
- 任务每次执行不会重试;执行均为串行。脚本中的 SQL 逐一执行,无并行,确保任务执行顺序与依赖关系
- 基于间隔的任务严格遵循固定间隔点。若当前任务执行时间超过间隔单位,则下一任务立即执行;否则,下一任务等待至下一间隔单位触发。例如,若任务定义 1 秒间隔,而某次执行耗时 1.5 秒,则下一任务立即执行;若耗时 0.5 秒,则下一任务等待至下一 1 秒间隔开始
- 创建任务时可指定会话参数,也可后续通过 `ALTER TASK` 修改,例如:
- 多个任务从同一表流消费变更数据时,会获取不同的增量。当某任务使用 DML 语句消费流中的变更数据后,流会推进偏移量,变更数据不再对后续任务可见。当前建议仅让一个任务消费同一流。可为同一表创建多个流,由不同任务分别消费。
- 任务每次执行不会重试;执行串行进行。脚本中的 SQL 按顺序逐条执行,无并行。这确保了任务执行的顺序与依赖关系
- 基于间隔的任务会严格按固定间隔点触发。若当前任务执行耗时超过间隔单位,则下一次任务立即执行;否则,等待下一间隔单位。例如,任务间隔为 1 秒,若某次执行耗时 1.5 秒,则下一次立即执行;若耗时 0.5 秒,则等待至下一个 1 秒刻度
- 会话参数既可在创建任务时指定,也可后续使用 ALTER TASK 语句修改。例如:
```sql
ALTER TASK simple_task SET
enable_query_result_cache = 1,
Expand All @@ -73,11 +73,11 @@ AS
- `SCHEDULE` 参数中的 cron 表达式必须**恰好包含 6 个字段**。
- 各字段含义如下:
1. **秒**(0–59)
2. **分钟**(0–59)
3. **小时**(0–23)
2. ****(0–59)
3. ****(0–23)
4. **日**(1–31)
5. **月**(1–12 或 JAN–DEC)
6. **星期**(0–6,0 表示星期日,或 SUN–SAT)
6. **星期**(0–6,0 表示周日,或 SUN–SAT)

#### Cron 表达式示例:

Expand All @@ -86,23 +86,23 @@ AS

- **每分钟:**
- `USING CRON '0 * * * * *' 'UTC'`
- 在每分钟开始时运行任务
- 在每分钟的第 0 秒运行

- **每小时第 15 分钟:**
- **每小时的第 15 分钟:**
- `USING CRON '0 15 * * * *' 'UTC'`
- 在每小时的第 15 分钟运行任务
- 在每小时的 15 分 0 秒运行

- **每周一中午 12:00:00:**
- `USING CRON '0 0 12 * * 1' 'UTC'`
- 在每周一中午运行任务
- 在每周一中午运行

- **每月第一天午夜:**
- **每月首日午夜:**
- `USING CRON '0 0 0 1 * *' 'UTC'`
- 在每月第一天的午夜运行任务
- 在每月 1 日 00:00:00 运行

- **每个工作日上午 8:30:00:**
- **每个工作日 8:30:00 AM:**
- `USING CRON '0 30 8 * * 1-5' 'UTC'`
- 在周一至周五上午 8:30 运行任务
- 在周一至周五的 8:30 AM 运行

## 使用示例

Expand All @@ -117,7 +117,7 @@ AS
INSERT INTO summary_table SELECT * FROM source_table;
```

本例创建名为 `my_daily_task` 的任务(Task)。它使用 **compute_wh** 计算集群(Warehouse)运行 SQL,将数据从 source_table 插入 summary_table,并按 **CRON 表达式** 于**太平洋时间每天上午 9 点**执行。
本例创建名为 `my_daily_task` 的任务,使用 **compute_wh** 计算集群,将 source_table 的数据插入 summary_table,并按 **CRON 表达式**于**太平洋时间每天上午 9 点**执行。

### 自动挂起

Expand All @@ -130,7 +130,7 @@ AS
INSERT INTO compaction_test.test VALUES((1));
```

本例创建名为 `mytask` 的任务(Task)(若不存在)。该任务分配至 **system** 计算集群(Warehouse),计划**每 2 分钟**运行一次,**连续失败 3 次**将**自动挂起**,并对 compaction_test.test 表执行 INSERT。
本例创建名为 `mytask` 的任务(如不存在),分配给 **system** 计算集群,每 **2 分钟**运行一次,**连续失败 3 次后自动挂起**,对 compaction_test.test 表执行 INSERT。

### 秒级调度

Expand All @@ -144,19 +144,19 @@ AS
GROUP BY sales_date;
```

本例创建名为 `daily_sales_summary` 的任务(Task),具备**秒级调度**,计划**每 30 秒**运行一次。它使用 **analytics** 计算集群(Warehouse),聚合 sales_data 表数据计算每日销售汇总
本例创建名为 `daily_sales_summary` 的任务,采用**秒级调度**,每 **30 秒**运行一次,使用 **analytics** 计算集群,对 sales_data 表汇总每日销售额

### 任务依赖

```sql
CREATE TASK IF NOT EXISTS process_orders
WAREHOUSE = 'etl'
AFTER task1, task2
AFTER task1
AS
INSERT INTO data_warehouse.orders SELECT * FROM staging.orders;
```

本例创建名为 `process_orders` 的任务(Task),定义为在 **task1** 与 **task2** **成功完成后**运行,用于在任务 **DAG** 中建立**依赖关系**。它使用 **etl** 计算集群(Warehouse),将数据从 Staging Area 传输至 Data Warehouse
本例创建名为 `process_orders` 的任务,在 **task1** 成功完成后运行,用于在任务 **DAG** 中建立**依赖关系**。任务使用 **etl** 计算集群,将数据从 staging 区传输到数据仓库

> 提示:使用 AFTER 参数时无需设置 SCHEDULE 参数。

Expand All @@ -172,7 +172,7 @@ AS
WHERE archived_date < DATEADD(HOUR, -24, CURRENT_TIMESTAMP());
```

本例创建名为 `hourly_data_cleanup` 的任务(Task)。它使用 **maintenance** 计算集群(Warehouse),计划**每小时**运行,删除 archived_data 表中 24 小时前的数据,并仅在 **STREAM_STATUS** 函数确认 `db1.change_stream` 包含变更数据时运行
本例创建名为 `hourly_data_cleanup` 的任务,使用 **maintenance** 计算集群**每小时**运行,删除 archived_data 表中超过 24 小时的数据。仅当 **STREAM_STATUS** 函数检测到 `db1.change_stream` 包含变更数据时才运行

### 错误集成

Expand All @@ -190,7 +190,7 @@ AS
END;
```

本例创建名为 `mytask` 的任务(Task)。它使用 **mywh** 计算集群(Warehouse),计划**每 30 秒**运行一次,执行包含 INSERT 与 DELETE 语句的 **BEGIN 块**,并在两条语句执行后提交事务。任务失败时将触发名为 **myerror** 的**错误集成(Error Integration)**。
本例创建名为 `mytask` 的任务,使用 **mywh** 计算集群,每 **30 秒**运行一次,执行包含 INSERT 与 DELETE **BEGIN 块**,完成后提交事务。任务失败时将触发名为 **myerror** 的**错误集成**。

### 会话参数

Expand All @@ -208,4 +208,4 @@ AS
GROUP BY product_category;
```

本例创建名为 `cache_enabled_task` 的任务(Task),并启用查询结果缓存的**会话参数(Session Parameter)**。任务计划**每 5 分钟**运行,使用 **analytics** 计算集群(Warehouse)。会话参数(Session Parameter) **`enable_query_result_cache = 1`** 与 **`query_result_cache_min_execute_secs = 5`** 置于**所有其他任务参数之后**,为执行时间 ≥5 秒的查询启用结果缓存。若底层数据未变,可提升相同任务后续执行的性能
本例创建名为 `cache_enabled_task` 的任务,带启用查询结果缓存的**会话参数**,每 **5 分钟**运行一次,使用 **analytics** 计算集群。会话参数 **`enable_query_result_cache = 1`** 与 **`query_result_cache_min_execute_secs = 5`** 置于所有其他参数之后,为执行时间 ≥5 秒的查询启用缓存,若底层数据未变,可**提升**后续执行的**性能**