@@ -42,8 +42,6 @@ use databend_common_meta_app::tenant::Tenant;
42
42
use databend_common_meta_kvapi:: kvapi;
43
43
use databend_common_meta_kvapi:: kvapi:: DirName ;
44
44
use databend_common_meta_kvapi:: kvapi:: Key ;
45
- use databend_common_meta_types:: txn_condition:: Target ;
46
- use databend_common_meta_types:: ConditionResult ;
47
45
use databend_common_meta_types:: MatchSeq ;
48
46
use databend_common_meta_types:: MetaError ;
49
47
use databend_common_meta_types:: TxnCondition ;
@@ -277,65 +275,55 @@ impl TaskMgr {
277
275
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
278
276
let after_key = after_dependent_ident. to_string_key ( ) ;
279
277
280
- match self . kv_api . get_pb ( & after_dependent_ident) . await ? {
278
+ let seq = match self . kv_api . get_pb ( & after_dependent_ident) . await ? {
281
279
None => {
282
- check_ops. push ( TxnCondition {
283
- key : after_key. clone ( ) ,
284
- expected : ConditionResult :: Eq as i32 ,
285
- target : Some ( Target :: Seq ( 0 ) ) ,
286
- } ) ;
287
-
288
280
update_ops. push ( TxnOp :: put (
289
- after_key,
281
+ after_key. clone ( ) ,
290
282
TaskDependentValue ( BTreeSet :: from_iter ( new_afters. iter ( ) . cloned ( ) ) )
291
283
. to_pb ( ) ?
292
284
. encode_to_vec ( ) ,
293
285
) ) ;
286
+ 0
294
287
}
295
288
Some ( mut old_dependent) => {
296
- check_ops. push ( TxnCondition :: eq_seq ( after_key. clone ( ) , old_dependent. seq ) ) ;
297
-
298
289
old_dependent. 0 . extend ( new_afters. iter ( ) . cloned ( ) ) ;
299
290
300
291
update_ops. push ( TxnOp :: put (
301
- after_key,
292
+ after_key. clone ( ) ,
302
293
old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
303
294
) ) ;
295
+ old_dependent. seq
304
296
}
305
- }
297
+ } ;
298
+ check_ops. push ( TxnCondition :: eq_seq ( after_key, seq) ) ;
306
299
307
300
for after in new_afters {
308
301
let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
309
302
let before_dependent_ident =
310
303
TaskDependentIdent :: new_generic ( & self . tenant , before_dependent) ;
311
304
let before_key = before_dependent_ident. to_string_key ( ) ;
312
305
313
- match self . kv_api . get_pb ( & before_dependent_ident) . await ? {
306
+ let seq = match self . kv_api . get_pb ( & before_dependent_ident) . await ? {
314
307
None => {
315
- check_ops. push ( TxnCondition {
316
- key : before_key. clone ( ) ,
317
- expected : ConditionResult :: Eq as i32 ,
318
- target : Some ( Target :: Seq ( 0 ) ) ,
319
- } ) ;
320
-
321
308
update_ops. push ( TxnOp :: put (
322
- before_key,
309
+ before_key. clone ( ) ,
323
310
TaskDependentValue ( BTreeSet :: from_iter ( [ task_name. to_string ( ) ] ) )
324
311
. to_pb ( ) ?
325
312
. encode_to_vec ( ) ,
326
313
) ) ;
314
+ 0
327
315
}
328
316
Some ( mut old_dependent) => {
329
- check_ops. push ( TxnCondition :: eq_seq ( before_key. clone ( ) , old_dependent. seq ) ) ;
330
-
331
317
old_dependent. 0 . insert ( task_name. to_string ( ) ) ;
332
318
333
319
update_ops. push ( TxnOp :: put (
334
- before_key,
320
+ before_key. clone ( ) ,
335
321
old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
336
322
) ) ;
323
+ old_dependent. seq
337
324
}
338
- }
325
+ } ;
326
+ check_ops. push ( TxnCondition :: eq_seq ( before_key, seq) ) ;
339
327
}
340
328
let request = TxnRequest :: new ( check_ops, update_ops) ;
341
329
let reply = self . kv_api . transaction ( request) . await ?;
@@ -365,52 +353,41 @@ impl TaskMgr {
365
353
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
366
354
let after_key = after_dependent_ident. to_string_key ( ) ;
367
355
368
- match self . kv_api . get_pb ( & after_dependent_ident) . await ? {
369
- None => {
370
- check_ops. push ( TxnCondition {
371
- key : after_key. clone ( ) ,
372
- expected : ConditionResult :: Eq as i32 ,
373
- target : Some ( Target :: Seq ( 0 ) ) ,
374
- } ) ;
375
- }
356
+ let seq = match self . kv_api . get_pb ( & after_dependent_ident) . await ? {
357
+ None => 0 ,
376
358
Some ( mut old_dependent) => {
377
- check_ops. push ( TxnCondition :: eq_seq ( after_key. clone ( ) , old_dependent. seq ) ) ;
378
-
379
359
for remove_after in remove_afters {
380
360
old_dependent. 0 . remove ( remove_after) ;
381
361
}
382
362
update_ops. push ( TxnOp :: put (
383
- after_key,
363
+ after_key. clone ( ) ,
384
364
old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
385
365
) ) ;
366
+
367
+ old_dependent. seq
386
368
}
387
- }
369
+ } ;
370
+ check_ops. push ( TxnCondition :: eq_seq ( after_key, seq) ) ;
388
371
389
372
for after in remove_afters {
390
373
let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
391
374
let before_dependent_ident =
392
375
TaskDependentIdent :: new_generic ( & self . tenant , before_dependent) ;
393
376
let before_key = before_dependent_ident. to_string_key ( ) ;
394
377
395
- match self . kv_api . get_pb ( & before_dependent_ident) . await ? {
396
- None => {
397
- check_ops. push ( TxnCondition {
398
- key : before_key. clone ( ) ,
399
- expected : ConditionResult :: Eq as i32 ,
400
- target : Some ( Target :: Seq ( 0 ) ) ,
401
- } ) ;
402
- }
378
+ let seq = match self . kv_api . get_pb ( & before_dependent_ident) . await ? {
379
+ None => 0 ,
403
380
Some ( mut old_dependent) => {
404
- check_ops. push ( TxnCondition :: eq_seq ( before_key. clone ( ) , old_dependent. seq ) ) ;
405
-
406
381
old_dependent. 0 . remove ( task_name) ;
407
382
408
383
update_ops. push ( TxnOp :: put (
409
- before_key,
384
+ before_key. clone ( ) ,
410
385
old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
411
386
) ) ;
387
+ old_dependent. seq
412
388
}
413
- }
389
+ } ;
390
+ check_ops. push ( TxnCondition :: eq_seq ( before_key, seq) ) ;
414
391
}
415
392
let request = TxnRequest :: new ( check_ops, update_ops) ;
416
393
let reply = self . kv_api . transaction ( request) . await ?;
@@ -616,36 +593,29 @@ impl TaskMgr {
616
593
let target_key = TaskDependentKey :: new ( other_dependent, dependent_target. clone ( ) ) ;
617
594
let target_ident =
618
595
TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) ) ;
596
+ let target_key = target_ident. to_string_key ( ) ;
619
597
620
- match self . kv_api . get_pb ( & target_ident) . await ? {
598
+ let seq = match self . kv_api . get_pb ( & target_ident) . await ? {
621
599
None => {
622
- check_ops. push ( TxnCondition {
623
- key : target_ident. to_string_key ( ) ,
624
- expected : ConditionResult :: Eq as i32 ,
625
- target : Some ( Target :: Seq ( 0 ) ) ,
626
- } ) ;
627
-
628
600
update_ops. push ( TxnOp :: put (
629
- target_ident . to_string_key ( ) ,
601
+ target_key . clone ( ) ,
630
602
TaskDependentValue ( BTreeSet :: from_iter ( [ task_name. to_string ( ) ] ) )
631
603
. to_pb ( ) ?
632
604
. encode_to_vec ( ) ,
633
605
) ) ;
606
+ 0
634
607
}
635
608
Some ( mut old_dependent) => {
636
- check_ops. push ( TxnCondition :: eq_seq (
637
- target_ident. to_string_key ( ) ,
638
- old_dependent. seq ,
639
- ) ) ;
640
-
641
609
old_dependent. 0 . remove ( task_name) ;
642
610
643
611
update_ops. push ( TxnOp :: put (
644
- target_ident . to_string_key ( ) ,
612
+ target_key . clone ( ) ,
645
613
old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
646
614
) ) ;
615
+ old_dependent. seq
647
616
}
648
- }
617
+ } ;
618
+ check_ops. push ( TxnCondition :: eq_seq ( target_key, seq) ) ;
649
619
}
650
620
}
651
621
Ok ( ( ) )
0 commit comments