Skip to content

Commit 5dcd6dc

Browse files
authored
[fel] add key scenario cases (#165)
* [waterflow] support configuring node concurrency * [fel] subflows inherit ordering flag from parent flow * [fel] add key examples for backpressure and concurrency * [waterflow] support branch syntax for 'when' and 'then' clauses * [fel] demonstrate data anonymization with example * [fel] adapter flow case * [fel] clean code * [fel] clean code --------- Co-authored-by: songyongtan <[email protected]>
1 parent 0f2602a commit 5dcd6dc

File tree

17 files changed

+579
-4
lines changed

17 files changed

+579
-4
lines changed

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiConditions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,23 @@ public <O> AiMatchHappen<O, D, I, RF, F> match(Operators.Whether<I> whether,
5757
node -> processor.process(new AiState<>(node, this.flow())).state), this.flow());
5858
}
5959

60+
/**
61+
* 指定条件和处理器创建条件分支。
62+
*
63+
* @param whether 表示匹配条件的 {@link Operators.Whether}{@code <}{@link I}{@code >}。
64+
* @param processor 表示分支处理器的 {@link Operators.Then}{@code <}{@link O}{@code , }{@link D}{@code ,
65+
* }{@link I}{@code , }{@link RF}{@code , }{@link F}{@code >}。
66+
* @param <O> 表示第一个条件分支指定的返回类型。
67+
* @return 表示条件分支的 {@link AiMatchHappen}{@code <}{@link O}{@code , }{@link D}{@code ,
68+
* }{@link I}{@code , }{@link RF}{@code , }{@link F}{@code >}。
69+
* @throws IllegalArgumentException 当 {@code processor} 为 {@code null} 时。
70+
*/
71+
public <O> AiWhenHappen<O, D, I, RF, F> when(Operators.Whether<I> whether,
72+
Operators.Then<I, O> processor) {
73+
Validation.notNull(processor, "Ai branch processor cannot be null.");
74+
return new AiWhenHappen<>(this.conditions.when(whether, processor), this.flow());
75+
}
76+
6077
/**
6178
* 指定条件和对应的处理器创建条件跳转分支。
6279
*

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ public AiState<O, D, I, RF, F> id(String id) {
8282
return this;
8383
}
8484

85+
/**
86+
* Sets the maximum concurrency level for this state's processing pipeline.
87+
*
88+
* @param concurrency The maximum number of concurrent operations allowed (must be positive).
89+
* @return The current state instance for method chaining.
90+
* @throws IllegalArgumentException If the concurrency value is zero or negative.
91+
*/
92+
public AiState<O, D, I, RF, F> concurrency(int concurrency) {
93+
this.state.concurrency(concurrency);
94+
return this;
95+
}
96+
8597
/**
8698
* 获取当前节点的数据订阅者。
8799
*
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fel.engine.activities;
8+
9+
import modelengine.fel.engine.flows.AiFlow;
10+
import modelengine.fit.waterflow.domain.flow.Flow;
11+
import modelengine.fit.waterflow.domain.states.WhenHappen;
12+
import modelengine.fit.waterflow.domain.stream.operators.Operators;
13+
import modelengine.fitframework.inspection.Validation;
14+
15+
/**
16+
* Represents a conditional branch that matches when conditions in an AI processing flow.
17+
* This class handles the branching logic when specific conditions are met in the workflow.
18+
*
19+
* @param <O> The output data type of the current node.
20+
* @param <D> The initial data type of the containing flow.
21+
* @param <I> The input parameter data type.
22+
* @param <RF> The internal flow type, extending {@link Flow}{@code <D>}.
23+
* @param <F> The AI flow type, extending {@link AiFlow}{@code <D, RF>}.
24+
*
25+
* @author 宋永坦
26+
* @since 2025-06-12
27+
*/
28+
public class AiWhenHappen<O, D, I, RF extends Flow<D>, F extends AiFlow<D, RF>> {
29+
private final WhenHappen<O, D, I, RF> matchHappen;
30+
31+
private final F flow;
32+
33+
/**
34+
* Creates a new AI flow matching generator that handles conditional branching.
35+
* This constructor initializes a stateful processor for when/then style pattern matching
36+
* within AI workflows.
37+
*
38+
* @param matchHappen The core matching generator that evaluates conditions.
39+
* @param flow The parent AI flow.
40+
* @throws NullPointerException If either parameter is null.
41+
*/
42+
public AiWhenHappen(WhenHappen<O, D, I, RF> matchHappen, F flow) {
43+
this.matchHappen = Validation.notNull(matchHappen, "WhenHappen cannot be null.");
44+
this.flow = Validation.notNull(flow, "Flow cannot be null.");
45+
}
46+
47+
/**
48+
* Creates a conditional branch with the specified predicate and handler.
49+
*
50+
* @param whether The condition predicate that determines branch activation.
51+
* @param processor The transformation handler to execute when condition is met.
52+
* @return A new {@link AiWhenHappen} instance representing the conditional branch.
53+
* @throws IllegalArgumentException if processor is null.
54+
*/
55+
public AiWhenHappen<O, D, I, RF, F> when(Operators.Whether<I> whether, Operators.Then<I, O> processor) {
56+
Validation.notNull(processor, "Ai branch processor cannot be null.");
57+
return new AiWhenHappen<>(this.matchHappen.when(whether, processor), this.flow);
58+
}
59+
60+
/**
61+
* Provides a default processing logic and terminates the conditional node.
62+
*
63+
* @param processor The handler to process unmatched inputs.
64+
* @return An {@link AiState} representing the terminal node of the conditional flow.
65+
* @throws IllegalArgumentException if processor is null.
66+
*/
67+
public AiState<O, D, O, RF, F> others(Operators.Then<I, O> processor) {
68+
Validation.notNull(processor, "Ai branch processor cannot be null.");
69+
return new AiState<>(this.matchHappen.others(processor), this.flow);
70+
}
71+
}

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/Conversation.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class Conversation<D, R> {
4747
*/
4848
public Conversation(AiProcessFlow<D, R> flow, FlowSession session) {
4949
this.flow = Validation.notNull(flow, "Flow cannot be null.");
50+
if (session != null) {
51+
session.begin();
52+
}
5053
this.session =
5154
(session == null) ? this.setConverseListener(new FlowSession(true)) : this.setSubConverseListener(session);
5255
this.session.begin();

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public FlowEmitter<O> invoke(I data) {
8989
public Pattern<I, O> sync() {
9090
return new SimplePattern<>(data -> {
9191
FlowSession require = AiFlowSession.require();
92-
FlowSession session = new FlowSession(true);
92+
FlowSession session = new FlowSession(require.preserved());
9393
Window window = session.begin();
9494
session.copySessionState(require);
9595
ConverseLatch<O> conversation = this.getFlow().converse(session).offer(data);
@@ -116,7 +116,7 @@ public Flow<I> origin() {
116116
*/
117117
protected static <O> FlowSession buildFlowSession(FlowEmitter<O> emitter) {
118118
FlowSession mainSession = AiFlowSession.require();
119-
FlowSession flowSession = FlowSession.newRootSession(mainSession, true);
119+
FlowSession flowSession = FlowSession.newRootSession(mainSession, mainSession.preserved());
120120
flowSession.setInnerState(PARENT_SESSION_ID_KEY, mainSession.getId());
121121
ResultAction<O> resultAction = emitter::emit;
122122
flowSession.setInnerState(RESULT_ACTION_KEY, resultAction);
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fel.engine;
8+
9+
import modelengine.fel.core.chat.ChatMessage;
10+
import modelengine.fel.core.chat.ChatOption;
11+
import modelengine.fel.core.chat.support.AiMessage;
12+
import modelengine.fel.core.util.Tip;
13+
import modelengine.fel.engine.flows.AiFlows;
14+
import modelengine.fel.engine.flows.AiProcessFlow;
15+
import modelengine.fel.engine.flows.ConverseLatch;
16+
import modelengine.fel.engine.operators.models.ChatFlowModel;
17+
import modelengine.fel.engine.operators.prompts.Prompts;
18+
import modelengine.fit.waterflow.domain.context.FlowSession;
19+
import modelengine.fit.waterflow.domain.context.StateContext;
20+
import modelengine.fit.waterflow.domain.utils.SleepUtil;
21+
import modelengine.fitframework.flowable.Choir;
22+
23+
import org.junit.jupiter.api.Assertions;
24+
import org.junit.jupiter.api.DisplayName;
25+
import org.junit.jupiter.api.Nested;
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
/**
31+
* Test cases demonstrating different flow control scenarios in AI processing pipelines.
32+
* Contains nested test classes for specific flow control mechanisms.
33+
*
34+
* @author 宋永坦
35+
* @since 2025-06-11
36+
*/
37+
public class AiFlowCaseTest {
38+
private static final int SPEED = 1;
39+
40+
@Nested
41+
class DesensitizeCase {
42+
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
43+
emitter.emit(new AiMessage("<think>"));
44+
int takeTime = 10 * SPEED;
45+
SleepUtil.sleep(takeTime);
46+
for (int i = 0; i < 48; i++) {
47+
emitter.emit(new AiMessage(String.valueOf(i)));
48+
SleepUtil.sleep(takeTime);
49+
}
50+
emitter.emit(new AiMessage("</think>"));
51+
SleepUtil.sleep(takeTime);
52+
for (int i = 100; i < 150; i++) {
53+
emitter.emit(new AiMessage(String.valueOf(i)));
54+
SleepUtil.sleep(takeTime);
55+
}
56+
emitter.complete();
57+
}), ChatOption.custom().model("modelName").stream(true).build());
58+
59+
private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create()
60+
.prompt(Prompts.human("{{0}}"))
61+
.generate(model)
62+
.map(this::classic)
63+
.conditions()
64+
.when(chunk -> chunk.isThinkContent, input -> input)
65+
.others(input -> {
66+
this.log(input);
67+
return input;
68+
})
69+
.map(this::mockDesensitize1)
70+
.map(this::mockDesensitize2)
71+
.close();
72+
73+
@Test
74+
@DisplayName("DesensitizeCase")
75+
void run() {
76+
AtomicInteger counter = new AtomicInteger(0);
77+
long startTime = System.currentTimeMillis();
78+
System.out.printf("time:%s, start.\n", startTime);
79+
ConverseLatch<String> result = flow.converse(new FlowSession(true)).doOnConsume(answer -> {
80+
System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer);
81+
counter.incrementAndGet();
82+
}).offer(Tip.fromArray("hi"));
83+
result.await();
84+
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
85+
Assertions.assertEquals(100, counter.get());
86+
}
87+
88+
private Chunk classic(ChatMessage message, StateContext ctx) {
89+
if (message.text().trim().equals("<think>")) {
90+
ctx.setState("isThinking", true);
91+
return new Chunk(true, message.text());
92+
}
93+
if (message.text().trim().equals("</think>")) {
94+
ctx.setState("isThinking", false);
95+
return new Chunk(true, message.text());
96+
}
97+
if (Boolean.TRUE.equals(ctx.getState("isThinking"))) {
98+
return new Chunk(true, message.text());
99+
}
100+
return new Chunk(false, message.text());
101+
}
102+
103+
private String mockDesensitize1(Chunk chunk) {
104+
SleepUtil.sleep(10 * SPEED);
105+
return chunk.content.replace("3", "*");
106+
}
107+
108+
private String mockDesensitize2(String chunk) {
109+
SleepUtil.sleep(10 * SPEED);
110+
return chunk.replace("4", "*");
111+
}
112+
113+
private void log(Chunk chunk) {
114+
System.out.println("log content:" + chunk.content);
115+
}
116+
117+
private static class Chunk {
118+
private final boolean isThinkContent;
119+
private final String content;
120+
121+
private Chunk(boolean isThinkContent, String content) {this.isThinkContent = isThinkContent;
122+
this.content = content;
123+
}
124+
}
125+
}
126+
127+
/**
128+
* Simulates a backpressure scenario where:
129+
* <ol>
130+
* <li>The LLM generates data faster than the TTS can process it.</li>
131+
* <li>TTS processing is constrained to a single thread.</li>
132+
* <li>TTS processing speed is artificially slowed.</li>
133+
* </ol>
134+
*/
135+
@Nested
136+
class BackPressureCase {
137+
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
138+
for (int i = 0; i < 100; i++) {
139+
emitter.emit(new AiMessage(String.valueOf(i)));
140+
SleepUtil.sleep(5 * SPEED);
141+
}
142+
emitter.complete();
143+
System.out.printf("time:%s, generate completed.\n", System.currentTimeMillis());
144+
}), ChatOption.custom().model("modelName").stream(true).build());
145+
146+
private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create()
147+
.prompt(Prompts.human("{{0}}"))
148+
.generate(model)
149+
.map(this::mockDesensitize).concurrency(1) // Limit processing to 1 concurrent thread
150+
.map(this::mockTTS).concurrency(1) // Limit processing to 1 concurrent thread
151+
.close();
152+
153+
@Test
154+
@DisplayName("BackPressureCase")
155+
void run() {
156+
AtomicInteger counter = new AtomicInteger(0);
157+
long startTime = System.currentTimeMillis();
158+
System.out.printf("time:%s, start.\n", startTime);
159+
ConverseLatch<String> result = flow.converse(new FlowSession(false)).doOnConsume(answer -> {
160+
System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer);
161+
counter.incrementAndGet();
162+
}).offer(Tip.fromArray("hi"));
163+
result.await();
164+
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
165+
Assertions.assertEquals(100, counter.get());
166+
}
167+
168+
private String mockDesensitize(ChatMessage chunk) {
169+
// Simulate time-consuming operation with a delay.
170+
SleepUtil.sleep(10 * SPEED);
171+
return chunk.text().replace("3", "*");
172+
}
173+
174+
private String mockTTS(String chunk) {
175+
// Simulate time-consuming operation with a delay.
176+
SleepUtil.sleep(10 * SPEED);
177+
return chunk;
178+
}
179+
}
180+
181+
/**
182+
* Demonstrates concurrent processing with balanced throughput where:
183+
* <ol>
184+
* <li>LLM generates data at moderate pace.</li>
185+
* <li>Downstream processing runs with 3 concurrent threads.</li>
186+
* <li>Processing speed is slightly slower than generation (3 : 1).</li>
187+
* </ol>
188+
*/
189+
@Nested
190+
class ConcurrencyCase {
191+
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
192+
for (int i = 0; i < 100; i++) {
193+
emitter.emit(new AiMessage(String.valueOf(i)));
194+
SleepUtil.sleep(10 * SPEED);
195+
}
196+
emitter.complete();
197+
}), ChatOption.custom().model("modelName").stream(true).build());
198+
199+
private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create()
200+
.prompt(Prompts.human("{{0}}"))
201+
.generate(model)
202+
.map(this::mockDesensitize).concurrency(3) // Set processing to 3 concurrent thread
203+
.close();
204+
205+
@Test
206+
@DisplayName("ConcurrencyCase")
207+
void run() {
208+
AtomicInteger counter = new AtomicInteger(0);
209+
long startTime = System.currentTimeMillis();
210+
System.out.printf("time:%s, start.\n", startTime);
211+
ConverseLatch<String> result = flow.converse(new FlowSession(false)).doOnConsume(answer -> {
212+
System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer);
213+
counter.incrementAndGet();
214+
}).offer(Tip.fromArray("hi"));
215+
result.await();
216+
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
217+
Assertions.assertEquals(100, counter.get());
218+
}
219+
220+
private String mockDesensitize(ChatMessage chunk) {
221+
// Simulate slower processing at 1/3 speed of LLM generation.
222+
SleepUtil.sleep(30 * SPEED);
223+
return chunk.text().replace("3", "*");
224+
}
225+
}
226+
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,18 @@ default <I> void send(ProcessType type, Subscriber<I, ?> subscriber, List<FlowCo
5454
* @param <I> 流程实例执行时的入参数据类型,用于泛型推倒
5555
*/
5656
<I> void sendCallback(List<FlowContext<I>> contexts);
57+
58+
/**
59+
* Directly processes a list of flow contexts through the specified subscriber.
60+
* This method serves as a default implementation for immediate processing without
61+
* any intermediate transformations or routing.
62+
*
63+
* @param <I> The type of input data contained in the flow contexts.
64+
* @param type The type of processing to be performed.
65+
* @param subscriber The subscriber that will handle the processing.
66+
* @param context List of flow contexts to be processed.
67+
*/
68+
default <I> void directProcess(ProcessType type, Subscriber<I, ?> subscriber, List<FlowContext<I>> context) {
69+
subscriber.process(type, context);
70+
}
5771
}

0 commit comments

Comments
 (0)