Skip to content

Commit 6132b93

Browse files
committed
chore: codefmt
1 parent ee04c4a commit 6132b93

File tree

3 files changed

+80
-79
lines changed

3 files changed

+80
-79
lines changed

src/meta/app/src/principal/task_dependent_ident.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,27 @@ use crate::tenant_key::ident::TIdent;
1616

1717
pub type TaskDependentIdent = TIdent<TaskDependentResource, TaskDependentKey>;
1818

19+
impl TaskDependentIdent {
20+
pub fn new_before(tenant: impl ToTenant, task_name: impl ToString) -> Self {
21+
TaskDependentIdent::new_generic(
22+
tenant,
23+
TaskDependentKey::new(DependentType::Before, task_name.to_string()),
24+
)
25+
}
26+
27+
pub fn new_after(tenant: impl ToTenant, task_name: impl ToString) -> Self {
28+
TaskDependentIdent::new_generic(
29+
tenant,
30+
TaskDependentKey::new(DependentType::After, task_name.to_string()),
31+
)
32+
}
33+
}
34+
1935
pub use kvapi_impl::TaskDependentResource;
2036

37+
use crate::principal::DependentType;
2138
use crate::principal::TaskDependentKey;
39+
use crate::tenant::ToTenant;
2240

2341
mod kvapi_impl {
2442
use databend_common_meta_kvapi::kvapi;

src/query/management/src/task/task_mgr.rs

Lines changed: 61 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,9 @@ use databend_common_meta_app::principal::task::TaskState;
3131
use databend_common_meta_app::principal::task_dependent_ident::TaskDependentIdent;
3232
use databend_common_meta_app::principal::task_message_ident::TaskMessageIdent;
3333
use databend_common_meta_app::principal::task_state_ident::TaskStateIdent;
34-
use databend_common_meta_app::principal::DependentType;
3534
use databend_common_meta_app::principal::ScheduleType;
3635
use databend_common_meta_app::principal::Status;
3736
use databend_common_meta_app::principal::Task;
38-
use databend_common_meta_app::principal::TaskDependentKey;
3937
use databend_common_meta_app::principal::TaskIdent;
4038
use databend_common_meta_app::schema::CreateOption;
4139
use databend_common_meta_app::tenant::Tenant;
@@ -271,9 +269,7 @@ impl TaskMgr {
271269
let mut update_ops = Vec::new();
272270
let mut check_ops = Vec::with_capacity(new_afters.len());
273271

274-
let after_dependent = TaskDependentKey::new(DependentType::After, task_name.to_string());
275-
let after_dependent_ident =
276-
TaskDependentIdent::new_generic(&self.tenant, after_dependent.clone());
272+
let after_dependent_ident = TaskDependentIdent::new_after(&self.tenant, task_name);
277273

278274
let after_seq_deps = self.kv_api.get_pb(&after_dependent_ident).await?;
279275
check_ops.push(txn_cond_eq_seq(
@@ -285,10 +281,9 @@ impl TaskMgr {
285281
after_deps.0.extend(new_afters.iter().cloned());
286282
update_ops.push(txn_put_pb(&after_dependent_ident, &after_deps)?);
287283

288-
let before_dependent_idents = new_afters.iter().map(|after| {
289-
let before_dependent = TaskDependentKey::new(DependentType::Before, after.clone());
290-
TaskDependentIdent::new_generic(&self.tenant, before_dependent)
291-
});
284+
let before_dependent_idents = new_afters
285+
.iter()
286+
.map(|after| TaskDependentIdent::new_before(&self.tenant, after));
292287
for (before_dependent_ident, before_seq_deps) in
293288
self.kv_api.get_pb_vec(before_dependent_idents).await?
294289
{
@@ -324,10 +319,7 @@ impl TaskMgr {
324319
let mut update_ops = Vec::new();
325320
let mut check_ops = Vec::with_capacity(remove_afters.len());
326321

327-
let after_dependent = TaskDependentKey::new(DependentType::After, task_name.to_string());
328-
let after_dependent_ident =
329-
TaskDependentIdent::new_generic(&self.tenant, after_dependent.clone());
330-
322+
let after_dependent_ident = TaskDependentIdent::new_after(&self.tenant, task_name);
331323
let after_seq_deps = self.kv_api.get_pb(&after_dependent_ident).await?;
332324
check_ops.push(txn_cond_eq_seq(
333325
&after_dependent_ident,
@@ -341,10 +333,9 @@ impl TaskMgr {
341333
update_ops.push(txn_put_pb(&after_dependent_ident, &deps)?);
342334
}
343335

344-
let before_dependent_idents = remove_afters.iter().map(|after| {
345-
let before_dependent = TaskDependentKey::new(DependentType::Before, after.clone());
346-
TaskDependentIdent::new_generic(&self.tenant, before_dependent)
347-
});
336+
let before_dependent_idents = remove_afters
337+
.iter()
338+
.map(|after| TaskDependentIdent::new_before(&self.tenant, after));
348339
for (before_dependent_ident, before_seq_deps) in
349340
self.kv_api.get_pb_vec(before_dependent_idents).await?
350341
{
@@ -381,57 +372,33 @@ impl TaskMgr {
381372
let mut check_ops = Vec::new();
382373
let mut update_ops = Vec::new();
383374

384-
let task_after_ident = TaskDependentIdent::new_generic(
385-
&self.tenant,
386-
TaskDependentKey::new(DependentType::After, task_name.to_string()),
387-
);
388-
if let Some(task_after_dependent) = self.kv_api.get(&task_after_ident).await? {
389-
let target_idents = task_after_dependent.0.into_iter().map(|dependent_target| {
390-
let target_key =
391-
TaskDependentKey::new(DependentType::Before, dependent_target.clone());
392-
TaskDependentIdent::new_generic(&self.tenant, target_key.clone())
393-
});
394-
for (target_ident, seq_dep) in self.kv_api.get_pb_vec(target_idents).await? {
395-
check_ops.push(txn_cond_eq_seq(&target_ident, seq_dep.seq()));
375+
let task_after_ident = TaskDependentIdent::new_after(&self.tenant, task_name);
376+
let task_before_ident = TaskDependentIdent::new_before(&self.tenant, task_name);
396377

397-
if let Some(mut deps) = seq_dep {
398-
deps.0.remove(task_name);
399-
update_ops.push(txn_put_pb(&target_ident, &deps)?);
400-
}
401-
}
378+
let mut target_idents = Vec::new();
379+
if let Some(task_after_dependent) = self.kv_api.get(&task_after_ident).await? {
380+
target_idents.extend(task_after_dependent.0.into_iter().map(|dependent_target| {
381+
TaskDependentIdent::new_before(&self.tenant, dependent_target)
382+
}));
402383
}
403-
let task_before_ident = TaskDependentIdent::new_generic(
404-
&self.tenant,
405-
TaskDependentKey::new(DependentType::Before, task_name.to_string()),
406-
);
407384
if let Some(task_before_dependent) = self.kv_api.get(&task_before_ident).await? {
408-
let target_idents = task_before_dependent.0.into_iter().map(|dependent_target| {
409-
let target_key =
410-
TaskDependentKey::new(DependentType::After, dependent_target.clone());
411-
TaskDependentIdent::new_generic(&self.tenant, target_key.clone())
412-
});
413-
for (target_ident, seq_dep) in self.kv_api.get_pb_vec(target_idents).await? {
414-
check_ops.push(txn_cond_eq_seq(&target_ident, seq_dep.seq()));
385+
target_idents.extend(task_before_dependent.0.into_iter().map(|dependent_target| {
386+
TaskDependentIdent::new_after(&self.tenant, dependent_target)
387+
}));
388+
}
389+
for (target_ident, seq_dep) in self.kv_api.get_pb_vec(target_idents).await? {
390+
check_ops.push(txn_cond_eq_seq(&target_ident, seq_dep.seq()));
415391

416-
if let Some(mut deps) = seq_dep {
417-
deps.0.remove(task_name);
418-
update_ops.push(txn_put_pb(&target_ident, &deps)?);
419-
}
392+
if let Some(mut deps) = seq_dep {
393+
deps.0.remove(task_name);
394+
update_ops.push(txn_put_pb(&target_ident, &deps)?);
420395
}
421396
}
422397
update_ops.push(TxnOp::delete(
423-
TaskDependentIdent::new_generic(
424-
&self.tenant,
425-
TaskDependentKey::new(DependentType::Before, task_name.to_string()),
426-
)
427-
.to_string_key(),
398+
TaskDependentIdent::new_before(&self.tenant, task_name).to_string_key(),
428399
));
429400
update_ops.push(TxnOp::delete(
430-
TaskDependentIdent::new_generic(
431-
&self.tenant,
432-
TaskDependentKey::new(DependentType::After, task_name.to_string()),
433-
)
434-
.to_string_key(),
401+
TaskDependentIdent::new_after(&self.tenant, task_name).to_string_key(),
435402
));
436403
update_ops.push(TxnOp::delete(
437404
TaskStateIdent::new(&self.tenant, task_name).to_string_key(),
@@ -443,16 +410,36 @@ impl TaskMgr {
443410
Ok(Ok(()))
444411
}
445412

413+
/// Marks the given task as succeeded, and checks all tasks that depend on it.
414+
///
415+
/// For each task that depends on the completed task (`task_name`), we check if all its
416+
/// predecessor tasks are also succeeded. If so, we mark the dependent task as *not succeeded*
417+
/// to prevent premature execution. Otherwise, we record the dependent task as *ready*
418+
/// for further processing.
419+
///
420+
/// # Arguments
421+
/// - `task_name`: The name of the task that has just completed successfully.
422+
///
423+
/// # Returns
424+
/// - `Vec<String>`: A list of dependent task names that are ready to proceed.
425+
///
426+
/// # Behavior
427+
/// 1. Retrieves all tasks that must be executed *before* the given `task_name`.
428+
/// 2. For each such task, find the tasks that depend on it (`after` tasks).
429+
/// 3. For each `after` task:
430+
/// - If all its dependencies (excluding the current task) are succeeded:
431+
/// - Mark that task as **not succeeded**.
432+
/// - Also mark the current task as succeeded.
433+
/// - Record it as ready for further processing.
434+
/// - Otherwise:
435+
/// - Still mark the current task as succeeded.
446436
#[async_backtrace::framed]
447437
#[fastrace::trace]
448-
pub async fn task_succeeded(
438+
pub async fn get_next_ready_tasks(
449439
&self,
450440
task_name: &str,
451441
) -> Result<Result<Vec<String>, TaskError>, TaskApiError> {
452-
let task_before_ident = TaskDependentIdent::new_generic(
453-
&self.tenant,
454-
TaskDependentKey::new(DependentType::Before, task_name.to_string()),
455-
);
442+
let task_before_ident = TaskDependentIdent::new_before(&self.tenant, task_name);
456443
let task_state_key = TaskStateIdent::new(&self.tenant, task_name);
457444
let succeeded_value = TaskState { is_succeeded: true };
458445
let not_succeeded_value = TaskState {
@@ -464,35 +451,31 @@ impl TaskMgr {
464451
};
465452
let mut ready_tasks = Vec::new();
466453

467-
let target_after_idents = task_before_dependent.0.iter().map(|before| {
468-
let after_dependent = TaskDependentKey::new(DependentType::After, before.clone());
469-
TaskDependentIdent::new_generic(&self.tenant, after_dependent)
470-
});
454+
let target_after_idents = task_before_dependent
455+
.0
456+
.iter()
457+
.map(|before| TaskDependentIdent::new_after(&self.tenant, before));
471458
for (target_after_ident, target_after_dependent) in
472459
self.kv_api.get_pb_vec(target_after_idents).await?
473460
{
474461
let Some(target_after_dependent) = target_after_dependent else {
475462
continue;
476463
};
477-
478464
let mut request = TxnRequest::new(vec![], vec![])
479465
.with_else(vec![txn_put_pb(&task_state_key, &succeeded_value)?]);
480466

481467
for target_after_task in target_after_dependent.0.iter() {
482468
let task_ident = TaskStateIdent::new(&self.tenant, target_after_task);
483469
// Only care about the predecessors of this task's successor tasks, excluding this task itself.
484470
if target_after_task != task_name {
485-
request =
486-
request.push_if_then([], [txn_put_pb(&task_ident, &not_succeeded_value)?]);
487-
continue;
488-
}
489-
request = request.push_if_then(
490-
[TxnCondition::eq_value(
471+
request.condition.push(TxnCondition::eq_value(
491472
task_ident.to_string_key(),
492473
succeeded_value.to_pb()?.encode_to_vec(),
493-
)],
494-
[txn_put_pb(&task_ident, &not_succeeded_value)?],
495-
);
474+
));
475+
}
476+
request
477+
.if_then
478+
.push(txn_put_pb(&task_ident, &not_succeeded_value)?);
496479
}
497480
let reply = self.kv_api.transaction(request).await?;
498481

src/query/service/src/task/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ impl TaskService {
393393
.await?;
394394

395395
for next_task in
396-
task_mgr.task_succeeded(&task_name).await??
396+
task_mgr.get_next_ready_tasks(&task_name).await??
397397
{
398398
let next_task = task_mgr
399399
.describe_task(&next_task)

0 commit comments

Comments
 (0)