12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: collections:: BTreeSet ;
16
15
use std:: str:: FromStr ;
17
16
use std:: sync:: Arc ;
18
17
@@ -23,6 +22,7 @@ use databend_common_ast::ast::AlterTaskOptions;
23
22
use databend_common_ast:: ast:: ScheduleOptions ;
24
23
use databend_common_meta_api:: kv_pb_api:: KVPbApi ;
25
24
use databend_common_meta_api:: kv_pb_api:: UpsertPB ;
25
+ use databend_common_meta_api:: util:: txn_put_pb;
26
26
use databend_common_meta_api:: SchemaApi ;
27
27
use databend_common_meta_app:: principal:: task;
28
28
use databend_common_meta_app:: principal:: task:: TaskMessage ;
@@ -35,7 +35,6 @@ use databend_common_meta_app::principal::ScheduleType;
35
35
use databend_common_meta_app:: principal:: Status ;
36
36
use databend_common_meta_app:: principal:: Task ;
37
37
use databend_common_meta_app:: principal:: TaskDependentKey ;
38
- use databend_common_meta_app:: principal:: TaskDependentValue ;
39
38
use databend_common_meta_app:: principal:: TaskIdent ;
40
39
use databend_common_meta_app:: schema:: CreateOption ;
41
40
use databend_common_meta_app:: tenant:: Tenant ;
@@ -51,6 +50,7 @@ use databend_common_meta_types::With;
51
50
use databend_common_proto_conv:: FromToProto ;
52
51
use futures:: TryStreamExt ;
53
52
use prost:: Message ;
53
+ use seq_marked:: SeqValue ;
54
54
55
55
use crate :: task:: errors:: TaskApiError ;
56
56
use crate :: task:: errors:: TaskError ;
@@ -275,55 +275,25 @@ impl TaskMgr {
275
275
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
276
276
let after_key = after_dependent_ident. to_string_key ( ) ;
277
277
278
- let seq = match self . kv_api . get_pb ( & after_dependent_ident) . await ? {
279
- None => {
280
- update_ops. push ( TxnOp :: put (
281
- after_key. clone ( ) ,
282
- TaskDependentValue ( BTreeSet :: from_iter ( new_afters. iter ( ) . cloned ( ) ) )
283
- . to_pb ( ) ?
284
- . encode_to_vec ( ) ,
285
- ) ) ;
286
- 0
287
- }
288
- Some ( mut old_dependent) => {
289
- old_dependent. 0 . extend ( new_afters. iter ( ) . cloned ( ) ) ;
290
-
291
- update_ops. push ( TxnOp :: put (
292
- after_key. clone ( ) ,
293
- old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
294
- ) ) ;
295
- old_dependent. seq
296
- }
297
- } ;
298
- check_ops. push ( TxnCondition :: eq_seq ( after_key, seq) ) ;
278
+ let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
279
+ check_ops. push ( TxnCondition :: eq_seq ( after_key, after_seq_deps. seq ( ) ) ) ;
280
+
281
+ let mut after_deps = after_seq_deps. unwrap_or_default ( ) ;
282
+ after_deps. 0 . extend ( new_afters. iter ( ) . cloned ( ) ) ;
283
+ update_ops. push ( txn_put_pb ( & after_dependent_ident, & after_deps) ?) ;
299
284
300
285
for after in new_afters {
301
286
let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
302
287
let before_dependent_ident =
303
288
TaskDependentIdent :: new_generic ( & self . tenant , before_dependent) ;
304
289
let before_key = before_dependent_ident. to_string_key ( ) ;
305
290
306
- let seq = match self . kv_api . get_pb ( & before_dependent_ident) . await ? {
307
- None => {
308
- update_ops. push ( TxnOp :: put (
309
- before_key. clone ( ) ,
310
- TaskDependentValue ( BTreeSet :: from_iter ( [ task_name. to_string ( ) ] ) )
311
- . to_pb ( ) ?
312
- . encode_to_vec ( ) ,
313
- ) ) ;
314
- 0
315
- }
316
- Some ( mut old_dependent) => {
317
- old_dependent. 0 . insert ( task_name. to_string ( ) ) ;
291
+ let before_seq_deps = self . kv_api . get_pb ( & before_dependent_ident) . await ?;
292
+ check_ops. push ( TxnCondition :: eq_seq ( before_key, before_seq_deps. seq ( ) ) ) ;
318
293
319
- update_ops. push ( TxnOp :: put (
320
- before_key. clone ( ) ,
321
- old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
322
- ) ) ;
323
- old_dependent. seq
324
- }
325
- } ;
326
- check_ops. push ( TxnCondition :: eq_seq ( before_key, seq) ) ;
294
+ let mut deps = before_seq_deps. unwrap_or_default ( ) ;
295
+ deps. 0 . insert ( task_name. to_string ( ) ) ;
296
+ update_ops. push ( txn_put_pb ( & before_dependent_ident, & deps) ?) ;
327
297
}
328
298
let request = TxnRequest :: new ( check_ops, update_ops) ;
329
299
let reply = self . kv_api . transaction ( request) . await ?;
@@ -353,41 +323,29 @@ impl TaskMgr {
353
323
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
354
324
let after_key = after_dependent_ident. to_string_key ( ) ;
355
325
356
- let seq = match self . kv_api . get_pb ( & after_dependent_ident) . await ? {
357
- None => 0 ,
358
- Some ( mut old_dependent) => {
359
- for remove_after in remove_afters {
360
- old_dependent. 0 . remove ( remove_after) ;
361
- }
362
- update_ops. push ( TxnOp :: put (
363
- after_key. clone ( ) ,
364
- old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
365
- ) ) ;
326
+ let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
327
+ check_ops. push ( TxnCondition :: eq_seq ( after_key, after_seq_deps. seq ( ) ) ) ;
366
328
367
- old_dependent. seq
329
+ if let Some ( mut deps) = after_seq_deps {
330
+ for remove_after in remove_afters {
331
+ deps. 0 . remove ( remove_after) ;
368
332
}
369
- } ;
370
- check_ops . push ( TxnCondition :: eq_seq ( after_key , seq ) ) ;
333
+ update_ops . push ( txn_put_pb ( & after_dependent_ident , & deps ) ? ) ;
334
+ }
371
335
372
336
for after in remove_afters {
373
337
let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
374
338
let before_dependent_ident =
375
339
TaskDependentIdent :: new_generic ( & self . tenant , before_dependent) ;
376
340
let before_key = before_dependent_ident. to_string_key ( ) ;
377
341
378
- let seq = match self . kv_api . get_pb ( & before_dependent_ident) . await ? {
379
- None => 0 ,
380
- Some ( mut old_dependent) => {
381
- old_dependent. 0 . remove ( task_name) ;
342
+ let before_seq_deps = self . kv_api . get_pb ( & before_dependent_ident) . await ?;
343
+ check_ops. push ( TxnCondition :: eq_seq ( before_key, before_seq_deps. seq ( ) ) ) ;
382
344
383
- update_ops. push ( TxnOp :: put (
384
- before_key. clone ( ) ,
385
- old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
386
- ) ) ;
387
- old_dependent. seq
388
- }
389
- } ;
390
- check_ops. push ( TxnCondition :: eq_seq ( before_key, seq) ) ;
345
+ if let Some ( mut deps) = before_seq_deps {
346
+ deps. 0 . remove ( task_name) ;
347
+ update_ops. push ( txn_put_pb ( & before_dependent_ident, & deps) ?) ;
348
+ }
391
349
}
392
350
let request = TxnRequest :: new ( check_ops, update_ops) ;
393
351
let reply = self . kv_api . transaction ( request) . await ?;
@@ -452,12 +410,10 @@ impl TaskMgr {
452
410
TaskDependentKey :: new ( DependentType :: Before , task_name. to_string ( ) ) ,
453
411
) ;
454
412
let task_state_key = TaskStateIdent :: new ( & self . tenant , task_name) ;
455
- let succeeded_value = TaskState { is_succeeded : true } . to_pb ( ) ? . encode_to_vec ( ) ;
413
+ let succeeded_value = TaskState { is_succeeded : true } ;
456
414
let not_succeeded_value = TaskState {
457
415
is_succeeded : false ,
458
- }
459
- . to_pb ( ) ?
460
- . encode_to_vec ( ) ;
416
+ } ;
461
417
462
418
let Some ( task_before_dependent) = self . kv_api . get_pb ( & task_before_ident) . await ? else {
463
419
return Ok ( Ok ( Vec :: new ( ) ) ) ;
@@ -482,18 +438,13 @@ impl TaskMgr {
482
438
if target_after_task != task_name {
483
439
conditions. push ( TxnCondition :: eq_value (
484
440
task_ident. to_string_key ( ) ,
485
- succeeded_value. clone ( ) ,
441
+ succeeded_value. to_pb ( ) ? . encode_to_vec ( ) ,
486
442
) ) ;
487
443
}
488
- if_ops. push ( TxnOp :: put (
489
- task_ident. to_string_key ( ) ,
490
- not_succeeded_value. clone ( ) ,
491
- ) ) ;
444
+ if_ops. push ( txn_put_pb ( & task_ident, & not_succeeded_value) ?) ;
492
445
}
493
- let request = TxnRequest :: new ( conditions, if_ops) . with_else ( vec ! [ TxnOp :: put(
494
- task_state_key. to_string_key( ) ,
495
- succeeded_value. clone( ) ,
496
- ) ] ) ;
446
+ let request = TxnRequest :: new ( conditions, if_ops)
447
+ . with_else ( vec ! [ txn_put_pb( & task_state_key, & succeeded_value) ?] ) ;
497
448
let reply = self . kv_api . transaction ( request) . await ?;
498
449
499
450
if reply. success {
@@ -595,27 +546,13 @@ impl TaskMgr {
595
546
TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) ) ;
596
547
let target_key = target_ident. to_string_key ( ) ;
597
548
598
- let seq = match self . kv_api . get_pb ( & target_ident) . await ? {
599
- None => {
600
- update_ops. push ( TxnOp :: put (
601
- target_key. clone ( ) ,
602
- TaskDependentValue ( BTreeSet :: from_iter ( [ task_name. to_string ( ) ] ) )
603
- . to_pb ( ) ?
604
- . encode_to_vec ( ) ,
605
- ) ) ;
606
- 0
607
- }
608
- Some ( mut old_dependent) => {
609
- old_dependent. 0 . remove ( task_name) ;
610
-
611
- update_ops. push ( TxnOp :: put (
612
- target_key. clone ( ) ,
613
- old_dependent. to_pb ( ) ?. encode_to_vec ( ) ,
614
- ) ) ;
615
- old_dependent. seq
616
- }
617
- } ;
618
- check_ops. push ( TxnCondition :: eq_seq ( target_key, seq) ) ;
549
+ let seq_dep = self . kv_api . get_pb ( & target_ident) . await ?;
550
+ check_ops. push ( TxnCondition :: eq_seq ( target_key, seq_dep. seq ( ) ) ) ;
551
+
552
+ if let Some ( mut deps) = seq_dep {
553
+ deps. 0 . remove ( task_name) ;
554
+ update_ops. push ( txn_put_pb ( & target_ident, & deps) ?) ;
555
+ }
619
556
}
620
557
}
621
558
Ok ( ( ) )
0 commit comments