Skip to content

Commit b1ea18e

Browse files
committed
chore: codefmt
1 parent ea3f1bf commit b1ea18e

File tree

1 file changed

+127
-110
lines changed

1 file changed

+127
-110
lines changed

src/query/management/src/task/task_mgr.rs

Lines changed: 127 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -275,27 +275,67 @@ impl TaskMgr {
275275
let after_dependent = TaskDependentKey::new(DependentType::After, task_name.to_string());
276276
let after_dependent_ident =
277277
TaskDependentIdent::new_generic(&self.tenant, after_dependent.clone());
278+
let after_key = after_dependent_ident.to_string_key();
278279

279-
self.check_and_add(
280-
&mut check_ops,
281-
&mut update_ops,
282-
new_afters,
283-
&after_dependent_ident,
284-
)
285-
.await?;
280+
match self.kv_api.get_pb(&after_dependent_ident).await? {
281+
None => {
282+
check_ops.push(TxnCondition {
283+
key: after_key.clone(),
284+
expected: ConditionResult::Eq as i32,
285+
target: Some(Target::Seq(0)),
286+
});
287+
288+
update_ops.push(TxnOp::put(
289+
after_key,
290+
TaskDependentValue(BTreeSet::from_iter(new_afters.iter().cloned()))
291+
.to_pb()?
292+
.encode_to_vec(),
293+
));
294+
}
295+
Some(mut old_dependent) => {
296+
check_ops.push(TxnCondition::eq_seq(after_key.clone(), old_dependent.seq));
297+
298+
old_dependent.0.extend(new_afters.iter().cloned());
299+
300+
update_ops.push(TxnOp::put(
301+
after_key,
302+
old_dependent.to_pb()?.encode_to_vec(),
303+
));
304+
}
305+
}
286306

287307
for after in new_afters {
288308
let before_dependent = TaskDependentKey::new(DependentType::Before, after.clone());
289309
let before_dependent_ident =
290310
TaskDependentIdent::new_generic(&self.tenant, before_dependent);
311+
let before_key = before_dependent_ident.to_string_key();
312+
313+
match self.kv_api.get_pb(&before_dependent_ident).await? {
314+
None => {
315+
check_ops.push(TxnCondition {
316+
key: before_key.clone(),
317+
expected: ConditionResult::Eq as i32,
318+
target: Some(Target::Seq(0)),
319+
});
320+
321+
update_ops.push(TxnOp::put(
322+
before_key,
323+
TaskDependentValue(BTreeSet::from_iter([task_name.to_string()]))
324+
.to_pb()?
325+
.encode_to_vec(),
326+
));
327+
}
328+
Some(mut old_dependent) => {
329+
check_ops.push(TxnCondition::eq_seq(before_key.clone(), old_dependent.seq));
291330

292-
self.check_and_add(
293-
&mut check_ops,
294-
&mut update_ops,
295-
&[task_name.to_string()],
296-
&before_dependent_ident,
297-
)
298-
.await?;
331+
old_dependent.0.insert(task_name.to_string());
332+
333+
update_ops.push(TxnOp::put(
334+
before_key,
335+
old_dependent.to_pb()?.encode_to_vec(),
336+
));
337+
}
338+
}
299339
}
300340
let request = TxnRequest::new(check_ops, update_ops);
301341
let reply = self.kv_api.transaction(request).await?;
@@ -323,27 +363,53 @@ impl TaskMgr {
323363
let after_dependent = TaskDependentKey::new(DependentType::After, task_name.to_string());
324364
let after_dependent_ident =
325365
TaskDependentIdent::new_generic(&self.tenant, after_dependent.clone());
366+
let after_key = after_dependent_ident.to_string_key();
326367

327-
self.check_and_remove(
328-
&mut check_ops,
329-
&mut update_ops,
330-
remove_afters,
331-
&after_dependent_ident,
332-
)
333-
.await?;
368+
match self.kv_api.get_pb(&after_dependent_ident).await? {
369+
None => {
370+
check_ops.push(TxnCondition {
371+
key: after_key.clone(),
372+
expected: ConditionResult::Eq as i32,
373+
target: Some(Target::Seq(0)),
374+
});
375+
}
376+
Some(mut old_dependent) => {
377+
check_ops.push(TxnCondition::eq_seq(after_key.clone(), old_dependent.seq));
378+
379+
old_dependent.0.retain(|name| !remove_afters.contains(name));
380+
381+
update_ops.push(TxnOp::put(
382+
after_key,
383+
old_dependent.to_pb()?.encode_to_vec(),
384+
));
385+
}
386+
}
334387

335388
for after in remove_afters {
336389
let before_dependent = TaskDependentKey::new(DependentType::Before, after.clone());
337390
let before_dependent_ident =
338-
TaskDependentIdent::new_generic(&self.tenant, before_dependent.clone());
391+
TaskDependentIdent::new_generic(&self.tenant, before_dependent);
392+
let before_key = before_dependent_ident.to_string_key();
393+
394+
match self.kv_api.get_pb(&before_dependent_ident).await? {
395+
None => {
396+
check_ops.push(TxnCondition {
397+
key: before_key.clone(),
398+
expected: ConditionResult::Eq as i32,
399+
target: Some(Target::Seq(0)),
400+
});
401+
}
402+
Some(mut old_dependent) => {
403+
check_ops.push(TxnCondition::eq_seq(before_key.clone(), old_dependent.seq));
339404

340-
self.check_and_remove(
341-
&mut check_ops,
342-
&mut update_ops,
343-
&[task_name.to_string()],
344-
&before_dependent_ident,
345-
)
346-
.await?;
405+
old_dependent.0.remove(task_name);
406+
407+
update_ops.push(TxnOp::put(
408+
before_key,
409+
old_dependent.to_pb()?.encode_to_vec(),
410+
));
411+
}
412+
}
347413
}
348414
let request = TxnRequest::new(check_ops, update_ops);
349415
let reply = self.kv_api.transaction(request).await?;
@@ -368,9 +434,9 @@ impl TaskMgr {
368434
let mut check_ops = Vec::new();
369435
let mut update_ops = Vec::new();
370436

371-
self.clean_related_dependent(&task_name, &mut check_ops, &mut update_ops, true)
437+
self.clean_related_dependent(task_name, &mut check_ops, &mut update_ops, true)
372438
.await?;
373-
self.clean_related_dependent(&task_name, &mut check_ops, &mut update_ops, false)
439+
self.clean_related_dependent(task_name, &mut check_ops, &mut update_ops, false)
374440
.await?;
375441

376442
update_ops.push(TxnOp::delete(
@@ -529,7 +595,7 @@ impl TaskMgr {
529595

530596
async fn clean_related_dependent(
531597
&self,
532-
task_name: &&str,
598+
task_name: &str,
533599
check_ops: &mut Vec<TxnCondition>,
534600
update_ops: &mut Vec<TxnOp>,
535601
is_after: bool,
@@ -550,84 +616,35 @@ impl TaskMgr {
550616
let target_ident =
551617
TaskDependentIdent::new_generic(&self.tenant, target_key.clone());
552618

553-
self.check_and_remove(
554-
check_ops,
555-
update_ops,
556-
&[task_name.to_string()],
557-
&target_ident,
558-
)
559-
.await?;
560-
}
561-
}
562-
Ok(())
563-
}
564-
565-
async fn check_and_add(
566-
&self,
567-
check_ops: &mut Vec<TxnCondition>,
568-
update_ops: &mut Vec<TxnOp>,
569-
task_names: &[String],
570-
dependent_ident: &TaskDependentIdent,
571-
) -> Result<(), TaskApiError> {
572-
match self.kv_api.get_pb(dependent_ident).await? {
573-
None => {
574-
check_ops.push(TxnCondition {
575-
key: dependent_ident.to_string_key(),
576-
expected: ConditionResult::Eq as i32,
577-
target: Some(Target::Seq(0)),
578-
});
579-
580-
update_ops.push(TxnOp::put(
581-
dependent_ident.to_string_key(),
582-
TaskDependentValue(BTreeSet::from_iter(task_names.iter().cloned()))
583-
.to_pb()?
584-
.encode_to_vec(),
585-
));
586-
}
587-
Some(mut old_dependent) => {
588-
check_ops.push(TxnCondition::eq_seq(
589-
dependent_ident.to_string_key(),
590-
old_dependent.seq,
591-
));
592-
593-
old_dependent.0.extend(task_names.iter().cloned());
594-
595-
update_ops.push(TxnOp::put(
596-
dependent_ident.to_string_key(),
597-
old_dependent.to_pb()?.encode_to_vec(),
598-
));
599-
}
600-
}
601-
Ok(())
602-
}
603-
604-
async fn check_and_remove(
605-
&self,
606-
check_ops: &mut Vec<TxnCondition>,
607-
update_ops: &mut Vec<TxnOp>,
608-
task_names: &[String],
609-
dependent_ident: &TaskDependentIdent,
610-
) -> Result<(), TaskApiError> {
611-
match self.kv_api.get_pb(dependent_ident).await? {
612-
None => {
613-
check_ops.push(TxnCondition {
614-
key: dependent_ident.to_string_key(),
615-
expected: ConditionResult::Eq as i32,
616-
target: Some(Target::Seq(0)),
617-
});
618-
}
619-
Some(mut old_dependent) => {
620-
check_ops.push(TxnCondition::eq_seq(
621-
dependent_ident.to_string_key(),
622-
old_dependent.seq,
623-
));
624-
625-
old_dependent.0.retain(|name| !task_names.contains(name));
626-
627-
update_ops.push(TxnOp::put(
628-
dependent_ident.to_string_key(),
629-
old_dependent.to_pb()?.encode_to_vec(),
630-
));
619+
match self.kv_api.get_pb(&target_ident).await? {
620+
None => {
621+
check_ops.push(TxnCondition {
622+
key: target_ident.to_string_key(),
623+
expected: ConditionResult::Eq as i32,
624+
target: Some(Target::Seq(0)),
625+
});
626+
627+
update_ops.push(TxnOp::put(
628+
target_ident.to_string_key(),
629+
TaskDependentValue(BTreeSet::from_iter([task_name.to_string()]))
630+
.to_pb()?
631+
.encode_to_vec(),
632+
));
633+
}
634+
Some(mut old_dependent) => {
635+
check_ops.push(TxnCondition::eq_seq(
636+
target_ident.to_string_key(),
637+
old_dependent.seq,
638+
));
639+
640+
old_dependent.0.remove(task_name);
641+
642+
update_ops.push(TxnOp::put(
643+
target_ident.to_string_key(),
644+
old_dependent.to_pb()?.encode_to_vec(),
645+
));
646+
}
647+
}
631648
}
632649
}
633650
Ok(())

0 commit comments

Comments
 (0)