Skip to content

feat: Refactoring the after trigger of private task #18443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ build_exceptions! {
TaskScheduleAndAfterConflict(2615),
/// Task when condition not met
TaskWhenConditionNotMet(2616),
/// Task Running when modifying after
TaskRunningWhenModifyingAfter(2617),
}

// Search and External Service Errors [1901-1903, 1910]
Expand Down
6 changes: 6 additions & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ pub mod procedure_identity;
pub mod procedure_name_ident;
pub mod stage_file_ident;
pub mod task;
pub mod task_dependent_ident;
pub mod task_ident;
pub mod task_message_ident;
pub mod task_state_ident;
pub mod tenant_ownership_object_ident;
pub mod tenant_user_ident;
pub mod user_defined_file_format_ident;
Expand Down Expand Up @@ -89,13 +91,17 @@ pub use role_info::RoleInfo;
pub use role_info::RoleInfoSerdeError;
pub use stage_file_ident::StageFileIdent;
pub use stage_file_path::StageFilePath;
pub use task::DependentType;
pub use task::ScheduleOptions;
pub use task::ScheduleType;
pub use task::State;
pub use task::Status;
pub use task::Task;
pub use task::TaskDependentKey;
pub use task::TaskDependentValue;
pub use task::TaskMessage;
pub use task::TaskRun;
pub use task::TaskSucceededStateValue;
pub use task::WarehouseOptions;
pub use task_ident::TaskIdent;
pub use task_ident::TaskIdentRaw;
Expand Down
95 changes: 95 additions & 0 deletions src/meta/app/src/principal/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::BTreeSet;

use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -172,3 +173,97 @@ impl TaskMessage {
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskSucceededStateKey {
pub before_task: String,
pub after_task: String,
}

impl TaskSucceededStateKey {
pub fn new(before_task: String, after_task: String) -> Self {
Self {
before_task,
after_task,
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskSucceededStateValue;

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DependentType {
After = 0,
Before = 1,
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskDependentKey {
pub ty: DependentType,
pub source: String,
}

impl TaskDependentKey {
pub fn new(ty: DependentType, source: String) -> Self {
Self { ty, source }
}
}

#[derive(Debug, Clone, PartialEq, Default)]
pub struct TaskDependentValue(pub BTreeSet<String>);

mod kvapi_key_impl {
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::KeyBuilder;
use databend_common_meta_kvapi::kvapi::KeyError;
use databend_common_meta_kvapi::kvapi::KeyParser;

use crate::principal::task::TaskSucceededStateKey;
use crate::principal::DependentType;
use crate::principal::TaskDependentKey;

impl kvapi::KeyCodec for TaskSucceededStateKey {
fn encode_key(&self, b: KeyBuilder) -> KeyBuilder {
b.push_str(self.before_task.as_str())
.push_str(self.after_task.as_str())
}

fn decode_key(parser: &mut KeyParser) -> Result<Self, KeyError>
where Self: Sized {
let before_task = parser.next_str()?;
let after_task = parser.next_str()?;

Ok(Self {
before_task,
after_task,
})
}
}

impl kvapi::KeyCodec for TaskDependentKey {
fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
match self.ty {
DependentType::After => b.push_str("After"),
DependentType::Before => b.push_str("Before"),
}
.push_str(self.source.as_str())
}

fn decode_key(parser: &mut kvapi::KeyParser) -> Result<Self, kvapi::KeyError> {
let ty = match parser.next_str()?.as_str() {
"After" => DependentType::After,
"Before" => DependentType::Before,
str => {
return Err(KeyError::InvalidId {
s: str.to_string(),
reason: "Invalid Dependent Type".to_string(),
})
}
};
let source = parser.next_str()?;

Ok(Self { ty, source })
}
}
}
63 changes: 63 additions & 0 deletions src/meta/app/src/principal/task_dependent_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskDependentIdent = TIdent<TaskDependentResource, TaskDependentKey>;

impl TaskDependentIdent {
pub fn new_before(tenant: impl ToTenant, task_name: impl ToString) -> Self {
TaskDependentIdent::new_generic(
tenant,
TaskDependentKey::new(DependentType::Before, task_name.to_string()),
)
}

pub fn new_after(tenant: impl ToTenant, task_name: impl ToString) -> Self {
TaskDependentIdent::new_generic(
tenant,
TaskDependentKey::new(DependentType::After, task_name.to_string()),
)
}
}

pub use kvapi_impl::TaskDependentResource;

use crate::principal::DependentType;
use crate::principal::TaskDependentKey;
use crate::tenant::ToTenant;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task::TaskDependentValue;
use crate::principal::task_dependent_ident::TaskDependentIdent;
use crate::tenant_key::resource::TenantResource;

pub struct TaskDependentResource;
impl TenantResource for TaskDependentResource {
const PREFIX: &'static str = "__fd_task_dependents";
const TYPE: &'static str = "TaskDependentIdent";
const HAS_TENANT: bool = true;
type ValueType = TaskDependentValue;
}

impl kvapi::Value for TaskDependentValue {
type KeyType = TaskDependentIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
59 changes: 59 additions & 0 deletions src/meta/app/src/principal/task_state_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskSucceededStateIdent = TIdent<Resource, TaskSucceededStateKey>;

impl TaskSucceededStateIdent {
pub fn new(
tenant: impl ToTenant,
before_task: impl ToString,
after_task: impl ToString,
) -> Self {
TaskSucceededStateIdent::new_generic(
tenant,
TaskSucceededStateKey::new(before_task.to_string(), after_task.to_string()),
)
}
}

pub use kvapi_impl::Resource;

use crate::principal::task::TaskSucceededStateKey;
use crate::tenant::ToTenant;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task_state_ident::TaskSucceededStateIdent;
use crate::principal::TaskSucceededStateValue;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_task_states";
const TYPE: &'static str = "TaskSucceededStateIdent";
const HAS_TENANT: bool = true;
type ValueType = TaskSucceededStateValue;
}

impl kvapi::Value for TaskSucceededStateValue {
type KeyType = TaskSucceededStateIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
43 changes: 43 additions & 0 deletions src/meta/proto-conv/src/task_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;

use chrono::DateTime;
use chrono::Utc;
use databend_common_meta_app::principal as mt;
Expand Down Expand Up @@ -195,3 +197,44 @@ impl FromToProto for mt::TaskMessage {
})
}
}

impl FromToProto for mt::TaskDependentValue {
type PB = pb::TaskDependentValue;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
where Self: Sized {
Ok(Self(BTreeSet::from_iter(p.names)))
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
Ok(pb::TaskDependentValue {
ver: VER,
min_reader_ver: MIN_READER_VER,
names: Vec::from_iter(self.0.iter().cloned()),
})
}
}

impl FromToProto for mt::TaskSucceededStateValue {
type PB = pb::TaskStateValue;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(_p: Self::PB) -> Result<Self, Incompatible>
where Self: Sized {
Ok(Self)
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
Ok(pb::TaskStateValue {
ver: VER,
min_reader_ver: MIN_READER_VER,
})
}
}
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(138, "2025-07-23: Add: TableStatistics add index size"),
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
(141, "2025-07-27: Add: TaskState and TaskDependent"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ mod v137_add_grant_object_connection;
mod v138_table_statistics;
mod v139_add_grant_ownership_object_sequence;
mod v140_task_message;
mod v141_task_state;
50 changes: 50 additions & 0 deletions src/meta/proto-conv/tests/it/v141_task_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;

use databend_common_meta_app::principal as mt;
use fastrace::func_name;

use crate::common;

#[test]
fn test_decode_v141_task_state() -> anyhow::Result<()> {
let task_state_v141 = vec![160, 6, 141, 1, 168, 6, 24];

let want = || mt::TaskSucceededStateValue;
common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), task_state_v141.as_slice(), 141, want())?;

Ok(())
}

#[test]
fn test_decode_v141_task_dependent() -> anyhow::Result<()> {
let task_dependent_value_v141 = vec![10, 1, 97, 10, 1, 98, 160, 6, 141, 1, 168, 6, 24];
let want = || mt::TaskDependentValue(BTreeSet::from([s("a"), s("b")]));
common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(
func_name!(),
task_dependent_value_v141.as_slice(),
141,
want(),
)?;

Ok(())
}

fn s(ss: impl ToString) -> String {
ss.to_string()
}
Loading