Skip to content

Commit 77c97c4

Browse files
committed
support delete local tasks
1 parent 75b2ade commit 77c97c4

File tree

7 files changed

+52
-1
lines changed

7 files changed

+52
-1
lines changed

core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
4747
*/
4848
private final String stepsTopic = "/topic/steps";
4949

50+
/**
51+
* To subscribe task update for job
52+
* Ex: /topic/tasks/{job id}
53+
*/
54+
private final String tasksTopic = "/topic/tasks";
55+
5056
/**
5157
* To subscribe real time logging for all jobs.
5258
* Ex: /topic/logs
@@ -89,6 +95,11 @@ public String topicForSteps() {
8995
return stepsTopic;
9096
}
9197

98+
@Bean("topicForTasks")
99+
public String topicForTasks() {
100+
return tasksTopic;
101+
}
102+
92103
@Bean("topicForLogs")
93104
public String topicForLogs() {
94105
return logsTopic;

core/src/main/java/com/flowci/core/job/dao/ExecutedLocalTaskDao.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,8 @@ public interface ExecutedLocalTaskDao extends MongoRepository<ExecutedLocalTask,
1313
List<ExecutedLocalTask> findAllByJobId(String jobId);
1414

1515
Optional<ExecutedLocalTask> findByJobIdAndAndName(String jobId, String name);
16+
17+
Long deleteAllByJobId(String jobId);
18+
19+
Long deleteAllByFlowId(String flowId);
1620
}

core/src/main/java/com/flowci/core/job/domain/ExecutedLocalTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ public final class ExecutedLocalTask implements Executed {
2727
@Indexed(name = "index_task_job_id")
2828
private String jobId;
2929

30+
@NonNull
31+
@Indexed(name = "index_task_flow_id")
32+
private String flowId;
33+
3034
@NonNull
3135
private String name;
3236

core/src/main/java/com/flowci/core/job/service/JobServiceImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ public class JobServiceImpl implements JobService {
110110
@Autowired
111111
private StepService stepService;
112112

113+
@Autowired
114+
private LocalTaskService localTaskService;
115+
113116
//====================================================================
114117
// %% Public functions
115118
//====================================================================
@@ -209,6 +212,7 @@ public Job rerun(Flow flow, Job job) {
209212

210213
// cleanup
211214
stepService.delete(job);
215+
localTaskService.delete(job);
212216
ymlManager.delete(job);
213217

214218
jobActionService.toCreated(job, yml.getRaw());
@@ -228,6 +232,9 @@ public void delete(Flow flow) {
228232
Long numOfStepDeleted = stepService.delete(flow);
229233
log.info("Deleted: {} steps of flow {}", numOfStepDeleted, flow.getName());
230234

235+
Long numOfTaskDeleted = localTaskService.delete(flow);
236+
log.info("Deleted: {} tasks of flow {}", numOfTaskDeleted, flow.getName());
237+
231238
eventManager.publish(new JobDeletedEvent(this, flow, numOfJobDeleted));
232239
});
233240
}

core/src/main/java/com/flowci/core/job/service/LocalTaskService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.flowci.core.job.service;
22

3+
import com.flowci.core.flow.domain.Flow;
34
import com.flowci.core.job.domain.ExecutedLocalTask;
45
import com.flowci.core.job.domain.Job;
56
import com.flowci.domain.LocalTask;
@@ -12,6 +13,10 @@ public interface LocalTaskService {
1213

1314
List<ExecutedLocalTask> list(Job job);
1415

16+
Long delete(Job job);
17+
18+
Long delete(Flow flow);
19+
1520
void executeAsync(Job job);
1621

1722
ExecutedLocalTask execute(Job job, LocalTask task);

core/src/main/java/com/flowci/core/job/service/LocalTaskServiceImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.flowci.core.job.service;
22

33
import com.flowci.core.common.manager.SpringEventManager;
4+
import com.flowci.core.flow.domain.Flow;
45
import com.flowci.core.job.dao.ExecutedLocalTaskDao;
56
import com.flowci.core.job.domain.Executed;
67
import com.flowci.core.job.domain.ExecutedLocalTask;
@@ -62,6 +63,7 @@ public void init(Job job) {
6263
ExecutedLocalTask t = new ExecutedLocalTask();
6364
t.setName(n.getPlugin()); // name is plugin name
6465
t.setJobId(job.getId());
66+
t.setFlowId(job.getFlowId());
6567
tasks.add(t);
6668
}
6769

@@ -74,6 +76,16 @@ public List<ExecutedLocalTask> list(Job job) {
7476
return executedLocalTaskDao.findAllByJobId(job.getId());
7577
}
7678

79+
@Override
80+
public Long delete(Job job) {
81+
return executedLocalTaskDao.deleteAllByJobId(job.getId());
82+
}
83+
84+
@Override
85+
public Long delete(Flow flow) {
86+
return executedLocalTaskDao.deleteAllByFlowId(flow.getId());
87+
}
88+
7789
@Override
7890
public void executeAsync(Job job) {
7991
localTaskExecutor.execute(() -> {

core/src/main/java/com/flowci/core/job/service/PushServiceImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public class PushServiceImpl implements PushService {
4141
@Autowired
4242
private String topicForSteps;
4343

44+
@Autowired
45+
private String topicForTasks;
46+
4447
@Autowired
4548
private SocketPushManager socketPushManager;
4649

@@ -74,6 +77,11 @@ public void onStepStatusChange(StepUpdateEvent event) {
7477
@Override
7578
@EventListener
7679
public void onTaskStatusChange(TaskUpdateEvent event) {
77-
80+
String topic = topicForTasks + "/" + event.getJobId();
81+
if (event.isInit()) {
82+
socketPushManager.push(topic, PushEvent.NEW_CREATED, event.getItems());
83+
return;
84+
}
85+
socketPushManager.push(topic, PushEvent.STATUS_CHANGE, event.getItems());
7886
}
7987
}

0 commit comments

Comments
 (0)