Skip to content

Commit 496210e

Browse files
authored
refactor(stream-node): 优化流式响应处理和编码配置 (#422)
- 将 application.yml 中添加 servlet 编码配置,强制使用 UTF-8 编码 - 精简 ExpanderNode 中基于 chatClient 的响应流逻辑,去除多余转换 - 修改 GraphProcess 中流处理方法,使用泛型封装业务消息 ChatMessage - 替换原 JSON 序列化为 Jackson 注解支持的 ChatMessage 记录类型 - 更新 GraphStreamController,调整 expand 接口返回值类型和流调用逻辑 - 统一流处理使用 Flux<ServerSentEvent<ChatMessage>>,改进类型安全与扩展性
1 parent d705f32 commit 496210e

File tree

4 files changed

+27
-32
lines changed

4 files changed

+27
-32
lines changed

spring-ai-alibaba-graph-example/stream-node/src/main/java/com/alibaba/cloud/ai/graph/controller/GraphProcess/GraphProcess.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,13 @@
1919
import com.alibaba.cloud.ai.graph.CompiledGraph;
2020
import com.alibaba.cloud.ai.graph.NodeOutput;
2121
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
22-
import com.alibaba.fastjson.JSON;
23-
import com.alibaba.fastjson.JSONObject;
22+
import com.fasterxml.jackson.annotation.JsonProperty;
2423
import org.slf4j.Logger;
2524
import org.slf4j.LoggerFactory;
2625
import org.springframework.http.codec.ServerSentEvent;
2726
import reactor.core.publisher.Flux;
2827
import reactor.core.publisher.Sinks;
2928

30-
import java.util.Map;
31-
3229
/**
3330
* @author yingzi
3431
* @since 2025/6/13
@@ -44,21 +41,21 @@ public GraphProcess(CompiledGraph compiledGraph) {
4441
this.compiledGraph = compiledGraph;
4542
}
4643

47-
public void processStream(Flux<NodeOutput> nodeOutputFlux, Sinks.Many<ServerSentEvent<String>> sink) {
44+
public void processStream(Flux<NodeOutput> nodeOutputFlux, Sinks.Many<ServerSentEvent<ChatMessage>> sink) {
4845
nodeOutputFlux
4946
.doOnNext(output -> {
5047
logger.info("output = {}", output);
5148
String nodeName = output.node();
52-
String content;
53-
if (output instanceof StreamingOutput streamingOutput) {
54-
content = JSON.toJSONString(Map.of(nodeName, streamingOutput.chunk()));
49+
ChatMessage chatMessage = null;
50+
if (output instanceof StreamingOutput<?> streamingOutput) {
51+
String chunk = streamingOutput.chunk();
52+
if (chunk != null && !chunk.isEmpty()) {
53+
chatMessage = new ChatMessage(nodeName, chunk);
54+
}
5555
} else {
56-
JSONObject nodeOutput = new JSONObject();
57-
nodeOutput.put("data", output.state().data());
58-
nodeOutput.put("node", nodeName);
59-
content = JSON.toJSONString(nodeOutput);
56+
chatMessage = new ChatMessage(nodeName, output.state().data());
6057
}
61-
sink.tryEmitNext(ServerSentEvent.builder(content).build());
58+
sink.tryEmitNext(ServerSentEvent.builder(chatMessage).build());
6259
})
6360
.doOnComplete(() -> {
6461
// 正常完成
@@ -70,4 +67,7 @@ public void processStream(Flux<NodeOutput> nodeOutputFlux, Sinks.Many<ServerSent
7067
})
7168
.subscribe();
7269
}
70+
71+
public record ChatMessage(@JsonProperty("node_name") String nodeName, @JsonProperty("type") Object data) {
72+
}
7373
}

spring-ai-alibaba-graph-example/stream-node/src/main/java/com/alibaba/cloud/ai/graph/controller/GraphStreamController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public GraphStreamController(@Qualifier("streamGraph")StateGraph stateGraph) thr
5555
}
5656

5757
@GetMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
58-
public Flux<ServerSentEvent<String>> expand(@RequestParam(value = "query", defaultValue = "你好,很高兴认识你,能简单介绍一下自己吗?", required = false) String query,
58+
public Flux<ServerSentEvent<GraphProcess.ChatMessage>> expand(@RequestParam(value = "query", defaultValue = "你好,很高兴认识你,能简单介绍一下自己吗?", required = false) String query,
5959
@RequestParam(value = "expander_number", defaultValue = "3", required = false) Integer expanderNumber,
6060
@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId) throws GraphRunnerException {
6161
RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();
@@ -64,8 +64,8 @@ public Flux<ServerSentEvent<String>> expand(@RequestParam(value = "query", defau
6464
objectMap.put("expander_number", expanderNumber);
6565

6666
GraphProcess graphProcess = new GraphProcess(this.compiledGraph);
67-
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
68-
Flux<NodeOutput> nodeOutputFlux = compiledGraph.fluxStream(objectMap, runnableConfig);
67+
Sinks.Many<ServerSentEvent<GraphProcess.ChatMessage>> sink = Sinks.many().unicast().onBackpressureBuffer();
68+
Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(objectMap, runnableConfig);
6969
graphProcess.processStream(nodeOutputFlux, sink);
7070

7171
return sink.asFlux()

spring-ai-alibaba-graph-example/stream-node/src/main/java/com/alibaba/cloud/ai/graph/node/ExpanderNode.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@
1616

1717
package com.alibaba.cloud.ai.graph.node;
1818

19-
import com.alibaba.cloud.ai.graph.GraphResponse;
2019
import com.alibaba.cloud.ai.graph.OverAllState;
2120
import com.alibaba.cloud.ai.graph.action.NodeAction;
22-
import com.alibaba.cloud.ai.graph.streaming.FluxConverter;
23-
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
2421
import org.springframework.ai.chat.client.ChatClient;
2522
import org.springframework.ai.chat.model.ChatResponse;
2623
import org.springframework.ai.chat.prompt.PromptTemplate;
2724
import reactor.core.publisher.Flux;
2825

29-
import java.util.Arrays;
30-
import java.util.List;
3126
import java.util.Map;
3227

3328
/**
@@ -52,16 +47,11 @@ public Map<String, Object> apply(OverAllState state) throws Exception {
5247
String query = state.value("query", "");
5348
Integer expanderNumber = state.value("expander_number", this.NUMBER);
5449

55-
Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("number", expanderNumber).param("query", query)).stream().chatResponse();
56-
57-
Flux<GraphResponse<StreamingOutput>> generator = FluxConverter.builder()
58-
.startingNode("expander_llm_stream")
59-
.startingState(state)
60-
.mapResult(response -> {
61-
String text = response.getResult().getOutput().getText();
62-
List<String> queryVariants = Arrays.asList(text.split("\n"));
63-
return Map.of("expander_content", queryVariants);
64-
}).build(chatResponseFlux);
65-
return Map.of("expander_content", generator);
50+
Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt()
51+
.user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate())
52+
.param("number", expanderNumber)
53+
.param("query", query))
54+
.stream().chatResponse();
55+
return Map.of("expander_content", chatResponseFlux);
6656
}
6757
}

spring-ai-alibaba-graph-example/stream-node/src/main/resources/application.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
server:
22
port: 8080
3+
servlet:
4+
encoding:
5+
charset: UTF-8
6+
enabled: true
7+
force: true
38
spring:
49
application:
510
name: stream-node

0 commit comments

Comments
 (0)