Skip to content

Commit cdf4393

Browse files
committed
fix: 改进 null 数据处理逻辑,始终调用 Tip.merge() 以避免逻辑不一致
- 移除 AiStart 中的提前返回,始终调用 acc.merge(data) - 依赖 Tip.merge() 内部的 null 检查来处理 null 情况 - 保留警告日志以便追踪并发竞态条件 - 这样可以保证处理逻辑的一致性,避免数据丢失 相关 issue: #247
1 parent 6b88002 commit cdf4393

File tree

1 file changed

+4
-5
lines changed
  • framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities

1 file changed

+4
-5
lines changed

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -596,14 +596,13 @@ public final AiState<Tip, D, Tip, RF, F> runnableParallel(Pattern<O, Tip>... pat
596596

597597
AiState<Tip, D, Tip, RF, F> state = aiFork.join(Tip::new, (acc, data) -> {
598598
// 防御性处理:Fork 的某些分支可能返回 null(特别是并发场景下的竞态条件)
599-
// 参考:https://github.com/ModelEngine-Group/fit-framework/issues/247
599+
// Tip.merge() 会处理 null 情况,参考:https://github.com/ModelEngine-Group/fit-framework/issues/247
600600
if (data == null) {
601-
log.warn("Fork.join reducer received null data, this may indicate a race condition. " +
602-
"Keeping accumulator unchanged. acc={}, thread={}",
601+
log.warn("Fork.join reducer received null data in iteration, this may indicate a race condition. " +
602+
"Tip.merge() will handle this defensively. acc={}, thread={}",
603603
acc, Thread.currentThread().getName());
604-
return acc; // 保持累加器不变,避免 NPE
605604
}
606-
return acc.merge(data);
605+
return acc.merge(data); // Tip.merge() 内部会处理 null,返回 this
607606
});
608607
((Processor<?, ?>) state.publisher()).displayAs("runnableParallel");
609608
return state;

0 commit comments

Comments
 (0)