diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 28a9d88e4b921..7e7028dfc9d6d 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -405,6 +405,8 @@ build_exceptions! { TaskScheduleAndAfterConflict(2615), /// Task when condition not met TaskWhenConditionNotMet(2616), + /// Task Running when modifying after + TaskRunningWhenModifyingAfter(2617), } // Search and External Service Errors [1901-1903, 1910] diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index b0402f5eefe27..d349b99190c8b 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -49,8 +49,10 @@ pub mod procedure_identity; pub mod procedure_name_ident; pub mod stage_file_ident; pub mod task; +pub mod task_dependent_ident; pub mod task_ident; pub mod task_message_ident; +pub mod task_state_ident; pub mod tenant_ownership_object_ident; pub mod tenant_user_ident; pub mod user_defined_file_format_ident; @@ -89,13 +91,17 @@ pub use role_info::RoleInfo; pub use role_info::RoleInfoSerdeError; pub use stage_file_ident::StageFileIdent; pub use stage_file_path::StageFilePath; +pub use task::DependentType; pub use task::ScheduleOptions; pub use task::ScheduleType; pub use task::State; pub use task::Status; pub use task::Task; +pub use task::TaskDependentKey; +pub use task::TaskDependentValue; pub use task::TaskMessage; pub use task::TaskRun; +pub use task::TaskSucceededStateValue; pub use task::WarehouseOptions; pub use task_ident::TaskIdent; pub use task_ident::TaskIdentRaw; diff --git a/src/meta/app/src/principal/task.rs b/src/meta/app/src/principal/task.rs index 4b45c6ab3f118..43e358d4a1387 100644 --- a/src/meta/app/src/principal/task.rs +++ b/src/meta/app/src/principal/task.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::collections::BTreeSet; use chrono::DateTime; use chrono::Utc; @@ -172,3 +173,97 @@ impl TaskMessage { } } } + +#[derive(Debug, Clone, PartialEq)] +pub struct TaskSucceededStateKey { + pub before_task: String, + pub after_task: String, +} + +impl TaskSucceededStateKey { + pub fn new(before_task: String, after_task: String) -> Self { + Self { + before_task, + after_task, + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TaskSucceededStateValue; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum DependentType { + After = 0, + Before = 1, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TaskDependentKey { + pub ty: DependentType, + pub source: String, +} + +impl TaskDependentKey { + pub fn new(ty: DependentType, source: String) -> Self { + Self { ty, source } + } +} + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct TaskDependentValue(pub BTreeSet); + +mod kvapi_key_impl { + use databend_common_meta_kvapi::kvapi; + use databend_common_meta_kvapi::kvapi::KeyBuilder; + use databend_common_meta_kvapi::kvapi::KeyError; + use databend_common_meta_kvapi::kvapi::KeyParser; + + use crate::principal::task::TaskSucceededStateKey; + use crate::principal::DependentType; + use crate::principal::TaskDependentKey; + + impl kvapi::KeyCodec for TaskSucceededStateKey { + fn encode_key(&self, b: KeyBuilder) -> KeyBuilder { + b.push_str(self.before_task.as_str()) + .push_str(self.after_task.as_str()) + } + + fn decode_key(parser: &mut KeyParser) -> Result + where Self: Sized { + let before_task = parser.next_str()?; + let after_task = parser.next_str()?; + + Ok(Self { + before_task, + after_task, + }) + } + } + + impl kvapi::KeyCodec for TaskDependentKey { + fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { + match self.ty { + DependentType::After => b.push_str("After"), + DependentType::Before => b.push_str("Before"), + } + .push_str(self.source.as_str()) + } + + fn decode_key(parser: &mut kvapi::KeyParser) -> Result { + let ty = match parser.next_str()?.as_str() { + "After" => DependentType::After, + "Before" => DependentType::Before, + str => { + return Err(KeyError::InvalidId { + s: str.to_string(), + reason: "Invalid Dependent Type".to_string(), + }) + } + }; + let source = parser.next_str()?; + + Ok(Self { ty, source }) + } + } +} diff --git a/src/meta/app/src/principal/task_dependent_ident.rs b/src/meta/app/src/principal/task_dependent_ident.rs new file mode 100644 index 0000000000000..010123fd6e467 --- /dev/null +++ b/src/meta/app/src/principal/task_dependent_ident.rs @@ -0,0 +1,63 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; + +pub type TaskDependentIdent = TIdent; + +impl TaskDependentIdent { + pub fn new_before(tenant: impl ToTenant, task_name: impl ToString) -> Self { + TaskDependentIdent::new_generic( + tenant, + TaskDependentKey::new(DependentType::Before, task_name.to_string()), + ) + } + + pub fn new_after(tenant: impl ToTenant, task_name: impl ToString) -> Self { + TaskDependentIdent::new_generic( + tenant, + TaskDependentKey::new(DependentType::After, task_name.to_string()), + ) + } +} + +pub use kvapi_impl::TaskDependentResource; + +use crate::principal::DependentType; +use crate::principal::TaskDependentKey; +use crate::tenant::ToTenant; + +mod kvapi_impl { + use databend_common_meta_kvapi::kvapi; + + use crate::principal::task::TaskDependentValue; + use crate::principal::task_dependent_ident::TaskDependentIdent; + use crate::tenant_key::resource::TenantResource; + + pub struct TaskDependentResource; + impl TenantResource for TaskDependentResource { + const PREFIX: &'static str = "__fd_task_dependents"; + const TYPE: &'static str = "TaskDependentIdent"; + const HAS_TENANT: bool = true; + type ValueType = TaskDependentValue; + } + + impl kvapi::Value for TaskDependentValue { + type KeyType = TaskDependentIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} diff --git a/src/meta/app/src/principal/task_state_ident.rs b/src/meta/app/src/principal/task_state_ident.rs new file mode 100644 index 0000000000000..b4b0abf75367c --- /dev/null +++ b/src/meta/app/src/principal/task_state_ident.rs @@ -0,0 +1,59 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; + +pub type TaskSucceededStateIdent = TIdent; + +impl TaskSucceededStateIdent { + pub fn new( + tenant: impl ToTenant, + before_task: impl ToString, + after_task: impl ToString, + ) -> Self { + TaskSucceededStateIdent::new_generic( + tenant, + TaskSucceededStateKey::new(before_task.to_string(), after_task.to_string()), + ) + } +} + +pub use kvapi_impl::Resource; + +use crate::principal::task::TaskSucceededStateKey; +use crate::tenant::ToTenant; + +mod kvapi_impl { + use databend_common_meta_kvapi::kvapi; + + use crate::principal::task_state_ident::TaskSucceededStateIdent; + use crate::principal::TaskSucceededStateValue; + use crate::tenant_key::resource::TenantResource; + + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_task_states"; + const TYPE: &'static str = "TaskSucceededStateIdent"; + const HAS_TENANT: bool = true; + type ValueType = TaskSucceededStateValue; + } + + impl kvapi::Value for TaskSucceededStateValue { + type KeyType = TaskSucceededStateIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} diff --git a/src/meta/proto-conv/src/task_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/task_from_to_protobuf_impl.rs index 381aa50a8712f..a98a97e19371d 100644 --- a/src/meta/proto-conv/src/task_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/task_from_to_protobuf_impl.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeSet; + use chrono::DateTime; use chrono::Utc; use databend_common_meta_app::principal as mt; @@ -195,3 +197,44 @@ impl FromToProto for mt::TaskMessage { }) } } + +impl FromToProto for mt::TaskDependentValue { + type PB = pb::TaskDependentValue; + + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + + fn from_pb(p: Self::PB) -> Result + where Self: Sized { + Ok(Self(BTreeSet::from_iter(p.names))) + } + + fn to_pb(&self) -> Result { + Ok(pb::TaskDependentValue { + ver: VER, + min_reader_ver: MIN_READER_VER, + names: Vec::from_iter(self.0.iter().cloned()), + }) + } +} + +impl FromToProto for mt::TaskSucceededStateValue { + type PB = pb::TaskStateValue; + + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + + fn from_pb(_p: Self::PB) -> Result + where Self: Sized { + Ok(Self) + } + + fn to_pb(&self) -> Result { + Ok(pb::TaskStateValue { + ver: VER, + min_reader_ver: MIN_READER_VER, + }) + } +} diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index 6b21d5d4fe020..a54e20f65b718 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -170,6 +170,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (138, "2025-07-23: Add: TableStatistics add index size"), (139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"), (140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"), + (141, "2025-07-27: Add: TaskState and TaskDependent"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 8581d843ac051..1eda7d6426ce9 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -132,3 +132,4 @@ mod v137_add_grant_object_connection; mod v138_table_statistics; mod v139_add_grant_ownership_object_sequence; mod v140_task_message; +mod v141_task_state; diff --git a/src/meta/proto-conv/tests/it/v141_task_state.rs b/src/meta/proto-conv/tests/it/v141_task_state.rs new file mode 100644 index 0000000000000..a4f8db05cd091 --- /dev/null +++ b/src/meta/proto-conv/tests/it/v141_task_state.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; + +use databend_common_meta_app::principal as mt; +use fastrace::func_name; + +use crate::common; + +#[test] +fn test_decode_v141_task_state() -> anyhow::Result<()> { + let task_state_v141 = vec![160, 6, 141, 1, 168, 6, 24]; + + let want = || mt::TaskSucceededStateValue; + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), task_state_v141.as_slice(), 141, want())?; + + Ok(()) +} + +#[test] +fn test_decode_v141_task_dependent() -> anyhow::Result<()> { + let task_dependent_value_v141 = vec![10, 1, 97, 10, 1, 98, 160, 6, 141, 1, 168, 6, 24]; + let want = || mt::TaskDependentValue(BTreeSet::from([s("a"), s("b")])); + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old( + func_name!(), + task_dependent_value_v141.as_slice(), + 141, + want(), + )?; + + Ok(()) +} + +fn s(ss: impl ToString) -> String { + ss.to_string() +} diff --git a/src/meta/protos/proto/task.proto b/src/meta/protos/proto/task.proto index 27c3e8f99aa8c..c6caa8bfe0090 100644 --- a/src/meta/protos/proto/task.proto +++ b/src/meta/protos/proto/task.proto @@ -78,3 +78,15 @@ message Task { optional string error_integration = 20; string owner_user = 21; } + +message TaskStateValue { + uint64 ver = 100; + uint64 min_reader_ver = 101; +} + +message TaskDependentValue { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + repeated string names = 1; +} diff --git a/src/query/management/src/task/errors.rs b/src/query/management/src/task/errors.rs index 703728729ad1c..d76b8c3be5f8b 100644 --- a/src/query/management/src/task/errors.rs +++ b/src/query/management/src/task/errors.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Display; - use databend_common_exception::ErrorCode; +use databend_common_meta_types::InvalidArgument; use databend_common_meta_types::MetaError; +use databend_common_proto_conv::Incompatible; use crate::errors::TenantError; @@ -84,6 +84,15 @@ pub enum TaskApiError { meta_err: MetaError, context: String, }, + + #[error("Incompatible error: {inner}")] + Incompatible { inner: Incompatible }, + + #[error("There are simultaneous update to task: {task_name} afters: {after}")] + SimultaneousUpdateTaskAfter { task_name: String, after: String }, + + #[error("InvalidArgument error: {inner}")] + InvalidArgument { inner: InvalidArgument }, } impl From for TaskApiError { @@ -95,6 +104,18 @@ impl From for TaskApiError { } } +impl From for TaskApiError { + fn from(err: InvalidArgument) -> Self { + TaskApiError::InvalidArgument { inner: err } + } +} + +impl From for TaskApiError { + fn from(value: Incompatible) -> Self { + TaskApiError::Incompatible { inner: value } + } +} + impl From for ErrorCode { fn from(value: TaskApiError) -> Self { match value { @@ -102,21 +123,11 @@ impl From for ErrorCode { TaskApiError::MetaError { meta_err, context } => { ErrorCode::from(meta_err).add_message_back(context) } - } - } -} - -impl TaskApiError { - pub fn append_context(self, context: impl Display) -> Self { - match self { - TaskApiError::TenantError(e) => TaskApiError::TenantError(e.append_context(context)), - TaskApiError::MetaError { - meta_err, - context: old, - } => TaskApiError::MetaError { - meta_err, - context: format!("{}; {}", old, context), - }, + TaskApiError::Incompatible { inner } => ErrorCode::from_std_error(inner), + TaskApiError::SimultaneousUpdateTaskAfter { .. } => { + ErrorCode::from_string(value.to_string()) + } + TaskApiError::InvalidArgument { inner } => ErrorCode::from_std_error(inner), } } } diff --git a/src/query/management/src/task/task_mgr.rs b/src/query/management/src/task/task_mgr.rs index 65949a6d83935..d7872a885ec4a 100644 --- a/src/query/management/src/task/task_mgr.rs +++ b/src/query/management/src/task/task_mgr.rs @@ -22,9 +22,16 @@ use databend_common_ast::ast::AlterTaskOptions; use databend_common_ast::ast::ScheduleOptions; use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_api::kv_pb_api::UpsertPB; +use databend_common_meta_api::txn_cond_eq_seq; +use databend_common_meta_api::txn_op_del; +use databend_common_meta_api::util::txn_put_pb; +use databend_common_meta_api::SchemaApi; use databend_common_meta_app::principal::task; use databend_common_meta_app::principal::task::TaskMessage; +use databend_common_meta_app::principal::task::TaskSucceededStateValue; +use databend_common_meta_app::principal::task_dependent_ident::TaskDependentIdent; use databend_common_meta_app::principal::task_message_ident::TaskMessageIdent; +use databend_common_meta_app::principal::task_state_ident::TaskSucceededStateIdent; use databend_common_meta_app::principal::ScheduleType; use databend_common_meta_app::principal::Status; use databend_common_meta_app::principal::Task; @@ -33,10 +40,18 @@ use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::txn_condition::Target; +use databend_common_meta_types::ConditionResult; use databend_common_meta_types::MatchSeq; use databend_common_meta_types::MetaError; +use databend_common_meta_types::TxnCondition; +use databend_common_meta_types::TxnOp; +use databend_common_meta_types::TxnRequest; use databend_common_meta_types::With; +use futures::StreamExt; use futures::TryStreamExt; +use seq_marked::SeqValue; use crate::task::errors::TaskApiError; use crate::task::errors::TaskError; @@ -153,12 +168,7 @@ impl TaskMgr { name: task_name.to_string(), })); } - for after in afters { - if task.after.contains(after) { - continue; - } - task.after.push(after.clone()); - } + return self.add_after(&task.task_name, afters).await; } AlterTaskOptions::RemoveAfter(afters) => { if task.schedule_options.is_some() { @@ -167,7 +177,7 @@ impl TaskMgr { name: task_name.to_string(), })); } - task.after.retain(|task| !afters.contains(task)); + return self.remove_after(&task.task_name, afters).await; } } if let Err(e) = self @@ -251,6 +261,248 @@ impl TaskMgr { Ok(change.is_changed()) } + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn add_after( + &self, + task_name: &str, + new_afters: &[String], + ) -> Result, TaskApiError> { + let mut request = TxnRequest::new(vec![], vec![]); + let after_dependent_ident = TaskDependentIdent::new_after(&self.tenant, task_name); + + let after_seq_deps = self.kv_api.get_pb(&after_dependent_ident).await?; + request.condition.push(txn_cond_eq_seq( + &after_dependent_ident, + after_seq_deps.seq(), + )); + + let mut after_deps = after_seq_deps.unwrap_or_default(); + after_deps.0.extend(new_afters.iter().cloned()); + request + .if_then + .push(txn_put_pb(&after_dependent_ident, &after_deps)?); + + let before_dependent_idents = new_afters + .iter() + .map(|after| TaskDependentIdent::new_before(&self.tenant, after)); + for (before_dependent_ident, before_seq_deps) in + self.kv_api.get_pb_vec(before_dependent_idents).await? + { + request.condition.push(txn_cond_eq_seq( + &before_dependent_ident, + before_seq_deps.seq(), + )); + + let mut deps = before_seq_deps.unwrap_or_default(); + deps.0.insert(task_name.to_string()); + request + .if_then + .push(txn_put_pb(&before_dependent_ident, &deps)?); + } + let reply = self.kv_api.transaction(request).await?; + + if !reply.success { + return Err(TaskApiError::SimultaneousUpdateTaskAfter { + task_name: task_name.to_string(), + after: new_afters.join(","), + }); + } + + Ok(Ok(())) + } + + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn remove_after( + &self, + task_name: &str, + remove_afters: &[String], + ) -> Result, TaskApiError> { + let mut request = TxnRequest::new(vec![], vec![]); + + let after_dependent_ident = TaskDependentIdent::new_after(&self.tenant, task_name); + let after_seq_deps = self.kv_api.get_pb(&after_dependent_ident).await?; + request.condition.push(txn_cond_eq_seq( + &after_dependent_ident, + after_seq_deps.seq(), + )); + + if let Some(mut deps) = after_seq_deps { + for remove_after in remove_afters { + deps.0.remove(remove_after); + } + request + .if_then + .push(txn_put_pb(&after_dependent_ident, &deps)?); + } + + let before_dependent_idents = remove_afters + .iter() + .map(|after| TaskDependentIdent::new_before(&self.tenant, after)); + for (before_dependent_ident, before_seq_deps) in + self.kv_api.get_pb_vec(before_dependent_idents).await? + { + request.condition.push(txn_cond_eq_seq( + &before_dependent_ident, + before_seq_deps.seq(), + )); + + if let Some(mut deps) = before_seq_deps { + deps.0.remove(task_name); + request + .if_then + .push(txn_put_pb(&before_dependent_ident, &deps)?); + } + } + let reply = self.kv_api.transaction(request).await?; + + if !reply.success { + return Err(TaskApiError::SimultaneousUpdateTaskAfter { + task_name: task_name.to_string(), + after: remove_afters.join(","), + }); + } + + Ok(Ok(())) + } + + // Tips: Task Before only cleans up when dropping a task + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn clean_task_state_and_dependents( + &self, + task_name: &str, + ) -> Result, TaskApiError> { + let mut request = TxnRequest::new(vec![], vec![]); + + let task_after_ident = TaskDependentIdent::new_after(&self.tenant, task_name); + let task_before_ident = TaskDependentIdent::new_before(&self.tenant, task_name); + + let mut target_idents = Vec::new(); + if let Some(task_after_dependent) = self.kv_api.get(&task_after_ident).await? { + target_idents.extend(task_after_dependent.0.into_iter().map(|dependent_target| { + TaskDependentIdent::new_before(&self.tenant, dependent_target) + })); + } + if let Some(task_before_dependent) = self.kv_api.get(&task_before_ident).await? { + target_idents.extend(task_before_dependent.0.into_iter().map(|dependent_target| { + TaskDependentIdent::new_after(&self.tenant, dependent_target) + })); + } + for (target_ident, seq_dep) in self.kv_api.get_pb_vec(target_idents).await? { + request + .condition + .push(txn_cond_eq_seq(&target_ident, seq_dep.seq())); + + if let Some(mut deps) = seq_dep { + deps.0.remove(task_name); + request.if_then.push(txn_put_pb(&target_ident, &deps)?); + } + } + request.if_then.push(TxnOp::delete( + TaskDependentIdent::new_before(&self.tenant, task_name).to_string_key(), + )); + request.if_then.push(TxnOp::delete( + TaskDependentIdent::new_after(&self.tenant, task_name).to_string_key(), + )); + let mut stream = self + .kv_api + .list_pb_keys(&DirName::new(TaskSucceededStateIdent::new( + &self.tenant, + task_name, + "", + ))) + .await?; + + while let Some(result) = stream.next().await { + request.if_then.push(TxnOp::delete(result?.to_string_key())) + } + let _ = self.kv_api.transaction(request).await?; + + Ok(Ok(())) + } + + /// Marks the given task as succeeded, and checks all tasks that depend on it. + /// + /// For each task that depends on the completed task (`task_name`), we check if all its + /// predecessor tasks are also succeeded. If so, we mark the dependent task as *not succeeded* + /// to prevent premature execution. Otherwise, we record the dependent task as *ready* + /// for further processing. + /// + /// # Arguments + /// - `task_name`: The name of the task that has just completed successfully. + /// + /// # Returns + /// - `Vec`: A list of dependent task names that are ready to proceed. + /// + /// # Behavior + /// Assume: + /// - `a` = given `task_name` (the completed task) + /// - `b` = a task that has `a` in its `AFTER` list (i.e., depends on `a`) + /// - `c` = other tasks in `b`'s `AFTER` list (other dependencies of `b`) + /// + /// 1. Retrieves all tasks (`b`) that have `a` in their `AFTER` list. + /// 2. For each `b`: + /// - Check whether all its dependencies (`a/b` + `c/b`) have succeeded. + /// - If all dependencies are complete: + /// - Add `b` to the ready list. + /// - Set the status of its dependencies (`a/b` + `c/b`) to `not succeeded`. + /// - If not all dependencies of `b` are complete: + /// - Only mark `a/b` as succeeded. + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn get_next_ready_tasks( + &self, + task_name: &str, + ) -> Result, TaskError>, TaskApiError> { + let task_before_ident = TaskDependentIdent::new_before(&self.tenant, task_name); + + let Some(task_before_dependent) = self.kv_api.get_pb(&task_before_ident).await? else { + return Ok(Ok(Vec::new())); + }; + let mut ready_tasks = Vec::new(); + + let target_after_idents = task_before_dependent + .0 + .iter() + .map(|before| TaskDependentIdent::new_after(&self.tenant, before)); + for (target_after_ident, target_after_dependent) in + self.kv_api.get_pb_vec(target_after_idents).await? + { + let Some(target_after_dependent) = target_after_dependent else { + continue; + }; + let target_after = &target_after_ident.name().source; + let this_task_to_target_state = + TaskSucceededStateIdent::new(&self.tenant, task_name, target_after); + let mut request = TxnRequest::new(vec![], vec![]).with_else(vec![txn_put_pb( + &this_task_to_target_state, + &TaskSucceededStateValue, + )?]); + + for before_target_after in target_after_dependent.0.iter() { + let task_ident = + TaskSucceededStateIdent::new(&self.tenant, before_target_after, target_after); + // Only care about the predecessors of this task's successor tasks, excluding this task itself. + if before_target_after != task_name { + request.condition.push(TxnCondition { + key: task_ident.to_string_key(), + expected: ConditionResult::Gt as i32, + target: Some(Target::Seq(0)), + }); + } + request.if_then.push(txn_op_del(&task_ident)); + } + let reply = self.kv_api.transaction(request).await?; + + if reply.success { + ready_tasks.push(target_after_ident.name().source.clone()); + } + } + Ok(Ok(ready_tasks)) + } + async fn create_task_inner( &self, task: Task, @@ -300,7 +552,9 @@ impl TaskMgr { } } if !task.after.is_empty() { - self.send(TaskMessage::AfterTask(task)).await?; + if let Err(err) = self.add_after(&task.task_name, &task.after).await? { + return Ok(Err(err)); + } } else if task.schedule_options.is_some() && !without_schedule { self.send(TaskMessage::ScheduleTask(task)).await?; } diff --git a/src/query/service/src/interpreters/task/private.rs b/src/query/service/src/interpreters/task/private.rs index 50ed7c43da273..9a5201d3ec217 100644 --- a/src/query/service/src/interpreters/task/private.rs +++ b/src/query/service/src/interpreters/task/private.rs @@ -18,6 +18,7 @@ use chrono::Utc; use databend_common_ast::ast::TaskSql; use databend_common_catalog::table_context::TableContext; use databend_common_cloud_control::task_utils; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_management::task::TaskMgr; use databend_common_meta_app::principal::task::EMPTY_TASK_ID; @@ -33,6 +34,7 @@ use databend_common_users::UserApiProvider; use crate::interpreters::task::TaskInterpreter; use crate::sessions::QueryContext; +use crate::task::TaskService; pub(crate) struct PrivateTaskInterpreter; @@ -87,6 +89,14 @@ impl TaskInterpreter for PrivateTaskInterpreter { } async fn alter_task(&self, _ctx: &Arc, plan: &AlterTaskPlan) -> Result<()> { + if TaskService::instance() + .is_task_executing(&plan.task_name) + .await? + { + return Err(ErrorCode::TaskRunningWhenModifyingAfter( + "Task Running when modifying after", + )); + } UserApiProvider::instance() .task_api(&plan.tenant) .alter_task(&plan.task_name, &plan.alter_options) @@ -108,6 +118,14 @@ impl TaskInterpreter for PrivateTaskInterpreter { } async fn drop_task(&self, _ctx: &Arc, plan: &DropTaskPlan) -> Result<()> { + if TaskService::instance() + .is_task_executing(&plan.task_name) + .await? + { + return Err(ErrorCode::TaskRunningWhenModifyingAfter( + "Task Running when modifying after", + )); + } UserApiProvider::instance() .task_api(&plan.tenant) .drop_task(&plan.task_name) diff --git a/src/query/service/src/task/service.rs b/src/query/service/src/task/service.rs index af9cb60e5201b..1f42442d19e60 100644 --- a/src/query/service/src/task/service.rs +++ b/src/query/service/src/task/service.rs @@ -21,7 +21,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use async_stream::stream; use chrono::DateTime; use chrono::Utc; use chrono_tz::Tz; @@ -59,10 +58,8 @@ use databend_common_meta_types::MetaError; use databend_common_sql::Planner; use databend_common_users::UserApiProvider; use databend_common_users::BUILTIN_ROLE_ACCOUNT_ADMIN; -use futures::Stream; use futures_util::stream::BoxStream; use futures_util::TryStreamExt; -use itertools::Itertools; use log::error; use tokio::time::sleep; use tokio_stream::StreamExt; @@ -136,12 +133,6 @@ impl TaskService { );"; self.execute_sql(None, create_task_run_table).await?; - let create_task_after_table = "CREATE TABLE IF NOT EXISTS system_task.task_after(\ - task_name STRING NOT NULL,\ - next_task STRING NOT NULL\ - );"; - self.execute_sql(None, create_task_after_table).await?; - Ok(()) } @@ -401,11 +392,9 @@ impl TaskService { .update_or_create_task_run(&task_run) .await?; - let mut stream = - Box::pin(task_service.check_next_tasks(&task_name)); - - while let Some(next_task) = stream.next().await { - let next_task = next_task?; + for next_task in + task_mgr.get_next_ready_tasks(&task_name).await?? + { let next_task = task_mgr .describe_task(&next_task) .await?? @@ -460,7 +449,9 @@ impl TaskService { token.cancel(); } if task_mgr.accept(&task_key).await? { - self.clean_task_afters(&task_name).await?; + task_mgr + .clean_task_state_and_dependents(&task_name) + .await??; } task_mgr .accept(&TaskMessageIdent::new( @@ -469,16 +460,8 @@ impl TaskService { )) .await?; } - TaskMessage::AfterTask(task) => { - if !task_mgr.accept(&task_key).await? { - continue; - } - match task.status { - Status::Suspended => continue, - Status::Started => (), - } - self.update_task_afters(&task).await?; - } + // deprecated + TaskMessage::AfterTask(_) => (), } } Ok(()) @@ -505,6 +488,18 @@ impl TaskService { }))) } + pub async fn is_task_executing(&self, task_name: &str) -> Result { + let blocks = self.execute_sql(None, &format!("SELECT state = 'EXECUTING' from system_task.task_run WHERE task_name = '{task_name}' ORDER BY run_id DESC LIMIT 1")).await?; + Ok(blocks + .first() + .and_then(|block| { + block.columns()[0] + .index(0) + .and_then(|scalar| scalar.as_boolean().cloned()) + }) + .unwrap_or(false)) + } + fn decode(resp: WatchResponse) -> Result> { let Some((key, _, Some(value))) = resp.unpack() else { return Ok(None); @@ -673,89 +668,6 @@ impl TaskService { Ok(()) } - pub fn check_next_tasks<'a>( - &'a self, - task_name: &'a str, - ) -> impl Stream> + '_ { - stream! { - let check = format!(" - WITH latest_task_run AS ( - SELECT - task_name, - state, - completed_at - FROM ( - SELECT - task_name, - state, - completed_at, - ROW_NUMBER() OVER (PARTITION BY task_name ORDER BY completed_at DESC) AS rn - FROM system_task.task_run - ) ranked - WHERE rn = 1 -), -next_task_time AS ( - SELECT - task_name AS next_task, - completed_at - FROM latest_task_run -) -SELECT DISTINCT ta.next_task -FROM system_task.task_after ta -WHERE ta.task_name = '{task_name}' - AND NOT EXISTS ( - SELECT 1 - FROM system_task.task_after ta_dep - LEFT JOIN latest_task_run tr - ON ta_dep.task_name = tr.task_name - LEFT JOIN next_task_time nt - ON ta_dep.next_task = nt.next_task - WHERE ta_dep.next_task = ta.next_task - AND ( - tr.task_name IS NULL - OR tr.state != 'SUCCEEDED' - OR tr.completed_at IS NULL - OR (nt.completed_at IS NOT NULL AND tr.completed_at <= nt.completed_at) - ) - );"); - if let Some(next_task) = self.execute_sql(None, &check).await?.first().and_then(|block| block.columns()[0].index(0).and_then(|scalar| { scalar.as_string().map(|s| s.to_string()) })) { - yield Result::Ok(next_task); - } - } - } - - pub async fn clean_task_afters(&self, task_name: &str) -> Result<()> { - self.execute_sql( - None, - &format!( - "DELETE FROM system_task.task_after WHERE next_task = '{}'", - task_name - ), - ) - .await?; - - Ok(()) - } - - pub async fn update_task_afters(&self, task: &Task) -> Result<()> { - self.clean_task_afters(&task.task_name).await?; - let values = task - .after - .iter() - .map(|after| format!("('{}', '{}')", after, task.task_name)) - .join(", "); - self.execute_sql( - None, - &format!( - "INSERT INTO system_task.task_after (task_name, next_task) VALUES {}", - values - ), - ) - .await?; - - Ok(()) - } - fn task_run2insert(task_run: &TaskRun) -> Result { let task = &task_run.task; diff --git a/tests/task/test-private-task.sh b/tests/task/test-private-task.sh index 0c31108312969..4deaeeb00d187 100644 --- a/tests/task/test-private-task.sh +++ b/tests/task/test-private-task.sh @@ -149,6 +149,48 @@ else exit 1 fi +response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"ALTER TASK my_task_3 REMOVE AFTER 'my_task_2'\"}") +alter_task_query_id=$(echo $response | jq -r '.id') +echo "ALTER Task 3 ID: $alter_task_query_id" + +sleep 5 + +response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"EXECUTE TASK my_task_1\"}") +execute_task_1_query_id=$(echo $response | jq -r '.id') +echo "Execute Task 1 ID: $execute_task_1_query_id" + +sleep 15 + +response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1 FROM t1 ORDER BY c1\"}") + +actual=$(echo "$response" | jq -c '.data') +expected='[["0"],["0"],["0"],["1"],["1"],["1"],["2"],["2"],["2"]]' + +if [ "$actual" = "$expected" ]; then + echo "✅ Query result matches expected" +else + echo "❌ Mismatch" + echo "Expected: $expected" + echo "Actual : $actual" + exit 1 +fi + +sleep 30 + +response=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1 FROM t1 ORDER BY c1\"}") + +actual=$(echo "$response" | jq -c '.data') +expected='[["0"],["0"],["0"],["1"],["1"],["1"],["1"],["2"],["2"],["2"]]' + +if [ "$actual" = "$expected" ]; then + echo "✅ Query result matches expected" +else + echo "❌ Mismatch" + echo "Expected: $expected" + echo "Actual : $actual" + exit 1 +fi + # Test whether the schedule can be restored after restart killall -9 databend-query || true @@ -172,7 +214,7 @@ sleep 45 response9=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1 FROM t1 ORDER BY c1\"}") actual=$(echo "$response9" | jq -c '.data') -expected='[["0"],["0"],["1"],["1"],["1"],["2"],["2"]]' +expected='[["0"],["0"],["0"],["1"],["1"],["1"],["1"],["1"],["2"],["2"],["2"]]' if [ "$actual" = "$expected" ]; then echo "✅ Query result matches expected"