Skip to content

Commit 1c58cee

Browse files
committed
chore: update_after is divided into add_afters & remove_afters & clean_task_state_and_dependents
1 parent 975fba5 commit 1c58cee

File tree

2 files changed

+80
-41
lines changed

2 files changed

+80
-41
lines changed

โ€Žsrc/query/management/src/task/task_mgr.rs

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashSet;
1615
use std::str::FromStr;
1716
use std::sync::Arc;
1817

@@ -168,12 +167,7 @@ impl TaskMgr {
168167
name: task_name.to_string(),
169168
}));
170169
}
171-
for after in afters {
172-
if task.after.contains(after) {
173-
continue;
174-
}
175-
task.after.push(after.clone());
176-
}
170+
return self.add_after(&task.task_name, afters).await;
177171
}
178172
AlterTaskOptions::RemoveAfter(afters) => {
179173
if task.schedule_options.is_some() {
@@ -182,7 +176,7 @@ impl TaskMgr {
182176
name: task_name.to_string(),
183177
}));
184178
}
185-
task.after.retain(|task| !afters.contains(task));
179+
return self.remove_after(&task.task_name, afters).await;
186180
}
187181
}
188182
if let Err(e) = self
@@ -268,32 +262,14 @@ impl TaskMgr {
268262

269263
#[async_backtrace::framed]
270264
#[fastrace::trace]
271-
pub async fn update_after(
265+
pub async fn add_after(
272266
&self,
273267
task_name: &str,
274-
task_after: &[String],
268+
new_afters: &[String],
275269
) -> Result<Result<(), TaskError>, TaskApiError> {
276-
let task_after_ident = DirName::new(TaskDependentIdent::new_generic(
277-
&self.tenant,
278-
TaskDependent::new(DependentType::After, task_name.to_string(), "".to_string()),
279-
));
280-
281270
let mut update_ops = Vec::new();
282-
283-
let mut new_afters: HashSet<&String> = task_after.iter().collect();
284-
let mut remove_afters: Vec<String> = Vec::new();
285-
let mut task_after_stream = self.kv_api.list_pb_values(&task_after_ident).await?;
286-
287-
while let Some(after_task_dependent) = task_after_stream.next().await {
288-
let after_task_dependent = after_task_dependent?;
289-
290-
debug_assert_eq!(after_task_dependent.ty, DependentType::After);
291-
292-
if !new_afters.remove(&after_task_dependent.target) {
293-
remove_afters.push(after_task_dependent.target.clone());
294-
}
295-
}
296271
let mut check_ops = Vec::with_capacity(new_afters.len());
272+
297273
for after in new_afters {
298274
let after_dependent =
299275
TaskDependent::new(DependentType::After, task_name.to_string(), after.clone());
@@ -320,27 +296,91 @@ impl TaskMgr {
320296
target: Some(Target::Seq(0)),
321297
});
322298
}
323-
for after in remove_afters {
324-
let before_dependent_ident = TaskDependentIdent::new_generic(
325-
&self.tenant,
326-
TaskDependent::new(DependentType::Before, after.clone(), task_name.to_string()),
327-
);
328-
329-
update_ops.push(TxnOp::delete(before_dependent_ident.to_string_key()));
330-
}
331299
let request = TxnRequest::new(check_ops, update_ops);
332300
let reply = self.kv_api.transaction(request).await?;
333301

334302
if reply.success {
335303
return Err(TaskApiError::SimultaneousUpdateTaskAfter {
336304
task_name: task_name.to_string(),
337-
after: task_after.join(","),
305+
after: new_afters.join(","),
338306
});
339307
}
340308

341309
Ok(Ok(()))
342310
}
343311

312+
#[async_backtrace::framed]
313+
#[fastrace::trace]
314+
pub async fn remove_after(
315+
&self,
316+
task_name: &str,
317+
remove_afters: &[String],
318+
) -> Result<Result<(), TaskError>, TaskApiError> {
319+
let task_after_dir_ident = DirName::new(TaskDependentIdent::new_generic(
320+
&self.tenant,
321+
TaskDependent::new(DependentType::After, task_name.to_string(), "".to_string()),
322+
));
323+
let mut task_after_stream = self.kv_api.list_pb_values(&task_after_dir_ident).await?;
324+
let mut update_ops = Vec::new();
325+
326+
while let Some(task_after_dependent) = task_after_stream.next().await {
327+
let task_after_dependent = task_after_dependent?;
328+
329+
debug_assert_eq!(task_after_dependent.ty, DependentType::After);
330+
331+
if !remove_afters.contains(&task_after_dependent.target) {
332+
continue;
333+
}
334+
let task_after_ident =
335+
TaskDependentIdent::new_generic(&self.tenant, task_after_dependent);
336+
update_ops.push(TxnOp::delete(task_after_ident.to_string_key()));
337+
}
338+
let request = TxnRequest::new(vec![], update_ops);
339+
let _ = self.kv_api.transaction(request).await?;
340+
341+
Ok(Ok(()))
342+
}
343+
344+
// Tips: Task Before only cleans up when dropping a task
345+
#[async_backtrace::framed]
346+
#[fastrace::trace]
347+
pub async fn clean_task_state_and_dependents(
348+
&self,
349+
task_name: &str,
350+
) -> Result<Result<(), TaskError>, TaskApiError> {
351+
let task_before_dir_ident = DirName::new(TaskDependentIdent::new_generic(
352+
&self.tenant,
353+
TaskDependent::new(DependentType::Before, task_name.to_string(), "".to_string()),
354+
));
355+
let mut task_before_stream = self.kv_api.list_pb_values(&task_before_dir_ident).await?;
356+
let mut update_ops = Vec::new();
357+
358+
while let Some(task_before_dependent) = task_before_stream.next().await {
359+
let task_before_dependent = task_before_dependent?;
360+
debug_assert_eq!(task_before_dependent.ty, DependentType::Before);
361+
362+
let task_after_dependent = TaskDependent::new(
363+
DependentType::After,
364+
task_before_dependent.target.to_string(),
365+
task_before_dependent.source.to_string(),
366+
);
367+
368+
let task_before_ident =
369+
TaskDependentIdent::new_generic(&self.tenant, task_before_dependent);
370+
update_ops.push(TxnOp::delete(task_before_ident.to_string_key()));
371+
372+
let task_after_ident =
373+
TaskDependentIdent::new_generic(&self.tenant, task_after_dependent);
374+
update_ops.push(TxnOp::delete(task_after_ident.to_string_key()));
375+
}
376+
update_ops.push(TxnOp::delete(TaskStateIdent::new(&self.tenant, task_name).to_string_key()));
377+
378+
let request = TxnRequest::new(vec![], update_ops);
379+
let _ = self.kv_api.transaction(request).await?;
380+
381+
Ok(Ok(()))
382+
}
383+
344384
#[async_backtrace::framed]
345385
#[fastrace::trace]
346386
pub async fn task_succeeded(
@@ -466,7 +506,7 @@ impl TaskMgr {
466506
}
467507
}
468508
if !task.after.is_empty() {
469-
if let Err(err) = self.update_after(&task.task_name, &task.after).await? {
509+
if let Err(err) = self.add_after(&task.task_name, &task.after).await? {
470510
return Ok(Err(err));
471511
}
472512
} else if task.schedule_options.is_some() && !without_schedule {

โ€Žsrc/query/service/src/task/service.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,8 +449,7 @@ impl TaskService {
449449
token.cancel();
450450
}
451451
if task_mgr.accept(&task_key).await? {
452-
task_mgr.clean_task_state(&task_name).await?;
453-
task_mgr.update_after(&task_name, &[]).await??;
452+
task_mgr.clean_task_state_and_dependents(&task_name).await??;
454453
}
455454
task_mgr
456455
.accept(&TaskMessageIdent::new(

0 commit comments

Comments
ย (0)