Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
530 changes: 148 additions & 382 deletions docs/cn/guides/40-load-data/05-continuous-data-pipelines/01-stream.md

Large diffs are not rendered by default.

247 changes: 116 additions & 131 deletions docs/cn/guides/40-load-data/05-continuous-data-pipelines/02-task.md
Original file line number Diff line number Diff line change
@@ -1,169 +1,154 @@
---
title: 使用任务自动化数据加载
sidebar_label: 任务
title: 使用任务(Task)自动化数据加载
sidebar_label: 任务(Task)
---

任务封装了特定的 SQL 语句,设计用于按预定间隔执行、由特定事件触发或作为任务序列的一部分执行。在 Databend Cloud 中,任务通常用于定期捕获流中的变更数据(例如新增记录),然后将这些数据同步到指定目标。此外,任务还支持 [Webhook](https://en.wikipedia.org/wiki/Webhook) 等消息系统,可按需发送错误消息和通知
Task 是“把 SQL 交给 Databend 代跑”的方式。你可以让它按固定频率运行、在另一任务结束后运行,或者在某个 Stream 报告有增量时再运行。下面先看定义 Task 时需要关注的几个开关,再通过两个动手示例理解它如何和 Stream 配合

## 创建任务
## Task 构建要素

本主题详细说明在 Databend Cloud 中创建任务的步骤。您可以使用 [CREATE TASK](/sql/sql-commands/ddl/task/ddl-create_task) 命令创建任务。创建任务时,请参照下图设计工作流:
- **名称与计算仓库** – 每个 Task 都需要一个 Warehouse。
```sql
CREATE TASK ingest_orders
WAREHOUSE = 'etl_wh'
AS SELECT 1;
```
- **触发方式** – `SCHEDULE = 2 MINUTE`、CRON,或 `AFTER <task>`(适用于 DAG)。
- **执行条件** – `WHEN STREAM_STATUS('mystream') = TRUE` 这类布尔表达式,只有条件满足才运行。
- **错误策略** – `SUSPEND_TASK_AFTER_NUM_FAILURES`、`ERROR_INTEGRATION` 等参数可在失败多次后暂停并发通知。
- **SQL 负载** – `AS` 后的内容就是 Task 要执行的语句,可以是一条 INSERT/COPY/MERGE,也可以是 BEGIN...END。

![alt text](/img/load/task.png)
## 示例 1:定时 COPY

1. 为任务设置名称。
2. 指定运行任务的计算集群。如需创建计算集群,请参阅 [使用计算集群](/guides/cloud/using-databend-cloud/warehouses)。
3. 确定任务触发方式。
持续生成 Parquet 并导入表。记得把 `'etl_wh_small'` 换成你自己的 Warehouse。

- 可通过指定分钟或秒为间隔来调度任务,或使用 CRON 表达式配合可选时区实现更精确的调度。
### 步骤 1: 准备演示对象

```sql title='示例:'
-- 此任务每 2 分钟运行一次
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
SCHEDULE = 2 MINUTE
AS ...
```sql
CREATE DATABASE IF NOT EXISTS task_demo;
USE task_demo;

CREATE OR REPLACE TABLE sensor_events (
event_time TIMESTAMP,
sensor_id INT,
temperature DOUBLE,
humidity DOUBLE
);

-- 此任务每天午夜(东京时间)在 Asia/Tokyo 时区运行
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
SCHEDULE = USING CRON '0 0 0 * * *' 'Asia/Tokyo'
AS ...
CREATE OR REPLACE STAGE sensor_events_stage;
```

- 或者,您可以在任务间建立依赖关系,将任务设置为 [有向无环图](https://en.wikipedia.org/wiki/Directed_acyclic_graph) 中的子任务。
### 步骤 2: Task 1 —— 生成文件

```sql title='示例:'
-- 此任务依赖于 DAG 中 'task_root' 任务的完成
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
AFTER task_root
AS ...
```sql
CREATE OR REPLACE TASK task_generate_data
WAREHOUSE = 'etl_wh_small'
SCHEDULE = 1 MINUTE
AS
COPY INTO @sensor_events_stage
FROM (
SELECT
NOW() AS event_time,
number AS sensor_id,
20 + RAND() * 5 AS temperature,
60 + RAND() * 10 AS humidity
FROM numbers(100)
)
FILE_FORMAT = (TYPE = PARQUET);
```

4. 指定任务执行条件,允许基于布尔表达式控制任务执行(可选)。
### 步骤 3: Task 2 —— 将文件导入表

```sql title='示例:'
-- 此任务每 2 分钟运行一次,仅当 'mystream' 包含数据变更时执行 AS 后的 SQL
CREATE TASK mytask
WAREHOUSE = 'default'
SCHEDULE = 2 MINUTE
// highlight-next-line
WHEN STREAM_STATUS('mystream') = TRUE
AS ...
```sql
CREATE OR REPLACE TASK task_consume_data
WAREHOUSE = 'etl_wh_small'
SCHEDULE = 1 MINUTE
AS
COPY INTO sensor_events
FROM @sensor_events_stage
PATTERN = '.*[.]parquet'
FILE_FORMAT = (TYPE = PARQUET)
PURGE = TRUE;
```

5. 指定任务出错时的处理方式,包括设置连续失败次数以暂停任务,以及指定错误通知的集成方式。有关设置错误通知的更多信息,请参阅 [配置通知集成](#configuring-notification-integrations)。

```sql title='示例:'
-- 此任务将在连续失败 3 次后暂停
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS ...

-- 此任务将使用 'my_webhook' 集成发送错误通知
CREATE TASK mytask
WAREHOUSE = 'default'
// highlight-next-line
ERROR_INTEGRATION = 'my_webhook'
AS ...
### 步骤 4: 恢复 Task

```sql
ALTER TASK task_generate_data RESUME;
ALTER TASK task_consume_data RESUME;
```

6. 指定任务将执行的 SQL 语句。
### 步骤 5: 观察运行情况

```sql title='示例:'
-- 此任务每年更新 'employees' 表中的 'age' 列,使其递增 1
CREATE TASK mytask
WAREHOUSE = 'default'
SCHEDULE = USING CRON '0 0 1 1 * *' 'UTC'
// highlight-next-line
AS
UPDATE employees
SET age = age + 1;
```sql
SHOW TASKS LIKE 'task_%';
LIST @sensor_events_stage;
SELECT * FROM sensor_events ORDER BY event_time DESC LIMIT 5;
SELECT * FROM task_history('task_consume_data', 5);
```

## 查看已创建的任务

要查看组织创建的所有任务,请登录 Databend Cloud 并转到 **数据** > **任务**。您可以查看每个任务的详细信息,包括状态和调度计划。

要查看任务运行历史记录,请转到 **监控** > **任务历史**。您可以查看每次任务运行的结果、完成时间等详细信息。

## 配置通知集成

Databend Cloud 允许您为任务配置错误通知,在任务执行出错时自动发送通知。当前支持 Webhook 集成,可实现错误事件与外部系统或服务的实时通信。

### 任务错误负载

任务错误负载指任务执行出错时作为错误通知发送的数据或信息。该负载通常包含错误详情,如错误代码、错误消息、时间戳以及其他有助于诊断和解决问题的上下文信息。

```json title='任务错误负载示例:'
{
"version": "1.0",
"messageId": "063e40ab-0b55-439e-9cd2-504c496e1566",
"messageType": "TASK_FAILED",
"timestamp": "2024-03-19T02:37:21.160705788Z",
"tenantId": "tn78p61xz",
"taskName": "my_task",
"taskId": "15",
"rootTaskName": "my_task",
"rootTaskId": "15",
"messages": [
{
"runId": "unknown",
"scheduledTime": "2024-03-19T02:37:21.157169855Z",
"queryStartTime": "2024-03-19T02:37:21.043090475Z",
"completedTime": "2024-03-19T02:37:21.157169205Z",
"queryId": "88bb9d5d-5d5e-4e52-92cc-b1953406245a",
"errorKind": "UnexpectedError",
"errorCode": "500",
"errorMessage": "query sync failed: All attempts fail:\n#1: query error: code: 1006, message: divided by zero while evaluating function `divide(1, 0)`"
}
]
}
### 步骤 6: 调整或改写 Task

```sql
ALTER TASK task_consume_data
SET SCHEDULE = 30 SECOND,
WAREHOUSE = 'etl_wh_medium';

ALTER TASK task_consume_data
MODIFY AS
COPY INTO sensor_events
FROM @sensor_events_stage
FILE_FORMAT = (TYPE = PARQUET);

ALTER TASK task_consume_data RESUME;

SELECT *
FROM task_history('task_consume_data', 5)
ORDER BY completed_time DESC;
```

### 使用示例
## 示例 2:Stream 条件 Task

在为任务配置错误通知前,您需要使用 [CREATE NOTIFICATION INTEGRATION](/sql/sql-commands/ddl/notification/ddl-create-notification) 命令创建通知集成。以下示例展示了如何为任务创建和配置通知集成。该示例使用 [Webhook.site](http://webhook.site) 模拟消息系统,接收来自 Databend Cloud 的负载
只有当 Stream 报告“有增量”时才运行,避免空跑

1. 在浏览器中打开 [Webhook.site](http://webhook.site),获取您的 Webhook URL。
### 步骤 1: 创建 Stream 与结果表

![alt text](/img/load/webhook-1.png)
```sql
CREATE OR REPLACE STREAM sensor_events_stream
ON TABLE sensor_events
APPEND_ONLY = false;

CREATE OR REPLACE TABLE sensor_events_latest AS
SELECT *
FROM sensor_events
WHERE 1 = 0;
```

2. 在 Databend Cloud 中创建通知集成,然后创建带通知集成的任务:
### 步骤 2: 定义条件 Task

```sql
-- 创建名为 'my_task' 的任务,每分钟运行一次,错误通知发送至 'my_webhook'
-- 故意除以零以生成错误
CREATE TASK my_task
WAREHOUSE = 'default'
SCHEDULE = 1 MINUTE
ERROR_INTEGRATION = 'my_webhook'
CREATE OR REPLACE TASK task_stream_merge
WAREHOUSE = 'etl_wh_small'
SCHEDULE = 1 MINUTE
WHEN STREAM_STATUS('task_demo.sensor_events_stream') = TRUE
AS
SELECT 1 / 0;

-- 创建名为 'my_webhook' 的通知集成,用于发送 webhook 通知
CREATE NOTIFICATION INTEGRATION my_webhook
TYPE = WEBHOOK
ENABLED = TRUE
WEBHOOK = (
url = '<YOUR-WEBHOOK_URL>',
method = 'POST'
);
INSERT INTO sensor_events_latest
SELECT *
FROM sensor_events_stream;

-- 创建后恢复任务
ALTER TASK my_task RESUME;
ALTER TASK task_stream_merge RESUME;
```

3. 稍等片刻,您将看到 webhook 开始接收来自创建的任务的负载。
### 步骤 3: 查看增量与历史

![alt text](/img/load/webhook-2.png)
```sql
SELECT *
FROM sensor_events_latest
ORDER BY event_time DESC
LIMIT 5;

## 使用示例
SELECT *
FROM task_history('task_stream_merge', 5);
```

完整演示如何通过流捕获数据变更并使用任务同步,请参阅 [示例:实时跟踪和转换数据](01-stream.md#example-tracking-and-transforming-data-in-real-time)
只要 `STREAM_STATUS('<database>.<stream_name>')` 返回 TRUE(例如 `task_demo.sensor_events_stream`),Task 就会运行;否则保持暂停,直到下一批增量到达
32 changes: 15 additions & 17 deletions docs/cn/guides/40-load-data/05-continuous-data-pipelines/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@
title: 持续数据管道
---

## 数据管道简介
在 Databend 中构建 CDC(Change Data Capture)流程只需两种原语:

数据管道能够自动化地将来自不同来源的数据移动并转换至 Databend。它们确保数据流畅传输,对于快速且持续地处理和分析数据至关重要。
- **Stream**:捕获表的 INSERT/UPDATE/DELETE,一直保留到被消费。
- **Task**:按计划或在 Stream 有新数据时自动运行 SQL。

在持续数据管道中,一项名为 **变更数据捕获 (CDC)** 的特殊功能发挥着关键作用。借助 Databend,CDC 变得简单高效,仅需通过 Streams 和 Tasks 执行几条简单命令即可实现。
## 快速入口

## 理解变更数据捕获 (CDC)
- [示例 1:仅追加 Stream](./01-stream.md#示例-1仅追加-stream) – 捕获插入并写入目标表。
- [示例 2:标准 Stream](./01-stream.md#示例-2标准-stream含-update-delete) – 了解更新、删除在 Stream 中的表现。
- [示例 3:增量 Join](./01-stream.md#示例-3增量-join--计算) – 使用 `WITH CONSUME` 做批式增量聚合。
- [示例 1:定时 COPY 任务](./02-task.md#示例-1定时-copy) – 两个任务生成并导入 Parquet。
- [示例 2:Stream 条件任务](./02-task.md#示例-2基于-stream-的条件任务) – 只有 Stream 有增量时才触发。

CDC 是指流对象捕获应用于数据库表的插入、更新和删除操作的过程。它包含有关每次变更的元数据,从而能够基于修改后的数据执行操作。Databend 中的 CDC 在源表中跟踪行级变更,创建一个"变更表"来反映两个事务时间点之间的数据修改。
## 为什么选择 Databend CDC

## 使用变更数据捕获 (CDC) 的优势
- **轻量**:Stream 只保存尚未消费的增量。
- **事务一致**:消费 Stream 的语句成功才会清空,失败即回滚。
- **增量友好**:配合 `WITH CONSUME` 能多次运行同一 SQL,每次只处理新数据。
- **自动化**:Task 让任何 SQL 都能定时/触发执行。

1. **快速实时数据加载**:优化来自事务数据库的实时数据加载流程,几乎可在秒级完成。
2. **不影响原始数据**:安全可靠,不会损坏数据或其来源系统。
3. **克服批量 ETL 的局限性**:超越传统的批量 ETL 方法,后者对于持续数据更新而言速度较慢且效率较低。

## Databend 持续数据管道的核心特性

Databend 通过以下特性增强了持续数据管道:

- **持续数据跟踪与转换**:支持数据的实时跟踪与转换。[通过 Streams 了解数据跟踪与转换的更多信息](./01-stream.md)。

- **循环任务**:支持调度和管理循环数据处理任务,确保数据管道的高效性和可靠性。该功能目前处于私有预览阶段。
先完成 Stream 示例,再组合 Task,即可搭建自己的持续数据管道。
Loading
Loading