Skip to content

Commit 73bfbb9

Browse files
authored
feat: impl WarehouseOptions for Private Task (#18404)
* feat: impl `WarehouseOptions` for Private Task * test: add test-private-task-warehouse.sh * chore: rebase * chore: rebase
1 parent cf97c5e commit 73bfbb9

File tree

12 files changed

+331
-23
lines changed

12 files changed

+331
-23
lines changed

src/meta/app/src/principal/task.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub enum TaskMessage {
110110
// Schedule Task will try to spawn a thread in Query to continue running according to the time set in schedule
111111
ScheduleTask(Task),
112112
// Delete the task information and try to cancel the scheduled task in the query.
113-
DeleteTask(String),
113+
DeleteTask(String, Option<WarehouseOptions>),
114114
// After Task will bind Task to the tasks in Task.afters.
115115
// When Execute Task is executed, after all the after tasks of Task are completed,
116116
// the execution will continue.
@@ -123,15 +123,15 @@ impl TaskMessage {
123123
TaskMessage::ExecuteTask(task)
124124
| TaskMessage::ScheduleTask(task)
125125
| TaskMessage::AfterTask(task) => task.task_name.as_str(),
126-
TaskMessage::DeleteTask(task_name) => task_name.as_str(),
126+
TaskMessage::DeleteTask(task_name, _) => task_name.as_str(),
127127
}
128128
}
129129

130130
pub fn ty(&self) -> TaskMessageType {
131131
match self {
132132
TaskMessage::ExecuteTask(_) => TaskMessageType::Execute,
133133
TaskMessage::ScheduleTask(_) => TaskMessageType::Schedule,
134-
TaskMessage::DeleteTask(_) => TaskMessageType::Delete,
134+
TaskMessage::DeleteTask(_, _) => TaskMessageType::Delete,
135135
TaskMessage::AfterTask(_) => TaskMessageType::After,
136136
}
137137
}
@@ -162,4 +162,13 @@ impl TaskMessage {
162162
pub fn prefix_range() -> (i64, i64) {
163163
(0, 1)
164164
}
165+
166+
pub fn warehouse_options(&self) -> Option<&WarehouseOptions> {
167+
match self {
168+
TaskMessage::ExecuteTask(task)
169+
| TaskMessage::ScheduleTask(task)
170+
| TaskMessage::AfterTask(task) => task.warehouse_options.as_ref(),
171+
TaskMessage::DeleteTask(_, warehouse_options) => warehouse_options.as_ref(),
172+
}
173+
}
165174
}

src/meta/proto-conv/src/task_from_to_protobuf_impl.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use chrono::Utc;
1717
use databend_common_meta_app::principal as mt;
1818
use databend_common_meta_app::principal::task::Status;
1919
use databend_common_protos::pb;
20+
use databend_common_protos::pb::task_message::DeleteTask;
2021
use databend_common_protos::pb::task_message::Message;
2122

2223
use crate::reader_check_msg;
@@ -155,7 +156,17 @@ impl FromToProto for mt::TaskMessage {
155156
Message::ScheduleTask(task) => {
156157
mt::TaskMessage::ScheduleTask(mt::Task::from_pb(task)?)
157158
}
158-
Message::DeleteTask(task_name) => mt::TaskMessage::DeleteTask(task_name),
159+
Message::DeleteTask(task_name) => mt::TaskMessage::DeleteTask(task_name, None),
160+
Message::DeleteTaskV2(DeleteTask {
161+
task_name,
162+
warehouse_options,
163+
}) => {
164+
let warehouse = warehouse_options.as_ref().map(|w| mt::WarehouseOptions {
165+
warehouse: w.warehouse.clone(),
166+
using_warehouse_size: w.using_warehouse_size.clone(),
167+
});
168+
mt::TaskMessage::DeleteTask(task_name, warehouse)
169+
}
159170
Message::AfterTask(task) => mt::TaskMessage::AfterTask(mt::Task::from_pb(task)?),
160171
},
161172
})
@@ -165,7 +176,15 @@ impl FromToProto for mt::TaskMessage {
165176
let message = match self {
166177
mt::TaskMessage::ExecuteTask(task) => Message::ExecuteTask(task.to_pb()?),
167178
mt::TaskMessage::ScheduleTask(task) => Message::ScheduleTask(task.to_pb()?),
168-
mt::TaskMessage::DeleteTask(task_name) => Message::DeleteTask(task_name.clone()),
179+
mt::TaskMessage::DeleteTask(task_name, warehouse_options) => {
180+
Message::DeleteTaskV2(DeleteTask {
181+
task_name: task_name.clone(),
182+
warehouse_options: warehouse_options.as_ref().map(|w| pb::WarehouseOptions {
183+
warehouse: w.warehouse.clone(),
184+
using_warehouse_size: w.using_warehouse_size.clone(),
185+
}),
186+
})
187+
}
169188
mt::TaskMessage::AfterTask(task) => Message::AfterTask(task.to_pb()?),
170189
};
171190

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
169169
(137, "2025-07-22: Add: GrantConnectionObject and UserPrivilegeType AccessConnection, AccessConnection"),
170170
(138, "2025-07-23: Add: TableStatistics add index size"),
171171
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
172+
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
172173
// Dear developer:
173174
// If you're gonna add a new metadata version, you'll have to add a test for it.
174175
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,4 @@ mod v136_add_task;
131131
mod v137_add_grant_object_connection;
132132
mod v138_table_statistics;
133133
mod v139_add_grant_ownership_object_sequence;
134+
mod v140_task_message;

src/meta/proto-conv/tests/it/v136_add_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ fn test_decode_v136_task_message() -> anyhow::Result<()> {
185185
vec![26, 6, 116, 97, 115, 107, 95, 99, 160, 6, 136, 1, 168, 6, 24];
186186
let want_delete = || {
187187
let task = want_task();
188-
mt::TaskMessage::DeleteTask(task.task_name)
188+
mt::TaskMessage::DeleteTask(task.task_name, None)
189189
};
190190

191191
common::test_pb_from_to(func_name!(), want_delete())?;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use chrono::DateTime;
16+
use databend_common_meta_app::principal as mt;
17+
use databend_common_meta_app::principal::ScheduleOptions;
18+
use databend_common_meta_app::principal::ScheduleType;
19+
use databend_common_meta_app::principal::WarehouseOptions;
20+
use fastrace::func_name;
21+
use maplit::btreemap;
22+
23+
use crate::common;
24+
25+
#[test]
26+
fn test_decode_v140_task_message() -> anyhow::Result<()> {
27+
let want_task = || mt::Task {
28+
task_id: 11,
29+
task_name: "task_c".to_string(),
30+
query_text: "SELECT * FROM t1".to_string(),
31+
when_condition: Some("c1 > 1".to_string()),
32+
after: vec!["task_a".to_string(), "task_b".to_string()],
33+
comment: Some("comment".to_string()),
34+
owner: "public".to_string(),
35+
owner_user: "me".to_string(),
36+
schedule_options: Some(ScheduleOptions {
37+
interval: Some(11),
38+
cron: Some("30 12 * * *".to_string()),
39+
time_zone: Some("UTC".to_string()),
40+
schedule_type: ScheduleType::IntervalType,
41+
milliseconds_interval: Some(11),
42+
}),
43+
warehouse_options: Some(WarehouseOptions {
44+
warehouse: Some("warehouse_a".to_string()),
45+
using_warehouse_size: Some("10".to_string()),
46+
}),
47+
next_scheduled_at: Some(DateTime::from_timestamp(10, 0).unwrap()),
48+
suspend_task_after_num_failures: Some(10),
49+
error_integration: None,
50+
status: mt::Status::Suspended,
51+
created_at: DateTime::from_timestamp(11, 0).unwrap(),
52+
updated_at: DateTime::from_timestamp(12, 0).unwrap(),
53+
last_suspended_at: Some(DateTime::from_timestamp(13, 0).unwrap()),
54+
session_params: btreemap! { s("a") => s("b") },
55+
};
56+
57+
{
58+
let task_message_execute_v140 = vec![
59+
10, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
60+
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
61+
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
62+
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
63+
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
64+
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
65+
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
66+
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
67+
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
68+
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
69+
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
70+
109, 101, 160, 6, 140, 1, 168, 6, 24, 160, 6, 140, 1, 168, 6, 24,
71+
];
72+
let want_execute = || mt::TaskMessage::ExecuteTask(want_task());
73+
74+
common::test_pb_from_to(func_name!(), want_execute())?;
75+
common::test_load_old(
76+
func_name!(),
77+
task_message_execute_v140.as_slice(),
78+
140,
79+
want_execute(),
80+
)?;
81+
}
82+
{
83+
let task_message_schedule_v140 = vec![
84+
18, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
85+
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
86+
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
87+
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
88+
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
89+
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
90+
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
91+
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
92+
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
93+
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
94+
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
95+
109, 101, 160, 6, 140, 1, 168, 6, 24, 160, 6, 140, 1, 168, 6, 24,
96+
];
97+
let want_schedule = || mt::TaskMessage::ScheduleTask(want_task());
98+
99+
common::test_pb_from_to(func_name!(), want_schedule())?;
100+
common::test_load_old(
101+
func_name!(),
102+
task_message_schedule_v140.as_slice(),
103+
140,
104+
want_schedule(),
105+
)?;
106+
}
107+
{
108+
let task_message_after_v140 = vec![
109+
34, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
110+
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
111+
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
112+
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
113+
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
114+
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
115+
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
116+
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
117+
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
118+
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
119+
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
120+
109, 101, 160, 6, 140, 1, 168, 6, 24, 160, 6, 140, 1, 168, 6, 24,
121+
];
122+
let want_after = || mt::TaskMessage::AfterTask(want_task());
123+
124+
common::test_pb_from_to(func_name!(), want_after())?;
125+
common::test_load_old(
126+
func_name!(),
127+
task_message_after_v140.as_slice(),
128+
140,
129+
want_after(),
130+
)?;
131+
}
132+
{
133+
let task_message_delete_v140 = vec![
134+
42, 27, 10, 6, 116, 97, 115, 107, 95, 99, 18, 17, 10, 11, 119, 97, 114, 101, 104, 111,
135+
117, 115, 101, 95, 97, 18, 2, 49, 48, 160, 6, 140, 1, 168, 6, 24,
136+
];
137+
let want_delete = || {
138+
let task = want_task();
139+
mt::TaskMessage::DeleteTask(task.task_name, task.warehouse_options)
140+
};
141+
142+
common::test_pb_from_to(func_name!(), want_delete())?;
143+
common::test_load_old(
144+
func_name!(),
145+
task_message_delete_v140.as_slice(),
146+
140,
147+
want_delete(),
148+
)?;
149+
}
150+
151+
Ok(())
152+
}
153+
154+
fn s(ss: impl ToString) -> String {
155+
ss.to_string()
156+
}

src/meta/protos/proto/task.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,17 @@ message TaskMessage {
3737
uint64 ver = 100;
3838
uint64 min_reader_ver = 101;
3939

40+
message DeleteTask {
41+
string task_name = 1;
42+
WarehouseOptions warehouse_options = 2;
43+
}
44+
4045
oneof message {
4146
Task execute_task = 1;
4247
Task schedule_task = 2;
4348
string delete_task = 3;
4449
Task after_task = 4;
50+
DeleteTask delete_task_v2 = 5;
4551
}
4652
}
4753

src/query/management/src/task/task_mgr.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,17 @@ impl TaskMgr {
204204
let req = UpsertPB::delete(key).with(MatchSeq::GE(1));
205205
let res = self.kv_api.upsert_pb(&req).await?;
206206

207-
self.send(TaskMessage::DeleteTask(task_name.to_string()))
208-
.await?;
209207
if res.is_changed() {
210-
Ok(res.prev.as_ref().map(|prev| Task::clone(prev)))
208+
let Some(task) = res.prev.as_ref().map(|prev| Task::clone(prev)) else {
209+
return Ok(None);
210+
};
211+
self.send(TaskMessage::DeleteTask(
212+
task_name.to_string(),
213+
task.warehouse_options.clone(),
214+
))
215+
.await?;
216+
217+
Ok(Some(task))
211218
} else {
212219
Ok(None)
213220
}

src/query/service/src/task/service.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use databend_common_ast::ast::AlterTaskOptions;
3030
use databend_common_base::base::GlobalInstance;
3131
use databend_common_base::runtime::Runtime;
3232
use databend_common_base::runtime::TrySpawn;
33+
use databend_common_catalog::table_context::TableContext;
3334
use databend_common_config::GlobalConfig;
3435
use databend_common_config::InnerConfig;
3536
use databend_common_exception::ErrorCode;
@@ -202,6 +203,22 @@ impl TaskService {
202203
while let Some(result) = steam.next().await {
203204
let (_, task_message) = result?;
204205
let task_key = TaskMessageIdent::new(tenant, task_message.key());
206+
207+
if let Some(WarehouseOptions {
208+
warehouse: Some(warehouse),
209+
..
210+
}) = task_message.warehouse_options()
211+
{
212+
if warehouse
213+
!= &self
214+
.create_context(None)
215+
.await?
216+
.get_cluster()
217+
.get_warehouse_id()?
218+
{
219+
continue;
220+
}
221+
}
205222
match task_message {
206223
// ScheduleTask is always monitored by all Query nodes, and ExecuteTask is sent serially to avoid repeated sending.
207224
TaskMessage::ScheduleTask(mut task) => {
@@ -271,7 +288,7 @@ impl TaskService {
271288
let Some(_guard) = fn_lock(&task_service, &task_key, duration.as_millis() as u64).await? else {
272289
continue;
273290
};
274-
if !Self::check_when(&task, &owner, &task_service).await.unwrap() {
291+
if !Self::check_when(&task, &owner, &task_service).await? {
275292
continue;
276293
}
277294
fn_new_task_run(&task_service, &task).await?;
@@ -438,7 +455,7 @@ impl TaskService {
438455
None,
439456
)?;
440457
}
441-
TaskMessage::DeleteTask(task_name) => {
458+
TaskMessage::DeleteTask(task_name, _) => {
442459
if let Some(token) = scheduled_tasks.remove(&task_name) {
443460
token.cancel();
444461
}
@@ -529,8 +546,7 @@ impl TaskService {
529546
};
530547
let result = task_service
531548
.execute_sql(Some(user.clone()), &format!("SELECT {when_condition}"))
532-
.await
533-
.unwrap();
549+
.await?;
534550
Ok(result
535551
.first()
536552
.and_then(|block| block.get_by_offset(0).index(0))

src/query/sql/src/planner/binder/ddl/task.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,7 @@ impl Binder {
123123
sql,
124124
session_parameters,
125125
} = stmt;
126-
if (schedule_opts.is_none() && after.is_empty())
127-
|| (schedule_opts.is_some() && !after.is_empty())
128-
{
126+
if schedule_opts.is_some() && !after.is_empty() {
129127
return Err(ErrorCode::SyntaxException(
130128
"task must be defined with either given time schedule as a root task or run after other task as a DAG".to_string(),
131129
));

0 commit comments

Comments
 (0)