diff --git a/examples/fel-example/05-retrieval/README.md b/examples/fel-example/05-retrieval/README.md index 63e72a4ed..336923106 100644 --- a/examples/fel-example/05-retrieval/README.md +++ b/examples/fel-example/05-retrieval/README.md @@ -226,6 +226,8 @@ node0-->node1{{=}} ## 验证 +- 在IDEA中运行`DemoApplication` + - 在浏览器栏输入:`http://localhost:8080/ai/example/chat?query=请介绍一下黑神话悟空` ```json diff --git a/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java b/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java index 2444ddfd7..099c3f814 100644 --- a/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java +++ b/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java @@ -41,6 +41,7 @@ import modelengine.fitframework.annotation.Component; import modelengine.fitframework.annotation.Fit; import modelengine.fitframework.annotation.Value; +import modelengine.fitframework.log.Logger; import modelengine.fitframework.serialization.ObjectSerializer; import modelengine.fitframework.util.FileUtils; @@ -57,6 +58,7 @@ @Component @RequestMapping("/ai/example") public class RetrievalExampleController { + private static final Logger log = Logger.get(RetrievalExampleController.class); private static final String REWRITE_PROMPT = "作为一个向量检索助手,你的任务是结合历史记录,为”原问题“生成”检索词“," + "生成的问题要求指向对象清晰明确,并与“原问题语言相同。\n\n" + "历史记录:\n---\n" + DEFAULT_HISTORY_KEY + "---\n原问题:{{query}}\n检索词:"; @@ -85,7 +87,7 @@ public RetrievalExampleController(ChatModel chatModel, EmbedModel embedModel, .others(node -> node.map(tip -> tip.freeze().get("query").text())) .retrieve(new DefaultVectorRetriever(vectorStore, SearchOption.custom().topK(1).build())) .synthesize(docs -> Content.from(docs.stream().map(Document::text).collect(Collectors.joining("\n\n")))) - .close(); + .close(__ -> log.info("Retrieve flow completed.")); AiProcessFlow> indexFlow = AiFlows.create() .load(new JsonFileSource(serializer, StringTemplate.create("{{question}}: {{answer}}"))) @@ -93,14 +95,19 @@ public RetrievalExampleController(ChatModel chatModel, EmbedModel embedModel, .close(); File file = FileUtils.file(this.getClass().getClassLoader().getResource("data.json")); notNull(file, "The data cannot be null."); - indexFlow.converse().offer(file); + indexFlow.converse() + .doOnError(e -> log.info("Index build error. [error={}]", e.getMessage(), e)) + .doOnFinally(() -> log.info("Index build successfully.")) + .offer(file); this.ragFlow = AiFlows.create() + .just(query -> log.info("RAG flow start. [query={}]", query)) .map(query -> Tip.from("query", query)) .runnableParallel(value("context", retrieveFlow), passThrough()) .prompt(Prompts.history(), Prompts.human(CHAT_PROMPT)) + .just(__ -> log.info("LLM start generation.")) .generate(chatFlowModel) - .close(); + .close(__ -> log.info("RAG flow completed.")); } /** diff --git a/framework/waterflow/java/waterflow-core/README.md b/framework/waterflow/java/waterflow-core/README.md index 17fb5c20f..ae310ad25 100644 --- a/framework/waterflow/java/waterflow-core/README.md +++ b/framework/waterflow/java/waterflow-core/README.md @@ -271,7 +271,57 @@ Flows.create() 向流中投递数据。这里需要注意,流的运行是异步的,offer返回的是这次运行的实例ID。 -## 使用限制 +## 核心机制 + +### FlowSessionCache 会话缓存管理 + +FlowSessionCache 是 waterflow 的核心资源管理机制,负责统一管理流程执行过程中产生的所有会话相关资源,确保同一批数据的正确汇聚和资源的自动释放。 + +#### 缓存结构 + +FlowSessionCache 按照 `flowId -> sessionId -> FlowSessionCache` 的层级结构组织,每个流程会话维护独立的缓存实例,内部管理以下资源: + +1. **Session 流转缓存(nextToSessions)** + - 记录每个节点向下游节点流转数据时使用的 session + - 以当前窗口的唯一标识(UUID)为索引 + - 确保同一批数据在节点间流转时使用相同的 session 进行汇聚 + +2. **Emitter 处理缓存(nextEmitterHandleSessions)** + - 专门用于处理 emitter 操作的 session 缓存 + - 为发射器操作提供独立的会话上下文 + +3. **FlatMap 窗口缓存(flatMapSourceWindows)** + - 记录 flatMap 节点产生的源窗口信息 + - 以窗口唯一标识为索引存储 `FlatMapSourceWindow` 实例 + - 用于将 flatMap 操作产生的多个输出数据与原始输入关联 + +4. **Match 窗口缓存(matchWindows)** + - 记录条件匹配节点(`conditions`)产生的窗口信息 + - 以 MatchWindow 的唯一标识为索引 + - 用于将条件分支产生的数据进行汇聚 + +5. **累加器顺序缓存(accOrders)** + - 记录每个节点的累加操作顺序编号 + - 以节点 ID 为索引,存储递增的序号 + +#### 资源管理机制 + +**自动创建与复用** +- 首次访问某个流程会话时,FlowSessionCache 自动创建并初始化 +- 相同窗口/会话标识的资源会被复用,避免重复创建 +- 通过 `FlowSessionRepo` 提供的静态方法访问各类缓存资源 + +**会话隔离** +- 不同流程(flowId)的缓存完全隔离 +- 同一流程的不同会话(sessionId)也拥有独立的缓存空间 +- 避免跨会话或跨流程的数据污染 + +**生命周期管理** +- 会话结束时调用 `FlowSessionRepo.release(flowId, session)` 自动释放所有关联资源 +- 当某个流程的所有会话都释放后,自动清理该流程的缓存映射 +- 无需在各个窗口或会话类中实现清理逻辑,避免内存泄漏 + +## 使用限制 1. 在编排流程时需要保证节点流转上没有死循环,否则处于死循环的数据将一致在这些节点上循环流转。 2. 数据流转的线程池最大是100个,每个节点最大同时处理16个批次的数据,每个批次的数据在每个节点上串行执行。超过限制的数据将排队等待执行。 diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java index 261ff5f50..1ea34270b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java @@ -6,12 +6,13 @@ package modelengine.fit.waterflow.domain.context; +import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo; + import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** @@ -21,8 +22,6 @@ * @since 1.0 */ public class MatchWindow extends Window { - private static final Map all = new ConcurrentHashMap<>(); - private final Set arms = new HashSet<>(); /** @@ -41,22 +40,26 @@ public MatchWindow(Window source, UUID id, Object data) { /** * 创建一个MatchWindow * + * @param flowId 流程ID * @param source 源窗口 * @param id 窗口ID * @param data 窗口数据 * @return 返回创建的MatchWindow对象 */ - public static synchronized MatchWindow from(Window source, UUID id, Object data) { - MatchWindow window = all.get(id.toString()); + public static synchronized MatchWindow from(String flowId, Window source, UUID id, Object data) { + // 从 FlowSessionRepo 获取缓存 + Map cache = FlowSessionRepo.getMatchWindowCache(flowId, source.getSession()); + + MatchWindow window = cache.get(id); if (window == null) { window = new MatchWindow(source, id, data); FlowSession session = new FlowSession(source.getSession()); session.setWindow(window); - all.put(id.toString(), window); + cache.put(id, window); } WindowToken token = window.createToken(); token.beginConsume(); - List arms = all.values().stream().filter(t -> t.from == source).collect(Collectors.toList()); + List arms = cache.values().stream().filter(t -> t.from == source).collect(Collectors.toList()); for (MatchWindow a : arms) { a.setArms(arms); } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java index be71e73f8..1c822ce56 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java @@ -8,6 +8,7 @@ import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow; import modelengine.fit.waterflow.domain.context.FlowSession; +import modelengine.fit.waterflow.domain.context.MatchWindow; import modelengine.fit.waterflow.domain.context.Window; import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fitframework.inspection.Validation; @@ -90,6 +91,19 @@ public static FlatMapSourceWindow getFlatMapSource(String flowId, Window window, .getFlatMapSourceWindow(window, repo); } + /** + * 获取 MatchWindow 缓存 Map,用于存储和检索 MatchWindow 实例 + * + * @param flowId The unique identifier of the flow. + * @param session The current session context. + * @return MatchWindow 缓存 Map + */ + public static Map getMatchWindowCache(String flowId, FlowSession session) { + Validation.notNull(flowId, "Flow id cannot be null."); + Validation.notNull(session, "Session cannot be null."); + return getFlowSessionCache(flowId, session).getMatchWindowCache(); + } + /** * Releases all resources associated with a specific flow session. * @@ -137,6 +151,12 @@ private static class FlowSessionCache { */ private final Map flatMapSourceWindows = new ConcurrentHashMap<>(); + /** + * 记录流程中条件匹配节点产生的窗口信息,用于将同一批数据汇聚。 + * 其中索引为 match window 的唯一标识。 + */ + private final Map matchWindows = new ConcurrentHashMap<>(); + private final Map accOrders = new ConcurrentHashMap<>(); private FlowSession getNextToSession(FlowSession session) { @@ -165,6 +185,10 @@ private FlatMapSourceWindow getFlatMapSourceWindow(Window window, FlowContextRep }); } + private Map getMatchWindowCache() { + return this.matchWindows; + } + private int getNextAccOrder(String nodeId) { return this.accOrders.compute(nodeId, (key, value) -> { if (value == null) { diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java index cec671902..c3bc6bdc8 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java @@ -50,8 +50,8 @@ public MatchHappen match(Operators.Whether whether, Operators.BranchProcessor processor) { UUID id = UUID.randomUUID(); State branchStart = new State<>(this.node.publisher() - .just(input -> input.setSession( - MatchWindow.from(input.getWindow(), id, input.getData()).getSession()), whether) + .just(input -> input.setSession(MatchWindow.from(this.node.processor.getStreamId(), + input.getWindow(), id, input.getData()).getSession()), whether) .displayAs(SpecialDisplayNode.BRANCH.name()), this.node.getFlow()); State branch = processor.process(branchStart); this.branches.add(branch);