@@ -22,6 +22,7 @@ use databend_common_ast::ast::AlterTaskOptions;
22
22
use databend_common_ast:: ast:: ScheduleOptions ;
23
23
use databend_common_meta_api:: kv_pb_api:: KVPbApi ;
24
24
use databend_common_meta_api:: kv_pb_api:: UpsertPB ;
25
+ use databend_common_meta_api:: txn_cond_eq_seq;
25
26
use databend_common_meta_api:: util:: txn_put_pb;
26
27
use databend_common_meta_api:: SchemaApi ;
27
28
use databend_common_meta_app:: principal:: task;
@@ -273,23 +274,29 @@ impl TaskMgr {
273
274
let after_dependent = TaskDependentKey :: new ( DependentType :: After , task_name. to_string ( ) ) ;
274
275
let after_dependent_ident =
275
276
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
276
- let after_key = after_dependent_ident. to_string_key ( ) ;
277
277
278
278
let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
279
- check_ops. push ( TxnCondition :: eq_seq ( after_key, after_seq_deps. seq ( ) ) ) ;
279
+ check_ops. push ( txn_cond_eq_seq (
280
+ & after_dependent_ident,
281
+ after_seq_deps. seq ( ) ,
282
+ ) ) ;
280
283
281
284
let mut after_deps = after_seq_deps. unwrap_or_default ( ) ;
282
285
after_deps. 0 . extend ( new_afters. iter ( ) . cloned ( ) ) ;
283
286
update_ops. push ( txn_put_pb ( & after_dependent_ident, & after_deps) ?) ;
284
287
285
- for after in new_afters {
286
- let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
287
- let before_dependent_ident =
288
- TaskDependentIdent :: new_generic ( & self . tenant , before_dependent) ;
289
- let before_key = before_dependent_ident. to_string_key ( ) ;
290
-
291
- let before_seq_deps = self . kv_api . get_pb ( & before_dependent_ident) . await ?;
292
- check_ops. push ( TxnCondition :: eq_seq ( before_key, before_seq_deps. seq ( ) ) ) ;
288
+ for ( before_dependent_ident, before_seq_deps) in self
289
+ . kv_api
290
+ . get_pb_vec ( new_afters. iter ( ) . map ( |after| {
291
+ let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
292
+ TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
293
+ } ) )
294
+ . await ?
295
+ {
296
+ check_ops. push ( txn_cond_eq_seq (
297
+ & before_dependent_ident,
298
+ before_seq_deps. seq ( ) ,
299
+ ) ) ;
293
300
294
301
let mut deps = before_seq_deps. unwrap_or_default ( ) ;
295
302
deps. 0 . insert ( task_name. to_string ( ) ) ;
@@ -321,10 +328,12 @@ impl TaskMgr {
321
328
let after_dependent = TaskDependentKey :: new ( DependentType :: After , task_name. to_string ( ) ) ;
322
329
let after_dependent_ident =
323
330
TaskDependentIdent :: new_generic ( & self . tenant , after_dependent. clone ( ) ) ;
324
- let after_key = after_dependent_ident. to_string_key ( ) ;
325
331
326
332
let after_seq_deps = self . kv_api . get_pb ( & after_dependent_ident) . await ?;
327
- check_ops. push ( TxnCondition :: eq_seq ( after_key, after_seq_deps. seq ( ) ) ) ;
333
+ check_ops. push ( txn_cond_eq_seq (
334
+ & after_dependent_ident,
335
+ after_seq_deps. seq ( ) ,
336
+ ) ) ;
328
337
329
338
if let Some ( mut deps) = after_seq_deps {
330
339
for remove_after in remove_afters {
@@ -333,14 +342,18 @@ impl TaskMgr {
333
342
update_ops. push ( txn_put_pb ( & after_dependent_ident, & deps) ?) ;
334
343
}
335
344
336
- for after in remove_afters {
337
- let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
338
- let before_dependent_ident =
339
- TaskDependentIdent :: new_generic ( & self . tenant , before_dependent) ;
340
- let before_key = before_dependent_ident. to_string_key ( ) ;
341
-
342
- let before_seq_deps = self . kv_api . get_pb ( & before_dependent_ident) . await ?;
343
- check_ops. push ( TxnCondition :: eq_seq ( before_key, before_seq_deps. seq ( ) ) ) ;
345
+ for ( before_dependent_ident, before_seq_deps) in self
346
+ . kv_api
347
+ . get_pb_vec ( remove_afters. iter ( ) . map ( |after| {
348
+ let before_dependent = TaskDependentKey :: new ( DependentType :: Before , after. clone ( ) ) ;
349
+ TaskDependentIdent :: new_generic ( & self . tenant , before_dependent)
350
+ } ) )
351
+ . await ?
352
+ {
353
+ check_ops. push ( txn_cond_eq_seq (
354
+ & before_dependent_ident,
355
+ before_seq_deps. seq ( ) ,
356
+ ) ) ;
344
357
345
358
if let Some ( mut deps) = before_seq_deps {
346
359
deps. 0 . remove ( task_name) ;
@@ -420,12 +433,15 @@ impl TaskMgr {
420
433
} ;
421
434
let mut ready_tasks = Vec :: new ( ) ;
422
435
423
- for task_before_target in task_before_dependent. 0 . iter ( ) {
424
- let target_after_ident = TaskDependentIdent :: new_generic (
425
- & self . tenant ,
426
- TaskDependentKey :: new ( DependentType :: After , task_before_target. to_string ( ) ) ,
427
- ) ;
428
- let Some ( target_after_dependent) = self . kv_api . get ( & target_after_ident) . await ? else {
436
+ for ( target_after_ident, target_after_dependent) in self
437
+ . kv_api
438
+ . get_pb_vec ( task_before_dependent. 0 . iter ( ) . map ( |before| {
439
+ let after_dependent = TaskDependentKey :: new ( DependentType :: After , before. clone ( ) ) ;
440
+ TaskDependentIdent :: new_generic ( & self . tenant , after_dependent)
441
+ } ) )
442
+ . await ?
443
+ {
444
+ let Some ( target_after_dependent) = target_after_dependent else {
429
445
continue ;
430
446
} ;
431
447
@@ -448,7 +464,7 @@ impl TaskMgr {
448
464
let reply = self . kv_api . transaction ( request) . await ?;
449
465
450
466
if reply. success {
451
- ready_tasks. push ( task_before_target . clone ( ) )
467
+ ready_tasks. push ( target_after_ident . name ( ) . source . clone ( ) ) ;
452
468
}
453
469
}
454
470
Ok ( Ok ( ready_tasks) )
@@ -544,10 +560,9 @@ impl TaskMgr {
544
560
let target_key = TaskDependentKey :: new ( other_dependent, dependent_target. clone ( ) ) ;
545
561
let target_ident =
546
562
TaskDependentIdent :: new_generic ( & self . tenant , target_key. clone ( ) ) ;
547
- let target_key = target_ident. to_string_key ( ) ;
548
563
549
564
let seq_dep = self . kv_api . get_pb ( & target_ident) . await ?;
550
- check_ops. push ( TxnCondition :: eq_seq ( target_key , seq_dep. seq ( ) ) ) ;
565
+ check_ops. push ( txn_cond_eq_seq ( & target_ident , seq_dep. seq ( ) ) ) ;
551
566
552
567
if let Some ( mut deps) = seq_dep {
553
568
deps. 0 . remove ( task_name) ;
0 commit comments