Skip to content

Commit 8ce6eb0

Browse files
committed
feat: impl WarehouseOptions for Private Task
1 parent cf97c5e commit 8ce6eb0

File tree

8 files changed

+218
-10
lines changed

8 files changed

+218
-10
lines changed

src/meta/app/src/principal/task.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub enum TaskMessage {
110110
// Schedule Task will try to spawn a thread in Query to continue running according to the time set in schedule
111111
ScheduleTask(Task),
112112
// Delete the task information and try to cancel the scheduled task in the query.
113-
DeleteTask(String),
113+
DeleteTask(String, Option<WarehouseOptions>),
114114
// After Task will bind Task to the tasks in Task.afters.
115115
// When Execute Task is executed, after all the after tasks of Task are completed,
116116
// the execution will continue.
@@ -123,15 +123,15 @@ impl TaskMessage {
123123
TaskMessage::ExecuteTask(task)
124124
| TaskMessage::ScheduleTask(task)
125125
| TaskMessage::AfterTask(task) => task.task_name.as_str(),
126-
TaskMessage::DeleteTask(task_name) => task_name.as_str(),
126+
TaskMessage::DeleteTask(task_name, _) => task_name.as_str(),
127127
}
128128
}
129129

130130
pub fn ty(&self) -> TaskMessageType {
131131
match self {
132132
TaskMessage::ExecuteTask(_) => TaskMessageType::Execute,
133133
TaskMessage::ScheduleTask(_) => TaskMessageType::Schedule,
134-
TaskMessage::DeleteTask(_) => TaskMessageType::Delete,
134+
TaskMessage::DeleteTask(_, _) => TaskMessageType::Delete,
135135
TaskMessage::AfterTask(_) => TaskMessageType::After,
136136
}
137137
}
@@ -162,4 +162,13 @@ impl TaskMessage {
162162
pub fn prefix_range() -> (i64, i64) {
163163
(0, 1)
164164
}
165+
166+
pub fn warehouse_options(&self) -> Option<&WarehouseOptions> {
167+
match self {
168+
TaskMessage::ExecuteTask(task)
169+
| TaskMessage::ScheduleTask(task)
170+
| TaskMessage::AfterTask(task) => task.warehouse_options.as_ref(),
171+
TaskMessage::DeleteTask(_, warehouse_options) => warehouse_options.as_ref(),
172+
}
173+
}
165174
}

src/meta/proto-conv/src/task_from_to_protobuf_impl.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use chrono::Utc;
1717
use databend_common_meta_app::principal as mt;
1818
use databend_common_meta_app::principal::task::Status;
1919
use databend_common_protos::pb;
20+
use databend_common_protos::pb::task_message::DeleteTask;
2021
use databend_common_protos::pb::task_message::Message;
2122

2223
use crate::reader_check_msg;
@@ -155,7 +156,16 @@ impl FromToProto for mt::TaskMessage {
155156
Message::ScheduleTask(task) => {
156157
mt::TaskMessage::ScheduleTask(mt::Task::from_pb(task)?)
157158
}
158-
Message::DeleteTask(task_name) => mt::TaskMessage::DeleteTask(task_name),
159+
Message::DeleteTask(DeleteTask {
160+
task_name,
161+
warehouse_options,
162+
}) => {
163+
let warehouse = warehouse_options.as_ref().map(|w| mt::WarehouseOptions {
164+
warehouse: w.warehouse.clone(),
165+
using_warehouse_size: w.using_warehouse_size.clone(),
166+
});
167+
mt::TaskMessage::DeleteTask(task_name, warehouse)
168+
}
159169
Message::AfterTask(task) => mt::TaskMessage::AfterTask(mt::Task::from_pb(task)?),
160170
},
161171
})
@@ -165,7 +175,15 @@ impl FromToProto for mt::TaskMessage {
165175
let message = match self {
166176
mt::TaskMessage::ExecuteTask(task) => Message::ExecuteTask(task.to_pb()?),
167177
mt::TaskMessage::ScheduleTask(task) => Message::ScheduleTask(task.to_pb()?),
168-
mt::TaskMessage::DeleteTask(task_name) => Message::DeleteTask(task_name.clone()),
178+
mt::TaskMessage::DeleteTask(task_name, warehouse_options) => {
179+
Message::DeleteTask(DeleteTask {
180+
task_name: task_name.clone(),
181+
warehouse_options: warehouse_options.as_ref().map(|w| pb::WarehouseOptions {
182+
warehouse: w.warehouse.clone(),
183+
using_warehouse_size: w.using_warehouse_size.clone(),
184+
}),
185+
})
186+
}
169187
mt::TaskMessage::AfterTask(task) => Message::AfterTask(task.to_pb()?),
170188
};
171189

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
169169
(137, "2025-07-22: Add: GrantConnectionObject and UserPrivilegeType AccessConnection, AccessConnection"),
170170
(138, "2025-07-23: Add: TableStatistics add index size"),
171171
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
172+
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
172173
// Dear developer:
173174
// If you're gonna add a new metadata version, you'll have to add a test for it.
174175
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,4 @@ mod v136_add_task;
131131
mod v137_add_grant_object_connection;
132132
mod v138_table_statistics;
133133
mod v139_add_grant_ownership_object_sequence;
134+
mod v140_task_message;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use chrono::DateTime;
16+
use databend_common_meta_app::principal as mt;
17+
use databend_common_meta_app::principal::ScheduleOptions;
18+
use databend_common_meta_app::principal::ScheduleType;
19+
use databend_common_meta_app::principal::WarehouseOptions;
20+
use fastrace::func_name;
21+
use maplit::btreemap;
22+
23+
use crate::common;
24+
25+
#[test]
26+
fn test_decode_v137_task_message() -> anyhow::Result<()> {
27+
let want_task = || mt::Task {
28+
task_id: 11,
29+
task_name: "task_c".to_string(),
30+
query_text: "SELECT * FROM t1".to_string(),
31+
when_condition: Some("c1 > 1".to_string()),
32+
after: vec!["task_a".to_string(), "task_b".to_string()],
33+
comment: Some("comment".to_string()),
34+
owner: "public".to_string(),
35+
owner_user: "me".to_string(),
36+
schedule_options: Some(ScheduleOptions {
37+
interval: Some(11),
38+
cron: Some("30 12 * * *".to_string()),
39+
time_zone: Some("UTC".to_string()),
40+
schedule_type: ScheduleType::IntervalType,
41+
milliseconds_interval: Some(11),
42+
}),
43+
warehouse_options: Some(WarehouseOptions {
44+
warehouse: Some("warehouse_a".to_string()),
45+
using_warehouse_size: Some("10".to_string()),
46+
}),
47+
next_scheduled_at: Some(DateTime::from_timestamp(10, 0).unwrap()),
48+
suspend_task_after_num_failures: Some(10),
49+
error_integration: None,
50+
status: mt::Status::Suspended,
51+
created_at: DateTime::from_timestamp(11, 0).unwrap(),
52+
updated_at: DateTime::from_timestamp(12, 0).unwrap(),
53+
last_suspended_at: Some(DateTime::from_timestamp(13, 0).unwrap()),
54+
session_params: btreemap! { s("a") => s("b") },
55+
};
56+
57+
{
58+
let task_message_execute_v137 = vec![
59+
10, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
60+
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
61+
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
62+
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
63+
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
64+
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
65+
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
66+
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
67+
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
68+
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
69+
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
70+
109, 101, 160, 6, 137, 1, 168, 6, 24, 160, 6, 137, 1, 168, 6, 24,
71+
];
72+
let want_execute = || mt::TaskMessage::ExecuteTask(want_task());
73+
74+
common::test_pb_from_to(func_name!(), want_execute())?;
75+
common::test_load_old(
76+
func_name!(),
77+
task_message_execute_v137.as_slice(),
78+
137,
79+
want_execute(),
80+
)?;
81+
}
82+
{
83+
let task_message_schedule_v137 = vec![
84+
18, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
85+
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
86+
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
87+
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
88+
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
89+
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
90+
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
91+
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
92+
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
93+
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
94+
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
95+
109, 101, 160, 6, 137, 1, 168, 6, 24, 160, 6, 137, 1, 168, 6, 24,
96+
];
97+
let want_schedule = || mt::TaskMessage::ScheduleTask(want_task());
98+
99+
common::test_pb_from_to(func_name!(), want_schedule())?;
100+
common::test_load_old(
101+
func_name!(),
102+
task_message_schedule_v137.as_slice(),
103+
137,
104+
want_schedule(),
105+
)?;
106+
}
107+
{
108+
let task_message_after_v137 = vec![
109+
34, 239, 1, 8, 11, 18, 6, 116, 97, 115, 107, 95, 99, 34, 16, 83, 69, 76, 69, 67, 84,
110+
32, 42, 32, 70, 82, 79, 77, 32, 116, 49, 42, 7, 99, 111, 109, 109, 101, 110, 116, 50,
111+
6, 112, 117, 98, 108, 105, 99, 58, 22, 8, 11, 18, 11, 51, 48, 32, 49, 50, 32, 42, 32,
112+
42, 32, 42, 26, 3, 85, 84, 67, 40, 11, 66, 17, 10, 11, 119, 97, 114, 101, 104, 111,
113+
117, 115, 101, 95, 97, 18, 2, 49, 48, 74, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49,
114+
32, 48, 48, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 80, 10, 114, 23, 49, 57, 55, 48,
115+
45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 49, 32, 85, 84, 67, 122, 23,
116+
49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49, 50, 32, 85, 84,
117+
67, 130, 1, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 49,
118+
51, 32, 85, 84, 67, 138, 1, 6, 116, 97, 115, 107, 95, 97, 138, 1, 6, 116, 97, 115, 107,
119+
95, 98, 146, 1, 6, 99, 49, 32, 62, 32, 49, 154, 1, 6, 10, 1, 97, 18, 1, 98, 170, 1, 2,
120+
109, 101, 160, 6, 137, 1, 168, 6, 24, 160, 6, 137, 1, 168, 6, 24,
121+
];
122+
let want_after = || mt::TaskMessage::AfterTask(want_task());
123+
124+
common::test_pb_from_to(func_name!(), want_after())?;
125+
common::test_load_old(
126+
func_name!(),
127+
task_message_after_v137.as_slice(),
128+
137,
129+
want_after(),
130+
)?;
131+
}
132+
{
133+
let task_message_delete_v137 = vec![
134+
26, 27, 10, 6, 116, 97, 115, 107, 95, 99, 18, 17, 10, 11, 119, 97, 114, 101, 104, 111,
135+
117, 115, 101, 95, 97, 18, 2, 49, 48, 160, 6, 137, 1, 168, 6, 24,
136+
];
137+
let want_delete = || {
138+
let task = want_task();
139+
mt::TaskMessage::DeleteTask(task.task_name, task.warehouse_options)
140+
};
141+
142+
common::test_pb_from_to(func_name!(), want_delete())?;
143+
common::test_load_old(
144+
func_name!(),
145+
task_message_delete_v137.as_slice(),
146+
137,
147+
want_delete(),
148+
)?;
149+
}
150+
151+
Ok(())
152+
}
153+
154+
fn s(ss: impl ToString) -> String {
155+
ss.to_string()
156+
}

src/meta/protos/proto/task.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,15 @@ message TaskMessage {
3737
uint64 ver = 100;
3838
uint64 min_reader_ver = 101;
3939

40+
message DeleteTask {
41+
string task_name = 1;
42+
WarehouseOptions warehouse_options = 2;
43+
}
44+
4045
oneof message {
4146
Task execute_task = 1;
4247
Task schedule_task = 2;
43-
string delete_task = 3;
48+
DeleteTask delete_task = 3;
4449
Task after_task = 4;
4550
}
4651
}

src/query/management/src/task/task_mgr.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,17 @@ impl TaskMgr {
204204
let req = UpsertPB::delete(key).with(MatchSeq::GE(1));
205205
let res = self.kv_api.upsert_pb(&req).await?;
206206

207-
self.send(TaskMessage::DeleteTask(task_name.to_string()))
208-
.await?;
209207
if res.is_changed() {
210-
Ok(res.prev.as_ref().map(|prev| Task::clone(prev)))
208+
let Some(task) = res.prev.as_ref().map(|prev| Task::clone(prev)) else {
209+
return Ok(None);
210+
};
211+
self.send(TaskMessage::DeleteTask(
212+
task_name.to_string(),
213+
task.warehouse_options.clone(),
214+
))
215+
.await?;
216+
217+
Ok(Some(task))
211218
} else {
212219
Ok(None)
213220
}

src/query/service/src/task/service.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,17 @@ impl TaskService {
202202
while let Some(result) = steam.next().await {
203203
let (_, task_message) = result?;
204204
let task_key = TaskMessageIdent::new(tenant, task_message.key());
205+
206+
if let Some(WarehouseOptions {
207+
warehouse: Some(warehouse),
208+
..
209+
}) = task_message.warehouse_options()
210+
{
211+
// [WarehouseInfo::SelfManaged] uses ClusterId as warehouse
212+
if warehouse != &self.cluster_id {
213+
continue;
214+
}
215+
}
205216
match task_message {
206217
// ScheduleTask is always monitored by all Query nodes, and ExecuteTask is sent serially to avoid repeated sending.
207218
TaskMessage::ScheduleTask(mut task) => {
@@ -438,7 +449,7 @@ impl TaskService {
438449
None,
439450
)?;
440451
}
441-
TaskMessage::DeleteTask(task_name) => {
452+
TaskMessage::DeleteTask(task_name, _) => {
442453
if let Some(token) = scheduled_tasks.remove(&task_name) {
443454
token.cancel();
444455
}

0 commit comments

Comments
 (0)