Skip to content

Commit 22c4674

Browse files
committed
refactor: transfer the after check sql to meta
1 parent 959d837 commit 22c4674

File tree

15 files changed

+507
-127
lines changed

15 files changed

+507
-127
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,8 @@ build_exceptions! {
405405
TaskScheduleAndAfterConflict(2615),
406406
/// Task when condition not met
407407
TaskWhenConditionNotMet(2616),
408+
/// Task Running when modifying after
409+
TaskRunningWhenModifyingAfter(2617),
408410
}
409411

410412
// Search and External Service Errors [1901-1903, 1910]

src/meta/app/src/principal/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ pub mod procedure_identity;
4949
pub mod procedure_name_ident;
5050
pub mod stage_file_ident;
5151
pub mod task;
52+
pub mod task_dependent_ident;
5253
pub mod task_ident;
5354
pub mod task_message_ident;
55+
pub mod task_state_ident;
5456
pub mod tenant_ownership_object_ident;
5557
pub mod tenant_user_ident;
5658
pub mod user_defined_file_format_ident;
@@ -89,13 +91,16 @@ pub use role_info::RoleInfo;
8991
pub use role_info::RoleInfoSerdeError;
9092
pub use stage_file_ident::StageFileIdent;
9193
pub use stage_file_path::StageFilePath;
94+
pub use task::DependentType;
9295
pub use task::ScheduleOptions;
9396
pub use task::ScheduleType;
9497
pub use task::State;
9598
pub use task::Status;
9699
pub use task::Task;
100+
pub use task::TaskDependent;
97101
pub use task::TaskMessage;
98102
pub use task::TaskRun;
103+
pub use task::TaskState;
99104
pub use task::WarehouseOptions;
100105
pub use task_ident::TaskIdent;
101106
pub use task_ident::TaskIdentRaw;

src/meta/app/src/principal/task.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,63 @@ impl TaskMessage {
172172
}
173173
}
174174
}
175+
176+
#[derive(Debug, Clone, PartialEq)]
177+
pub struct TaskState {
178+
pub is_succeeded: bool,
179+
}
180+
181+
#[derive(Debug, Clone, Copy, PartialEq)]
182+
pub enum DependentType {
183+
After = 0,
184+
Before = 1,
185+
}
186+
187+
#[derive(Debug, Clone, PartialEq)]
188+
pub struct TaskDependent {
189+
pub ty: DependentType,
190+
pub source: String,
191+
pub target: String,
192+
}
193+
194+
impl TaskDependent {
195+
pub fn new(ty: DependentType, source: String, target: String) -> Self {
196+
Self { ty, source, target }
197+
}
198+
}
199+
200+
mod kvapi_key_impl {
201+
use databend_common_meta_kvapi::kvapi;
202+
use databend_common_meta_kvapi::kvapi::KeyError;
203+
204+
use crate::principal::DependentType;
205+
use crate::principal::TaskDependent;
206+
207+
impl kvapi::KeyCodec for TaskDependent {
208+
fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
209+
match self.ty {
210+
DependentType::After => b.push_str("After"),
211+
DependentType::Before => b.push_str("Before"),
212+
}
213+
.push_str(self.source.as_str())
214+
.push_str(self.target.as_str())
215+
}
216+
217+
fn decode_key(parser: &mut kvapi::KeyParser) -> Result<Self, kvapi::KeyError> {
218+
let ty = match parser.next_str()?.as_str() {
219+
"After" => DependentType::After,
220+
"Before" => DependentType::Before,
221+
str => {
222+
return Err(KeyError::InvalidId {
223+
s: str.to_string(),
224+
reason: "Invalid Dependent Type".to_string(),
225+
})
226+
}
227+
};
228+
let source = parser.next_str()?;
229+
let target = parser.next_str()?;
230+
231+
Ok(Self { ty, source, target })
232+
}
233+
}
234+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::tenant_key::ident::TIdent;
16+
17+
pub type TaskDependentIdent = TIdent<TaskDependentResource, TaskDependent>;
18+
19+
pub use kvapi_impl::TaskDependentResource;
20+
21+
use crate::principal::TaskDependent;
22+
23+
mod kvapi_impl {
24+
use databend_common_meta_kvapi::kvapi;
25+
26+
use crate::principal::task::TaskDependent;
27+
use crate::principal::task_dependent_ident::TaskDependentIdent;
28+
use crate::tenant_key::resource::TenantResource;
29+
30+
pub struct TaskDependentResource;
31+
impl TenantResource for TaskDependentResource {
32+
const PREFIX: &'static str = "__fd_task_dependents";
33+
const TYPE: &'static str = "TaskDependentIdent";
34+
const HAS_TENANT: bool = true;
35+
type ValueType = TaskDependent;
36+
}
37+
38+
impl kvapi::Value for TaskDependent {
39+
type KeyType = TaskDependentIdent;
40+
41+
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
42+
[]
43+
}
44+
}
45+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::tenant_key::ident::TIdent;
16+
17+
pub type TaskStateIdent = TIdent<Resource>;
18+
19+
pub type TaskStateIdentRaw = TIdent<Resource>;
20+
21+
pub use kvapi_impl::Resource;
22+
23+
mod kvapi_impl {
24+
use databend_common_meta_kvapi::kvapi;
25+
26+
use crate::principal::task::TaskState;
27+
use crate::principal::task_state_ident::TaskStateIdent;
28+
use crate::tenant_key::resource::TenantResource;
29+
30+
pub struct Resource;
31+
impl TenantResource for Resource {
32+
const PREFIX: &'static str = "__fd_task_states";
33+
const TYPE: &'static str = "TaskStateIdent";
34+
const HAS_TENANT: bool = true;
35+
type ValueType = TaskState;
36+
}
37+
38+
impl kvapi::Value for TaskState {
39+
type KeyType = TaskStateIdent;
40+
41+
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
42+
[]
43+
}
44+
}
45+
}

src/meta/proto-conv/src/task_from_to_protobuf_impl.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use chrono::DateTime;
1616
use chrono::Utc;
1717
use databend_common_meta_app::principal as mt;
1818
use databend_common_meta_app::principal::task::Status;
19+
use databend_common_meta_app::principal::DependentType;
1920
use databend_common_protos::pb;
2021
use databend_common_protos::pb::task_message::DeleteTask;
2122
use databend_common_protos::pb::task_message::Message;
@@ -195,3 +196,58 @@ impl FromToProto for mt::TaskMessage {
195196
})
196197
}
197198
}
199+
200+
impl FromToProto for mt::TaskDependent {
201+
type PB = pb::TaskDependent;
202+
203+
fn get_pb_ver(p: &Self::PB) -> u64 {
204+
p.ver
205+
}
206+
207+
fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
208+
where Self: Sized {
209+
Ok(Self {
210+
ty: match p.ty {
211+
0 => DependentType::After,
212+
1 => DependentType::Before,
213+
_ => return Err(Incompatible::new(format!("invalid task type {}", p.ty))),
214+
},
215+
source: p.source,
216+
target: p.target,
217+
})
218+
}
219+
220+
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
221+
Ok(pb::TaskDependent {
222+
ver: VER,
223+
min_reader_ver: MIN_READER_VER,
224+
225+
source: self.source.clone(),
226+
target: self.target.clone(),
227+
ty: self.ty as i32,
228+
})
229+
}
230+
}
231+
232+
impl FromToProto for mt::TaskState {
233+
type PB = pb::TaskState;
234+
235+
fn get_pb_ver(p: &Self::PB) -> u64 {
236+
p.ver
237+
}
238+
239+
fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
240+
where Self: Sized {
241+
Ok(Self {
242+
is_succeeded: p.is_succeeded,
243+
})
244+
}
245+
246+
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
247+
Ok(pb::TaskState {
248+
ver: VER,
249+
min_reader_ver: MIN_READER_VER,
250+
is_succeeded: self.is_succeeded,
251+
})
252+
}
253+
}

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
170170
(138, "2025-07-23: Add: TableStatistics add index size"),
171171
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
172172
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
173+
(141, "2025-07-27: Add: TaskState and TaskDependent"),
173174
// Dear developer:
174175
// If you're gonna add a new metadata version, you'll have to add a test for it.
175176
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,4 @@ mod v137_add_grant_object_connection;
132132
mod v138_table_statistics;
133133
mod v139_add_grant_ownership_object_sequence;
134134
mod v140_task_message;
135+
mod v141_task_state;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_meta_app::principal as mt;
16+
use fastrace::func_name;
17+
18+
use crate::common;
19+
20+
#[test]
21+
fn test_decode_v141_task_state() -> anyhow::Result<()> {
22+
let task_state_v141 = vec![];
23+
24+
let want = || mt::TaskState { is_succeeded: true };
25+
common::test_pb_from_to(func_name!(), want())?;
26+
common::test_load_old(func_name!(), task_state_v141.as_slice(), 141, want())?;
27+
28+
Ok(())
29+
}
30+
31+
#[test]
32+
fn test_decode_v141_task_dependent() -> anyhow::Result<()> {
33+
let task_dependent_v141 = vec![];
34+
35+
let want = || mt::TaskDependent {
36+
ty: mt::DependentType::After,
37+
source: s("a"),
38+
target: s("c"),
39+
};
40+
common::test_pb_from_to(func_name!(), want())?;
41+
common::test_load_old(func_name!(), task_dependent_v141.as_slice(), 141, want())?;
42+
43+
Ok(())
44+
}
45+
46+
fn s(ss: impl ToString) -> String {
47+
ss.to_string()
48+
}

src/meta/protos/proto/task.proto

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,23 @@ message Task {
7878
optional string error_integration = 20;
7979
string owner_user = 21;
8080
}
81+
82+
message TaskState {
83+
uint64 ver = 100;
84+
uint64 min_reader_ver = 101;
85+
86+
bool is_succeeded = 1;
87+
}
88+
89+
message TaskDependent {
90+
uint64 ver = 100;
91+
uint64 min_reader_ver = 101;
92+
93+
enum DependentType {
94+
After = 0;
95+
Before = 1;
96+
}
97+
string source = 1;
98+
string target = 2;
99+
DependentType ty = 3;
100+
}

0 commit comments

Comments
 (0)