Skip to content

Commit af8b4b7

Browse files
CodeCasterXclaude
andcommitted
feat: 添加诊断日志以定位 Fork.join() NPE 根本原因
添加详细的日志记录以追踪: 1. Fork.join() reducer 接收的每个数据值 2. Pattern 执行的输入和输出 3. null 值出现的完整堆栈跟踪 4. 线程信息以分析并发行为 这些日志将帮助我们理解为什么 input.getData() 会返回 null, 从而找到 NPE 的真正根源。 相关 issue: #247 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent b990cbf commit af8b4b7

File tree

1 file changed

+37
-1
lines changed
  • framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities

1 file changed

+37
-1
lines changed

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package modelengine.fel.engine.activities;
88

9+
import lombok.extern.slf4j.Slf4j;
910
import modelengine.fel.core.chat.ChatMessage;
1011
import modelengine.fel.core.chat.Prompt;
1112
import modelengine.fel.core.chat.support.ChatMessages;
@@ -65,6 +66,7 @@
6566
* @author 易文渊
6667
* @since 2024-04-28
6768
*/
69+
@Slf4j
6870
public class AiStart<O, D, I, RF extends Flow<D>, F extends AiFlow<D, RF>> extends AiActivity<D, RF, F> {
6971
private final Start<O, D, I, RF> start;
7072

@@ -593,7 +595,20 @@ public final AiState<Tip, D, Tip, RF, F> runnableParallel(Pattern<O, Tip>... pat
593595
}
594596

595597
AiState<Tip, D, Tip, RF, F> state = aiFork.join(Tip::new, (acc, data) -> {
598+
// 诊断日志:记录每次reducer调用的详细信息
599+
log.debug("[Fork.join reducer] Thread={}, acc={}, data={}, data_is_null={}",
600+
Thread.currentThread().getName(),
601+
acc,
602+
data,
603+
data == null);
604+
596605
// Tip.merge() 内部会处理 data 为 null 的情况
606+
if (data == null) {
607+
log.warn("[Fork.join reducer] DIAGNOSTIC: Received null data in reducer! acc={}, thread={}",
608+
acc, Thread.currentThread().getName());
609+
// 打印堆栈跟踪以了解调用路径
610+
log.warn("[Fork.join reducer] Stack trace:", new RuntimeException("null data diagnostic"));
611+
}
597612
acc.merge(data);
598613
return acc;
599614
});
@@ -603,6 +618,27 @@ public final AiState<Tip, D, Tip, RF, F> runnableParallel(Pattern<O, Tip>... pat
603618

604619
private Processor<O, Tip> getPatternProcessor(Pattern<O, Tip> pattern, AiState<O, D, O, RF, F> node) {
605620
return node.publisher()
606-
.map(input -> AiFlowSession.applyPattern(pattern, input.getData(), input.getSession()), null);
621+
.map(input -> {
622+
O inputData = input.getData();
623+
log.debug("[getPatternProcessor] Executing pattern={}, inputData={}, thread={}",
624+
pattern.getClass().getSimpleName(),
625+
inputData,
626+
Thread.currentThread().getName());
627+
628+
Tip result = AiFlowSession.applyPattern(pattern, inputData, input.getSession());
629+
630+
log.debug("[getPatternProcessor] Pattern result={}, result_is_null={}, thread={}",
631+
result,
632+
result == null,
633+
Thread.currentThread().getName());
634+
635+
if (result == null) {
636+
log.error("[getPatternProcessor] CRITICAL: Pattern returned null! pattern={}, inputData={}",
637+
pattern.getClass().getSimpleName(),
638+
inputData);
639+
}
640+
641+
return result;
642+
}, null);
607643
}
608644
}

0 commit comments

Comments
 (0)