diff --git a/docs/cn/guides/40-load-data/05-continuous-data-pipelines/01-stream.md b/docs/cn/guides/40-load-data/05-continuous-data-pipelines/01-stream.md index fe0c2fc40a..799e97c827 100644 --- a/docs/cn/guides/40-load-data/05-continuous-data-pipelines/01-stream.md +++ b/docs/cn/guides/40-load-data/05-continuous-data-pipelines/01-stream.md @@ -1,452 +1,218 @@ --- -title: 通过 Stream 追踪与转换数据 -sidebar_label: Stream +title: 使用 Stream 追踪与转换数据 +sidebar_label: 流(Stream) --- -import StepsWrap from '@site/src/components/StepsWrap'; -import StepContent from '@site/src/components/Steps/step-content'; +Stream 是 Databend 用来记录行级变更的“增量表”。每次提交 INSERT/UPDATE/DELETE,Stream 都会缓存对应的最终状态,直到消费为止。本页沿用英语版的三个动手示例,帮助你快速体会两种模式以及增量计算的写法。 -Databend 中的 Stream 是一种动态实时记录表变更的机制。通过创建 Stream 可以捕获并跟踪关联表的修改,实现对数据变更的持续消费与分析。 +## 模式速览 -### Stream 工作原理 +| 模式 | 捕获内容 | 适用场景 | +| --- | --- | --- | +| Standard Stream(`APPEND_ONLY = false`) | INSERT + UPDATE + DELETE,并在被消费前合并为每行的最新状态 | 需要完整记录变更、可回放更新/删除 | +| Append-Only Stream(默认,`APPEND_ONLY = true`) | 只捕获 INSERT | 纯追加型事实/日志流水 | -Stream 支持两种工作模式:**标准模式** 和 **仅追加模式**。在 [CREATE STREAM](/sql/sql-commands/ddl/stream/create-stream) 时通过 `APPEND_ONLY` 参数指定(默认为 `true`)。 +Stream 不复制整张表,只保留“尚未消费的增量”。消费由谁触发、何时触发完全由你掌控。 -- **标准模式**:捕获所有类型的数据变更,包括插入、更新和删除。 -- **仅追加模式**:此模式下 Stream 仅包含数据插入记录,不捕获更新或删除操作。 +## 示例 1:Append-Only Stream -Databend Stream 的设计哲学是专注于捕获数据的最终状态。例如,如果您插入一个值然后多次更新它,Stream 只会保留该值在被消费前的最新状态。以下示例展示了两种模式下 Stream 的表现形式和工作原理。 - - - - -#### 创建 Stream 捕获变更 - -首先创建两个表,然后分别为它们创建不同模式的 Stream 来捕获表变更。 +### 步骤 1: 创建基表与 Stream ```sql --- 创建表并插入初始值 -CREATE TABLE t_standard(a INT); -CREATE TABLE t_append_only(a INT); +CREATE OR REPLACE TABLE sensor_readings ( + sensor_id INT, + temperature DOUBLE +); --- 创建两种模式的 Stream:标准模式和仅追加模式 -CREATE STREAM s_standard ON TABLE t_standard APPEND_ONLY=false; -CREATE STREAM s_append_only ON TABLE t_append_only APPEND_ONLY=true; +CREATE OR REPLACE STREAM sensor_readings_stream + ON TABLE sensor_readings; -- APPEND_ONLY 默认即为 true ``` -使用 [SHOW FULL STREAMS](/sql/sql-commands/ddl/stream/show-streams) 命令可以查看已创建的 Stream 及其模式: +### 步骤 2: 插入并查看增量 ```sql -SHOW FULL STREAMS; - -┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ created_on │ name │ database │ catalog │ table_on │ owner │ comment │ mode │ invalid_reason │ -├────────────────────────────┼───────────────┼──────────┼─────────┼───────────────────────┼──────────────────┼─────────┼─────────────┼────────────────┤ -│ 2024-02-18 16:39:58.996763 │ s_append_only │ default │ default │ default.t_append_only │ NULL │ │ append_only │ │ -│ 2024-02-18 16:39:58.966942 │ s_standard │ default │ default │ default.t_standard │ NULL │ │ standard │ │ -└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ -``` +INSERT INTO sensor_readings VALUES (1, 21.5), (2, 19.7); -现在向每个表插入两个值,观察 Stream 捕获的内容: - -```sql --- 插入两个新值 -INSERT INTO t_standard VALUES(2), (3); -INSERT INTO t_append_only VALUES(2), (3); - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 2 │ INSERT │ 8cd000827f8140d9921f897016e5a88e000000 │ false │ -│ 3 │ INSERT │ 8cd000827f8140d9921f897016e5a88e000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - -SELECT * FROM s_append_only; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 2 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000000 │ -│ 3 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000001 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ +SELECT sensor_id, temperature, change$action, change$is_update +FROM sensor_readings_stream; ``` -以上结果表明两个 Stream 都成功捕获了新插入的记录。关于结果中 Stream 列的含义,请参阅 [Stream 列结构](#stream-columns) 。现在让我们更新并删除一个新插入的值,观察 Stream 捕获的差异。 - -```sql -UPDATE t_standard SET a = 4 WHERE a = 2; -UPDATE t_append_only SET a = 4 WHERE a = 2; - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -| 4 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000000 │ false | -│ 3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - -SELECT * FROM s_append_only; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 4 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000000 │ -│ 3 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000001 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ - -DELETE FROM t_standard WHERE a = 4; -DELETE FROM t_append_only WHERE a = 4; - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - -SELECT * FROM s_append_only; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 3 │ INSERT │ false │ bfed6c91f3e4402fa477b6853a2d2b58000001 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ ``` - -到目前为止,由于我们尚未处理 Stream,两种模式之间没有显著差异。所有变更都被整合并表现为 INSERT 操作。**Stream 可以通过任务、DML (数据操作语言) 操作或使用 [WITH CONSUME](/sql/sql-commands/query-syntax/with-consume) 和 [WITH Stream Hints](/sql/sql-commands/query-syntax/with-stream-hints) 的查询来消费**。消费后 Stream 将不包含数据,但可以继续捕获新的变更(如果有)。为了进一步分析差异,让我们继续消费 Stream 并检查输出。 - - - - -#### 消费 Stream - -创建两个新表,并将 Stream 捕获的内容插入其中。 - -```sql -CREATE TABLE t_consume_standard(b INT); -CREATE TABLE t_consume_append_only(b INT); - -INSERT INTO t_consume_standard SELECT a FROM s_standard; -INSERT INTO t_consume_append_only SELECT a FROM s_append_only; - -SELECT * FROM t_consume_standard; - -┌─────────────────┐ -│ b │ -├─────────────────┤ -│ 3 │ -└─────────────────┘ - -SELECT * FROM t_consume_append_only; - -┌─────────────────┐ -│ b │ -├─────────────────┤ -│ 3 │ -└─────────────────┘ +┌────────────┬───────────────┬───────────────┬──────────────────┐ +│ sensor_id │ temperature │ change$action │ change$is_update │ +├────────────┼───────────────┼───────────────┼──────────────────┤ +│ 1 │ 21.5 │ INSERT │ false │ +│ 2 │ 19.7 │ INSERT │ false │ +└────────────┴───────────────┴───────────────┴──────────────────┘ ``` -如果现在查询 Stream,会发现它们已为空,因为已被消费。 +### 步骤 3: 消费并写入目标表 ```sql --- 空结果 -SELECT * FROM s_standard; +CREATE OR REPLACE TABLE sensor_readings_latest AS +SELECT sensor_id, temperature +FROM sensor_readings_stream; --- 空结果 -SELECT * FROM s_append_only; +SELECT * FROM sensor_readings_stream; -- 已为空 ``` - - - -#### 捕获新变更 +## 示例 2:Standard Stream(含 UPDATE / DELETE) -现在将每个表中的值从 `3` 更新为 `4`,然后再次检查它们的 Stream: +### 步骤 1: 为同一张表建立 Standard Stream ```sql -UPDATE t_standard SET a = 4 WHERE a = 3; -UPDATE t_append_only SET a = 4 WHERE a = 3; - - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 3 │ DELETE │ 1dd5cab0b1b64328a112db89d602ca04000001 │ true │ -│ 4 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ true │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - --- 空结果 -SELECT * FROM s_append_only; +CREATE OR REPLACE STREAM sensor_readings_stream_std + ON TABLE sensor_readings + APPEND_ONLY = false; ``` -以上结果表明,标准模式 Stream 将 UPDATE 操作处理为两个动作的组合:一个 DELETE 动作移除旧值 (`3`),一个 INSERT 动作添加新值 (`4`)。当将 `3` 更新为 `4` 时,必须首先删除现有值 `3`,因为它已不在最终状态中,然后插入新值 `4`。这种行为反映了标准模式 Stream 如何仅捕获最终变更,将更新表示为同一行的删除(移除旧值)和插入(添加新值)序列。 - -另一方面,仅追加模式 Stream 没有捕获任何内容,因为它设计为仅记录新数据添加(INSERT)而忽略更新或删除。 - -如果现在删除值 `4`,可以得到以下结果: +### 步骤 2: 执行 UPDATE / DELETE / INSERT 并比较两个 Stream ```sql -DELETE FROM t_standard WHERE a = 4; -DELETE FROM t_append_only WHERE a = 4; +UPDATE sensor_readings SET temperature = 22 WHERE sensor_id = 1; -- 更新 +DELETE FROM sensor_readings WHERE sensor_id = 2; -- 删除 +INSERT INTO sensor_readings VALUES (3, 18.5); -- 新增 -SELECT * FROM s_standard; +SELECT * FROM sensor_readings_stream; -- 仍为空(Append-Only Stream 模式忽略 UPDATE / DELETE) -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 3 │ DELETE │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - --- 空结果 -SELECT * FROM s_append_only; +SELECT sensor_id, temperature, change$action, change$is_update +FROM sensor_readings_stream_std +ORDER BY change$row_id; ``` -我们可以看到,两种模式的 Stream 都能够捕获插入操作,以及在 Stream 被消费前对插入值所做的任何更新和删除。但在消费后,如果对先前插入的数据进行更新或删除,只有标准模式 Stream 能够捕获这些变更,并将其记录为 DELETE 和 INSERT 操作。 - - - - -### 流式消费的事务支持 - -在 Databend 中,流式消费在单语句事务中具有事务性保障。这意味着: - -**事务成功**:如果事务提交,则流被消费。例如: - -```sql -INSERT INTO table SELECT * FROM stream; ``` - -若该 `INSERT` 事务提交成功,则流数据被消费。 - -**事务失败**:若事务失败,流数据保持不变且可供后续消费。 - -**并发访问**:_同一时间只有一个事务能成功消费流数据_。若多个事务尝试消费同一流,仅首个提交的事务会成功,其他事务将失败。 - -### 流的表元数据 - -**流本身不存储表的任何数据**。为表创建流后,Databend 会为该表添加特定的隐藏元数据列用于变更追踪,包括: - -| 列名 | 描述 | -| ---------------------- | --------------------------------------------------------------------------------- | -| \_origin_version | 标识该行最初创建时的表版本。 | -| \_origin_block_id | 标识该行先前所属的数据块 ID。 | -| \_origin_block_row_num | 标识该行在先前所属数据块中的行号。 | - -此前文档中的隐藏列 `_row_version` 已被移除,现已不可用。 - -要查看这些列的值,可使用 SELECT 语句: - -```sql title='示例:' -CREATE TABLE t(a int); -INSERT INTO t VALUES (1); -CREATE STREAM s ON TABLE t; -INSERT INTO t VALUES (2); -SELECT - *, - _origin_version, - _origin_block_id, - _origin_block_row_num -FROM - t; - -┌───────────┬──────────────────┬──────────────────────┬───────────────────────┐ -│ a │ _origin_version │ _origin_block_id │ _origin_block_row_num │ -├───────────┼──────────────────┼──────────────────────┼───────────────────────┤ -│ 1 │ NULL │ NULL │ NULL │ -│ 2 │ NULL │ NULL │ NULL │ -└───────────┴──────────────────┴──────────────────────┴───────────────────────┘ - -UPDATE t SET a = 3 WHERE a = 2; -SELECT - *, - _origin_version, - _origin_block_id, - _origin_block_row_num -FROM - t; - -┌───────────┬──────────────────┬─────────────────────────────────────────────┬───────────────────────┐ -│ a │ _origin_version │ _origin_block_id │ _origin_block_row_num │ -├───────────┼──────────────────┼─────────────────────────────────────────────┼───────────────────────┤ -│ 3 │ 2317 │ 132795849016460663684755265365603707394 │ 0 │ -│ 1 │ NULL │ NULL │ NULL │ -└───────────┴──────────────────┴─────────────────────────────────────────────┴───────────────────────┘ -``` - -### 流专用列 - -可直接通过 SELECT 语句查询流以获取变更记录。查询时可包含以下隐藏列获取变更详情: - -| 列名 | 描述 | -| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| change$action | 变更类型:INSERT 或 DELETE。 | -| change$is_update | 标识该变更是否为 UPDATE 操作的一部分。在流中,UPDATE 操作会分解为 DELETE 和 INSERT 操作,此字段将标记为 `true`。 | -| change$row_id | 用于追踪变更的每行唯一标识符。 | - -```sql title='示例:' -CREATE TABLE t(a int); -INSERT INTO t VALUES (1); -CREATE STREAM s ON TABLE t; -INSERT INTO t VALUES (2); - -SELECT * FROM s; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 2 │ INSERT │ false │ a577745c6a404f3384fa95791eb43f22000000 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ - --- 若新增行后更新该行 --- 流会将变更合并为包含更新值的 INSERT 操作 -UPDATE t SET a = 3 WHERE a = 2; -SELECT * FROM s; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 3 │ INSERT │ false │ a577745c6a404f3384fa95791eb43f22000000 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ +┌────────────┬───────────────┬───────────────┬──────────────────┐ +│ sensor_id │ temperature │ change$action │ change$is_update │ +├────────────┼───────────────┼───────────────┼──────────────────┤ +│ 1 │ 21.5 │ DELETE │ true │ +│ 1 │ 22 │ INSERT │ true │ +│ 2 │ 19.7 │ DELETE │ false │ +│ 3 │ 18.5 │ INSERT │ false │ +└────────────┴───────────────┴───────────────┴──────────────────┘ ``` -### 示例:实时数据追踪与转换 +更新对应“删除旧值 + 写入新值”,独立的 DELETE/INSERT 则各自占一条,因而在消费前始终能拿到最终状态。 -以下示例展示如何使用流实时捕获和追踪用户活动。 +## 示例 3:增量 Join 与计算 -#### 1. 创建表结构 +多条 Stream 可以在需要时“一次性消费”。借助 `WITH CONSUME`,每次只处理尚未消费的增量,既适合批式作业也能容忍不同步到达的变更。 -示例使用三个表: - -- `user_activities` 表记录用户活动 -- `user_profiles` 表存储用户档案 -- `user_activity_profiles` 表是前两者的合并视图 - -创建 `activities_stream` 流用于捕获 `user_activities` 表的实时变更,随后通过查询消费该流来更新 `user_activity_profiles` 表。 +### 步骤 1: 创建基表与 Stream ```sql --- 创建用户活动记录表 -CREATE TABLE user_activities ( - user_id INT, - activity VARCHAR, - timestamp TIMESTAMP +CREATE OR REPLACE TABLE customers ( + customer_id INT, + segment VARCHAR, + city VARCHAR ); --- 创建用户档案表 -CREATE TABLE user_profiles ( - user_id INT, - username VARCHAR, - location VARCHAR +CREATE OR REPLACE TABLE orders ( + order_id INT, + customer_id INT, + amount DOUBLE ); --- 向用户档案表插入数据 -INSERT INTO user_profiles VALUES (101, 'Alice', 'New York'); -INSERT INTO user_profiles VALUES (102, 'Bob', 'San Francisco'); -INSERT INTO user_profiles VALUES (103, 'Charlie', 'Los Angeles'); -INSERT INTO user_profiles VALUES (104, 'Dana', 'Chicago'); - --- 创建用户活动与档案的合并视图表 -CREATE TABLE user_activity_profiles ( - user_id INT, - username VARCHAR, - location VARCHAR, - activity VARCHAR, - activity_timestamp TIMESTAMP -); +CREATE OR REPLACE STREAM customers_stream ON TABLE customers; +CREATE OR REPLACE STREAM orders_stream ON TABLE orders; ``` -#### 2. 创建流 - -在 `user_activities` 表上创建流以捕获实时变更: +### 步骤 2: 首批数据 ```sql -CREATE STREAM activities_stream ON TABLE user_activities; -``` +INSERT INTO customers VALUES + (101, 'VIP', 'Shanghai'), + (102, 'Standard', 'Beijing'), + (103, 'VIP', 'Shenzhen'); -#### 3. 向源表插入数据 +INSERT INTO orders VALUES + (5001, 101, 199.0), + (5002, 101, 59.0), + (5003, 102, 89.0); +``` -向 `user_activities` 表插入数据以产生变更: +### 步骤 3: 第一次增量聚合 ```sql -INSERT INTO user_activities VALUES (102, 'logout', '2023-12-19 09:00:00'); -INSERT INTO user_activities VALUES (103, 'view_profile', '2023-12-19 09:15:00'); -INSERT INTO user_activities VALUES (104, 'edit_profile', '2023-12-19 10:00:00'); -INSERT INTO user_activities VALUES (101, 'purchase', '2023-12-19 10:30:00'); -INSERT INTO user_activities VALUES (102, 'login', '2023-12-19 11:00:00'); +WITH + orders_delta AS ( + SELECT customer_id, amount + FROM orders_stream WITH CONSUME + ), + customers_delta AS ( + SELECT customer_id, segment + FROM customers_stream WITH CONSUME + ) +SELECT + o.customer_id, + c.segment, + SUM(o.amount) AS incremental_sales +FROM orders_delta AS o +JOIN customers_delta AS c + ON o.customer_id = c.customer_id +GROUP BY o.customer_id, c.segment +ORDER BY o.customer_id; ``` -#### 4. 消费流数据更新目标表 +``` +┌──────────────┬───────────┬────────────────────┐ +│ customer_id │ segment │ incremental_sales │ +├──────────────┼───────────┼────────────────────┤ +│ 101 │ VIP │ 258.0 │ +│ 102 │ Standard │ 89.0 │ +└──────────────┴───────────┴────────────────────┘ +``` -消费流数据更新 `user_activity_profiles` 表: +### 步骤 4: 下一批到达后再次运行 ```sql --- 向合并视图表插入数据 -INSERT INTO user_activity_profiles +INSERT INTO customers VALUES (104, 'Standard', 'Guangzhou'); +INSERT INTO orders VALUES (5004, 101, 40.0), (5005, 104, 120.0); + +WITH + orders_delta AS ( + SELECT customer_id, amount + FROM orders_stream WITH CONSUME + ), + customers_delta AS ( + SELECT customer_id, segment + FROM customers_stream WITH CONSUME + ) SELECT - a.user_id, p.username, p.location, a.activity, a.timestamp -FROM - -- 变更数据源表 - activities_stream AS a -JOIN - -- 关联用户档案数据 - user_profiles AS p -ON - a.user_id = p.user_id - --- a.change$action 是标识变更类型的列(当前 Databend 仅支持 INSERT) -WHERE a.change$action = 'INSERT'; + o.customer_id, + c.segment, + SUM(o.amount) AS incremental_sales +FROM orders_delta AS o +JOIN customers_delta AS c + ON o.customer_id = c.customer_id +GROUP BY o.customer_id, c.segment +ORDER BY o.customer_id; ``` -查看更新后的 `user_activity_profiles` 表: - -```sql -SELECT - * -FROM - user_activity_profiles - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ user_id │ username │ location │ activity │ activity_timestamp │ -├─────────────────┼──────────────────┼──────────────────┼──────────────────┼─────────────────────┤ -│ 103 │ Charlie │ Los Angeles │ view_profile │ 2023-12-19 09:15:00 │ -│ 104 │ Dana │ Chicago │ edit_profile │ 2023-12-19 10:00:00 │ -│ 101 │ Alice │ New York │ purchase │ 2023-12-19 10:30:00 │ -│ 102 │ Bob │ San Francisco │ login │ 2023-12-19 11:00:00 │ -│ 102 │ Bob │ San Francisco │ logout │ 2023-12-19 09:00:00 │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ +``` +┌──────────────┬───────────┬────────────────────┐ +│ customer_id │ segment │ incremental_sales │ +├──────────────┼───────────┼────────────────────┤ +│ 101 │ VIP │ 40.0 │ +│ 104 │ Standard │ 120.0 │ +└──────────────┴───────────┴────────────────────┘ ``` -#### 5. 实时数据处理任务更新 +第二次运行自动只消费最新的增量,允许订单和客户信息在不同时间到达。 -为使 `user_activity_profiles` 表保持最新,需定期与 `activities_stream` 流同步数据。同步频率应与 `user_activities` 表更新间隔一致,确保合并视图能准确反映最新的用户活动和档案数据。 +## 使用提示 -可使用 Databend 的 `TASK` 命令(当前为私有预览功能)定义每分钟或每秒更新 `user_activity_profiles` 表的任务。 +**消费语义** +- `INSERT INTO target SELECT ... FROM stream` 成功提交后才会清空 Stream,失败则回滚。 +- 同一条 Stream 同时只能被一个语句消费,其余会被回滚。 -```sql --- 在 Databend 中定义定时任务 -CREATE TASK user_activity_task -WAREHOUSE = 'default' -SCHEDULE = 1 MINUTE --- 当 activities_stream 有新数据时触发任务 -WHEN stream_status('activities_stream') AS - -- 向合并视图表插入新记录 - INSERT INTO user_activity_profiles - SELECT - -- 基于 user_id 关联流数据和用户档案 - a.user_id, p.username, p.location, a.activity, a.timestamp - FROM - activities_stream AS a - JOIN user_profiles AS p - ON a.user_id = p.user_id - -- 仅包含 INSERT 类型的变更行 - WHERE a.change$action = 'INSERT'; -``` +**模式选择** +- Append-Only Stream 专注 INSERT,是事件、日志入湖的最佳拍档。 +- Standard Stream 能保留 UPDATE / DELETE 前后的最终状态,适合需要完整变更信息的场景。 + +**隐藏列** +- 查询 Stream 时可使用 `change$action`、`change$is_update`、`change$row_id` 判断每条增量。 +- 基表上还有 `_origin_version`、`_origin_block_id`、`_origin_block_row_num`,方便排查“最终值从何而来”。 + +**联动任务** +- 结合 Task 可以按计划自动消费 Stream,使用 `task_history('', )` 查看执行记录。 +- SQL 中搭配 `WITH CONSUME`,即可在批式作业里只处理“本次新增”的数据。 diff --git a/docs/cn/guides/40-load-data/05-continuous-data-pipelines/02-task.md b/docs/cn/guides/40-load-data/05-continuous-data-pipelines/02-task.md index ccc0528864..9520e6676d 100644 --- a/docs/cn/guides/40-load-data/05-continuous-data-pipelines/02-task.md +++ b/docs/cn/guides/40-load-data/05-continuous-data-pipelines/02-task.md @@ -1,169 +1,154 @@ --- -title: 使用任务自动化数据加载 -sidebar_label: 任务 +title: 使用任务(Task)自动化数据加载 +sidebar_label: 任务(Task) --- -任务封装了特定的 SQL 语句,设计用于按预定间隔执行、由特定事件触发或作为任务序列的一部分执行。在 Databend Cloud 中,任务通常用于定期捕获流中的变更数据(例如新增记录),然后将这些数据同步到指定目标。此外,任务还支持 [Webhook](https://en.wikipedia.org/wiki/Webhook) 等消息系统,可按需发送错误消息和通知。 +Task 是“把 SQL 交给 Databend 代跑”的方式。你可以让它按固定频率运行、在另一任务结束后运行,或者在某个 Stream 报告有增量时再运行。下面先看定义 Task 时需要关注的几个开关,再通过两个动手示例理解它如何和 Stream 配合。 -## 创建任务 +## Task 构建要素 -本主题详细说明在 Databend Cloud 中创建任务的步骤。您可以使用 [CREATE TASK](/sql/sql-commands/ddl/task/ddl-create_task) 命令创建任务。创建任务时,请参照下图设计工作流: +- **名称与计算仓库** – 每个 Task 都需要一个 Warehouse。 + ```sql + CREATE TASK ingest_orders + WAREHOUSE = 'etl_wh' + AS SELECT 1; + ``` +- **触发方式** – `SCHEDULE = 2 MINUTE`、CRON,或 `AFTER `(适用于 DAG)。 +- **执行条件** – `WHEN STREAM_STATUS('mystream') = TRUE` 这类布尔表达式,只有条件满足才运行。 +- **错误策略** – `SUSPEND_TASK_AFTER_NUM_FAILURES`、`ERROR_INTEGRATION` 等参数可在失败多次后暂停并发通知。 +- **SQL 负载** – `AS` 后的内容就是 Task 要执行的语句,可以是一条 INSERT/COPY/MERGE,也可以是 BEGIN...END。 -![alt text](/img/load/task.png) +## 示例 1:定时 COPY -1. 为任务设置名称。 -2. 指定运行任务的计算集群。如需创建计算集群,请参阅 [使用计算集群](/guides/cloud/using-databend-cloud/warehouses)。 -3. 确定任务触发方式。 +持续生成 Parquet 并导入表。记得把 `'etl_wh_small'` 换成你自己的 Warehouse。 - - 可通过指定分钟或秒为间隔来调度任务,或使用 CRON 表达式配合可选时区实现更精确的调度。 +### 步骤 1: 准备演示对象 -```sql title='示例:' --- 此任务每 2 分钟运行一次 -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -SCHEDULE = 2 MINUTE -AS ... +```sql +CREATE DATABASE IF NOT EXISTS task_demo; +USE task_demo; + +CREATE OR REPLACE TABLE sensor_events ( + event_time TIMESTAMP, + sensor_id INT, + temperature DOUBLE, + humidity DOUBLE +); --- 此任务每天午夜(东京时间)在 Asia/Tokyo 时区运行 -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -SCHEDULE = USING CRON '0 0 0 * * *' 'Asia/Tokyo' -AS ... +CREATE OR REPLACE STAGE sensor_events_stage; ``` - - 或者,您可以在任务间建立依赖关系,将任务设置为 [有向无环图](https://en.wikipedia.org/wiki/Directed_acyclic_graph) 中的子任务。 +### 步骤 2: Task 1 —— 生成文件 -```sql title='示例:' --- 此任务依赖于 DAG 中 'task_root' 任务的完成 -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -AFTER task_root -AS ... +```sql +CREATE OR REPLACE TASK task_generate_data + WAREHOUSE = 'etl_wh_small' + SCHEDULE = 1 MINUTE +AS +COPY INTO @sensor_events_stage +FROM ( + SELECT + NOW() AS event_time, + number AS sensor_id, + 20 + RAND() * 5 AS temperature, + 60 + RAND() * 10 AS humidity + FROM numbers(100) +) +FILE_FORMAT = (TYPE = PARQUET); ``` -4. 指定任务执行条件,允许基于布尔表达式控制任务执行(可选)。 +### 步骤 3: Task 2 —— 将文件导入表 -```sql title='示例:' --- 此任务每 2 分钟运行一次,仅当 'mystream' 包含数据变更时执行 AS 后的 SQL -CREATE TASK mytask -WAREHOUSE = 'default' -SCHEDULE = 2 MINUTE -// highlight-next-line -WHEN STREAM_STATUS('mystream') = TRUE -AS ... +```sql +CREATE OR REPLACE TASK task_consume_data + WAREHOUSE = 'etl_wh_small' + SCHEDULE = 1 MINUTE +AS +COPY INTO sensor_events +FROM @sensor_events_stage +PATTERN = '.*[.]parquet' +FILE_FORMAT = (TYPE = PARQUET) +PURGE = TRUE; ``` -5. 指定任务出错时的处理方式,包括设置连续失败次数以暂停任务,以及指定错误通知的集成方式。有关设置错误通知的更多信息,请参阅 [配置通知集成](#configuring-notification-integrations)。 - -```sql title='示例:' --- 此任务将在连续失败 3 次后暂停 -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -SUSPEND_TASK_AFTER_NUM_FAILURES = 3 -AS ... - --- 此任务将使用 'my_webhook' 集成发送错误通知 -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -ERROR_INTEGRATION = 'my_webhook' -AS ... +### 步骤 4: 恢复 Task + +```sql +ALTER TASK task_generate_data RESUME; +ALTER TASK task_consume_data RESUME; ``` -6. 指定任务将执行的 SQL 语句。 +### 步骤 5: 观察运行情况 -```sql title='示例:' --- 此任务每年更新 'employees' 表中的 'age' 列,使其递增 1 -CREATE TASK mytask -WAREHOUSE = 'default' -SCHEDULE = USING CRON '0 0 1 1 * *' 'UTC' -// highlight-next-line -AS -UPDATE employees -SET age = age + 1; +```sql +SHOW TASKS LIKE 'task_%'; +LIST @sensor_events_stage; +SELECT * FROM sensor_events ORDER BY event_time DESC LIMIT 5; +SELECT * FROM task_history('task_consume_data', 5); ``` -## 查看已创建的任务 - -要查看组织创建的所有任务,请登录 Databend Cloud 并转到 **数据** > **任务**。您可以查看每个任务的详细信息,包括状态和调度计划。 - -要查看任务运行历史记录,请转到 **监控** > **任务历史**。您可以查看每次任务运行的结果、完成时间等详细信息。 - -## 配置通知集成 - -Databend Cloud 允许您为任务配置错误通知,在任务执行出错时自动发送通知。当前支持 Webhook 集成,可实现错误事件与外部系统或服务的实时通信。 - -### 任务错误负载 - -任务错误负载指任务执行出错时作为错误通知发送的数据或信息。该负载通常包含错误详情,如错误代码、错误消息、时间戳以及其他有助于诊断和解决问题的上下文信息。 - -```json title='任务错误负载示例:' -{ - "version": "1.0", - "messageId": "063e40ab-0b55-439e-9cd2-504c496e1566", - "messageType": "TASK_FAILED", - "timestamp": "2024-03-19T02:37:21.160705788Z", - "tenantId": "tn78p61xz", - "taskName": "my_task", - "taskId": "15", - "rootTaskName": "my_task", - "rootTaskId": "15", - "messages": [ - { - "runId": "unknown", - "scheduledTime": "2024-03-19T02:37:21.157169855Z", - "queryStartTime": "2024-03-19T02:37:21.043090475Z", - "completedTime": "2024-03-19T02:37:21.157169205Z", - "queryId": "88bb9d5d-5d5e-4e52-92cc-b1953406245a", - "errorKind": "UnexpectedError", - "errorCode": "500", - "errorMessage": "query sync failed: All attempts fail:\n#1: query error: code: 1006, message: divided by zero while evaluating function `divide(1, 0)`" - } - ] -} +### 步骤 6: 调整或改写 Task + +```sql +ALTER TASK task_consume_data + SET SCHEDULE = 30 SECOND, + WAREHOUSE = 'etl_wh_medium'; + +ALTER TASK task_consume_data + MODIFY AS +COPY INTO sensor_events +FROM @sensor_events_stage +FILE_FORMAT = (TYPE = PARQUET); + +ALTER TASK task_consume_data RESUME; + +SELECT * +FROM task_history('task_consume_data', 5) +ORDER BY completed_time DESC; ``` -### 使用示例 +## 示例 2:Stream 条件 Task -在为任务配置错误通知前,您需要使用 [CREATE NOTIFICATION INTEGRATION](/sql/sql-commands/ddl/notification/ddl-create-notification) 命令创建通知集成。以下示例展示了如何为任务创建和配置通知集成。该示例使用 [Webhook.site](http://webhook.site) 模拟消息系统,接收来自 Databend Cloud 的负载。 +只有当 Stream 报告“有增量”时才运行,避免空跑。 -1. 在浏览器中打开 [Webhook.site](http://webhook.site),获取您的 Webhook URL。 +### 步骤 1: 创建 Stream 与结果表 -![alt text](/img/load/webhook-1.png) +```sql +CREATE OR REPLACE STREAM sensor_events_stream + ON TABLE sensor_events + APPEND_ONLY = false; + +CREATE OR REPLACE TABLE sensor_events_latest AS +SELECT * +FROM sensor_events +WHERE 1 = 0; +``` -2. 在 Databend Cloud 中创建通知集成,然后创建带通知集成的任务: +### 步骤 2: 定义条件 Task ```sql --- 创建名为 'my_task' 的任务,每分钟运行一次,错误通知发送至 'my_webhook' --- 故意除以零以生成错误 -CREATE TASK my_task -WAREHOUSE = 'default' -SCHEDULE = 1 MINUTE -ERROR_INTEGRATION = 'my_webhook' +CREATE OR REPLACE TASK task_stream_merge + WAREHOUSE = 'etl_wh_small' + SCHEDULE = 1 MINUTE + WHEN STREAM_STATUS('task_demo.sensor_events_stream') = TRUE AS -SELECT 1 / 0; - --- 创建名为 'my_webhook' 的通知集成,用于发送 webhook 通知 -CREATE NOTIFICATION INTEGRATION my_webhook -TYPE = WEBHOOK -ENABLED = TRUE -WEBHOOK = ( - url = '', - method = 'POST' -); +INSERT INTO sensor_events_latest +SELECT * +FROM sensor_events_stream; --- 创建后恢复任务 -ALTER TASK my_task RESUME; +ALTER TASK task_stream_merge RESUME; ``` -3. 稍等片刻,您将看到 webhook 开始接收来自创建的任务的负载。 +### 步骤 3: 查看增量与历史 -![alt text](/img/load/webhook-2.png) +```sql +SELECT * +FROM sensor_events_latest +ORDER BY event_time DESC +LIMIT 5; -## 使用示例 +SELECT * +FROM task_history('task_stream_merge', 5); +``` -完整演示如何通过流捕获数据变更并使用任务同步,请参阅 [示例:实时跟踪和转换数据](01-stream.md#example-tracking-and-transforming-data-in-real-time)。 \ No newline at end of file +只要 `STREAM_STATUS('.')` 返回 TRUE(例如 `task_demo.sensor_events_stream`),Task 就会运行;否则保持暂停,直到下一批增量到达。 diff --git a/docs/cn/guides/40-load-data/05-continuous-data-pipelines/index.md b/docs/cn/guides/40-load-data/05-continuous-data-pipelines/index.md index 6b09d006d8..4fa7c1a57b 100644 --- a/docs/cn/guides/40-load-data/05-continuous-data-pipelines/index.md +++ b/docs/cn/guides/40-load-data/05-continuous-data-pipelines/index.md @@ -2,26 +2,24 @@ title: 持续数据管道 --- -## 数据管道简介 +在 Databend 中构建 CDC(Change Data Capture)流程只需两种原语: -数据管道能够自动化地将来自不同来源的数据移动并转换至 Databend。它们确保数据流畅传输,对于快速且持续地处理和分析数据至关重要。 +- **Stream**:捕获表的 INSERT/UPDATE/DELETE,一直保留到被消费。 +- **Task**:按计划或在 Stream 有新数据时自动运行 SQL。 -在持续数据管道中,一项名为 **变更数据捕获 (CDC)** 的特殊功能发挥着关键作用。借助 Databend,CDC 变得简单高效,仅需通过 Streams 和 Tasks 执行几条简单命令即可实现。 +## 快速入口 -## 理解变更数据捕获 (CDC) +- [示例 1:仅追加 Stream](./01-stream.md#示例-1仅追加-stream) – 捕获插入并写入目标表。 +- [示例 2:标准 Stream](./01-stream.md#示例-2标准-stream含-update-delete) – 了解更新、删除在 Stream 中的表现。 +- [示例 3:增量 Join](./01-stream.md#示例-3增量-join--计算) – 使用 `WITH CONSUME` 做批式增量聚合。 +- [示例 1:定时 COPY 任务](./02-task.md#示例-1定时-copy) – 两个任务生成并导入 Parquet。 +- [示例 2:Stream 条件任务](./02-task.md#示例-2基于-stream-的条件任务) – 只有 Stream 有增量时才触发。 -CDC 是指流对象捕获应用于数据库表的插入、更新和删除操作的过程。它包含有关每次变更的元数据,从而能够基于修改后的数据执行操作。Databend 中的 CDC 在源表中跟踪行级变更,创建一个"变更表"来反映两个事务时间点之间的数据修改。 +## 为什么选择 Databend CDC -## 使用变更数据捕获 (CDC) 的优势 +- **轻量**:Stream 只保存尚未消费的增量。 +- **事务一致**:消费 Stream 的语句成功才会清空,失败即回滚。 +- **增量友好**:配合 `WITH CONSUME` 能多次运行同一 SQL,每次只处理新数据。 +- **自动化**:Task 让任何 SQL 都能定时/触发执行。 -1. **快速实时数据加载**:优化来自事务数据库的实时数据加载流程,几乎可在秒级完成。 -2. **不影响原始数据**:安全可靠,不会损坏数据或其来源系统。 -3. **克服批量 ETL 的局限性**:超越传统的批量 ETL 方法,后者对于持续数据更新而言速度较慢且效率较低。 - -## Databend 持续数据管道的核心特性 - -Databend 通过以下特性增强了持续数据管道: - -- **持续数据跟踪与转换**:支持数据的实时跟踪与转换。[通过 Streams 了解数据跟踪与转换的更多信息](./01-stream.md)。 - -- **循环任务**:支持调度和管理循环数据处理任务,确保数据管道的高效性和可靠性。该功能目前处于私有预览阶段。 \ No newline at end of file +先完成 Stream 示例,再组合 Task,即可搭建自己的持续数据管道。 diff --git a/docs/en/guides/40-load-data/05-continuous-data-pipelines/01-stream.md b/docs/en/guides/40-load-data/05-continuous-data-pipelines/01-stream.md index fa1e8c4e32..bd86099151 100644 --- a/docs/en/guides/40-load-data/05-continuous-data-pipelines/01-stream.md +++ b/docs/en/guides/40-load-data/05-continuous-data-pipelines/01-stream.md @@ -6,447 +6,236 @@ sidebar_label: Stream import StepsWrap from '@site/src/components/StepsWrap'; import StepContent from '@site/src/components/Steps/step-content'; -A stream in Databend is a dynamic and real-time representation of changes to a table. Streams are created to capture and track modifications to the associated table, allowing continuous consumption and analysis of data changes as they occur. +A stream in Databend is an always-on change table: every committed INSERT, UPDATE, or DELETE is captured until you consume it. This page stays lean—first a quick overview, then one lab with real outputs so you can see streams in action. -### How Stream Works +## Stream Overview -A stream can operate in two modes: **Standard** and **Append-Only**. Specify a mode using the `APPEND_ONLY` parameter (defaults to `true`) when you [CREATE STREAM](/sql/sql-commands/ddl/stream/create-stream). +- Streams don’t duplicate table storage; they list the latest change for each affected row until you consume it. +- Consumption (task, INSERT ... SELECT, `WITH CONSUME`, etc.) clears the stream while keeping it ready for new data. +- `APPEND_ONLY` defaults to `true`; set `APPEND_ONLY = false` only when you must capture UPDATE/DELETE events. -- **Standard**: Captures all types of data changes, including insertions, updates, and deletions. -- **Append-Only**: In this mode, the stream exclusively contains data insertion records; data updates or deletions are not captured. +| Mode | Captures | Typical use | +| --- | --- | --- | +| Standard (`APPEND_ONLY = false`) | INSERT + UPDATE + DELETE, collapsed to the latest state per row. | Slowly changing dimensions, compliance audits. | +| Append-Only (`APPEND_ONLY = true`, default) | INSERT only. | Append-only fact/event ingestion. | -The design philosophy of Databend streams is to focus on capturing the final state of the data. For instance, if you insert a value and then update it multiple times, the stream only keeps the most recent state of the value before it is consumed. The following example illustrates what a stream looks like and how it works in both modes. +## Example 1: Append-Only Stream - - +Run the statements below in any Databend deployment (Cloud worksheet or local) to see how the default append-only mode captures and consumes inserts. -#### Create streams to capture changes - -Let's create two tables first, and then create a stream for each table with different modes to capture changes to the tables. +### 1. Create table and stream ```sql --- Create a table and insert a value -CREATE TABLE t_standard(a INT); -CREATE TABLE t_append_only(a INT); +CREATE OR REPLACE TABLE sensor_readings ( + sensor_id INT, + temperature DOUBLE +); --- Create two streams with different modes: Standard and Append_Only -CREATE STREAM s_standard ON TABLE t_standard APPEND_ONLY=false; -CREATE STREAM s_append_only ON TABLE t_append_only APPEND_ONLY=true; +-- APPEND_ONLY defaults to true, so no extra clause is required. +CREATE OR REPLACE STREAM sensor_readings_stream + ON TABLE sensor_readings; ``` -You can view the created streams and their mode using the [SHOW FULL STREAMS](/sql/sql-commands/ddl/stream/show-streams) command: +### 2. Insert rows and preview ```sql -SHOW FULL STREAMS; - -┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ created_on │ name │ database │ catalog │ table_on │ owner │ comment │ mode │ invalid_reason │ -├────────────────────────────┼───────────────┼──────────┼─────────┼───────────────────────┼──────────────────┼─────────┼─────────────┼────────────────┤ -│ 2024-02-18 16:39:58.996763 │ s_append_only │ default │ default │ default.t_append_only │ NULL │ │ append_only │ │ -│ 2024-02-18 16:39:58.966942 │ s_standard │ default │ default │ default.t_standard │ NULL │ │ standard │ │ -└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ -``` +INSERT INTO sensor_readings VALUES (1, 21.5), (2, 19.7); -Now, let's insert two values into each table and observe what the streams capture: - -```sql --- Insert two new values -INSERT INTO t_standard VALUES(2), (3); -INSERT INTO t_append_only VALUES(2), (3); - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 2 │ INSERT │ 8cd000827f8140d9921f897016e5a88e000000 │ false │ -│ 3 │ INSERT │ 8cd000827f8140d9921f897016e5a88e000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - -SELECT * FROM s_append_only; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 2 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000000 │ -│ 3 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000001 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ +SELECT sensor_id, temperature, change$action, change$is_update +FROM sensor_readings_stream; ``` -The results above indicate that both streams have successfully captured the new insertions. See [Stream Columns](#stream-columns) for details on the stream columns in the results. Now, let's update and then delete a newly inserted value and examine whether there are differences in the streams' captures. +Output: -```sql -UPDATE t_standard SET a = 4 WHERE a = 2; -UPDATE t_append_only SET a = 4 WHERE a = 2; - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -| 4 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000000 │ false | -│ 3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - -SELECT * FROM s_append_only; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 4 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000000 │ -│ 3 │ INSERT │ false │ 63dc9b84fe0a43528808c3304969b317000001 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ - -DELETE FROM t_standard WHERE a = 4; -DELETE FROM t_append_only WHERE a = 4; - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 3 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - -SELECT * FROM s_append_only; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 3 │ INSERT │ false │ bfed6c91f3e4402fa477b6853a2d2b58000001 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ ``` - -Up to this point, we haven't noticed any significant differences between the two modes as we haven't processed the streams yet. All changes have been consolidated and manifested as INSERT actions. **A stream can be consumed by a task, a DML (Data Manipulation Language) operation, or a query with [WITH CONSUME](/sql/sql-commands/query-syntax/with-consume) or [WITH Stream Hints](/sql/sql-commands/query-syntax/with-stream-hints)**. After consumption, the stream contains no data but can continue to capture new changes, if any. To further analyze the distinctions, let's proceed with consuming the streams and examining the output. - - - - -#### Consume streams - -Let's create two new tables and insert into them what the streams have captured. - -```sql -CREATE TABLE t_consume_standard(b INT); -CREATE TABLE t_consume_append_only(b INT); - -INSERT INTO t_consume_standard SELECT a FROM s_standard; -INSERT INTO t_consume_append_only SELECT a FROM s_append_only; - -SELECT * FROM t_consume_standard; - -┌─────────────────┐ -│ b │ -├─────────────────┤ -│ 3 │ -└─────────────────┘ - -SELECT * FROM t_consume_append_only; - -┌─────────────────┐ -│ b │ -├─────────────────┤ -│ 3 │ -└─────────────────┘ +┌────────────┬───────────────┬───────────────┬──────────────────┐ +│ sensor_id │ temperature │ change$action │ change$is_update │ +├────────────┼───────────────┼───────────────┼──────────────────┤ +│ 1 │ 21.5 │ INSERT │ false │ +│ 2 │ 19.7 │ INSERT │ false │ +└────────────┴───────────────┴───────────────┴──────────────────┘ ``` -If you query the streams now, you'll find them empty because they have been consumed. +### 3. Consume (optional) ```sql --- empty results -SELECT * FROM s_standard; +SELECT sensor_id, temperature +FROM sensor_readings_stream WITH CONSUME; --- empty results -SELECT * FROM s_append_only; +SELECT * FROM sensor_readings_stream; -- now empty ``` - - +`WITH CONSUME` reads the stream once and clears the delta so the next round can capture fresh INSERTs. -#### Capture new changes +## Example 2: Standard Stream (Updates & Deletes) -Now, let's update the value from `3` to `4` in each table, and subsequently, check their streams again: +Switch to Standard mode when you must react to every mutation, including UPDATE or DELETE. -```sql -UPDATE t_standard SET a = 4 WHERE a = 3; -UPDATE t_append_only SET a = 4 WHERE a = 3; - - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 3 │ DELETE │ 1dd5cab0b1b64328a112db89d602ca04000001 │ true │ -│ 4 │ INSERT │ 1dd5cab0b1b64328a112db89d602ca04000001 │ true │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - --- empty results -SELECT * FROM s_append_only; -``` - -The results above show that the Standard stream processes the UPDATE operation as a combination of two actions: a DELETE action that removes the old value (`3`) and an INSERT action that adds the new value (`4`). When updating `3` to `4`, the existing value `3` must first be deleted because it no longer exists in the final state, followed by the insertion of the new value `4`. This behavior reflects how the Standard stream captures only the final changes, representing updates as a sequence of a deletion (removing the old value) and an insertion (adding the new value) for the same row. - -On the other hand, the Append_Only stream does not capture anything because it is designed to log only new data additions (INSERT) and ignores updates or deletions. - -If we delete the value `4` now, we can obtain the following results: +### 1. Create a Standard stream ```sql -DELETE FROM t_standard WHERE a = 4; -DELETE FROM t_append_only WHERE a = 4; - -SELECT * FROM s_standard; - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$row_id │ change$is_update │ -│ Nullable(Int32) │ Nullable(String) │ Nullable(String) │ Boolean │ -├─────────────────┼──────────────────┼────────────────────────────────────────┼──────────────────┤ -│ 3 │ DELETE │ 1dd5cab0b1b64328a112db89d602ca04000001 │ false │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ - --- empty results -SELECT * FROM s_append_only; +CREATE OR REPLACE STREAM sensor_readings_stream_std + ON TABLE sensor_readings + APPEND_ONLY = false; ``` -We can see that both stream modes have the capability to capture insertions, along with any subsequent updates and deletions made to the inserted values before the streams are consumed. However, after consumption, if there are updates or deletions to the previously inserted data, only the standard stream is able to capture these changes, recording them as DELETE and INSERT actions. - - - - -### Transactional Support for Stream Consumption - -In Databend, stream consumption is transactional within single-statement transactions. This means: - -**Successful Transaction**: If a transaction is committed, the stream is consumed. For instance: +### 2. Mutate rows and compare ```sql -INSERT INTO table SELECT * FROM stream; -``` - -If this `INSERT` transaction commits, the stream is consumed. +DELETE FROM sensor_readings WHERE sensor_id = 1; -- remove old reading +INSERT INTO sensor_readings VALUES (1, 22); -- same sensor, new value +DELETE FROM sensor_readings WHERE sensor_id = 2; -- pure deletion +INSERT INTO sensor_readings VALUES (3, 18.5); -- brand-new sensor -**Failed Transaction**: If the transaction fails, the stream remains unchanged and available for future consumption. +SELECT * FROM sensor_readings_stream; -- still empty (Append-Only ignores non-inserts) -**Concurrent Access**: _Only one transaction can successfully consume a stream at a time_. If multiple transactions attempt to consume the same stream, only the first committed transaction succeeds, others fail. - -### Table Metadata for Stream - -**A stream does not store any data for a table**. After creating a stream for a table, Databend introduces specific hidden metadata columns to the table for change tracking purposes. These columns include: - -| Column | Description | -| ---------------------- | --------------------------------------------------------------------------------- | -| \_origin_version | Identifies the table version in which this row was initially created. | -| \_origin_block_id | Identifies the block ID to which this row belonged previously. | -| \_origin_block_row_num | Identifies the row number within the block to which this row belonged previously. | - -The previously documented hidden column `_row_version` has been removed and is no longer available. - -To display the values of these columns, use the SELECT statement: - -```sql title='Example:' -CREATE TABLE t(a int); -INSERT INTO t VALUES (1); -CREATE STREAM s ON TABLE t; -INSERT INTO t VALUES (2); -SELECT - *, - _origin_version, - _origin_block_id, - _origin_block_row_num -FROM - t; - -┌───────────┬──────────────────┬──────────────────────┬───────────────────────┐ -│ a │ _origin_version │ _origin_block_id │ _origin_block_row_num │ -├───────────┼──────────────────┼──────────────────────┼───────────────────────┤ -│ 1 │ NULL │ NULL │ NULL │ -│ 2 │ NULL │ NULL │ NULL │ -└───────────┴──────────────────┴──────────────────────┴───────────────────────┘ - -UPDATE t SET a = 3 WHERE a = 2; -SELECT - *, - _origin_version, - _origin_block_id, - _origin_block_row_num -FROM - t; - -┌───────────┬──────────────────┬─────────────────────────────────────────────┬───────────────────────┐ -│ a │ _origin_version │ _origin_block_id │ _origin_block_row_num │ -├───────────┼──────────────────┼─────────────────────────────────────────────┼───────────────────────┤ -│ 3 │ 2317 │ 132795849016460663684755265365603707394 │ 0 │ -│ 1 │ NULL │ NULL │ NULL │ -└───────────┴──────────────────┴─────────────────────────────────────────────┴───────────────────────┘ +SELECT sensor_id, temperature, change$action, change$is_update +FROM sensor_readings_stream_std +ORDER BY change$row_id; ``` -### Stream Columns - -You can use the SELECT statement to directly query a stream and retrieve the tracked changes. When querying a stream, consider incorporating these hidden columns for additional details about the changes: +Output: -| Column | Description | -| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| change$action | Type of change: INSERT or DELETE. | -| change$is_update | Indicates whether the `change$action` is part of an UPDATE. In a stream, an UPDATE is represented by a combination of DELETE and INSERT operations, with this field set to `true`. | -| change$row_id | Unique identifier for each row to track changes. | - -```sql title='Example:' -CREATE TABLE t(a int); -INSERT INTO t VALUES (1); -CREATE STREAM s ON TABLE t; -INSERT INTO t VALUES (2); - -SELECT * FROM s; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 2 │ INSERT │ false │ a577745c6a404f3384fa95791eb43f22000000 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ - --- If you add a new row and then update it, --- the stream consolidates the changes as an INSERT with your updated value. -UPDATE t SET a = 3 WHERE a = 2; -SELECT * FROM s; - -┌─────────────────────────────────────────────────────────────────────────────────────────────┐ -│ a │ change$action │ change$is_update │ change$row_id │ -├─────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤ -│ 3 │ INSERT │ false │ a577745c6a404f3384fa95791eb43f22000000 │ -└─────────────────────────────────────────────────────────────────────────────────────────────┘ +``` +┌────────────┬───────────────┬───────────────┬──────────────────┐ +│ sensor_id │ temperature │ change$action │ change$is_update │ +├────────────┼───────────────┼───────────────┼──────────────────┤ +│ 1 │ 21.5 │ DELETE │ true │ +│ 1 │ 22 │ INSERT │ true │ +│ 2 │ 19.7 │ DELETE │ false │ +│ 3 │ 18.5 │ INSERT │ false │ +└────────────┴───────────────┴───────────────┴──────────────────┘ ``` -### Example: Tracking and Transforming Data in Real-Time - -The following example demonstrates how to use streams to capture and track user activities in real-time. - -#### 1. Creating Tables + Standard streams capture each change with context: updates show up as DELETE+INSERT on the same `sensor_id`, while standalone deletions/insertions appear individually. Append-Only streams stay empty because they track inserts only. -The example uses three tables: +## Example 3: Incremental Stream Join -- `user_activities` table records user activities. -- `user_profiles` table stores user profiles. -- `user_activity_profiles` table is a combined view of the two tables. +Join multiple append-only streams to produce incremental KPIs. Because Databend streams keep new rows until they are consumed, you can run the same query after each load. Every execution drains only the new rows via [`WITH CONSUME`](/sql/sql-commands/query-syntax/with-consume), so updates that arrive at different times are still matched on the next iteration. -The `activities_stream` table is created as a stream to capture real-time changes to the `user_activities` table. The stream is then consumed by a query to update the` user_activity_profiles` table with the latest data. +### 1. Create tables and streams ```sql --- Create a table to record user activities -CREATE TABLE user_activities ( - user_id INT, - activity VARCHAR, - timestamp TIMESTAMP +CREATE OR REPLACE TABLE customers ( + customer_id INT, + segment VARCHAR, + city VARCHAR ); --- Create a table to store user profiles -CREATE TABLE user_profiles ( - user_id INT, - username VARCHAR, - location VARCHAR +CREATE OR REPLACE TABLE orders ( + order_id INT, + customer_id INT, + amount DOUBLE ); --- Insert data into the user_profiles table -INSERT INTO user_profiles VALUES (101, 'Alice', 'New York'); -INSERT INTO user_profiles VALUES (102, 'Bob', 'San Francisco'); -INSERT INTO user_profiles VALUES (103, 'Charlie', 'Los Angeles'); -INSERT INTO user_profiles VALUES (104, 'Dana', 'Chicago'); - --- Create a table for the combined view of user activities and profiles -CREATE TABLE user_activity_profiles ( - user_id INT, - username VARCHAR, - location VARCHAR, - activity VARCHAR, - activity_timestamp TIMESTAMP -); +CREATE OR REPLACE STREAM customers_stream ON TABLE customers; +CREATE OR REPLACE STREAM orders_stream ON TABLE orders; ``` -#### 2. Creating a Stream - -Create a stream on the `user_activities` table to capture real-time changes: +### 2. Load the first batch ```sql -CREATE STREAM activities_stream ON TABLE user_activities; -``` +INSERT INTO customers VALUES + (101, 'VIP', 'Seattle'), + (102, 'Standard', 'Austin'), + (103, 'VIP', 'Austin'); -#### 3. Inserting Data into the Source Table +INSERT INTO orders VALUES + (5001, 101, 199.0), + (5002, 101, 59.0), + (5003, 102, 89.0); +``` -Insert data into the `user_activities` table to make some changes: +### 3. Run the first incremental query ```sql -INSERT INTO user_activities VALUES (102, 'logout', '2023-12-19 09:00:00'); -INSERT INTO user_activities VALUES (103, 'view_profile', '2023-12-19 09:15:00'); -INSERT INTO user_activities VALUES (104, 'edit_profile', '2023-12-19 10:00:00'); -INSERT INTO user_activities VALUES (101, 'purchase', '2023-12-19 10:30:00'); -INSERT INTO user_activities VALUES (102, 'login', '2023-12-19 11:00:00'); +WITH + orders_delta AS ( + SELECT customer_id, amount + FROM orders_stream WITH CONSUME + ), + customers_delta AS ( + SELECT customer_id, segment + FROM customers_stream WITH CONSUME + ) +SELECT + o.customer_id, + c.segment, + SUM(o.amount) AS incremental_sales +FROM orders_delta AS o +JOIN customers_delta AS c + ON o.customer_id = c.customer_id +GROUP BY o.customer_id, c.segment +ORDER BY o.customer_id; ``` -#### 4. Consuming the Stream to Update the Target Table +``` +┌──────────────┬───────────┬────────────────────┐ +│ customer_id │ segment │ incremental_sales │ +├──────────────┼───────────┼────────────────────┤ +│ 101 │ VIP │ 258.0 │ +│ 102 │ Standard │ 89.0 │ +└──────────────┴───────────┴────────────────────┘ +``` -Consume the stream to update the `user_activity_profiles` table: +The streams are now empty. When more rows arrive, the same query will capture only the new data. + +### 4. Run again after the next batch ```sql --- Inserting data into the user_activity_profiles table -INSERT INTO user_activity_profiles +-- New data arrives later +INSERT INTO customers VALUES (104, 'Standard', 'Denver'); +INSERT INTO orders VALUES + (5004, 101, 40.0), + (5005, 104, 120.0); + +-- Same incremental query as before +WITH + orders_delta AS ( + SELECT customer_id, amount + FROM orders_stream WITH CONSUME + ), + customers_delta AS ( + SELECT customer_id, segment + FROM customers_stream WITH CONSUME + ) SELECT - a.user_id, p.username, p.location, a.activity, a.timestamp -FROM - -- Source table for changed data - activities_stream AS a -JOIN - -- Joining with user profile data - user_profiles AS p -ON - a.user_id = p.user_id - --- a.change$action is a column indicating the type of change (Databend only supports INSERT for now) -WHERE a.change$action = 'INSERT'; + o.customer_id, + c.segment, + SUM(o.amount) AS incremental_sales +FROM orders_delta AS o +JOIN customers_delta AS c + ON o.customer_id = c.customer_id +GROUP BY o.customer_id, c.segment +ORDER BY o.customer_id; ``` -Then, check the updated `user_activity_profiles` table: - -```sql -SELECT - * -FROM - user_activity_profiles - -┌────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ user_id │ username │ location │ activity │ activity_timestamp │ -├─────────────────┼──────────────────┼──────────────────┼──────────────────┼─────────────────────┤ -│ 103 │ Charlie │ Los Angeles │ view_profile │ 2023-12-19 09:15:00 │ -│ 104 │ Dana │ Chicago │ edit_profile │ 2023-12-19 10:00:00 │ -│ 101 │ Alice │ New York │ purchase │ 2023-12-19 10:30:00 │ -│ 102 │ Bob │ San Francisco │ login │ 2023-12-19 11:00:00 │ -│ 102 │ Bob │ San Francisco │ logout │ 2023-12-19 09:00:00 │ -└────────────────────────────────────────────────────────────────────────────────────────────────┘ +``` +┌──────────────┬───────────┬────────────────────┐ +│ customer_id │ segment │ incremental_sales │ +├──────────────┼───────────┼────────────────────┤ +│ 101 │ VIP │ 40.0 │ +│ 104 │ Standard │ 120.0 │ +└──────────────┴───────────┴────────────────────┘ ``` -#### 5. Task Update for Real-Time Data Processing +Rows stay in each stream until `WITH CONSUME` runs, so inserts that arrive at different times are still matched on the next run. Leave the streams unconsumed when you expect more related rows, then rerun the query to pick up the incremental delta. -To keep the `user_activity_profiles` table current, it's important to periodically synchronize it with data from the `activities_stream`. This synchronization should be aligned with the update intervals of the `user_activities` table, ensuring that the user_activity_profiles accurately reflects the latest user activities and profiles for real-time data analysis. +## Stream Workflow Notes -The Databend `TASK` command(currently in private preview), can be utilized to define a task that updates the `user_activity_profiles` table every minute or seconds. +**Consumption** +- Streams are drained inside a transaction: `INSERT INTO target SELECT ... FROM stream` empties the stream only when the statement commits. +- Only one consumer can succeed at a time; other concurrent statements roll back. + +**Modes** +- Append-Only streams capture INSERTs only and are ideal for append-heavy workloads. +- Standard streams emit updates and deletes as long as you consume them; late-arriving updates remain until the next run. + +**Hidden Columns** +- Streams expose `change$action`, `change$is_update`, and `change$row_id`; use them to understand how Databend recorded each row. +- Base tables gain `_origin_version`, `_origin_block_id`, `_origin_block_row_num` for debugging row provenance. + +**Integrations** +- Pair streams with tasks using `task_history('', )` for scheduled incremental loads. +- Use [`WITH CONSUME`](02-task.md) when you want to drain only the latest delta. -```sql --- Define a task in Databend -CREATE TASK user_activity_task -WAREHOUSE = 'default' -SCHEDULE = 1 MINUTE --- Trigger task when new data arrives in activities_stream -WHEN stream_status('activities_stream') AS - -- Insert new records into user_activity_profiles - INSERT INTO user_activity_profiles - SELECT - -- Join activities_stream with user_profiles based on user_id - a.user_id, p.username, p.location, a.activity, a.timestamp - FROM - activities_stream AS a - JOIN user_profiles AS p - ON a.user_id = p.user_id - -- Include only rows where the action is 'INSERT' - WHERE a.change$action = 'INSERT'; -``` diff --git a/docs/en/guides/40-load-data/05-continuous-data-pipelines/02-task.md b/docs/en/guides/40-load-data/05-continuous-data-pipelines/02-task.md index 5892a3011e..7cd212d7c4 100644 --- a/docs/en/guides/40-load-data/05-continuous-data-pipelines/02-task.md +++ b/docs/en/guides/40-load-data/05-continuous-data-pipelines/02-task.md @@ -3,167 +3,213 @@ title: Automating Data Loading with Tasks sidebar_label: Task --- -A task encapsulates specific SQL statements that are designed to be executed either at predetermined intervals, triggered by specific events, or as part of a broader sequence of tasks. Tasks in Databend Cloud are commonly used to regularly capture data changes from streams, such as newly added records, and then synchronize this data with designated target destinations. Furthermore, tasks offer support for [Webhook](https://en.wikipedia.org/wiki/Webhook) and other messaging systems, facilitating the delivery of error messages and notifications as needed. +Tasks wrap SQL so Databend can run it for you on a schedule or when a condition is met. Keep the following knobs in mind when you define one with [CREATE TASK](/sql/sql-commands/ddl/task/ddl-create_task): -## Creating a Task +![alt text](/img/load/task.png) -This topic breaks down the procedure of creating a task in Databend Cloud. In Databend Cloud, you create a task using the [CREATE TASK](/sql/sql-commands/ddl/task/ddl-create_task) command. When creating a task, follow the illustration below to design the workflow: +- **Name & warehouse** – every task needs a warehouse. + ```sql + CREATE TASK ingest_orders + WAREHOUSE = 'etl_wh' + AS SELECT 1; + ``` +- **Trigger** – fixed interval, CRON, or `AFTER another_task`. + ```sql + CREATE TASK mytask + WAREHOUSE = 'default' + SCHEDULE = 2 MINUTE + AS ...; + ``` +- **Guards** – only run when a predicate is true. + ```sql + CREATE TASK mytask + WAREHOUSE = 'default' + WHEN STREAM_STATUS('mystream') = TRUE + AS ...; + ``` +- **Error handling** – pause after N failures or send notifications. + ```sql + CREATE TASK mytask + WAREHOUSE = 'default' + SUSPEND_TASK_AFTER_NUM_FAILURES = 3 + AS ...; + ``` +- **SQL payload** – whatever you place after `AS` is what the task executes. + ```sql + CREATE TASK bump_age + WAREHOUSE = 'default' + SCHEDULE = USING CRON '0 0 1 1 * *' 'UTC' + AS UPDATE employees SET age = age + 1; + ``` + +## Example 1: Scheduled Copy + +Continuously generate sensor data, land it as Parquet, and load it into a table. Replace `'etl_wh_small'` with **your** warehouse name in every `CREATE/ALTER TASK` statement. + +### Step 1. Prepare demo objects -![alt text](/img/load/task.png) +```sql +-- Create a playground schema and target table +CREATE DATABASE IF NOT EXISTS task_demo; +USE task_demo; + +CREATE OR REPLACE TABLE sensor_events ( + event_time TIMESTAMP, + sensor_id INT, + temperature DOUBLE, + humidity DOUBLE +); -1. Set a name for the task. -2. Specify a warehouse to run the task. To create a warehouse, see [Work with Warehouses](/guides/cloud/using-databend-cloud/warehouses). -3. Determine how to trigger the task to run. - - - You can schedule the task to run by specifying the interval in minutes or seconds, or by using a CRON expression with an optional time zone for more precise scheduling. - -```sql title='Examples:' --- This task runs every 2 minutes -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -SCHEDULE = 2 MINUTE -AS ... - --- This task runs daily at midnight (local time) in the Asia/Tokyo timezone -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -SCHEDULE = USING CRON '0 0 0 * * *' 'Asia/Tokyo' -AS ... +-- Stage that will store the generated Parquet files +CREATE OR REPLACE STAGE sensor_events_stage; ``` - - Alternatively, you can establish dependencies between tasks, setting the task as a child task in a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph). +### Step 2. Task 1 — Generate files -```sql title='Examples:' --- This task is dependent on the completion of the 'task_root' task in the DAG -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -AFTER task_root -AS ... +`task_generate_data` writes 100 random readings to the stage once per minute. Each execution produces a fresh Parquet file that downstream consumers can ingest. + +```sql +CREATE OR REPLACE TASK task_generate_data + WAREHOUSE = 'etl_wh_small' -- replace with your warehouse + SCHEDULE = 1 MINUTE +AS +COPY INTO @sensor_events_stage +FROM ( + SELECT + NOW() AS event_time, + number AS sensor_id, + 20 + RAND() * 5 AS temperature, + 60 + RAND() * 10 AS humidity + FROM numbers(100) +) +FILE_FORMAT = (TYPE = PARQUET); ``` -4. Specify the condition under which the task will execute, allowing you to optionally control task execution based on a boolean expression. +### Step 3. Task 2 — Load the files -```sql title='Examples:' --- This task runs every 2 minutes and executes the SQL after AS only if 'mystream' contains data changes -CREATE TASK mytask -WAREHOUSE = 'default' -SCHEDULE = 2 MINUTE -// highlight-next-line -WHEN STREAM_STATUS('mystream') = TRUE -AS ... +`task_consume_data` scans the stage on the same cadence and copies every newly generated Parquet file into the `sensor_events` table. The `PURGE = TRUE` clause cleans up files that were already ingested. + +```sql +CREATE OR REPLACE TASK task_consume_data + WAREHOUSE = 'etl_wh_small' -- replace with your warehouse + SCHEDULE = 1 MINUTE +AS +COPY INTO sensor_events +FROM @sensor_events_stage +PATTERN = '.*[.]parquet' +FILE_FORMAT = (TYPE = PARQUET) +PURGE = TRUE; ``` -5. Specify what to do if the task results in an error, including options such as setting the number of consecutive failures to suspend the task and specifying the notification integration for error notifications. For more information about setting an error notification, see [Configuring Notification Integrations](#configuring-notification-integrations). - -```sql title='Examples:' --- This task will suspend after 3 consecutive failures -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -SUSPEND_TASK_AFTER_NUM_FAILURES = 3 -AS ... - --- This task will utilize the 'my_webhook' integration for error notifications. -CREATE TASK mytask -WAREHOUSE = 'default' -// highlight-next-line -ERROR_INTEGRATION = 'my_webhook' -AS ... +### Step 4. Resume tasks + +```sql +ALTER TASK task_generate_data RESUME; +ALTER TASK task_consume_data RESUME; ``` -6. Specify the SQL statement the task will execute. +Both tasks start in a suspended state until you resume them. Expect the first files and copies to happen within the next minute. -```sql title='Examples:' --- This task updates the 'age' column in the 'employees' table, incrementing it by 1 every year. -CREATE TASK mytask -WAREHOUSE = 'default' -SCHEDULE = USING CRON '0 0 1 1 * *' 'UTC' -// highlight-next-line -AS -UPDATE employees -SET age = age + 1; +### Step 5. Monitor the pipeline + +```sql +-- Confirm that the tasks are running +SHOW TASKS LIKE 'task_%'; + +-- Inspect files on the stage (should shrink as PURGE removes processed files) +LIST @sensor_events_stage; + +-- Check the ingested rows +SELECT * +FROM sensor_events +ORDER BY event_time DESC +LIMIT 5; + +-- Review recent executions for troubleshooting +SELECT * +FROM task_history('task_consume_data', 5); + +-- Change configuration later if needed +ALTER TASK task_consume_data + SCHEDULE = 30 SECOND, + WAREHOUSE = 'etl_wh_medium'; -- replace with your warehouse ``` -## Viewing Created Tasks - -To view all tasks created by your organization, log in to Databend Cloud and go to **Data** > **Task**. You can see detailed information for each task, including their status and schedules. - -To view the task run history, go to **Monitor** > **Task History**. You can see each run of tasks with their result, completion time, and other details. - -## Configuring Notification Integrations - -Databend Cloud allows you to configure error notifications for a task, automating the process of sending notifications when an error occurs during the task execution. It currently supports Webhook integrations, facilitating seamless communication of error events to external systems or services in real-time. - -### Task Error Payload - -A task error payload refers to the data or information that is sent as part of an error notification when a task encounters an error during its execution. This payload typically includes details about the error, such as error codes, error messages, timestamps, and potentially other relevant contextual information that can help in diagnosing and resolving the issue. - -```json title='Task Error Payload Example:' -{ - "version": "1.0", - "messageId": "063e40ab-0b55-439e-9cd2-504c496e1566", - "messageType": "TASK_FAILED", - "timestamp": "2024-03-19T02:37:21.160705788Z", - "tenantId": "tn78p61xz", - "taskName": "my_task", - "taskId": "15", - "rootTaskName": "my_task", - "rootTaskId": "15", - "messages": [ - { - "runId": "unknown", - "scheduledTime": "2024-03-19T02:37:21.157169855Z", - "queryStartTime": "2024-03-19T02:37:21.043090475Z", - "completedTime": "2024-03-19T02:37:21.157169205Z", - "queryId": "88bb9d5d-5d5e-4e52-92cc-b1953406245a", - "errorKind": "UnexpectedError", - "errorCode": "500", - "errorMessage": "query sync failed: All attempts fail:\n#1: query error: code: 1006, message: divided by zero while evaluating function `divide(1, 0)`" - } - ] -} +You can suspend either task with `ALTER TASK ... SUSPEND` when you finish testing. + +### Step 6. Update tasks + +You can change schedules, warehouses, or even the SQL payload without dropping the task: + +```sql +-- Tweak the schedule and warehouse +ALTER TASK task_consume_data + SCHEDULE = 30 SECOND, + WAREHOUSE = 'etl_wh_medium'; -- replace with your warehouse + +-- Update the SQL payload (replace the existing body) +ALTER TASK task_consume_data + AS +COPY INTO sensor_events +FROM @sensor_events_stage +FILE_FORMAT = (TYPE = PARQUET); + +-- Resume after edits (tasks suspend when their SQL changes) +ALTER TASK task_consume_data RESUME; + +-- Review execution history for verification +SELECT * +FROM task_history('task_consume_data', 5) +ORDER BY completed_time DESC; ``` -### Usage Examples +`TASK_HISTORY` returns status, timing, and query IDs, making it easy to double-check changes. -Before configuring error notifications for a task, you must create a notification integration with the [CREATE NOTIFICATION INTEGRATION](/sql/sql-commands/ddl/notification/ddl-create-notification) command. The following is an example of creating and configuring a notification integration for a task. The example utilizes [Webhook.site](http://webhook.site) to simulate the messaging system, receiving payloads from Databend Cloud. +## Example 2: Stream-Triggered Merge -1. Open the [Webhook.site](http://webhook.site) in your web browser, and obtain the URL of your Webhook. +Use `WHEN STREAM_STATUS(...)` to fire only when a stream has new rows. Reuse the `sensor_events` table from Example 1. -![alt text](/img/load/webhook-1.png) +### Step 1. Create stream + latest table -2. In Databend Cloud, create a notification integration, and then create a task with the notification integration: +```sql +-- Create a stream on the sensor table (Standard mode to capture every mutation) +CREATE OR REPLACE STREAM sensor_events_stream + ON TABLE sensor_events + APPEND_ONLY = false; + +-- Target table that keeps only the latest copy of each row +CREATE OR REPLACE TABLE sensor_events_latest AS +SELECT * +FROM sensor_events +WHERE 1 = 0; +``` + +### Step 2. Create the conditional task ```sql --- Create a task named 'my_task' to run every minute, with error notifications sent to 'my_webhook'. --- Intentionally divide by zero to generate an error. -CREATE TASK my_task -WAREHOUSE = 'default' -SCHEDULE = 1 MINUTE -ERROR_INTEGRATION = 'my_webhook' +CREATE OR REPLACE TASK task_stream_merge + WAREHOUSE = 'etl_wh_small' -- replace with your warehouse + SCHEDULE = 1 MINUTE + WHEN STREAM_STATUS('task_demo.sensor_events_stream') = TRUE AS -SELECT 1 / 0; - --- Create a notification integration named 'my_webhook' for sending webhook notifications. -CREATE NOTIFICATION INTEGRATION my_webhook -TYPE = WEBHOOK -ENABLED = TRUE -WEBHOOK = ( - url = '', - method = 'POST' -); +INSERT INTO sensor_events_latest +SELECT * +FROM sensor_events_stream; --- Resume the task after creation -ALTER TASK my_task RESUME; +ALTER TASK task_stream_merge RESUME; ``` -3. Wait for a moment, and you'll notice that your webhook starts to receive the payload from the created task. +### Step 3. Verify the behavior -![alt text](/img/load/webhook-2.png) +```sql +SELECT * +FROM sensor_events_latest +ORDER BY event_time DESC +LIMIT 5; + +SELECT * +FROM task_history('task_stream_merge', 5); +``` -## Usage Examples +The task fires only when `STREAM_STATUS('.')` returns `TRUE`. Always prefix the stream with its database (for example `task_demo.sensor_events_stream`) so the task can resolve it regardless of the current schema, and use your own warehouse name in every `CREATE/ALTER TASK`. -See [Example: Tracking and Transforming Data in Real-Time](01-stream.md#example-tracking-and-transforming-data-in-real-time) for a complete demo on how to capture data changes with a stream and sync them with a task. diff --git a/docs/en/guides/40-load-data/05-continuous-data-pipelines/index.md b/docs/en/guides/40-load-data/05-continuous-data-pipelines/index.md index 21d9fa2309..1b0f4340f9 100644 --- a/docs/en/guides/40-load-data/05-continuous-data-pipelines/index.md +++ b/docs/en/guides/40-load-data/05-continuous-data-pipelines/index.md @@ -2,26 +2,24 @@ title: Continuous Data Pipelines --- -## Introduction to Data Pipelines +Build end-to-end change data capture (CDC) flows in Databend with two primitives: -Data pipelines automate the process of moving and changing data from different sources into Databend. They make sure data flows smoothly and are vital for processing and analyzing data quickly and continuously. +- **Streams** capture every INSERT/UPDATE/DELETE until you consume them. +- **Tasks** run SQL on a schedule or when a stream reports new rows. -In Continuous Data Pipelines, a special feature called **Change Data Capture (CDC)** plays a key role. With Databend, CDC becomes easy and efficient, requiring only a few simple commands through Streams and Tasks. +## Quick Navigation -## Understanding Change Data Capture (CDC) +- [Example 1: Append-Only Stream Copy](./01-stream.md#example-1-append-only-stream-copy) – capture inserts and consume them into another table. +- [Example 2: Standard Stream Updates](./01-stream.md#example-2-standard-stream-updates) – see how updates/deletes appear and why only one consumer can drain a stream. +- [Example 3: Incremental Stream Metrics](./01-stream.md#example-3-incremental-stream-metrics) – join multiple streams with `WITH CONSUME` to compute deltas batch by batch. +- [Example 1: Scheduled Copy Task](./02-task.md#example-1-scheduled-copy) – generate and load files with two recurring tasks. +- [Example 2: Stream-Triggered Merge](./02-task.md#example-2-stream-triggered-merge) – fire a task only when `STREAM_STATUS` is true. -CDC is a process where a stream object captures insertions, updates, and deletions—applied to database tables. It includes metadata about each change, enabling actions based on the modified data. CDC in Databend tracks changes at the row level in a source table, creating a "change table" that reflects modifications between two transactional points in time. +## Why CDC in Databend -## Advantages of Using Change Data Capture (CDC) +- **Lightweight** – streams keep the latest change set without duplicating full tables. +- **Transactional** – stream consumption succeeds or rolls back with your SQL statement. +- **Incremental** – rerun the same query with `WITH CONSUME` to process only new rows. +- **Schedulable** – tasks let you automate the copy, merge, or alert logic you already expressed in SQL. -1. **Fast Real-Time Data Loading**: Streamlines the loading of real-time data from transactional databases, almost in seconds. -2. **Doesn't Affect Original Data**: Safe to use as it doesn’t damage the data or the systems where the data comes from. -3. **Overcoming Limitations of Batch ETL**: Surpasses traditional batch ETL methods, which are slower and less effective for continuous data updates. - -## Key Features of Databend's Continuous Data Pipelines - -Databend enhances continuous data pipelines with the following features: - -- **Continuous Data Tracking and Transformation**: Enables real-time tracking and transformation of data. [Discover more about Tracking and Transforming Data via Streams](./01-stream.md). - -- **Recurring Tasks**: Supports the scheduling and management of recurring data processing tasks to ensure efficiency and reliability of the data pipeline. This feature is currently in private preview. +Dive into the stream examples first, then combine them with tasks to automate your pipeline.