From 1677197cd252bb0f223f51dd6067a633132f64e8 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 3 Nov 2025 11:21:58 +0800 Subject: [PATCH 1/5] add streaming job doc --- docs/data-operate/import/streaming-job.md | 263 ++++++++++++++++++ .../data-operate/import/streaming-job.md | 262 +++++++++++++++++ sidebars.json | 3 +- 3 files changed, 527 insertions(+), 1 deletion(-) create mode 100644 docs/data-operate/import/streaming-job.md create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md diff --git a/docs/data-operate/import/streaming-job.md b/docs/data-operate/import/streaming-job.md new file mode 100644 index 0000000000000..8103dfe430e3f --- /dev/null +++ b/docs/data-operate/import/streaming-job.md @@ -0,0 +1,263 @@ +--- +{ + "title": "Continuous Load", + "language": "en" +} +--- + +## Overview + +Doris allows you to create a continuous import task using a Job + TVF approach. After submitting the Job, Doris continuously runs the import job, querying the TVF in real time and writing the data into the Doris table. + +## Supported TVFs + +[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF + +## Basic Principles + +### S3 + +Iterates through the files in the specified directory of S3, splitting each file into a list and writing it to the Doris table in small batches. + +**Incremental Read Method** + +After creating the task, Doris continuously reads data from the specified path and polls for new files at a fixed frequency. + +Note: The name of a new file must be lexicographically greater than the name of the last imported file; otherwise, Doris will not treat it as a new file. For example, if files are named file1, file2, and file3, they will be imported sequentially; if a new file named file0 is added later, Doris will not import it because it is lexicographically less than the last imported file, file3. + +## Quick Start + +### Creating an Import Job + +Assume that files ending in CSV are periodically generated in the S3 directory. You can then create a Job. + +```SQL +CREATE JOB my_job +ON STREAMING +DO +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### Check import status + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: \N + Properties: \N + CurrentOffset: {"endFile":"s3/demo/test/1.csv"} + EndOffset: {"endFile":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + ErrorMsg: \N + JobRuntimeMsg: \N +``` + +### Pause import job + +```SQL +PAUSE JOB WHERE jobname = ; +``` + +### Resume import job + +```SQL +RESUME JOB where jobName = ; +``` + +### Modify import job + +```SQL +-- -- Supports modifying Job properties and insert statements +Alter Job jobName +PROPERTIES( + "session.insert_max_filter_ratio"="0.5" +) +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### Delete imported jobs + +```SQL +DROP JOB where jobName = ; +``` + +## Reference + +### Import command + +创建一个 Job + TVF 常驻导入作业语法如下: + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + +The module description is as follows: + +| Module | Description | + +| -------------- | ------------------------------------------------------------ | +| job_name | Task name | +| job_properties | General import parameters used to specify the Job | +| comment | Remarks used to describe the Job | +| Insert_Command | SQL to execute; currently only Insert into table select * from s3() is supported | + +### Importing Parameters + +#### FE Configuration Parameters + +| Parameter | Default Value | | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads used to execute StreamingTasks | +| max_streaming_task_show_count | 100 | Maximum number of task execution records kept in memory for a StreamingTask | + +#### Import Configuration Parameters + +| Parameter | Default Value | Description | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | +| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value. | +| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value. | +| max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. | + +### Import Status + +#### Job + +After a job is successfully submitted, you can execute **select \* from job("insert") where ExecuteType = 'Streaming'** to check the current status of the job. + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://wd-test123/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 5 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"} + CurrentOffset: {"endFile":"s3/demo/test/1.csv"} + EndOffset: {"endFile":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +The specific parameter results are displayed as follows: + +| Result Columns | Description | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job Name | +| Definer | Job Definer | +| ExecuteType | Job scheduling type: *ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | Recurring strategy. Used in normal Insert operations; empty when ExecuteType=Streaming | +| Status | Job status | +| ExecuteSql | Job's Insert SQL statement | +| CreateTime | Job creation time | +| SucceedTaskCount | Number of successful tasks | +| FailedTaskCount | Number of failed tasks | +| CanceledTaskCount | Number of canceled tasks | +| Comment | Job comment | +| Properties | Job properties | +| CurrentOffset | Job's current completion offset. Only `ExecuteType=Streaming` has a value. | +| EndOffset | The maximum EndOffset obtained by the Job from the data source. Only `ExecuteType=Streaming` has a value. | +| LoadStatistic | Job statistics. | +| ErrorMsg | Error messages during Job execution. | +| JobRuntimeMsg | Some runtime information for the Job. + +#### Task + +You can execute `select \* from tasks(type='insert') where jobId='1758534452459'` to view the running status of each Task. + +Note: Only the latest Task information will be retained. + +```SQL +mysql> select * from tasks(type='insert') where jobId='1758534452459'\G +*************************** 1. row *************************** + TaskId: 1758534723330 + JobId: 1758534452459 + JobName: test_streaming_insert_job_name + Label: 1758534452459_1758534723330 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-09-22 17:52:55 + StartTime: \N + FinishTime: \N + TrackingUrl: \N +LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + User: root +FirstErrorMsg: \N +RunningOffset: {"endFile": "s3/demo/test/1.csv"} +``` + +| Results Columns | Description | +| ------------- | ---------------------------------------------------- | +| TaskId | Task ID | +| JobID | JobID | +| JobName | Job Name | +| Label | Label of Insert | +| Status | Status of Task | +| ErrorMsg | Task failure information | +| CreateTime | Task creation time | +| StartTime | Task start time | +| FinishTime | Task completion time | +| TrackingUrl | Error URL of Insert | +| LoadStatistic | Task statistics | +| User | Executor of task | +| FirstErrorMsg | Information about the first data quality error in a normal InsertTask | +| RunningOffset | Offset information of the current Task synchronization. Only has a value if Job.ExecuteType=Streaming | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md new file mode 100644 index 0000000000000..26787c26452d9 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md @@ -0,0 +1,262 @@ +--- +{ + "title": "持续导入", + "language": "zh-CN" +} +--- + +## 概述 + +Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 + +## 支持的 TVF + +[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF + +## 基本原理 + +### S3 + +遍历 S3 指定目录的文件,对文件进行拆分成文件列表,以小批次的文件列表的方式写入到 Doris 表中。 + +**增量读取方式** + +创建任务后,Doris 会持续从指定路径中读取数据,并以固定频率轮询是否有新文件。 + +注意:新文件的名称必须按字典序大于上一次已导入的文件名,否则 Doris 不会将其作为新文件处理。比如,文件命名为 file1、file2、file3 时会按顺序导入;如果随后新增一个 file0,由于它在字典序上小于最后已导入的文件 file3,Doris 将不会导入该文件。 + +## 快速上手 + +### 创建导入作业 + +假设 S3 的目录下,会定期的产生以 CSV 结尾的文件。此时可以创建 Job + +```SQL +CREATE JOB my_job +ON STREAMING +DO +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### 查看导入状态 + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: \N + Properties: \N + CurrentOffset: {"endFile":"s3/demo/test/1.csv"} + EndOffset: {"endFile":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + ErrorMsg: \N + JobRuntimeMsg: \N +``` + +### 暂停导入作业 + +```SQL +PAUSE JOB WHERE jobname = ; +``` + +### 恢复导入作业 + +```SQL +RESUME JOB where jobName = ; +``` + +### 修改导入作业 + +```SQL +-- 支持修改Job的properties和insert语句 +Alter Job jobName +PROPERTIES( + "session.insert_max_filter_ratio"="0.5" +) +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### 删除导入作业 + +```SQL +DROP JOB where jobName = ; +``` + +## 参考手册 + +### 导入命令 + +创建一个 Job + TVF 常驻导入作业语法如下: + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + +创建的模块说明如下: + +| 模块 | 说明 | +| -------------- | ------------------------------------------------------------ | +| job_name | 任务名 | +| job_properties | 用于指定 Job 的通用导入参数 | +| comment | 用于描述 Job 作业的备注信息 | +| Insert_Command | 用于执行的 SQL,目前只支持 Insert into table select * from s3() | + +### 导入参数 + +#### FE 配置参数 + +| 参数 | 默认值 | | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | + +#### 导入配置参数 + +| 参数 | 默认值 | 说明 | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | +| s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | +| s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | +| max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | + +### 导入状态 + +#### Job + +Job 提交成功后,可以执行 **select \* from job("insert") where ExecuteType = 'Streaming'** 来查看 Job 当前的状态 + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://wd-test123/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 5 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"} + CurrentOffset: {"endFile":"s3/demo/test/1.csv"} + EndOffset: {"endFile":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +具体显示参数结果如下 + +| 结果列 | 说明 | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job 名称 | +| Definer | job 定义者 | +| ExecuteType | Job 调度的类型:*ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | 循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空 | +| Status | Job 状态 | +| ExecuteSql | Job 的 Insert SQL 语句 | +| CreateTime | job 创建时间 | +| SucceedTaskCount | 成功任务数量 | +| FailedTaskCount | 失败任务数量 | +| CanceledTaskCount | 取消任务数量 | +| Comment | job 注释 | +| Properties | job 的属性 | +| CurrentOffset | Job 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值 | +| EndOffset | Job 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值 | +| LoadStatistic | Job 的统计信息 | +| ErrorMsg | Job 执行的错误信息 | +| JobRuntimeMsg | Job 运行时的一些提示信息 | + +#### Task + +可以执行**select \* from tasks(type='insert') where jobId='1758534452459'** 来查看每次 Task 的运行状态。 + +注:只会保留当前最新的一次 Task 信息。 + +```SQL +mysql> select * from tasks(type='insert') where jobId='1758534452459'\G +*************************** 1. row *************************** + TaskId: 1758534723330 + JobId: 1758534452459 + JobName: test_streaming_insert_job_name + Label: 1758534452459_1758534723330 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-09-22 17:52:55 + StartTime: \N + FinishTime: \N + TrackingUrl: \N +LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + User: root +FirstErrorMsg: \N +RunningOffset: {"endFile": "s3/demo/test/1.csv"} +``` + +| 结果列 | 说明 | +| ------------- | ---------------------------------------------------- | +| TaskId | 任务 ID | +| JobID | JobID | +| JobName | Job 名称 | +| Label | Insert 的 Label | +| Status | Task 的状态 | +| ErrorMsg | task 失败信息 | +| CreateTime | Task 的创建时间 | +| StartTime | Task 的开始时间 | +| FinishTime | Task 的完成时间 | +| TrackingUrl | Insert 的错误 URL | +| LoadStatistic | Task 的统计信息 | +| User | task 的执行者 | +| FirstErrorMsg | 普通的 InsertTask 第一次数据质量错误的信息 | +| RunningOffset | 当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值 | \ No newline at end of file diff --git a/sidebars.json b/sidebars.json index 042e024b66e6f..fb3ae27b0fffe 100644 --- a/sidebars.json +++ b/sidebars.json @@ -228,7 +228,8 @@ "data-operate/import/load-internals/load-internals", "data-operate/import/load-internals/routine-load-internals" ] - } + }, + "data-operate/import/streaming-job" ] }, { From 11e6e39e8b4d1f45ff07cfb8c10473dacfdbcdd2 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 10 Nov 2025 11:53:22 +0800 Subject: [PATCH 2/5] chang key --- docs/data-operate/import/streaming-job.md | 10 +++++----- .../current/data-operate/import/streaming-job.md | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/data-operate/import/streaming-job.md b/docs/data-operate/import/streaming-job.md index 8103dfe430e3f..3ce9b036943eb 100644 --- a/docs/data-operate/import/streaming-job.md +++ b/docs/data-operate/import/streaming-job.md @@ -73,8 +73,8 @@ SELECT * FROM S3 CanceledTaskCount: 0 Comment: \N Properties: \N - CurrentOffset: {"endFile":"s3/demo/test/1.csv"} - EndOffset: {"endFile":"s3/demo/test/1.csv"} + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} ErrorMsg: \N JobRuntimeMsg: \N @@ -191,8 +191,8 @@ SELECT * FROM S3 CanceledTaskCount: 0 Comment: Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"} - CurrentOffset: {"endFile":"s3/demo/test/1.csv"} - EndOffset: {"endFile":"s3/demo/test/1.csv"} + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0} ErrorMsg: \N ``` @@ -242,7 +242,7 @@ mysql> select * from tasks(type='insert') where jobId='1758534452459'\G LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} User: root FirstErrorMsg: \N -RunningOffset: {"endFile": "s3/demo/test/1.csv"} +RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"} ``` | Results Columns | Description | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md index 26787c26452d9..257aba4b1f2d4 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data-operate/import/streaming-job.md @@ -73,8 +73,8 @@ SELECT * FROM S3 CanceledTaskCount: 0 Comment: \N Properties: \N - CurrentOffset: {"endFile":"s3/demo/test/1.csv"} - EndOffset: {"endFile":"s3/demo/test/1.csv"} + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} ErrorMsg: \N JobRuntimeMsg: \N @@ -190,8 +190,8 @@ SELECT * FROM S3 CanceledTaskCount: 0 Comment: Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"} - CurrentOffset: {"endFile":"s3/demo/test/1.csv"} - EndOffset: {"endFile":"s3/demo/test/1.csv"} + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0} ErrorMsg: \N ``` @@ -241,7 +241,7 @@ mysql> select * from tasks(type='insert') where jobId='1758534452459'\G LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} User: root FirstErrorMsg: \N -RunningOffset: {"endFile": "s3/demo/test/1.csv"} +RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"} ``` | 结果列 | 说明 | From 97172002fa4ebebab4438f80a69ce796a4b1a414 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 10 Nov 2025 12:08:17 +0800 Subject: [PATCH 3/5] fix --- .../data-operate/import/streaming-job.md | 262 +++++++++++++++++ sidebars.ts | 1 + .../data-operate/import/streaming-job.md | 263 ++++++++++++++++++ 3 files changed, 526 insertions(+) create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md create mode 100644 versioned_docs/version-4.x/data-operate/import/streaming-job.md diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md new file mode 100644 index 0000000000000..257aba4b1f2d4 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/data-operate/import/streaming-job.md @@ -0,0 +1,262 @@ +--- +{ + "title": "持续导入", + "language": "zh-CN" +} +--- + +## 概述 + +Doris 可以通过 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 + +## 支持的 TVF + +[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF + +## 基本原理 + +### S3 + +遍历 S3 指定目录的文件,对文件进行拆分成文件列表,以小批次的文件列表的方式写入到 Doris 表中。 + +**增量读取方式** + +创建任务后,Doris 会持续从指定路径中读取数据,并以固定频率轮询是否有新文件。 + +注意:新文件的名称必须按字典序大于上一次已导入的文件名,否则 Doris 不会将其作为新文件处理。比如,文件命名为 file1、file2、file3 时会按顺序导入;如果随后新增一个 file0,由于它在字典序上小于最后已导入的文件 file3,Doris 将不会导入该文件。 + +## 快速上手 + +### 创建导入作业 + +假设 S3 的目录下,会定期的产生以 CSV 结尾的文件。此时可以创建 Job + +```SQL +CREATE JOB my_job +ON STREAMING +DO +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### 查看导入状态 + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: \N + Properties: \N + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + ErrorMsg: \N + JobRuntimeMsg: \N +``` + +### 暂停导入作业 + +```SQL +PAUSE JOB WHERE jobname = ; +``` + +### 恢复导入作业 + +```SQL +RESUME JOB where jobName = ; +``` + +### 修改导入作业 + +```SQL +-- 支持修改Job的properties和insert语句 +Alter Job jobName +PROPERTIES( + "session.insert_max_filter_ratio"="0.5" +) +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### 删除导入作业 + +```SQL +DROP JOB where jobName = ; +``` + +## 参考手册 + +### 导入命令 + +创建一个 Job + TVF 常驻导入作业语法如下: + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + +创建的模块说明如下: + +| 模块 | 说明 | +| -------------- | ------------------------------------------------------------ | +| job_name | 任务名 | +| job_properties | 用于指定 Job 的通用导入参数 | +| comment | 用于描述 Job 作业的备注信息 | +| Insert_Command | 用于执行的 SQL,目前只支持 Insert into table select * from s3() | + +### 导入参数 + +#### FE 配置参数 + +| 参数 | 默认值 | | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | + +#### 导入配置参数 + +| 参数 | 默认值 | 说明 | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量,导入变量可参考 [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#导入配置参数) | +| s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | +| s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | +| max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | + +### 导入状态 + +#### Job + +Job 提交成功后,可以执行 **select \* from job("insert") where ExecuteType = 'Streaming'** 来查看 Job 当前的状态 + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://wd-test123/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 5 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"} + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +具体显示参数结果如下 + +| 结果列 | 说明 | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job 名称 | +| Definer | job 定义者 | +| ExecuteType | Job 调度的类型:*ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | 循环策略。普通的 Insert 会用到,ExecuteType=Streaming 时为空 | +| Status | Job 状态 | +| ExecuteSql | Job 的 Insert SQL 语句 | +| CreateTime | job 创建时间 | +| SucceedTaskCount | 成功任务数量 | +| FailedTaskCount | 失败任务数量 | +| CanceledTaskCount | 取消任务数量 | +| Comment | job 注释 | +| Properties | job 的属性 | +| CurrentOffset | Job 当前处理完成的 Offset。只有 ExecuteType=Streaming 才有值 | +| EndOffset | Job 获取到数据源端最大的 EndOffset。只有 ExecuteType=Streaming 才有值 | +| LoadStatistic | Job 的统计信息 | +| ErrorMsg | Job 执行的错误信息 | +| JobRuntimeMsg | Job 运行时的一些提示信息 | + +#### Task + +可以执行**select \* from tasks(type='insert') where jobId='1758534452459'** 来查看每次 Task 的运行状态。 + +注:只会保留当前最新的一次 Task 信息。 + +```SQL +mysql> select * from tasks(type='insert') where jobId='1758534452459'\G +*************************** 1. row *************************** + TaskId: 1758534723330 + JobId: 1758534452459 + JobName: test_streaming_insert_job_name + Label: 1758534452459_1758534723330 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-09-22 17:52:55 + StartTime: \N + FinishTime: \N + TrackingUrl: \N +LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + User: root +FirstErrorMsg: \N +RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"} +``` + +| 结果列 | 说明 | +| ------------- | ---------------------------------------------------- | +| TaskId | 任务 ID | +| JobID | JobID | +| JobName | Job 名称 | +| Label | Insert 的 Label | +| Status | Task 的状态 | +| ErrorMsg | task 失败信息 | +| CreateTime | Task 的创建时间 | +| StartTime | Task 的开始时间 | +| FinishTime | Task 的完成时间 | +| TrackingUrl | Insert 的错误 URL | +| LoadStatistic | Task 的统计信息 | +| User | task 的执行者 | +| FirstErrorMsg | 普通的 InsertTask 第一次数据质量错误的信息 | +| RunningOffset | 当前 Task 同步的 Offset 信息。只有 Job.ExecuteType=Streaming 才有值 | \ No newline at end of file diff --git a/sidebars.ts b/sidebars.ts index ffda36e7fb042..2ef54e4ab9d79 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -224,6 +224,7 @@ const sidebars: SidebarsConfig = { 'data-operate/import/load-internals/routine-load-internals', ], }, + "data-operate/import/streaming-job" ], }, { diff --git a/versioned_docs/version-4.x/data-operate/import/streaming-job.md b/versioned_docs/version-4.x/data-operate/import/streaming-job.md new file mode 100644 index 0000000000000..3ce9b036943eb --- /dev/null +++ b/versioned_docs/version-4.x/data-operate/import/streaming-job.md @@ -0,0 +1,263 @@ +--- +{ + "title": "Continuous Load", + "language": "en" +} +--- + +## Overview + +Doris allows you to create a continuous import task using a Job + TVF approach. After submitting the Job, Doris continuously runs the import job, querying the TVF in real time and writing the data into the Doris table. + +## Supported TVFs + +[S3](../../sql-manual/sql-functions/table-valued-functions/s3.md) TVF + +## Basic Principles + +### S3 + +Iterates through the files in the specified directory of S3, splitting each file into a list and writing it to the Doris table in small batches. + +**Incremental Read Method** + +After creating the task, Doris continuously reads data from the specified path and polls for new files at a fixed frequency. + +Note: The name of a new file must be lexicographically greater than the name of the last imported file; otherwise, Doris will not treat it as a new file. For example, if files are named file1, file2, and file3, they will be imported sequentially; if a new file named file0 is added later, Doris will not import it because it is lexicographically less than the last imported file, file3. + +## Quick Start + +### Creating an Import Job + +Assume that files ending in CSV are periodically generated in the S3 directory. You can then create a Job. + +```SQL +CREATE JOB my_job +ON STREAMING +DO +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### Check import status + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 1 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: \N + Properties: \N + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + ErrorMsg: \N + JobRuntimeMsg: \N +``` + +### Pause import job + +```SQL +PAUSE JOB WHERE jobname = ; +``` + +### Resume import job + +```SQL +RESUME JOB where jobName = ; +``` + +### Modify import job + +```SQL +-- -- Supports modifying Job properties and insert statements +Alter Job jobName +PROPERTIES( + "session.insert_max_filter_ratio"="0.5" +) +INSERT INTO db1.tbl1 +select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" +) +``` + +### Delete imported jobs + +```SQL +DROP JOB where jobName = ; +``` + +## Reference + +### Import command + +创建一个 Job + TVF 常驻导入作业语法如下: + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + +The module description is as follows: + +| Module | Description | + +| -------------- | ------------------------------------------------------------ | +| job_name | Task name | +| job_properties | General import parameters used to specify the Job | +| comment | Remarks used to describe the Job | +| Insert_Command | SQL to execute; currently only Insert into table select * from s3() is supported | + +### Importing Parameters + +#### FE Configuration Parameters + +| Parameter | Default Value | | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads used to execute StreamingTasks | +| max_streaming_task_show_count | 100 | Maximum number of task execution records kept in memory for a StreamingTask | + +#### Import Configuration Parameters + +| Parameter | Default Value | Description | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | None | Supports configuring all session variables in job_properties. For importing variables, please refer to [Insert Into Select](../../data-operate/import/import-way/insert-into-manual.md#Import Configuration Parameters) | +| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value. | +| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value. | +| max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. | + +### Import Status + +#### Job + +After a job is successfully submitted, you can execute **select \* from job("insert") where ExecuteType = 'Streaming'** to check the current status of the job. + +```SQL +select * from job(type=insert) where ExecuteType = "streaming" + Id: 1758538737484 + Name: my_job1 + Definer: root + ExecuteType: STREAMING +RecurringStrategy: \N + Status: RUNNING + ExecuteSql: INSERT INTO test.`student1` +SELECT * FROM S3 +( + "uri" = "s3://wd-test123/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" +) + CreateTime: 2025-09-22 19:24:51 + SucceedTaskCount: 5 + FailedTaskCount: 0 +CanceledTaskCount: 0 + Comment: + Properties: {"s3.max_batch_files":"2","session.insert_max_filter_ratio":"0.5"} + CurrentOffset: {"fileName":"s3/demo/test/1.csv"} + EndOffset: {"fileName":"s3/demo/test/1.csv"} + LoadStatistic: {"scannedRows":0,"loadBytes":0,"fileNumber":0,"fileSize":0} + ErrorMsg: \N +``` + +The specific parameter results are displayed as follows: + +| Result Columns | Description | +| ----------------- | ------------------------------------------------------------ | +| ID | Job ID | +| NAME | Job Name | +| Definer | Job Definer | +| ExecuteType | Job scheduling type: *ONE_TIME/RECURRING/STREAMING/MANUAL* | +| RecurringStrategy | Recurring strategy. Used in normal Insert operations; empty when ExecuteType=Streaming | +| Status | Job status | +| ExecuteSql | Job's Insert SQL statement | +| CreateTime | Job creation time | +| SucceedTaskCount | Number of successful tasks | +| FailedTaskCount | Number of failed tasks | +| CanceledTaskCount | Number of canceled tasks | +| Comment | Job comment | +| Properties | Job properties | +| CurrentOffset | Job's current completion offset. Only `ExecuteType=Streaming` has a value. | +| EndOffset | The maximum EndOffset obtained by the Job from the data source. Only `ExecuteType=Streaming` has a value. | +| LoadStatistic | Job statistics. | +| ErrorMsg | Error messages during Job execution. | +| JobRuntimeMsg | Some runtime information for the Job. + +#### Task + +You can execute `select \* from tasks(type='insert') where jobId='1758534452459'` to view the running status of each Task. + +Note: Only the latest Task information will be retained. + +```SQL +mysql> select * from tasks(type='insert') where jobId='1758534452459'\G +*************************** 1. row *************************** + TaskId: 1758534723330 + JobId: 1758534452459 + JobName: test_streaming_insert_job_name + Label: 1758534452459_1758534723330 + Status: SUCCESS + ErrorMsg: \N + CreateTime: 2025-09-22 17:52:55 + StartTime: \N + FinishTime: \N + TrackingUrl: \N +LoadStatistic: {"scannedRows":20,"loadBytes":425,"fileNumber":2,"fileSize":256} + User: root +FirstErrorMsg: \N +RunningOffset: {"startFileName":"s3/demo/1.csv","endFileName":"s3/demo/8.csv"} +``` + +| Results Columns | Description | +| ------------- | ---------------------------------------------------- | +| TaskId | Task ID | +| JobID | JobID | +| JobName | Job Name | +| Label | Label of Insert | +| Status | Status of Task | +| ErrorMsg | Task failure information | +| CreateTime | Task creation time | +| StartTime | Task start time | +| FinishTime | Task completion time | +| TrackingUrl | Error URL of Insert | +| LoadStatistic | Task statistics | +| User | Executor of task | +| FirstErrorMsg | Information about the first data quality error in a normal InsertTask | +| RunningOffset | Offset information of the current Task synchronization. Only has a value if Job.ExecuteType=Streaming | \ No newline at end of file From 7179c389ee5bb1195dee5e08d104714c4f975eeb Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 19 Nov 2025 16:49:24 +0800 Subject: [PATCH 4/5] add sql manual --- .../job/CREATE-STREAMING-JOB.md | 84 ++++++++++++++++++ .../sql-manual/sql-statements/job/DROP-JOB.md | 8 +- .../sql-statements/job/PAUSE-JOB.md | 7 +- .../sql-statements/job/RESUME-JOB.md | 7 +- .../job/CREATE-STREAMING-JOB.md | 86 +++++++++++++++++++ .../sql-manual/sql-statements/job/DROP-JOB.md | 7 +- .../sql-statements/job/PAUSE-JOB.md | 8 +- .../sql-statements/job/RESUME-JOB.md | 7 +- .../job/CREATE-STREAMING-JOB.md | 86 +++++++++++++++++++ .../sql-manual/sql-statements/job/DROP-JOB.md | 7 +- .../sql-statements/job/PAUSE-JOB.md | 7 +- .../sql-statements/job/RESUME-JOB.md | 7 +- sidebars.ts | 1 + .../job/CREATE-STREAMING-JOB.md | 84 ++++++++++++++++++ .../sql-manual/sql-statements/job/DROP-JOB.md | 7 +- .../sql-statements/job/PAUSE-JOB.md | 7 +- .../sql-statements/job/RESUME-JOB.md | 7 +- versioned_sidebars/version-4.x-sidebars.json | 1 + 18 files changed, 392 insertions(+), 36 deletions(-) create mode 100644 docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md create mode 100644 versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md diff --git a/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md new file mode 100644 index 0000000000000..6dd5874861dfd --- /dev/null +++ b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -0,0 +1,84 @@ +--- +{ +"title": "CREATE STREAMING JOB", +"language": "en" +} + +--- + +## Description + +Doris Streaming Job is a continuous import task based on the Job + TVF approach. After the Job is submitted, Doris will continuously run the import job, querying the data in TVF and writing it into the Doris table in real time. + +## Syntax + + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + + +## Required parameters + +**1. ``** +> The job name is used to uniquely identify an event within a database. The job name must be globally unique; an error will occur if a job with the same name already exists. + +**3. ``** +> The DO clause specifies the operation to be executed when the job is triggered, i.e., an SQL statement. Currently, it only supports S3 TVF. + +## Optional parameters + +**1. `job_properties`** +| Parameters | Default Values ​​| Description | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | None | Supports configuring all session variables in job_properties | +| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value | +| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value | +| max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. + +## Access Control Requirements + +The user executing this SQL command must have at least the following privileges: +| Privilege | Object | Notes | +|:--------------|:-----------|:------------------------| +| LOAD_PRIV | Database (DB) | Currently, only the **LOAD** privilege is supported to perform this operation | + +## Usage Notes + +- The TASK only retains the latest 100 records. +- Currently, only the **INSERT internal table Select * From S3(...)** operation is supported; more operations will be supported in the future. + +## Examples + +- Create a job named my_job that continuously monitors files in a specified directory on S3 and imports data from files ending in .csv into db1.tbl1. + + ```sql + CREATE JOB my_job + ON STREAMING + DO + INSERT INTO db1.`tbl1` + SELECT * FROM S3 + ( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" + ); + ``` + +## CONFIG + +**fe.conf** + +| Parameters | Default Values ​​| | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads used to execute StreamingTasks | +| max_streaming_task_show_count | 100 | Maximum number of task execution records that a StreamingTask keeps in memory | \ No newline at end of file diff --git a/docs/sql-manual/sql-statements/job/DROP-JOB.md b/docs/sql-manual/sql-statements/job/DROP-JOB.md index 4be326d61c0e1..aedea6caa1a76 100644 --- a/docs/sql-manual/sql-statements/job/DROP-JOB.md +++ b/docs/sql-manual/sql-statements/job/DROP-JOB.md @@ -24,9 +24,11 @@ DROP JOB where jobName = ; The user who executes this SQL command must have at least the following permissions: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | Database | Currently only supports **ADMIN** permissions to perform this operation | +| Privilege | Object | ExecuteType | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | Database | NO Streaming | Currently only supports **ADMIN** permissions to perform this operation | +| LOAD_PRIV | Database | Streaming |Supports **LOAD** permissions to perform this operation | + ## Examples diff --git a/docs/sql-manual/sql-statements/job/PAUSE-JOB.md b/docs/sql-manual/sql-statements/job/PAUSE-JOB.md index 91e3208cfc3ce..19bb0c21aee69 100644 --- a/docs/sql-manual/sql-statements/job/PAUSE-JOB.md +++ b/docs/sql-manual/sql-statements/job/PAUSE-JOB.md @@ -24,9 +24,10 @@ PAUSE JOB WHERE jobname = ; The user who executes this SQL command must have at least the following permissions: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | Database | Currently only supports **ADMIN** permissions to perform this operation | +| Privilege | Object | ExecuteType | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | Database | NO Streaming | Currently only supports **ADMIN** permissions to perform this operation | +| LOAD_PRIV | Database | Streaming |Supports **LOAD** permissions to perform this operation | ## Examples diff --git a/docs/sql-manual/sql-statements/job/RESUME-JOB.md b/docs/sql-manual/sql-statements/job/RESUME-JOB.md index 6854b98e02df6..00452de27276a 100644 --- a/docs/sql-manual/sql-statements/job/RESUME-JOB.md +++ b/docs/sql-manual/sql-statements/job/RESUME-JOB.md @@ -23,9 +23,10 @@ RESUME JOB where jobName = ; The user who executes this SQL command must have at least the following permissions: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | Database | Currently only supports **ADMIN** permissions to perform this operation | +| Privilege | Object | ExecuteType | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | Database | NO Streaming | Currently only supports **ADMIN** permissions to perform this operation | +| LOAD_PRIV | Database | Streaming |Supports **LOAD** permissions to perform this operation | ## Example diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md new file mode 100644 index 0000000000000..b2698b888ca55 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -0,0 +1,86 @@ +--- +{ +"title": "CREATE STREAMING JOB", +"language": "zh-CN" +} + +--- + +## 描述 + +Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 + +## 语法 + + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + + +## 必选参数 + +**1. ``** +> 作业名称,它在一个 db 中标识唯一事件。JOB 名称必须是全局唯一的,如果已经存在同名的 JOB,则会报错。 + +**3. ``** +> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF + +## 可选参数 + +**1. `job_properties`** +| 参数 | 默认值 | 说明 | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | +| s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | +| s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | +| max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | + +## 权限控制 + +执行此 SQL 命令的用户必须至少具有以下权限: + +| 权限(Privilege) | 对象(Object) | 说明(Notes) | +|:--------------|:-----------|:------------------------| +| LOAD_PRIV | 数据库(DB) | 目前仅支持 **LOAD** 权限执行此操作 | + +## 注意事项 + +- TASK 只保留最新的 100 条记录。 + +- 目前仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 + +## 示例 + +- 创建一个名为 my_job 的作业,持续监听 S3 上的指定目录的文件,执行的操作是将 csv 结尾的文件中的数据导入到 db1.tbl1 中。 + + ```sql + CREATE JOB my_job + ON STREAMING + DO + INSERT INTO db1.`tbl1` + SELECT * FROM S3 + ( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" + ); + ``` + +## CONFIG + +**fe.conf** + +| 参数 | 默认值 | | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/DROP-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/DROP-JOB.md index be3099a0a7223..ce887dc09bf0e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/DROP-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/DROP-JOB.md @@ -24,9 +24,10 @@ DROP JOB where jobName = ; 执行此 SQL 命令的用户必须至少具有以下权限: -| 权限(Privilege) | 对象(Object) | 说明(Notes) | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | 数据库(DB) | 目前仅支持 **ADMIN** 权限执行此操作 | +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | 数据库(DB) | 非 Streaming | 目前仅支持 **ADMIN** 权限执行此操作 | +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | ## 示例 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/PAUSE-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/PAUSE-JOB.md index 3d2147123dbac..d33c63b5a5fb8 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/PAUSE-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/PAUSE-JOB.md @@ -24,9 +24,11 @@ PAUSE JOB WHERE jobname = ; 执行此 SQL 命令的用户必须至少具有以下权限: -| 权限(Privilege) | 对象(Object) | 说明(Notes) | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | 数据库(DB) | 目前仅支持 **ADMIN** 权限执行此操作 | +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | 数据库(DB) | 非 Streaming | 目前仅支持 **ADMIN** 权限执行此操作 | +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | + ## 示例 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/RESUME-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/RESUME-JOB.md index 421bf06115249..24b5090564fd5 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/RESUME-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/RESUME-JOB.md @@ -23,9 +23,10 @@ RESUME JOB where jobName = ; 执行此 SQL 命令的用户必须至少具有以下权限: -| 权限(Privilege) | 对象(Object) | 说明(Notes) | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | 数据库(DB) | 目前仅支持 **ADMIN** 权限执行此操作 | +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | 数据库(DB) | 非 Streaming | 目前仅支持 **ADMIN** 权限执行此操作 | +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | ## 示例 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md new file mode 100644 index 0000000000000..8d5ac2c6c7091 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -0,0 +1,86 @@ +--- +{ +"title": "CREATE STREAMING JOB", +"language": "zh-CN" +} + +--- + +## 描述 + +Doris Streaming Job 是基于 Job + TVF 的方式,创建一个持续导入任务。在提交 Job 作业后,Doris 会持续运行该导入作业,实时的查询 TVF 中的数据写入到 Doris 表中。 + +## 语法 + + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + + +## 必选参数 + +**1. ``** +> 作业名称,它在一个 db 中标识唯一事件。JOB 名称必须是全局唯一的,如果已经存在同名的 JOB,则会报错。 + +**3. ``** +> DO 子句,它指定了 Job 作业触发时需要执行的操作,即一条 SQL 语句,目前只支持 S3 TVF + +## 可选参数 + +**1. `job_properties`** +| 参数 | 默认值 | 说明 | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | +| s3.max_batch_files | 256 | 当累计文件数达到该值时触发一次导入写入 | +| s3.max_batch_bytes | 10G | 当累计数据量达到该值时触发一次导入写入 | +| max_interval | 10s | 当上游没有新增文件或数据时,空闲的调度间隔。 | + +## 权限控制 + +执行此 SQL 命令的用户必须至少具有以下权限: + +| 权限(Privilege) | 对象(Object) | 说明(Notes) | +|:--------------|:-----------|:------------------------| +| LOAD_PRIV | 数据库(DB) | 目前仅支持 **LOAD** 权限执行此操作 | + +## 注意事项 + +- TASK 只保留最新的 100 条记录。 + +- 目前仅支持 **INSERT 内表 Select * From S3(...)** 操作,后续会支持更多的操作。 + +## 示例 + +- 创建一个名为 my_job 的作业,持续监听 S3 上的指定目录的文件,执行的操作是将 csv 结尾的文件中的数据导入到 db1.tbl1 中。 + + ```sql + CREATE JOB my_job + ON STREAMING + DO + INSERT INTO db1.`tbl1` + SELECT * FROM S3 + ( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" + ); + ``` + +## CONFIG + +**fe.conf** + +| 参数 | 默认值 | | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | 最大的 Streaming 作业数量 | +| job_streaming_task_exec_thread_num | 10 | 用于执行 StreamingTask 的线程数 | +| max_streaming_task_show_count | 100 | StreamingTask 在内存中最多保留的 task 执行记录 | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md index be3099a0a7223..ce887dc09bf0e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md @@ -24,9 +24,10 @@ DROP JOB where jobName = ; 执行此 SQL 命令的用户必须至少具有以下权限: -| 权限(Privilege) | 对象(Object) | 说明(Notes) | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | 数据库(DB) | 目前仅支持 **ADMIN** 权限执行此操作 | +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | 数据库(DB) | 非 Streaming | 目前仅支持 **ADMIN** 权限执行此操作 | +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | ## 示例 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md index 3d2147123dbac..d22d14522d290 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md @@ -24,9 +24,10 @@ PAUSE JOB WHERE jobname = ; 执行此 SQL 命令的用户必须至少具有以下权限: -| 权限(Privilege) | 对象(Object) | 说明(Notes) | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | 数据库(DB) | 目前仅支持 **ADMIN** 权限执行此操作 | +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | 数据库(DB) | 非 Streaming | 目前仅支持 **ADMIN** 权限执行此操作 | +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | ## 示例 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md index 421bf06115249..24b5090564fd5 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md @@ -23,9 +23,10 @@ RESUME JOB where jobName = ; 执行此 SQL 命令的用户必须至少具有以下权限: -| 权限(Privilege) | 对象(Object) | 说明(Notes) | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | 数据库(DB) | 目前仅支持 **ADMIN** 权限执行此操作 | +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | 数据库(DB) | 非 Streaming | 目前仅支持 **ADMIN** 权限执行此操作 | +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | ## 示例 diff --git a/sidebars.ts b/sidebars.ts index 2ef54e4ab9d79..94829c8f876df 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -2411,6 +2411,7 @@ const sidebars: SidebarsConfig = { label: 'Job', items: [ 'sql-manual/sql-statements/job/CREATE-JOB', + 'sql-manual/sql-statements/job/CREATE-STREAMING-JOB', 'sql-manual/sql-statements/job/PAUSE-JOB', 'sql-manual/sql-statements/job/DROP-JOB', 'sql-manual/sql-statements/job/RESUME-JOB', diff --git a/versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md new file mode 100644 index 0000000000000..6dd5874861dfd --- /dev/null +++ b/versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -0,0 +1,84 @@ +--- +{ +"title": "CREATE STREAMING JOB", +"language": "en" +} + +--- + +## Description + +Doris Streaming Job is a continuous import task based on the Job + TVF approach. After the Job is submitted, Doris will continuously run the import job, querying the data in TVF and writing it into the Doris table in real time. + +## Syntax + + +```SQL +CREATE JOB +ON STREAMING +[job_properties] +[ COMMENT ] +DO +``` + + +## Required parameters + +**1. ``** +> The job name is used to uniquely identify an event within a database. The job name must be globally unique; an error will occur if a job with the same name already exists. + +**3. ``** +> The DO clause specifies the operation to be executed when the job is triggered, i.e., an SQL statement. Currently, it only supports S3 TVF. + +## Optional parameters + +**1. `job_properties`** +| Parameters | Default Values ​​| Description | +| ------------------ | ------ | ------------------------------------------------------------ | +| session.* | None | Supports configuring all session variables in job_properties | +| s3.max_batch_files | 256 | Triggers an import write when the cumulative number of files reaches this value | +| s3.max_batch_bytes | 10G | Triggers an import write when the cumulative data volume reaches this value | +| max_interval | 10s | The idle scheduling interval when there are no new files or data added upstream. + +## Access Control Requirements + +The user executing this SQL command must have at least the following privileges: +| Privilege | Object | Notes | +|:--------------|:-----------|:------------------------| +| LOAD_PRIV | Database (DB) | Currently, only the **LOAD** privilege is supported to perform this operation | + +## Usage Notes + +- The TASK only retains the latest 100 records. +- Currently, only the **INSERT internal table Select * From S3(...)** operation is supported; more operations will be supported in the future. + +## Examples + +- Create a job named my_job that continuously monitors files in a specified directory on S3 and imports data from files ending in .csv into db1.tbl1. + + ```sql + CREATE JOB my_job + ON STREAMING + DO + INSERT INTO db1.`tbl1` + SELECT * FROM S3 + ( + "uri" = "s3://bucket/s3/demo/*.csv", + "format" = "csv", + "column_separator" = ",", + "s3.endpoint" = "s3.ap-southeast-1.amazonaws.com", + "s3.region" = "ap-southeast-1", + "s3.access_key" = "", + "s3.secret_key" = "" + ); + ``` + +## CONFIG + +**fe.conf** + +| Parameters | Default Values ​​| | +| ------------------------------------ | ------ | ------------------------------------------- | +| max_streaming_job_num | 1024 | Maximum number of Streaming jobs | +| job_streaming_task_exec_thread_num | 10 | Number of threads used to execute StreamingTasks | +| max_streaming_task_show_count | 100 | Maximum number of task execution records that a StreamingTask keeps in memory | \ No newline at end of file diff --git a/versioned_docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md b/versioned_docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md index 4be326d61c0e1..77e005a9fb715 100644 --- a/versioned_docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md +++ b/versioned_docs/version-4.x/sql-manual/sql-statements/job/DROP-JOB.md @@ -24,9 +24,10 @@ DROP JOB where jobName = ; The user who executes this SQL command must have at least the following permissions: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | Database | Currently only supports **ADMIN** permissions to perform this operation | +| Privilege | Object | ExecuteType | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | Database | NO Streaming | Currently only supports **ADMIN** permissions to perform this operation | +| LOAD_PRIV | Database | Streaming |Supports **LOAD** permissions to perform this operation | ## Examples diff --git a/versioned_docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md b/versioned_docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md index 91e3208cfc3ce..19bb0c21aee69 100644 --- a/versioned_docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md +++ b/versioned_docs/version-4.x/sql-manual/sql-statements/job/PAUSE-JOB.md @@ -24,9 +24,10 @@ PAUSE JOB WHERE jobname = ; The user who executes this SQL command must have at least the following permissions: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | Database | Currently only supports **ADMIN** permissions to perform this operation | +| Privilege | Object | ExecuteType | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | Database | NO Streaming | Currently only supports **ADMIN** permissions to perform this operation | +| LOAD_PRIV | Database | Streaming |Supports **LOAD** permissions to perform this operation | ## Examples diff --git a/versioned_docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md b/versioned_docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md index 6854b98e02df6..00452de27276a 100644 --- a/versioned_docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md +++ b/versioned_docs/version-4.x/sql-manual/sql-statements/job/RESUME-JOB.md @@ -23,9 +23,10 @@ RESUME JOB where jobName = ; The user who executes this SQL command must have at least the following permissions: -| Privilege | Object | Notes | -|:--------------|:-----------|:------------------------| -| ADMIN_PRIV | Database | Currently only supports **ADMIN** permissions to perform this operation | +| Privilege | Object | ExecuteType | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| ADMIN_PRIV | Database | NO Streaming | Currently only supports **ADMIN** permissions to perform this operation | +| LOAD_PRIV | Database | Streaming |Supports **LOAD** permissions to perform this operation | ## Example diff --git a/versioned_sidebars/version-4.x-sidebars.json b/versioned_sidebars/version-4.x-sidebars.json index c9f5e1e4cf54c..1175cbc2f83f3 100644 --- a/versioned_sidebars/version-4.x-sidebars.json +++ b/versioned_sidebars/version-4.x-sidebars.json @@ -2434,6 +2434,7 @@ "label": "Job", "items": [ "sql-manual/sql-statements/job/CREATE-JOB", + "sql-manual/sql-statements/job/CREATE-STREAMING-JOB", "sql-manual/sql-statements/job/PAUSE-JOB", "sql-manual/sql-statements/job/DROP-JOB", "sql-manual/sql-statements/job/RESUME-JOB", From 2bd4d1b170c0411ed2007d3c66332f879d967b9e Mon Sep 17 00:00:00 2001 From: wudi Date: Wed, 19 Nov 2025 17:07:59 +0800 Subject: [PATCH 5/5] add alter job --- .../sql-statements/job/ALTER-JOB.md | 93 +++++++++++++++++++ .../job/CREATE-STREAMING-JOB.md | 2 +- .../sql-statements/job/ALTER-JOB.md | 92 ++++++++++++++++++ .../job/CREATE-STREAMING-JOB.md | 2 +- .../sql-statements/job/ALTER-JOB.md | 93 +++++++++++++++++++ .../job/CREATE-STREAMING-JOB.md | 2 +- sidebars.ts | 1 + .../sql-statements/job/ALTER-JOB.md | 93 +++++++++++++++++++ .../job/CREATE-STREAMING-JOB.md | 2 +- versioned_sidebars/version-4.x-sidebars.json | 1 + 10 files changed, 377 insertions(+), 4 deletions(-) create mode 100644 docs/sql-manual/sql-statements/job/ALTER-JOB.md create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/ALTER-JOB.md create mode 100644 i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md create mode 100644 versioned_docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md diff --git a/docs/sql-manual/sql-statements/job/ALTER-JOB.md b/docs/sql-manual/sql-statements/job/ALTER-JOB.md new file mode 100644 index 0000000000000..dd6d13eda31b1 --- /dev/null +++ b/docs/sql-manual/sql-statements/job/ALTER-JOB.md @@ -0,0 +1,93 @@ +--- +{ +"title": "ALTER JOB", +"language": "en" +} +--- + +## Description + +A user can modify a job. Only jobs in the PAUSE state can be modified, and only Streaming type jobs can be modified. + +## Syntax + +```SQL +Alter Job +[job_properties] +DO +``` + +## Required Parameters + +**1. ``** +> Modify the job name of the job + +## Optional parameters + +**1. ``** +> Modify the job's attributes. + +**1. ``** +> Modify the SQL executed by the job. + + +## Access Control Requirements + +The user executing this SQL command must have at least the following privileges: + +| Privilege | Object | Job Type | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| LOAD_PRIV | Database (DB) | Streaming | Supports **LOAD** privileges to perform this operation | + +## Examples + +- Modify the session variable of my_job + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + ``` +- Modify the SQL statement for my_job + + ```SQL + Alter Job my_job + INSERT INTO db1.tbl1 + SELECT * FROM S3 + ( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ); + ``` + +- Simultaneously modify the Properties and SQL statements of my_job. + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + INSERT INTO db1.tbl1 + select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ) + ``` + +- Modify the synchronization progress of my_job + + ```sql + Alter JOB my_job + PROPERTIES( + 'offset' = '{"fileName":"regression/load/data/example_0.csv"}' + ) + ``` \ No newline at end of file diff --git a/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 6dd5874861dfd..5d208e7700ddc 100644 --- a/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/docs/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -32,7 +32,7 @@ DO ## Optional parameters -**1. `job_properties`** +**1. ``** | Parameters | Default Values ​​| Description | | ------------------ | ------ | ------------------------------------------------------------ | | session.* | None | Supports configuring all session variables in job_properties | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/ALTER-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/ALTER-JOB.md new file mode 100644 index 0000000000000..a313d14bc5b92 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/ALTER-JOB.md @@ -0,0 +1,92 @@ +--- +{ +"title": "ALTER JOB", +"language": "zh-CN" +} +--- + +## 描述 + +用户修改一个 JOB 作业。只能修改 PAUSE 状态下的 Job,并且只支持修改 Streaming 类型的 Job。 + +## 语法 + +```SQL +Alter Job +[job_properties] +DO +``` + +## 必选参数 + +**1. ``** +> 修改任务的作业名称。 + +## 可选参数 + +**1. ``** +> 修改任务的属性。 + +**1. ``** +> 修改任务执行的 SQL。 + +## 权限控制 + +执行此 SQL 命令的用户必须至少具有以下权限: + +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | + +## 示例 + +- 修改 my_job 的 session 变量 + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + ``` +- 修改 my_job 的 SQL 语句 + + ```SQL + Alter Job my_job + INSERT INTO db1.tbl1 + SELECT * FROM S3 + ( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ); + ``` + +- 同时修改 my_job 的 Properties 和 SQL 语句 + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + INSERT INTO db1.tbl1 + select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ) + ``` + +- 修改 my_job 同步的进度 + +```sql + Alter JOB my_job + PROPERTIES( + 'offset' = '{"fileName":"regression/load/data/example_0.csv"}' + ) +``` \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index b2698b888ca55..836ff85b4996d 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -32,7 +32,7 @@ DO ## 可选参数 -**1. `job_properties`** +**1. ``** | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | | session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md new file mode 100644 index 0000000000000..babe6ed8ee381 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md @@ -0,0 +1,93 @@ +--- +{ +"title": "ALTER JOB", +"language": "zh-CN" +} +--- + +## 描述 + +用户修改一个 JOB 作业。只能修改 PAUSE 状态下的 Job,并且只支持修改 Streaming 类型的 Job。 + +## 语法 + +```SQL +Alter Job +[job_properties] +DO +``` + +## 必选参数 + +**1. ``** +> 修改任务的作业名称。 + +## 可选参数 + +**1. ``** +> 修改任务的属性。 + +**1. ``** +> 修改任务执行的 SQL。 + + +## 权限控制 + +执行此 SQL 命令的用户必须至少具有以下权限: + +| 权限(Privilege) | 对象(Object) | Job 类型(ExecuteType)| 说明(Notes) | +|:--------------|:-----------|:------------------------|:------------------------| +| LOAD_PRIV | 数据库(DB) | Streaming | 支持 **LOAD** 权限执行此操作 | + +## 示例 + +- 修改 my_job 的 session 变量 + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + ``` +- 修改 my_job 的 SQL 语句 + + ```SQL + Alter Job my_job + INSERT INTO db1.tbl1 + SELECT * FROM S3 + ( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ); + ``` + +- 同时修改 my_job 的 Properties 和 SQL 语句 + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + INSERT INTO db1.tbl1 + select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ) + ``` + +- 修改 my_job 同步的进度 + +```sql + Alter JOB my_job + PROPERTIES( + 'offset' = '{"fileName":"regression/load/data/example_0.csv"}' + ) +``` \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 8d5ac2c6c7091..947426dd22b3a 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -32,7 +32,7 @@ DO ## 可选参数 -**1. `job_properties`** +**1. ``** | 参数 | 默认值 | 说明 | | ------------------ | ------ | ------------------------------------------------------------ | | session.* | 无 | 支持在 job_properties 上配置所有的 session 变量 | diff --git a/sidebars.ts b/sidebars.ts index 94829c8f876df..d655bc14c679c 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -2412,6 +2412,7 @@ const sidebars: SidebarsConfig = { items: [ 'sql-manual/sql-statements/job/CREATE-JOB', 'sql-manual/sql-statements/job/CREATE-STREAMING-JOB', + 'sql-manual/sql-statements/job/ALTER-JOB', 'sql-manual/sql-statements/job/PAUSE-JOB', 'sql-manual/sql-statements/job/DROP-JOB', 'sql-manual/sql-statements/job/RESUME-JOB', diff --git a/versioned_docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md b/versioned_docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md new file mode 100644 index 0000000000000..dd6d13eda31b1 --- /dev/null +++ b/versioned_docs/version-4.x/sql-manual/sql-statements/job/ALTER-JOB.md @@ -0,0 +1,93 @@ +--- +{ +"title": "ALTER JOB", +"language": "en" +} +--- + +## Description + +A user can modify a job. Only jobs in the PAUSE state can be modified, and only Streaming type jobs can be modified. + +## Syntax + +```SQL +Alter Job +[job_properties] +DO +``` + +## Required Parameters + +**1. ``** +> Modify the job name of the job + +## Optional parameters + +**1. ``** +> Modify the job's attributes. + +**1. ``** +> Modify the SQL executed by the job. + + +## Access Control Requirements + +The user executing this SQL command must have at least the following privileges: + +| Privilege | Object | Job Type | Notes | +|:--------------|:-----------|:------------------------|:------------------------| +| LOAD_PRIV | Database (DB) | Streaming | Supports **LOAD** privileges to perform this operation | + +## Examples + +- Modify the session variable of my_job + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + ``` +- Modify the SQL statement for my_job + + ```SQL + Alter Job my_job + INSERT INTO db1.tbl1 + SELECT * FROM S3 + ( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ); + ``` + +- Simultaneously modify the Properties and SQL statements of my_job. + + ```SQL + Alter Job my_job + PROPERTIES( + "session.insert_max_filter_ratio"="0.5" + ) + INSERT INTO db1.tbl1 + select * from S3( + "uri" = "s3://bucket/*.csv", + "s3.access_key" = "", + "s3.secret_key" = "", + "s3.region" = "", + "s3.endpoint" = "", + "format" = "" + ) + ``` + +- Modify the synchronization progress of my_job + + ```sql + Alter JOB my_job + PROPERTIES( + 'offset' = '{"fileName":"regression/load/data/example_0.csv"}' + ) + ``` \ No newline at end of file diff --git a/versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md b/versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md index 6dd5874861dfd..5d208e7700ddc 100644 --- a/versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md +++ b/versioned_docs/version-4.x/sql-manual/sql-statements/job/CREATE-STREAMING-JOB.md @@ -32,7 +32,7 @@ DO ## Optional parameters -**1. `job_properties`** +**1. ``** | Parameters | Default Values ​​| Description | | ------------------ | ------ | ------------------------------------------------------------ | | session.* | None | Supports configuring all session variables in job_properties | diff --git a/versioned_sidebars/version-4.x-sidebars.json b/versioned_sidebars/version-4.x-sidebars.json index 1175cbc2f83f3..9196f55b095a4 100644 --- a/versioned_sidebars/version-4.x-sidebars.json +++ b/versioned_sidebars/version-4.x-sidebars.json @@ -2435,6 +2435,7 @@ "items": [ "sql-manual/sql-statements/job/CREATE-JOB", "sql-manual/sql-statements/job/CREATE-STREAMING-JOB", + "sql-manual/sql-statements/job/ALTER-JOB", "sql-manual/sql-statements/job/PAUSE-JOB", "sql-manual/sql-statements/job/DROP-JOB", "sql-manual/sql-statements/job/RESUME-JOB",