Skip to content

Commit cc35ed4

Browse files
committed
[waterflow] 添加流程数据定期清理机制
1 parent 4f5c8b6 commit cc35ed4

File tree

12 files changed

+166
-0
lines changed

12 files changed

+166
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fit.waterflow.flowsengine.biz.service.scheduletasks;
8+
9+
import modelengine.fit.waterflow.common.utils.SleepUtil;
10+
import modelengine.fit.waterflow.flowsengine.domain.flows.context.repo.flowcontext.FlowContextRepo;
11+
import modelengine.fit.waterflow.flowsengine.domain.flows.context.repo.flowtrace.FlowTraceRepo;
12+
import modelengine.fitframework.annotation.Component;
13+
import modelengine.fitframework.annotation.Fit;
14+
import modelengine.fitframework.annotation.Value;
15+
import modelengine.fitframework.log.Logger;
16+
import modelengine.fitframework.schedule.annotation.Scheduled;
17+
import modelengine.fitframework.transaction.Transactional;
18+
19+
import java.util.List;
20+
21+
/**
22+
* 定时清理流程中已完成的context
23+
* 包括成功、失败、终止的流程数据
24+
*
25+
* @author 杨祥宇
26+
* @since 2025/04/02
27+
*/
28+
@Component
29+
public class CleanContextSchedule {
30+
private static final Logger log = Logger.get(CleanContextSchedule.class);
31+
private static final int LIMIT = 1000;
32+
private final FlowTraceRepo flowTraceRepo;
33+
private final FlowContextRepo flowContextRepo;
34+
private final int expiredDays;
35+
36+
public CleanContextSchedule(FlowTraceRepo flowTraceRepo, @Fit(alias = "flowContextPersistRepo") FlowContextRepo
37+
flowContextRepo, @Value("${jane.flowsEngine.contextExpiredDays}") int expiredDays) {
38+
this.flowTraceRepo = flowTraceRepo;
39+
this.flowContextRepo = flowContextRepo;
40+
this.expiredDays = expiredDays;
41+
}
42+
43+
/**
44+
* 每天凌晨3点定时清理超期EXPIRED_DAYS天的流程运行数据
45+
*/
46+
@Scheduled(strategy = Scheduled.Strategy.CRON, value = "0 0 3 * * ?")
47+
@Transactional
48+
public void cleanContextSchedule() {
49+
log.info("Start clean flow expired contexts");
50+
try {
51+
List<String> traceIds = flowTraceRepo.getExpiredTrace(expiredDays, LIMIT);
52+
while (!traceIds.isEmpty()) {
53+
flowContextRepo.deleteByTraceIdList(traceIds);
54+
flowTraceRepo.deleteByIdList(traceIds);
55+
traceIds = flowTraceRepo.getExpiredTrace(expiredDays, LIMIT);
56+
SleepUtil.sleep(60000);
57+
}
58+
} catch (Exception ex) {
59+
log.error("Clean context error, error message: {}" + ex.getMessage());
60+
}
61+
}
62+
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextPersistRepo.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,4 +736,12 @@ public boolean hasContextWithStatusAtPosition(List<String> statusList, String tr
736736
public String getTransIdByTrace(String traceId) {
737737
return contextMapper.getTransIdByTrace(traceId);
738738
}
739+
740+
@Override
741+
public void deleteByTraceIdList(List<String> traceIdList) {
742+
if (traceIdList.isEmpty()) {
743+
return;
744+
}
745+
contextMapper.deleteByTraceIdList(traceIdList);
746+
}
739747
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowcontext/FlowContextRepo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,5 +671,14 @@ default List<FlowContext<String>> getWithoutFlowDataByToBatch(List<String> toBat
671671
* @return trans id
672672
*/
673673
String getTransIdByTrace(String traceId);
674+
675+
/**
676+
* 根据trace列表删除对应的context数据
677+
*
678+
* @param traceIdList trace id列表
679+
*/
680+
default void deleteByTraceIdList(List<String> traceIdList) {
681+
throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTraceIdList");
682+
}
674683
}
675684

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/DefaultFlowTraceRepo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ public void deleteByIdList(List<String> traceIds) {
101101
flowTraceMapper.deleteByIdList(traceIds);
102102
}
103103

104+
@Override
105+
public List<String> getExpiredTrace(int expiredDays, int limit) {
106+
LocalDateTime now = LocalDateTime.now();
107+
LocalDateTime expired = now.minusDays(expiredDays);
108+
return flowTraceMapper.getExpiredTrace(expired, limit);
109+
}
110+
104111
private FlowTracePO serializer(FlowTrace flowTrace) {
105112
String contextPool = String.join(", ", flowTrace.getContextPool());
106113
return FlowTracePO.builder()

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/domain/flows/context/repo/flowtrace/FlowTraceRepo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,13 @@ public interface FlowTraceRepo {
100100
* @param traceIds traceId列表
101101
*/
102102
void deleteByIdList(List<String> traceIds);
103+
104+
/**
105+
* 查询超期并且已完成的trace id
106+
*
107+
* @param expiredDays 超期天数
108+
* @param limit 查询限制
109+
* @return trace id列表
110+
*/
111+
List<String> getExpiredTrace(int expiredDays, int limit);
103112
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowContextMapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,4 +523,11 @@ List<FlowContextPO> findFinishedContextsPagedByTraceId(String traceId, String en
523523
* @return trans id
524524
*/
525525
String getTransIdByTrace(String traceId);
526+
527+
/**
528+
* 根据trace列表删除对应的context
529+
*
530+
* @param traceIds trace id列表
531+
*/
532+
void deleteByTraceIdList(List<String> traceIds);
526533
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/persist/mapper/FlowTraceMapper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,13 @@ public interface FlowTraceMapper {
113113
* @return trace列表
114114
*/
115115
List<String> findRunningTrace(List<String> applications);
116+
117+
/**
118+
* 查询超期并且已完成的trace id
119+
*
120+
* @param expiredDays 超期天数
121+
* @param limit 查询数量
122+
* @return trace id列表
123+
*/
124+
List<String> getExpiredTrace(LocalDateTime expiredDays, int limit);
116125
}

app-builder/waterflow/java/waterflow-service/src/main/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ jane:
1616
scheduleRate: 10000
1717
maxCount: 0
1818
isNeedFlowCallbackAdapt: false
19+
contextExpiredDays: 7
1920

2021
distributed-lock-provider: databaseDistributedLockProvider

app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowContextMapper.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,4 +765,14 @@
765765
trace_id = #{traceId}
766766
LIMIT 1
767767
</select>
768+
769+
<delete id="deleteByTraceIdList">
770+
DELETE FROM
771+
<include refid="table"/>
772+
WHERE
773+
trace_id IN
774+
<foreach item="id" collection="traceIds" open="(" separator="," close=")">
775+
#{id}
776+
</foreach>
777+
</delete>
768778
</mapper>

app-builder/waterflow/java/waterflow-service/src/main/resources/mapper/FlowTraceMapper.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,15 @@
163163
#{traceId}
164164
</foreach>
165165
</delete>
166+
167+
<select id="getExpiredTrace" resultType="string">
168+
SELECT
169+
trace_id
170+
FROM
171+
<include refid="table"/>
172+
WHERE
173+
status != 'RUNNING'
174+
AND end_time &lt; #{expiredDays}
175+
LIMIT #{limit};
176+
</select>
166177
</mapper>

0 commit comments

Comments
 (0)