-
Notifications
You must be signed in to change notification settings - Fork 797
feat: Refactoring the after trigger of private task #18443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
22c4674
to
e1282c7
Compare
9b2ec1a
to
6d8a7fd
Compare
…& `clean_task_state_and_dependents`
6d8a7fd
to
d1b1d96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 10 of 15 files at r1, 6 of 8 files at r4, all commit messages.
Reviewable status: 11 of 15 files reviewed, 6 unresolved discussions (waiting on @zhang2014)
src/meta/app/src/principal/task.rs
line 201 at r4 (raw file):
#[derive(Debug, Clone, PartialEq)] pub struct TaskDependentValue(pub HashSet<String>);
If it is not a big data set and there is no performance concern, use ordered container BTreeSet
instead of HashSet
. HashSet yields arbitrary key orders that might mess up some tests.
src/query/management/src/task/task_mgr.rs
line 577 at r4 (raw file):
update_ops: &mut Vec<TxnOp>, task_names: &[String], dependent_ident: &TIdent<TaskDependentResource, TaskDependentKey>,
there is already an alias: TaskDependentIdent
src/query/management/src/task/task_mgr.rs
line 578 at r4 (raw file):
task_names: &[String], dependent_ident: &TIdent<TaskDependentResource, TaskDependentKey>, is_add: bool,
is_add
is a little bit confusing. I kind of think of removing this method may simplify the logic and structure.
There are some shortcut utils you can use to build txn, such as TxnCondition::eq_seq
:
src/query/management/src/task/task_mgr.rs
line 601 at r4 (raw file):
dependent_ident.to_string_key(), old_dependent.to_pb()?.encode_to_vec(), ));
eq_value does not guarantee the data is still the same. For example the value could have changed from a -> b -> a
. In this case, asserting value == a
does not guarantee anything and there is potential consistency issues.
Always check against seq
if possible. And in fact the condition to assert the seq
can be extracted out of these two match branches, which is more explicit: does not update unless it is still the same.
Code quote:
check_ops.push(TxnCondition::eq_value(
dependent_ident.to_string_key(),
old_dependent.to_pb()?.encode_to_vec(),
));
src/meta/protos/proto/task.proto
line 98 at r4 (raw file):
} string source = 1; DependentType ty = 3;
Keep the fields in the order that provides alphabetic order for the struct
Suggestion:
DependentType ty = 1;
string source = 2;
src/meta/proto-conv/src/task_from_to_protobuf_impl.rs
line 200 at r1 (raw file):
} impl FromToProto for mt::TaskDependent {
If TaskDependent won't be used as value, it does not need to implement FromToProto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 4 files at r5, 1 of 1 files at r6, 2 of 3 files at r9, all commit messages.
Reviewable status: 11 of 15 files reviewed, 5 unresolved discussions (waiting on @zhang2014)
src/query/management/src/task/task_mgr.rs
line 384 at r10 (raw file):
) -> Result<Result<(), TaskError>, TaskApiError> { let mut check_ops = Vec::new(); let mut update_ops = Vec::new();
Create a txn
: let txn=TxnRequest::new()
then use &mut txn
as the parameter in clean_related_dependent
.
Code quote:
let mut check_ops = Vec::new();
let mut update_ops = Vec::new();
src/query/management/src/task/task_mgr.rs
line 389 at r10 (raw file):
.await?; self.clean_related_dependent(task_name, &mut check_ops, &mut update_ops, false) .await?;
it seems like clean_related_dependent
it self should remove with after/before edges in one call.
There is no need to do it in two step.
Code quote:
self.clean_related_dependent(task_name, &mut check_ops, &mut update_ops, true)
.await?;
self.clean_related_dependent(task_name, &mut check_ops, &mut update_ops, false)
.await?;
src/query/management/src/task/task_mgr.rs
line 442 at r10 (raw file):
TaskDependentIdent::new_generic(&self.tenant, after_dependent) })) .await?
This line is too long. Break it down into several statements to improve readability.
Code quote:
for (target_after_ident, target_after_dependent) in self
.kv_api
.get_pb_vec(task_before_dependent.0.iter().map(|before| {
let after_dependent = TaskDependentKey::new(DependentType::After, before.clone());
TaskDependentIdent::new_generic(&self.tenant, after_dependent)
}))
.await?
src/query/management/src/task/task_mgr.rs
line 449 at r10 (raw file):
let mut conditions = Vec::new(); let mut if_ops = Vec::new();
create a txn let txn = TxnRequest::new()
instead of creating its fields
Code quote:
let mut conditions = Vec::new();
let mut if_ops = Vec::new();
src/query/management/src/task/task_mgr.rs
line 564 at r10 (raw file):
TaskDependentIdent::new_generic(&self.tenant, target_key.clone()); let seq_dep = self.kv_api.get_pb(&target_ident).await?;
use get_pb_vec
to batch the get.
968bc46
to
be933c3
Compare
I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/
Summary
PrivateTask originally used SQL to query the task_after and task_run tables to determine whether the task was the last completed task among other tasks to drive the execution of the next task.
Using SQL is relatively complex and can lead to low performance and even difficult to maintain. This PR moves this part of the
after check
to themeta
module, simplifying the logic and making it easier to maintain.TaskSucceededStateIdent => current task/ next task -> empty
(If the key exists, it means that the before task and after task are succeeded.)TaskDependentIdent => /DependentType (Before or After)/task_name 1
When task A is completed, check if any task C — which comes after a task B that comes before task A — is also completed.
If such a task C is completed, set both task A and task C’s is_successed to false; otherwise, set task A’s is_successed to true.
Tests
Type of change
This change is