Skip to content

Commit 8730c10

Browse files
committed
chore: 添加完整的诊断输出以追踪 NPE 根本原因
在关键调用链的每个层级添加 System.err 诊断输出: 1. Fork.java:96 - processor 调用前后 - 记录 input.getData(), acc, branchCount 等关键状态 2. AiStart.java:605 - merge 调用前后 - 记录 acc, data 参数和 merge 结果 3. Tip.merge() - merge 内部 - 记录 this, other 参数和 null 处理逻辑 这将帮助我们理解: - 哪个分支的数据为 null - null 是在哪个环节产生的 - Fork 的聚合逻辑是如何执行的 - 完整的数据流动路径 相关 issue: #247
1 parent cdf4393 commit 8730c10

File tree

3 files changed

+53
-10
lines changed
  • framework
    • fel/java
      • fel-core/src/main/java/modelengine/fel/core/util
      • fel-flow/src/main/java/modelengine/fel/engine/activities
    • waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states

3 files changed

+53
-10
lines changed

framework/fel/java/fel-core/src/main/java/modelengine/fel/core/util/Tip.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,31 @@ public Tip addAll(Map<String, Content> args) {
118118
* @return 表示当前的 {@link Tip}。
119119
*/
120120
public Tip merge(Tip other) {
121+
// === DIAGNOSTIC #5: Tip.merge() 开始 ===
122+
System.err.println(String.format(
123+
"[DIAG-Tip.merge-START] thread=%s, this=%s, other=%s, other_is_null=%b",
124+
Thread.currentThread().getName(), this, other, (other == null)
125+
));
126+
121127
// 防御性处理:在并发场景下,Fork.join() 可能传入 null
122128
// 参考:https://github.com/ModelEngine-Group/fit-framework/issues/247
123129
if (other == null) {
130+
System.err.println(String.format(
131+
"[DIAG-Tip.merge-NULL] thread=%s, other is null, returning this=%s",
132+
Thread.currentThread().getName(), this
133+
));
124134
return this;
125135
}
126-
return this.addAll(other.values);
136+
137+
Tip result = this.addAll(other.values);
138+
139+
// === DIAGNOSTIC #6: Tip.merge() 结束 ===
140+
System.err.println(String.format(
141+
"[DIAG-Tip.merge-END] thread=%s, result=%s",
142+
Thread.currentThread().getName(), result
143+
));
144+
145+
return result;
127146
}
128147

129148
/**

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -595,14 +595,21 @@ public final AiState<Tip, D, Tip, RF, F> runnableParallel(Pattern<O, Tip>... pat
595595
}
596596

597597
AiState<Tip, D, Tip, RF, F> state = aiFork.join(Tip::new, (acc, data) -> {
598-
// 防御性处理:Fork 的某些分支可能返回 null(特别是并发场景下的竞态条件)
599-
// Tip.merge() 会处理 null 情况,参考:https://github.com/ModelEngine-Group/fit-framework/issues/247
600-
if (data == null) {
601-
log.warn("Fork.join reducer received null data in iteration, this may indicate a race condition. " +
602-
"Tip.merge() will handle this defensively. acc={}, thread={}",
603-
acc, Thread.currentThread().getName());
604-
}
605-
return acc.merge(data); // Tip.merge() 内部会处理 null,返回 this
598+
// === DIAGNOSTIC #3: AiStart reducer 调用 merge 之前 ===
599+
System.err.println(String.format(
600+
"[DIAG-AiStart:605-BEFORE] thread=%s, acc=%s, data=%s, data_is_null=%b",
601+
Thread.currentThread().getName(), acc, data, (data == null)
602+
));
603+
604+
Tip mergeResult = acc.merge(data); // Tip.merge() 内部会处理 null
605+
606+
// === DIAGNOSTIC #4: AiStart reducer 调用 merge 之后 ===
607+
System.err.println(String.format(
608+
"[DIAG-AiStart:605-AFTER] thread=%s, mergeResult=%s",
609+
Thread.currentThread().getName(), mergeResult
610+
));
611+
612+
return mergeResult;
606613
});
607614
((Processor<?, ?>) state.publisher()).displayAs("runnableParallel");
608615
return state;

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,24 @@ public synchronized R process(FlowContext<O> input) {
9393
acc = Tuple.from((R) "", 0);
9494
}
9595
}
96-
acc = Tuple.from(processor.process(acc.first(), input.getData()), acc.second() + 1);
96+
97+
// === DIAGNOSTIC #1: Fork.join wrapper 调用 processor 之前 ===
98+
Object inputData = input.getData();
99+
System.err.println(String.format(
100+
"[DIAG-Fork:96-BEFORE] key=%s, thread=%s, branchCount=%d/%d, acc.first=%s, input.getData=%s, input.getData_is_null=%b",
101+
key, Thread.currentThread().getName(), acc.second(), forkNumber.get(),
102+
acc.first(), inputData, (inputData == null)
103+
));
104+
105+
R processedResult = processor.process(acc.first(), inputData);
106+
107+
// === DIAGNOSTIC #2: Fork.join wrapper 调用 processor 之后 ===
108+
System.err.println(String.format(
109+
"[DIAG-Fork:96-AFTER] key=%s, thread=%s, processedResult=%s, processedResult_is_null=%b",
110+
key, Thread.currentThread().getName(), processedResult, (processedResult == null)
111+
));
112+
113+
acc = Tuple.from(processedResult, acc.second() + 1);
97114
accs.put(key, acc);
98115

99116
if (acc.second() == forkNumber.get()) {

0 commit comments

Comments
 (0)