@@ -285,13 +285,12 @@ impl TaskMgr {
285
285
after_deps. 0 . extend ( new_afters. iter ( ) . cloned ( ) ) ;
286
286
update_ops. push ( txn_put_pb ( & after_dependent_ident, & after_deps) ?) ;
287
287
288
- for ( before_dependent_ident, before_seq_deps) in self
289
- . kv_api
290
- . get_pb_vec ( new_afters. iter ( ) . map ( |after| {
291
- let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
292
- TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
293
- } ) )
294
- . await ?
288
+ let before_dependent_idents = new_afters. iter ( ) . map ( |after| {
289
+ let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
290
+ TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
291
+ } ) ;
292
+ for ( before_dependent_ident, before_seq_deps) in
293
+ self . kv_api . get_pb_vec ( before_dependent_idents) . await ?
295
294
{
296
295
check_ops. push ( txn_cond_eq_seq (
297
296
& before_dependent_ident,
@@ -342,13 +341,12 @@ impl TaskMgr {
342
341
update_ops. push ( txn_put_pb ( & after_dependent_ident, & deps) ?) ;
343
342
}
344
343
345
- for ( before_dependent_ident, before_seq_deps) in self
346
- . kv_api
347
- . get_pb_vec ( remove_afters. iter ( ) . map ( |after| {
348
- let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
349
- TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
350
- } ) )
351
- . await ?
344
+ let before_dependent_idents = remove_afters. iter ( ) . map ( |after| {
345
+ let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
346
+ TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
347
+ } ) ;
348
+ for ( before_dependent_ident, before_seq_deps) in
349
+ self . kv_api . get_pb_vec ( before_dependent_idents) . await ?
352
350
{
353
351
check_ops. push ( txn_cond_eq_seq (
354
352
& before_dependent_ident,
@@ -383,11 +381,44 @@ impl TaskMgr {
383
381
let mut check_ops = Vec :: new ( ) ;
384
382
let mut update_ops = Vec :: new ( ) ;
385
383
386
- self . clean_related_dependent ( task_name, & mut check_ops, & mut update_ops, true )
387
- . await ?;
388
- self . clean_related_dependent ( task_name, & mut check_ops, & mut update_ops, false )
389
- . await ?;
384
+ let task_after_ident = TaskDependentIdent :: new_generic (
385
+ & self . tenant ,
386
+ TaskDependentKey :: new ( DependentType :: After , task_name. to_string ( ) ) ,
387
+ ) ;
388
+ if let Some ( task_after_dependent) = self . kv_api . get ( & task_after_ident) . await ? {
389
+ let target_idents = task_after_dependent. 0 . into_iter ( ) . map ( |dependent_target| {
390
+ let target_key =
391
+ TaskDependentKey :: new ( DependentType :: Before , dependent_target. clone ( ) ) ;
392
+ TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) )
393
+ } ) ;
394
+ for ( target_ident, seq_dep) in self . kv_api . get_pb_vec ( target_idents) . await ? {
395
+ check_ops. push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
396
+
397
+ if let Some ( mut deps) = seq_dep {
398
+ deps. 0 . remove ( task_name) ;
399
+ update_ops. push ( txn_put_pb ( & target_ident, & deps) ?) ;
400
+ }
401
+ }
402
+ }
403
+ let task_before_ident = TaskDependentIdent :: new_generic (
404
+ & self . tenant ,
405
+ TaskDependentKey :: new ( DependentType :: Before , task_name. to_string ( ) ) ,
406
+ ) ;
407
+ if let Some ( task_before_dependent) = self . kv_api . get ( & task_before_ident) . await ? {
408
+ let target_idents = task_before_dependent. 0 . into_iter ( ) . map ( |dependent_target| {
409
+ let target_key =
410
+ TaskDependentKey :: new ( DependentType :: After , dependent_target. clone ( ) ) ;
411
+ TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) )
412
+ } ) ;
413
+ for ( target_ident, seq_dep) in self . kv_api . get_pb_vec ( target_idents) . await ? {
414
+ check_ops. push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
390
415
416
+ if let Some ( mut deps) = seq_dep {
417
+ deps. 0 . remove ( task_name) ;
418
+ update_ops. push ( txn_put_pb ( & target_ident, & deps) ?) ;
419
+ }
420
+ }
421
+ }
391
422
update_ops. push ( TxnOp :: delete (
392
423
TaskDependentIdent :: new_generic (
393
424
& self . tenant ,
@@ -406,7 +437,7 @@ impl TaskMgr {
406
437
TaskStateIdent :: new ( & self . tenant , task_name) . to_string_key ( ) ,
407
438
) ) ;
408
439
409
- let request = TxnRequest :: new ( vec ! [ ] , update_ops) ;
440
+ let request = TxnRequest :: new ( check_ops , update_ops) ;
410
441
let _ = self . kv_api . transaction ( request) . await ?;
411
442
412
443
Ok ( Ok ( ( ) ) )
@@ -433,34 +464,36 @@ impl TaskMgr {
433
464
} ;
434
465
let mut ready_tasks = Vec :: new ( ) ;
435
466
436
- for ( target_after_ident, target_after_dependent) in self
437
- . kv_api
438
- . get_pb_vec ( task_before_dependent. 0 . iter ( ) . map ( |before| {
439
- let after_dependent = TaskDependentKey :: new ( DependentType :: After , before. clone ( ) ) ;
440
- TaskDependentIdent :: new_generic ( & self . tenant , after_dependent)
441
- } ) )
442
- . await ?
467
+ let target_after_idents = task_before_dependent. 0 . iter ( ) . map ( |before| {
468
+ let after_dependent = TaskDependentKey :: new ( DependentType :: After , before. clone ( ) ) ;
469
+ TaskDependentIdent :: new_generic ( & self . tenant , after_dependent)
470
+ } ) ;
471
+ for ( target_after_ident, target_after_dependent) in
472
+ self . kv_api . get_pb_vec ( target_after_idents) . await ?
443
473
{
444
474
let Some ( target_after_dependent) = target_after_dependent else {
445
475
continue ;
446
476
} ;
447
477
448
- let mut conditions = Vec :: new ( ) ;
449
- let mut if_ops = Vec :: new ( ) ;
478
+ let mut request = TxnRequest :: new ( vec ! [ ] , vec ! [ ] )
479
+ . with_else ( vec ! [ txn_put_pb ( & task_state_key , & succeeded_value ) ? ] ) ;
450
480
451
481
for target_after_task in target_after_dependent. 0 . iter ( ) {
452
482
let task_ident = TaskStateIdent :: new ( & self . tenant , target_after_task) ;
453
483
// Only care about the predecessors of this task's successor tasks, excluding this task itself.
454
484
if target_after_task != task_name {
455
- conditions. push ( TxnCondition :: eq_value (
485
+ request =
486
+ request. push_if_then ( [ ] , [ txn_put_pb ( & task_ident, & not_succeeded_value) ?] ) ;
487
+ continue ;
488
+ }
489
+ request = request. push_if_then (
490
+ [ TxnCondition :: eq_value (
456
491
task_ident. to_string_key ( ) ,
457
492
succeeded_value. to_pb ( ) ?. encode_to_vec ( ) ,
458
- ) ) ;
459
- }
460
- if_ops . push ( txn_put_pb ( & task_ident , & not_succeeded_value ) ? ) ;
493
+ ) ] ,
494
+ [ txn_put_pb ( & task_ident , & not_succeeded_value ) ? ] ,
495
+ ) ;
461
496
}
462
- let request = TxnRequest :: new ( conditions, if_ops)
463
- . with_else ( vec ! [ txn_put_pb( & task_state_key, & succeeded_value) ?] ) ;
464
497
let reply = self . kv_api . transaction ( request) . await ?;
465
498
466
499
if reply. success {
@@ -538,41 +571,6 @@ impl TaskMgr {
538
571
Ok ( Ok ( ( ) ) )
539
572
}
540
573
541
- async fn clean_related_dependent (
542
- & self ,
543
- task_name : & str ,
544
- check_ops : & mut Vec < TxnCondition > ,
545
- update_ops : & mut Vec < TxnOp > ,
546
- is_after : bool ,
547
- ) -> Result < ( ) , TaskApiError > {
548
- let ( self_dependent, other_dependent) = if is_after {
549
- ( DependentType :: After , DependentType :: Before )
550
- } else {
551
- ( DependentType :: Before , DependentType :: After )
552
- } ;
553
-
554
- let task_ident = TaskDependentIdent :: new_generic (
555
- & self . tenant ,
556
- TaskDependentKey :: new ( self_dependent, task_name. to_string ( ) ) ,
557
- ) ;
558
- if let Some ( task_dependent) = self . kv_api . get ( & task_ident) . await ? {
559
- for dependent_target in task_dependent. 0 . iter ( ) {
560
- let target_key = TaskDependentKey :: new ( other_dependent, dependent_target. clone ( ) ) ;
561
- let target_ident =
562
- TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) ) ;
563
-
564
- let seq_dep = self . kv_api . get_pb ( & target_ident) . await ?;
565
- check_ops. push ( txn_cond_eq_seq ( & target_ident, seq_dep. seq ( ) ) ) ;
566
-
567
- if let Some ( mut deps) = seq_dep {
568
- deps. 0 . remove ( task_name) ;
569
- update_ops. push ( txn_put_pb ( & target_ident, & deps) ?) ;
570
- }
571
- }
572
- }
573
- Ok ( ( ) )
574
- }
575
-
576
574
pub fn make_schedule_options ( opt : ScheduleOptions ) -> task:: ScheduleOptions {
577
575
match opt {
578
576
ScheduleOptions :: IntervalSecs ( secs, ms) => {
0 commit comments