Skip to content

Commit 55f0fa1

Browse files
zihenzzzhiyizi
authored andcommitted
feat: add langfuse (spring-ai-alibaba#442)
* feat: add langfuse * feat: add langfuse * feat: add langfuse * feat: add langfuse * feat: add langfuse * feat: add langfuse * feat: add langfuse * feat: add langfuse * feat: add langfuse * feat: add langfuse doc
1 parent cc65f16 commit 55f0fa1

File tree

16 files changed

+587
-8
lines changed

16 files changed

+587
-8
lines changed

data-agent-management/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,25 @@
128128
<artifactId>junit-jupiter</artifactId>
129129
<scope>test</scope>
130130
</dependency>
131+
<!-- OpenTelemetry -->
132+
<dependency>
133+
<groupId>io.opentelemetry</groupId>
134+
<artifactId>opentelemetry-api</artifactId>
135+
</dependency>
136+
<dependency>
137+
<groupId>io.opentelemetry</groupId>
138+
<artifactId>opentelemetry-sdk</artifactId>
139+
</dependency>
140+
<dependency>
141+
<groupId>io.opentelemetry</groupId>
142+
<artifactId>opentelemetry-exporter-otlp</artifactId>
143+
</dependency>
144+
<dependency>
145+
<groupId>io.opentelemetry</groupId>
146+
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
147+
</dependency>
148+
149+
131150
<dependency>
132151
<groupId>org.testcontainers</groupId>
133152
<artifactId>mysql</artifactId>

data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/config/DataAgentConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ public StateGraph nl2sqlGraph(NodeBeanUtil nodeBeanUtil, CodeExecutorProperties
174174
// Human Review keys
175175
keyStrategyHashMap.put(HUMAN_REVIEW_ENABLED, KeyStrategy.REPLACE);
176176
keyStrategyHashMap.put(HUMAN_FEEDBACK_DATA, KeyStrategy.REPLACE);
177+
// Langfuse 追踪:threadId 透传
178+
keyStrategyHashMap.put(TRACE_THREAD_ID, KeyStrategy.REPLACE);
177179
// Final result
178180
keyStrategyHashMap.put(RESULT, KeyStrategy.REPLACE);
179181
return keyStrategyHashMap;
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.alibaba.cloud.ai.dataagent.config;
17+
18+
import com.alibaba.cloud.ai.dataagent.constant.Constant;
19+
import io.opentelemetry.api.OpenTelemetry;
20+
import io.opentelemetry.api.common.AttributeKey;
21+
import io.opentelemetry.api.common.Attributes;
22+
import io.opentelemetry.api.trace.Tracer;
23+
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
24+
import io.opentelemetry.sdk.OpenTelemetrySdk;
25+
import io.opentelemetry.sdk.resources.Resource;
26+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
27+
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
28+
import jakarta.annotation.PreDestroy;
29+
import lombok.Data;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.springframework.boot.context.properties.ConfigurationProperties;
32+
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
34+
35+
import java.nio.charset.StandardCharsets;
36+
import java.util.Base64;
37+
import java.util.concurrent.TimeUnit;
38+
39+
/**
40+
* @author zihenzzz
41+
* @date 2026/2/16 13:55
42+
*/
43+
44+
@Slf4j
45+
@Data
46+
@Configuration
47+
@ConfigurationProperties(prefix = Constant.PROJECT_PROPERTIES_PREFIX + ".langfuse")
48+
public class OpenTelemetryConfig {
49+
50+
private static final String SERVICE_NAME = "data-agent";
51+
52+
private boolean enabled = true;
53+
54+
private String host;
55+
56+
private String publicKey;
57+
58+
private String secretKey;
59+
60+
private SdkTracerProvider tracerProvider;
61+
62+
@Bean
63+
public OpenTelemetry openTelemetry() {
64+
if (!enabled) {
65+
return OpenTelemetry.noop();
66+
}
67+
68+
String auth = publicKey + ":" + secretKey;
69+
String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
70+
71+
OtlpHttpSpanExporter spanExporter = OtlpHttpSpanExporter.builder()
72+
.setEndpoint(host + "/api/public/otel/v1/traces")
73+
.addHeader("Authorization", "Basic " + encodedAuth)
74+
.setTimeout(10, TimeUnit.SECONDS)
75+
.build();
76+
77+
Resource resource = Resource.getDefault()
78+
.merge(Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), SERVICE_NAME)));
79+
80+
tracerProvider = SdkTracerProvider.builder()
81+
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter)
82+
.setScheduleDelay(1, TimeUnit.SECONDS)
83+
.setMaxExportBatchSize(100)
84+
.build())
85+
.setResource(resource)
86+
.build();
87+
88+
OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
89+
90+
log.info("OpenTelemetry initialized with Langfuse OTLP HTTP exporter");
91+
92+
return openTelemetrySdk;
93+
}
94+
95+
@Bean
96+
public Tracer langfuseTracer(OpenTelemetry openTelemetry) {
97+
return openTelemetry.getTracer(SERVICE_NAME);
98+
}
99+
100+
@PreDestroy
101+
public void shutdown() {
102+
if (tracerProvider != null) {
103+
tracerProvider.close();
104+
}
105+
}
106+
107+
}

data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/constant/Constant.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,7 @@ private Constant() {
156156

157157
public static final String STREAM_EVENT_ERROR = "error";
158158

159+
// Langfuse 追踪:threadId 透传到 graph state,用于 token 累计
160+
public static final String TRACE_THREAD_ID = "TRACE_THREAD_ID";
161+
159162
}

data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/aimodelconfig/DynamicModelFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public ChatModel createChatModel(ModelConfigDTO config) {
7676
.model(config.getModelName())
7777
.temperature(config.getTemperature())
7878
.maxTokens(config.getMaxTokens())
79+
.streamUsage(true)
7980
.build();
8081
// 4. 返回统一的 OpenAiChatModel
8182
return OpenAiChatModel.builder().openAiApi(openAiApi).defaultOptions(openAiChatOptions).build();

data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/Context/StreamContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.alibaba.cloud.ai.dataagent.enums.TextType;
1919
import com.alibaba.cloud.ai.dataagent.vo.GraphNodeResponse;
20+
import io.opentelemetry.api.trace.Span;
2021
import lombok.Data;
2122
import org.springframework.http.codec.ServerSentEvent;
2223
import reactor.core.Disposable;
@@ -37,8 +38,23 @@ public class StreamContext {
3738

3839
private Sinks.Many<ServerSentEvent<GraphNodeResponse>> sink;
3940

41+
private Span span;
42+
4043
private TextType textType;
4144

45+
/**
46+
* 收集流式输出内容,用于 Langfuse 上报
47+
*/
48+
private final StringBuilder outputCollector = new StringBuilder();
49+
50+
public void appendOutput(String chunk) {
51+
outputCollector.append(chunk);
52+
}
53+
54+
public String getCollectedOutput() {
55+
return outputCollector.toString();
56+
}
57+
4258
/**
4359
* 标记是否已经清理,用于防止重复清理
4460
*/

data-agent-management/src/main/java/com/alibaba/cloud/ai/dataagent/service/graph/GraphServiceImpl.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.alibaba.cloud.ai.dataagent.service.graph;
1717

18+
import com.alibaba.cloud.ai.dataagent.service.langfuse.LangfuseService;
1819
import com.alibaba.cloud.ai.dataagent.enums.TextType;
1920
import com.alibaba.cloud.ai.dataagent.workflow.node.PlannerNode;
2021
import com.alibaba.cloud.ai.dataagent.dto.GraphRequest;
@@ -25,6 +26,7 @@
2526
import com.alibaba.cloud.ai.graph.exception.GraphRunnerException;
2627
import com.alibaba.cloud.ai.graph.exception.GraphStateException;
2728
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
29+
import io.opentelemetry.api.trace.Span;
2830
import lombok.extern.slf4j.Slf4j;
2931
import org.springframework.http.codec.ServerSentEvent;
3032
import org.springframework.stereotype.Service;
@@ -54,11 +56,15 @@ public class GraphServiceImpl implements GraphService {
5456

5557
private final MultiTurnContextManager multiTurnContextManager;
5658

59+
private final LangfuseService langfuseReporter;
60+
5761
public GraphServiceImpl(StateGraph stateGraph, ExecutorService executorService,
58-
MultiTurnContextManager multiTurnContextManager) throws GraphStateException {
62+
MultiTurnContextManager multiTurnContextManager, LangfuseService langfuseReporter)
63+
throws GraphStateException {
5964
this.compiledGraph = stateGraph.compile(CompileConfig.builder().interruptBefore(HUMAN_FEEDBACK_NODE).build());
6065
this.executor = executorService;
6166
this.multiTurnContextManager = multiTurnContextManager;
67+
this.langfuseReporter = langfuseReporter;
6268
}
6369

6470
@Override
@@ -100,6 +106,10 @@ public void stopStreamProcessing(String threadId) {
100106
multiTurnContextManager.discardPending(threadId);
101107
StreamContext context = streamContextMap.remove(threadId);
102108
if (context != null) {
109+
// 客户端断开,结束 Langfuse span
110+
if (context.getSpan() != null && context.getSpan().isRecording()) {
111+
langfuseReporter.endSpanSuccess(context.getSpan(), threadId, context.getCollectedOutput());
112+
}
103113
context.cleanup();
104114
log.info("Cleaned up stream context for threadId: {}", threadId);
105115
}
@@ -123,11 +133,15 @@ private void handleNewProcess(GraphRequest graphRequest) {
123133
log.warn("StreamContext already cleaned for threadId: {}, skipping stream start", threadId);
124134
return;
125135
}
136+
// 开始 Langfuse 追踪
137+
Span span = langfuseReporter.startLLMSpan("graph-stream", graphRequest);
138+
context.setSpan(span);
139+
126140
String multiTurnContext = multiTurnContextManager.buildContext(threadId);
127141
multiTurnContextManager.beginTurn(threadId, query);
128142
Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(
129143
Map.of(IS_ONLY_NL2SQL, nl2sqlOnly, INPUT_KEY, query, AGENT_ID, agentId, HUMAN_REVIEW_ENABLED,
130-
humanReviewEnabled, MULTI_TURN_CONTEXT, multiTurnContext),
144+
humanReviewEnabled, MULTI_TURN_CONTEXT, multiTurnContext, TRACE_THREAD_ID, threadId),
131145
RunnableConfig.builder().threadId(threadId).build());
132146
subscribeToFlux(context, nodeOutputFlux, graphRequest, agentId, threadId);
133147
}
@@ -147,6 +161,10 @@ private void handleHumanFeedback(GraphRequest graphRequest) {
147161
log.warn("StreamContext already cleaned for threadId: {}, skipping stream start", threadId);
148162
return;
149163
}
164+
// 开始 Langfuse 追踪
165+
Span span = langfuseReporter.startLLMSpan("graph-feedback", graphRequest);
166+
context.setSpan(span);
167+
150168
Map<String, Object> feedbackData = Map.of("feedback", !graphRequest.isRejectedPlan(), "feedback_content",
151169
feedbackContent);
152170
if (graphRequest.isRejectedPlan()) {
@@ -213,9 +231,13 @@ private void subscribeToFlux(StreamContext context, Flux<NodeOutput> nodeOutputF
213231
private void handleStreamError(String agentId, String threadId, Throwable error) {
214232
log.error("Error in stream processing for threadId: {}: ", threadId, error);
215233
StreamContext context = streamContextMap.remove(threadId);
216-
if (context != null && !context.isCleaned() && context.getSink() != null) {
217-
// 检查 sink 是否还有订阅者
218-
if (context.getSink().currentSubscriberCount() > 0) {
234+
if (context != null && !context.isCleaned()) {
235+
// 结束 Langfuse span(失败)
236+
if (context.getSpan() != null) {
237+
langfuseReporter.endSpanError(context.getSpan(), threadId,
238+
error instanceof Exception ? (Exception) error : new RuntimeException(error));
239+
}
240+
if (context.getSink() != null && context.getSink().currentSubscriberCount() > 0) {
219241
context.getSink()
220242
.tryEmitNext(ServerSentEvent
221243
.builder(GraphNodeResponse.error(agentId, threadId,
@@ -236,8 +258,12 @@ private void handleStreamComplete(String agentId, String threadId) {
236258
log.info("Stream processing completed successfully for threadId: {}", threadId);
237259
multiTurnContextManager.finishTurn(threadId);
238260
StreamContext context = streamContextMap.remove(threadId);
239-
if (context != null && !context.isCleaned() && context.getSink() != null) {
240-
if (context.getSink().currentSubscriberCount() > 0) {
261+
if (context != null && !context.isCleaned()) {
262+
// 结束 Langfuse span(成功)
263+
if (context.getSpan() != null) {
264+
langfuseReporter.endSpanSuccess(context.getSpan(), threadId, context.getCollectedOutput());
265+
}
266+
if (context.getSink() != null && context.getSink().currentSubscriberCount() > 0) {
241267
context.getSink()
242268
.tryEmitNext(ServerSentEvent.builder(GraphNodeResponse.complete(agentId, threadId))
243269
.event(STREAM_EVENT_COMPLETE)
@@ -294,6 +320,7 @@ private void handleStreamNodeOutput(GraphRequest request, StreamingOutput output
294320
}
295321
// 文本标记符号不返回给前端
296322
if (!isTypeSign) {
323+
context.appendOutput(chunk);
297324
if (PlannerNode.class.getSimpleName().equals(node)) {
298325
multiTurnContextManager.appendPlannerChunk(threadId, chunk);
299326
}

0 commit comments

Comments
 (0)