Skip to content

Commit f10608f

Browse files
committed
fix: task state will be affected by get_next_ready_tasks
1 parent 1b6d063 commit f10608f

File tree

7 files changed

+105
-69
lines changed

7 files changed

+105
-69
lines changed

src/meta/app/src/principal/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub use task::TaskDependentKey;
101101
pub use task::TaskDependentValue;
102102
pub use task::TaskMessage;
103103
pub use task::TaskRun;
104-
pub use task::TaskState;
104+
pub use task::TaskStateValue;
105105
pub use task::WarehouseOptions;
106106
pub use task_ident::TaskIdent;
107107
pub use task_ident::TaskIdentRaw;

src/meta/app/src/principal/task.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,19 @@ impl TaskMessage {
175175
}
176176

177177
#[derive(Debug, Clone, PartialEq)]
178-
pub struct TaskState {
178+
pub struct TaskStateKey {
179+
pub current: String,
180+
pub next: String,
181+
}
182+
183+
impl TaskStateKey {
184+
pub fn new(current: String, next: String) -> Self {
185+
Self { current, next }
186+
}
187+
}
188+
189+
#[derive(Debug, Clone, PartialEq)]
190+
pub struct TaskStateValue {
179191
pub is_succeeded: bool,
180192
}
181193

@@ -202,11 +214,29 @@ pub struct TaskDependentValue(pub BTreeSet<String>);
202214

203215
mod kvapi_key_impl {
204216
use databend_common_meta_kvapi::kvapi;
217+
use databend_common_meta_kvapi::kvapi::KeyBuilder;
205218
use databend_common_meta_kvapi::kvapi::KeyError;
219+
use databend_common_meta_kvapi::kvapi::KeyParser;
206220

221+
use crate::principal::task::TaskStateKey;
207222
use crate::principal::DependentType;
208223
use crate::principal::TaskDependentKey;
209224

225+
impl kvapi::KeyCodec for TaskStateKey {
226+
fn encode_key(&self, b: KeyBuilder) -> KeyBuilder {
227+
b.push_str(self.current.as_str())
228+
.push_str(self.next.as_str())
229+
}
230+
231+
fn decode_key(parser: &mut KeyParser) -> Result<Self, KeyError>
232+
where Self: Sized {
233+
let current = parser.next_str()?;
234+
let next = parser.next_str()?;
235+
236+
Ok(Self { current, next })
237+
}
238+
}
239+
210240
impl kvapi::KeyCodec for TaskDependentKey {
211241
fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
212242
match self.ty {

src/meta/app/src/principal/task_state_ident.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,26 @@
1414

1515
use crate::tenant_key::ident::TIdent;
1616

17-
pub type TaskStateIdent = TIdent<Resource>;
18-
19-
pub type TaskStateIdentRaw = TIdent<Resource>;
17+
pub type TaskStateIdent = TIdent<Resource, TaskStateKey>;
18+
19+
impl TaskStateIdent {
20+
pub fn new(tenant: impl ToTenant, current: impl ToString, next: impl ToString) -> Self {
21+
TaskStateIdent::new_generic(
22+
tenant,
23+
TaskStateKey::new(current.to_string(), next.to_string()),
24+
)
25+
}
26+
}
2027

2128
pub use kvapi_impl::Resource;
2229

30+
use crate::principal::task::TaskStateKey;
31+
use crate::tenant::ToTenant;
32+
2333
mod kvapi_impl {
2434
use databend_common_meta_kvapi::kvapi;
2535

26-
use crate::principal::task::TaskState;
36+
use crate::principal::task::TaskStateValue;
2737
use crate::principal::task_state_ident::TaskStateIdent;
2838
use crate::tenant_key::resource::TenantResource;
2939

@@ -32,10 +42,10 @@ mod kvapi_impl {
3242
const PREFIX: &'static str = "__fd_task_states";
3343
const TYPE: &'static str = "TaskStateIdent";
3444
const HAS_TENANT: bool = true;
35-
type ValueType = TaskState;
45+
type ValueType = TaskStateValue;
3646
}
3747

38-
impl kvapi::Value for TaskState {
48+
impl kvapi::Value for TaskStateValue {
3949
type KeyType = TaskStateIdent;
4050

4151
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {

src/meta/proto-conv/src/task_from_to_protobuf_impl.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ impl FromToProto for mt::TaskDependentValue {
219219
}
220220
}
221221

222-
impl FromToProto for mt::TaskState {
223-
type PB = pb::TaskState;
222+
impl FromToProto for mt::TaskStateValue {
223+
type PB = pb::TaskStateValue;
224224

225225
fn get_pb_ver(p: &Self::PB) -> u64 {
226226
p.ver
@@ -234,7 +234,7 @@ impl FromToProto for mt::TaskState {
234234
}
235235

236236
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
237-
Ok(pb::TaskState {
237+
Ok(pb::TaskStateValue {
238238
ver: VER,
239239
min_reader_ver: MIN_READER_VER,
240240
is_succeeded: self.is_succeeded,

src/meta/proto-conv/tests/it/v141_task_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::common;
2323
fn test_decode_v141_task_state() -> anyhow::Result<()> {
2424
let task_state_v141 = vec![8, 1, 160, 6, 141, 1, 168, 6, 24];
2525

26-
let want = || mt::TaskState { is_succeeded: true };
26+
let want = || mt::TaskStateValue { is_succeeded: true };
2727
common::test_pb_from_to(func_name!(), want())?;
2828
common::test_load_old(func_name!(), task_state_v141.as_slice(), 141, want())?;
2929

src/meta/protos/proto/task.proto

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,25 +79,13 @@ message Task {
7979
string owner_user = 21;
8080
}
8181

82-
message TaskState {
82+
message TaskStateValue {
8383
uint64 ver = 100;
8484
uint64 min_reader_ver = 101;
8585

8686
bool is_succeeded = 1;
8787
}
8888

89-
message TaskDependentKey {
90-
uint64 ver = 100;
91-
uint64 min_reader_ver = 101;
92-
93-
enum DependentType {
94-
After = 0;
95-
Before = 1;
96-
}
97-
DependentType ty = 1;
98-
string source = 2;
99-
}
100-
10189
message TaskDependentValue {
10290
uint64 ver = 100;
10391
uint64 min_reader_ver = 101;

src/query/management/src/task/task_mgr.rs

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use databend_common_meta_api::util::txn_put_pb;
2727
use databend_common_meta_api::SchemaApi;
2828
use databend_common_meta_app::principal::task;
2929
use databend_common_meta_app::principal::task::TaskMessage;
30-
use databend_common_meta_app::principal::task::TaskState;
30+
use databend_common_meta_app::principal::task::TaskStateValue;
3131
use databend_common_meta_app::principal::task_dependent_ident::TaskDependentIdent;
3232
use databend_common_meta_app::principal::task_message_ident::TaskMessageIdent;
3333
use databend_common_meta_app::principal::task_state_ident::TaskStateIdent;
@@ -47,6 +47,7 @@ use databend_common_meta_types::TxnOp;
4747
use databend_common_meta_types::TxnRequest;
4848
use databend_common_meta_types::With;
4949
use databend_common_proto_conv::FromToProto;
50+
use futures::StreamExt;
5051
use futures::TryStreamExt;
5152
use prost::Message;
5253
use seq_marked::SeqValue;
@@ -266,37 +267,38 @@ impl TaskMgr {
266267
task_name: &str,
267268
new_afters: &[String],
268269
) -> Result<Result<(), TaskError>, TaskApiError> {
269-
let mut update_ops = Vec::new();
270-
let mut check_ops = Vec::with_capacity(new_afters.len());
271-
270+
let mut request = TxnRequest::new(vec![], vec![]);
272271
let after_dependent_ident = TaskDependentIdent::new_after(&self.tenant, task_name);
273272

274273
let after_seq_deps = self.kv_api.get_pb(&after_dependent_ident).await?;
275-
check_ops.push(txn_cond_eq_seq(
274+
request.condition.push(txn_cond_eq_seq(
276275
&after_dependent_ident,
277276
after_seq_deps.seq(),
278277
));
279278

280279
let mut after_deps = after_seq_deps.unwrap_or_default();
281280
after_deps.0.extend(new_afters.iter().cloned());
282-
update_ops.push(txn_put_pb(&after_dependent_ident, &after_deps)?);
281+
request
282+
.if_then
283+
.push(txn_put_pb(&after_dependent_ident, &after_deps)?);
283284

284285
let before_dependent_idents = new_afters
285286
.iter()
286287
.map(|after| TaskDependentIdent::new_before(&self.tenant, after));
287288
for (before_dependent_ident, before_seq_deps) in
288289
self.kv_api.get_pb_vec(before_dependent_idents).await?
289290
{
290-
check_ops.push(txn_cond_eq_seq(
291+
request.condition.push(txn_cond_eq_seq(
291292
&before_dependent_ident,
292293
before_seq_deps.seq(),
293294
));
294295

295296
let mut deps = before_seq_deps.unwrap_or_default();
296297
deps.0.insert(task_name.to_string());
297-
update_ops.push(txn_put_pb(&before_dependent_ident, &deps)?);
298+
request
299+
.if_then
300+
.push(txn_put_pb(&before_dependent_ident, &deps)?);
298301
}
299-
let request = TxnRequest::new(check_ops, update_ops);
300302
let reply = self.kv_api.transaction(request).await?;
301303

302304
if !reply.success {
@@ -316,12 +318,11 @@ impl TaskMgr {
316318
task_name: &str,
317319
remove_afters: &[String],
318320
) -> Result<Result<(), TaskError>, TaskApiError> {
319-
let mut update_ops = Vec::new();
320-
let mut check_ops = Vec::with_capacity(remove_afters.len());
321+
let mut request = TxnRequest::new(vec![], vec![]);
321322

322323
let after_dependent_ident = TaskDependentIdent::new_after(&self.tenant, task_name);
323324
let after_seq_deps = self.kv_api.get_pb(&after_dependent_ident).await?;
324-
check_ops.push(txn_cond_eq_seq(
325+
request.condition.push(txn_cond_eq_seq(
325326
&after_dependent_ident,
326327
after_seq_deps.seq(),
327328
));
@@ -330,7 +331,9 @@ impl TaskMgr {
330331
for remove_after in remove_afters {
331332
deps.0.remove(remove_after);
332333
}
333-
update_ops.push(txn_put_pb(&after_dependent_ident, &deps)?);
334+
request
335+
.if_then
336+
.push(txn_put_pb(&after_dependent_ident, &deps)?);
334337
}
335338

336339
let before_dependent_idents = remove_afters
@@ -339,17 +342,18 @@ impl TaskMgr {
339342
for (before_dependent_ident, before_seq_deps) in
340343
self.kv_api.get_pb_vec(before_dependent_idents).await?
341344
{
342-
check_ops.push(txn_cond_eq_seq(
345+
request.condition.push(txn_cond_eq_seq(
343346
&before_dependent_ident,
344347
before_seq_deps.seq(),
345348
));
346349

347350
if let Some(mut deps) = before_seq_deps {
348351
deps.0.remove(task_name);
349-
update_ops.push(txn_put_pb(&before_dependent_ident, &deps)?);
352+
request
353+
.if_then
354+
.push(txn_put_pb(&before_dependent_ident, &deps)?);
350355
}
351356
}
352-
let request = TxnRequest::new(check_ops, update_ops);
353357
let reply = self.kv_api.transaction(request).await?;
354358

355359
if !reply.success {
@@ -369,8 +373,7 @@ impl TaskMgr {
369373
&self,
370374
task_name: &str,
371375
) -> Result<Result<(), TaskError>, TaskApiError> {
372-
let mut check_ops = Vec::new();
373-
let mut update_ops = Vec::new();
376+
let mut request = TxnRequest::new(vec![], vec![]);
374377

375378
let task_after_ident = TaskDependentIdent::new_after(&self.tenant, task_name);
376379
let task_before_ident = TaskDependentIdent::new_before(&self.tenant, task_name);
@@ -387,24 +390,33 @@ impl TaskMgr {
387390
}));
388391
}
389392
for (target_ident, seq_dep) in self.kv_api.get_pb_vec(target_idents).await? {
390-
check_ops.push(txn_cond_eq_seq(&target_ident, seq_dep.seq()));
393+
request
394+
.condition
395+
.push(txn_cond_eq_seq(&target_ident, seq_dep.seq()));
391396

392397
if let Some(mut deps) = seq_dep {
393398
deps.0.remove(task_name);
394-
update_ops.push(txn_put_pb(&target_ident, &deps)?);
399+
request.if_then.push(txn_put_pb(&target_ident, &deps)?);
395400
}
396401
}
397-
update_ops.push(TxnOp::delete(
402+
request.if_then.push(TxnOp::delete(
398403
TaskDependentIdent::new_before(&self.tenant, task_name).to_string_key(),
399404
));
400-
update_ops.push(TxnOp::delete(
405+
request.if_then.push(TxnOp::delete(
401406
TaskDependentIdent::new_after(&self.tenant, task_name).to_string_key(),
402407
));
403-
update_ops.push(TxnOp::delete(
404-
TaskStateIdent::new(&self.tenant, task_name).to_string_key(),
405-
));
408+
let mut stream = self
409+
.kv_api
410+
.list_pb_keys(&DirName::new(TaskStateIdent::new(
411+
&self.tenant,
412+
task_name,
413+
"",
414+
)))
415+
.await?;
406416

407-
let request = TxnRequest::new(check_ops, update_ops);
417+
while let Some(result) = stream.next().await {
418+
request.if_then.push(TxnOp::delete(result?.to_string_key()))
419+
}
408420
let _ = self.kv_api.transaction(request).await?;
409421

410422
Ok(Ok(()))
@@ -439,9 +451,8 @@ impl TaskMgr {
439451
task_name: &str,
440452
) -> Result<Result<Vec<String>, TaskError>, TaskApiError> {
441453
let task_before_ident = TaskDependentIdent::new_before(&self.tenant, task_name);
442-
let task_state_key = TaskStateIdent::new(&self.tenant, task_name);
443-
let succeeded_value = TaskState { is_succeeded: true };
444-
let not_succeeded_value = TaskState {
454+
let succeeded_value = TaskStateValue { is_succeeded: true };
455+
let not_succeeded_value = TaskStateValue {
445456
is_succeeded: false,
446457
};
447458

@@ -460,13 +471,19 @@ impl TaskMgr {
460471
let Some(target_after_dependent) = target_after_dependent else {
461472
continue;
462473
};
463-
let mut request = TxnRequest::new(vec![], vec![])
464-
.with_else(vec![txn_put_pb(&task_state_key, &succeeded_value)?]);
465-
466-
for target_after_task in target_after_dependent.0.iter() {
467-
let task_ident = TaskStateIdent::new(&self.tenant, target_after_task);
474+
let target_after = &target_after_ident.name().source;
475+
let this_task_to_target_state =
476+
TaskStateIdent::new(&self.tenant, task_name, target_after);
477+
let mut request = TxnRequest::new(vec![], vec![]).with_else(vec![txn_put_pb(
478+
&this_task_to_target_state,
479+
&succeeded_value,
480+
)?]);
481+
482+
for before_target_after in target_after_dependent.0.iter() {
483+
let task_ident =
484+
TaskStateIdent::new(&self.tenant, before_target_after, target_after);
468485
// Only care about the predecessors of this task's successor tasks, excluding this task itself.
469-
if target_after_task != task_name {
486+
if before_target_after != task_name {
470487
request.condition.push(TxnCondition::eq_value(
471488
task_ident.to_string_key(),
472489
succeeded_value.to_pb()?.encode_to_vec(),
@@ -485,15 +502,6 @@ impl TaskMgr {
485502
Ok(Ok(ready_tasks))
486503
}
487504

488-
#[async_backtrace::framed]
489-
#[fastrace::trace]
490-
pub async fn clean_task_state(&self, task_name: &str) -> Result<(), TaskApiError> {
491-
let key = TaskStateIdent::new(&self.tenant, task_name);
492-
let req = UpsertPB::delete(key).with(MatchSeq::GE(1));
493-
let _ = self.kv_api.upsert_pb(&req).await?;
494-
Ok(())
495-
}
496-
497505
async fn create_task_inner(
498506
&self,
499507
task: Task,

0 commit comments

Comments
 (0)