12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: collections:: HashSet ;
15
+ use std:: collections:: BTreeSet ;
16
16
use std:: str:: FromStr ;
17
17
use std:: sync:: Arc ;
18
18
@@ -28,7 +28,6 @@ use databend_common_meta_app::principal::task;
28
28
use databend_common_meta_app:: principal:: task:: TaskMessage ;
29
29
use databend_common_meta_app:: principal:: task:: TaskState ;
30
30
use databend_common_meta_app:: principal:: task_dependent_ident:: TaskDependentIdent ;
31
- use databend_common_meta_app:: principal:: task_dependent_ident:: TaskDependentResource ;
32
31
use databend_common_meta_app:: principal:: task_message_ident:: TaskMessageIdent ;
33
32
use databend_common_meta_app:: principal:: task_state_ident:: TaskStateIdent ;
34
33
use databend_common_meta_app:: principal:: DependentType ;
@@ -40,7 +39,6 @@ use databend_common_meta_app::principal::TaskDependentValue;
40
39
use databend_common_meta_app:: principal:: TaskIdent ;
41
40
use databend_common_meta_app:: schema:: CreateOption ;
42
41
use databend_common_meta_app:: tenant:: Tenant ;
43
- use databend_common_meta_app:: tenant_key:: ident:: TIdent ;
44
42
use databend_common_meta_kvapi:: kvapi;
45
43
use databend_common_meta_kvapi:: kvapi:: DirName ;
46
44
use databend_common_meta_kvapi:: kvapi:: Key ;
@@ -278,12 +276,11 @@ impl TaskMgr {
278
276
let after_dependent_ident =
279
277
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
280
278
281
- self . check_and_set (
279
+ self . check_and_add (
282
280
& mut check_ops,
283
281
& mut update_ops,
284
282
new_afters,
285
283
& after_dependent_ident,
286
- true ,
287
284
)
288
285
. await ?;
289
286
@@ -292,12 +289,11 @@ impl TaskMgr {
292
289
let before_dependent_ident =
293
290
TaskDependentIdent :: new_generic ( & self . tenant , before_dependent) ;
294
291
295
- self . check_and_set (
292
+ self . check_and_add (
296
293
& mut check_ops,
297
294
& mut update_ops,
298
295
& [ task_name. to_string ( ) ] ,
299
296
& before_dependent_ident,
300
- true ,
301
297
)
302
298
. await ?;
303
299
}
@@ -328,12 +324,11 @@ impl TaskMgr {
328
324
let after_dependent_ident =
329
325
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
330
326
331
- self . check_and_set (
327
+ self . check_and_remove (
332
328
& mut check_ops,
333
329
& mut update_ops,
334
330
remove_afters,
335
331
& after_dependent_ident,
336
- false ,
337
332
)
338
333
. await ?;
339
334
@@ -342,12 +337,11 @@ impl TaskMgr {
342
337
let before_dependent_ident =
343
338
TaskDependentIdent :: new_generic ( & self . tenant , before_dependent. clone ( ) ) ;
344
339
345
- self . check_and_set (
340
+ self . check_and_remove (
346
341
& mut check_ops,
347
342
& mut update_ops,
348
343
& [ task_name. to_string ( ) ] ,
349
344
& before_dependent_ident,
350
- false ,
351
345
)
352
346
. await ?;
353
347
}
@@ -556,55 +550,79 @@ impl TaskMgr {
556
550
let target_ident =
557
551
TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) ) ;
558
552
559
- self . check_and_set (
553
+ self . check_and_remove (
560
554
check_ops,
561
555
update_ops,
562
556
& [ task_name. to_string ( ) ] ,
563
557
& target_ident,
564
- false ,
565
558
)
566
559
. await ?;
567
560
}
568
561
}
569
562
Ok ( ( ) )
570
563
}
571
564
572
- async fn check_and_set (
565
+ async fn check_and_add (
573
566
& self ,
574
567
check_ops : & mut Vec < TxnCondition > ,
575
568
update_ops : & mut Vec < TxnOp > ,
576
569
task_names : & [ String ] ,
577
- dependent_ident : & TIdent < TaskDependentResource , TaskDependentKey > ,
578
- is_add : bool ,
570
+ dependent_ident : & TaskDependentIdent ,
579
571
) -> Result < ( ) , TaskApiError > {
580
- match self . kv_api . get ( dependent_ident) . await ? {
572
+ match self . kv_api . get_pb ( dependent_ident) . await ? {
581
573
None => {
582
574
check_ops. push ( TxnCondition {
583
575
key : dependent_ident. to_string_key ( ) ,
584
576
expected : ConditionResult :: Eq as i32 ,
585
577
target : Some ( Target :: Seq ( 0 ) ) ,
586
578
} ) ;
587
579
588
- if is_add {
589
- update_ops. push ( TxnOp :: put (
590
- dependent_ident. to_string_key ( ) ,
591
- TaskDependentValue ( HashSet :: from_iter ( task_names. iter ( ) . cloned ( ) ) )
592
- . to_pb ( ) ?
593
- . encode_to_vec ( ) ,
594
- ) ) ;
595
- }
580
+ update_ops. push ( TxnOp :: put (
581
+ dependent_ident. to_string_key ( ) ,
582
+ TaskDependentValue ( BTreeSet :: from_iter ( task_names. iter ( ) . cloned ( ) ) )
583
+ . to_pb ( ) ?
584
+ . encode_to_vec ( ) ,
585
+ ) ) ;
596
586
}
597
587
Some ( mut old_dependent) => {
598
- check_ops. push ( TxnCondition :: eq_value (
588
+ check_ops. push ( TxnCondition :: eq_seq (
589
+ dependent_ident. to_string_key ( ) ,
590
+ old_dependent. seq ,
591
+ ) ) ;
592
+
593
+ old_dependent. 0 . extend ( task_names. iter ( ) . cloned ( ) ) ;
594
+
595
+ update_ops. push ( TxnOp :: put (
599
596
dependent_ident. to_string_key ( ) ,
600
597
old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
601
598
) ) ;
599
+ }
600
+ }
601
+ Ok ( ( ) )
602
+ }
602
603
603
- if is_add {
604
- old_dependent. 0 . extend ( task_names. iter ( ) . cloned ( ) ) ;
605
- } else {
606
- old_dependent. 0 . retain ( |name| !task_names. contains ( name) ) ;
607
- }
604
+ async fn check_and_remove (
605
+ & self ,
606
+ check_ops : & mut Vec < TxnCondition > ,
607
+ update_ops : & mut Vec < TxnOp > ,
608
+ task_names : & [ String ] ,
609
+ dependent_ident : & TaskDependentIdent ,
610
+ ) -> Result < ( ) , TaskApiError > {
611
+ match self . kv_api . get_pb ( dependent_ident) . await ? {
612
+ None => {
613
+ check_ops. push ( TxnCondition {
614
+ key : dependent_ident. to_string_key ( ) ,
615
+ expected : ConditionResult :: Eq as i32 ,
616
+ target : Some ( Target :: Seq ( 0 ) ) ,
617
+ } ) ;
618
+ }
619
+ Some ( mut old_dependent) => {
620
+ check_ops. push ( TxnCondition :: eq_seq (
621
+ dependent_ident. to_string_key ( ) ,
622
+ old_dependent. seq ,
623
+ ) ) ;
624
+
625
+ old_dependent. 0 . retain ( |name| !task_names. contains ( name) ) ;
608
626
609
627
update_ops. push ( TxnOp :: put (
610
628
dependent_ident. to_string_key ( ) ,
0 commit comments