Skip to content

Commit b341414

Browse files
committed
Implement LLMObs "span.finished" metric
1 parent b19cbb9 commit b341414

File tree

7 files changed

+306
-18
lines changed

7 files changed

+306
-18
lines changed

dd-java-agent/instrumentation/openai-java/openai-java-3.0/src/main/java/datadog/trace/instrumentation/openai_java/HttpResponseWrapper.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,7 @@ public static <T> CompletableFuture<HttpResponseFor<T>> wrapFuture(
2828
BiConsumer<AgentSpan, T> decorate) {
2929
return future
3030
.thenApply(response -> wrap(response, span, decorate))
31-
.whenComplete(
32-
(r, t) -> {
33-
try {
34-
if (t != null) {
35-
DECORATE.onError(span, t);
36-
}
37-
DECORATE.beforeFinish(span);
38-
} finally {
39-
span.finish();
40-
}
41-
});
31+
.whenComplete((_r, t) -> DECORATE.finishSpan(span, t));
4232
}
4333

4434
private final HttpResponseFor<T> delegate;

dd-java-agent/instrumentation/openai-java/openai-java-3.0/src/main/java/datadog/trace/instrumentation/openai_java/OpenAiDecorator.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.openai.core.http.Headers;
55
import datadog.trace.api.Config;
66
import datadog.trace.api.llmobs.LLMObsContext;
7+
import datadog.trace.api.telemetry.LLMObsMetricCollector;
78
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
89
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
910
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
@@ -14,32 +15,34 @@
1415
public class OpenAiDecorator extends ClientDecorator {
1516
public static final OpenAiDecorator DECORATE = new OpenAiDecorator();
1617

18+
public static final String INTEGRATION = "openai";
1719
public static final String INSTRUMENTATION_NAME = "openai-java";
1820
public static final CharSequence SPAN_NAME = UTF8BytesString.create("openai.request");
1921

2022
public static final String REQUEST_MODEL = "openai.request.model";
2123
public static final String RESPONSE_MODEL = "openai.response.model";
2224
public static final String OPENAI_ORGANIZATION_NAME = "openai.organization";
2325

24-
private static final CharSequence COMPONENT_NAME = UTF8BytesString.create("openai");
26+
private static final CharSequence COMPONENT_NAME = UTF8BytesString.create(INTEGRATION);
2527

2628
private final boolean llmObsEnabled = Config.get().isLlmObsEnabled();
2729

2830
public AgentSpan startSpan(ClientOptions clientOptions) {
2931
AgentSpan span = AgentTracer.startSpan(INSTRUMENTATION_NAME, SPAN_NAME);
3032
afterStart(span);
3133
span.setTag("openai.api_base", clientOptions.baseUrl());
32-
// TODO api_version (either last part of the URL, or api-version param if Azure)
33-
// clientOptions.queryParams().values("api-version")
3434
return span;
3535
}
3636

3737
public void finishSpan(AgentSpan span, Throwable err) {
38-
if (err != null) {
39-
onError(span, err);
38+
try {
39+
if (err != null) {
40+
onError(span, err);
41+
}
42+
DECORATE.beforeFinish(span);
43+
} finally {
44+
span.finish();
4045
}
41-
DECORATE.beforeFinish(span);
42-
span.finish();
4346
}
4447

4548
@Override
@@ -70,6 +73,20 @@ public AgentSpan afterStart(AgentSpan span) {
7073
return super.afterStart(span);
7174
}
7275

76+
@Override
77+
public AgentSpan beforeFinish(AgentSpan span) {
78+
if (llmObsEnabled) {
79+
Object spanKindTag = span.getTag("_ml_obs_tag.span.kind");
80+
if (spanKindTag != null) {
81+
String spanKind = spanKindTag.toString();
82+
boolean isRootSpan = span.getLocalRootSpan() == span;
83+
LLMObsMetricCollector.get()
84+
.recordSpanFinished(INTEGRATION, spanKind, isRootSpan, true, span.isError());
85+
}
86+
}
87+
return super.beforeFinish(span);
88+
}
89+
7390
public void withHttpResponse(AgentSpan span, Headers headers) {
7491
if (!llmObsEnabled) {
7592
return;
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package datadog.trace.api.telemetry;
2+
3+
import datadog.trace.api.cache.DDCache;
4+
import datadog.trace.api.cache.DDCaches;
5+
import java.util.ArrayList;
6+
import java.util.Arrays;
7+
import java.util.Collection;
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.concurrent.ArrayBlockingQueue;
11+
import java.util.concurrent.BlockingQueue;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
public final class LLMObsMetricCollector
16+
implements MetricCollector<LLMObsMetricCollector.LLMObsMetric> {
17+
private static final String METRIC_NAMESPACE = "mlobs";
18+
19+
private static final Logger log = LoggerFactory.getLogger(LLMObsMetricCollector.class);
20+
private static final LLMObsMetricCollector INSTANCE = new LLMObsMetricCollector();
21+
22+
public static LLMObsMetricCollector get() {
23+
return INSTANCE;
24+
}
25+
26+
public static final String SPAN_FINISHED_METRIC = "span.finished";
27+
public static final String COUNT_METRIC_TYPE = "count";
28+
29+
private static final String IS_ROOT_SPAN_TRUE = "is_root_span:1";
30+
private static final String IS_ROOT_SPAN_FALSE = "is_root_span:0";
31+
private static final String AUTOINSTRUMENTED_TRUE = "autoinstrumented:1";
32+
private static final String AUTOINSTRUMENTED_FALSE = "autoinstrumented:0";
33+
private static final String ERROR_TRUE = "error:1";
34+
private static final String ERROR_FALSE = "error:0";
35+
36+
private final BlockingQueue<LLMObsMetric> metricsQueue;
37+
private final DDCache<String, String> integrationTagCache;
38+
private final DDCache<String, String> spanKindTagCache;
39+
40+
private LLMObsMetricCollector() {
41+
this.metricsQueue = new ArrayBlockingQueue<>(RAW_QUEUE_SIZE);
42+
this.integrationTagCache = DDCaches.newFixedSizeCache(8);
43+
this.spanKindTagCache = DDCaches.newFixedSizeCache(8);
44+
}
45+
46+
/**
47+
* Record a span finished metric for LLMObs telemetry.
48+
*
49+
* @param integration the integration name (e.g., "openai")
50+
* @param spanKind the span kind (e.g., "llm", "embedding")
51+
* @param isRootSpan whether this is a root span
52+
* @param isAutoInstrumented whether this span was auto-instrumented
53+
* @param hasError whether the span had an error
54+
*/
55+
public void recordSpanFinished(
56+
String integration,
57+
String spanKind,
58+
boolean isRootSpan,
59+
boolean isAutoInstrumented,
60+
boolean hasError) {
61+
String integrationTag =
62+
integrationTagCache.computeIfAbsent(integration, key -> "integration:" + key);
63+
String spanKindTag = spanKindTagCache.computeIfAbsent(spanKind, key -> "span_kind:" + key);
64+
65+
List<String> tags =
66+
Arrays.asList(
67+
integrationTag,
68+
spanKindTag,
69+
isRootSpan ? IS_ROOT_SPAN_TRUE : IS_ROOT_SPAN_FALSE,
70+
isAutoInstrumented ? AUTOINSTRUMENTED_TRUE : AUTOINSTRUMENTED_FALSE,
71+
hasError ? ERROR_TRUE : ERROR_FALSE);
72+
73+
LLMObsMetric metric =
74+
new LLMObsMetric(METRIC_NAMESPACE, true, SPAN_FINISHED_METRIC, COUNT_METRIC_TYPE, 1L, tags);
75+
if (!metricsQueue.offer(metric)) {
76+
log.debug("Unable to add telemetry metric {} for {}", SPAN_FINISHED_METRIC, integration);
77+
}
78+
}
79+
80+
@Override
81+
public void prepareMetrics() {
82+
// metrics are added directly via recordSpanFinished; no additional preparation needed
83+
}
84+
85+
@Override
86+
public Collection<LLMObsMetric> drain() {
87+
if (this.metricsQueue.isEmpty()) {
88+
return Collections.emptyList();
89+
}
90+
List<LLMObsMetric> drained = new ArrayList<>(this.metricsQueue.size());
91+
this.metricsQueue.drainTo(drained);
92+
return drained;
93+
}
94+
95+
public static class LLMObsMetric extends MetricCollector.Metric {
96+
public LLMObsMetric(
97+
String namespace,
98+
boolean common,
99+
String metricName,
100+
String type,
101+
Number value,
102+
List<String> tags) {
103+
super(namespace, common, metricName, type, value, tags);
104+
}
105+
}
106+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package datadog.trace.api.telemetry
2+
3+
import datadog.trace.test.util.DDSpecification
4+
5+
class LLMObsMetricCollectorTest extends DDSpecification {
6+
LLMObsMetricCollector collector = LLMObsMetricCollector.get()
7+
8+
void setup() {
9+
// clear any previous metrics
10+
collector.drain()
11+
}
12+
13+
def "no metrics - drain empty list"() {
14+
when:
15+
collector.prepareMetrics()
16+
17+
then:
18+
collector.drain().isEmpty()
19+
}
20+
21+
def "record and drain span finished metrics"() {
22+
when:
23+
collector.recordSpanFinished("openai", "llm", true, true, false)
24+
collector.recordSpanFinished("openai", "llm", false, true, false)
25+
collector.recordSpanFinished("anthropic", "embedding", true, false, true)
26+
collector.prepareMetrics()
27+
def metrics = collector.drain()
28+
29+
then:
30+
metrics.size() == 3
31+
32+
def metric1 = metrics[0]
33+
metric1.type == 'count'
34+
metric1.value == 1
35+
metric1.namespace == 'mlobs'
36+
metric1.metricName == 'span.finished'
37+
metric1.tags.sort() == [
38+
'integration:openai',
39+
'span_kind:llm',
40+
'is_root_span:1',
41+
'autoinstrumented:1',
42+
'error:0'
43+
].sort()
44+
45+
def metric2 = metrics[1]
46+
metric2.type == 'count'
47+
metric2.value == 1
48+
metric2.namespace == 'mlobs'
49+
metric2.metricName == 'span.finished'
50+
metric2.tags.toSet() == [
51+
'integration:openai',
52+
'span_kind:llm',
53+
'is_root_span:0',
54+
'autoinstrumented:1',
55+
'error:0'
56+
].toSet()
57+
58+
def metric3 = metrics[2]
59+
metric3.type == 'count'
60+
metric3.value == 1
61+
metric3.namespace == 'mlobs'
62+
metric3.metricName == 'span.finished'
63+
metric3.tags.toSet() == [
64+
'integration:anthropic',
65+
'span_kind:embedding',
66+
'is_root_span:1',
67+
'autoinstrumented:0',
68+
'error:1'
69+
].toSet()
70+
}
71+
}
72+

telemetry/src/main/java/datadog/telemetry/TelemetrySystem.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import datadog.telemetry.metric.ConfigInversionMetricPeriodicAction;
1414
import datadog.telemetry.metric.CoreMetricsPeriodicAction;
1515
import datadog.telemetry.metric.IastMetricPeriodicAction;
16+
import datadog.telemetry.metric.LLMObsMetricPeriodicAction;
1617
import datadog.telemetry.metric.OtelEnvMetricPeriodicAction;
1718
import datadog.telemetry.metric.WafMetricPeriodicAction;
1819
import datadog.telemetry.products.ProductChangeAction;
@@ -64,6 +65,9 @@ static Thread createTelemetryRunnable(
6465
if (Config.get().isCiVisibilityEnabled() && Config.get().isCiVisibilityTelemetryEnabled()) {
6566
actions.add(new CiVisibilityMetricPeriodicAction());
6667
}
68+
if (Config.get().isLlmObsEnabled()) {
69+
actions.add(new LLMObsMetricPeriodicAction());
70+
}
6771
}
6872
if (null != dependencyService) {
6973
actions.add(new DependencyPeriodicAction(dependencyService));
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package datadog.telemetry.metric;
2+
3+
import datadog.trace.api.telemetry.LLMObsMetricCollector;
4+
import datadog.trace.api.telemetry.MetricCollector;
5+
import edu.umd.cs.findbugs.annotations.NonNull;
6+
7+
public class LLMObsMetricPeriodicAction extends MetricPeriodicAction {
8+
@NonNull
9+
@Override
10+
public MetricCollector collector() {
11+
return LLMObsMetricCollector.get();
12+
}
13+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package datadog.telemetry.metric
2+
3+
import datadog.telemetry.TelemetryService
4+
import datadog.telemetry.api.Metric
5+
import datadog.trace.api.telemetry.LLMObsMetricCollector
6+
import datadog.trace.test.util.DDSpecification
7+
8+
class LLMObsMetricPeriodicActionTest extends DDSpecification {
9+
LLMObsMetricPeriodicAction periodicAction = new LLMObsMetricPeriodicAction()
10+
TelemetryService telemetryService = Mock()
11+
LLMObsMetricCollector collector = LLMObsMetricCollector.get()
12+
13+
void setup() {
14+
// clear any previous metrics
15+
collector.drain()
16+
}
17+
18+
void 'test multiple span finished metrics with different tags'() {
19+
when:
20+
collector.recordSpanFinished('openai', 'llm', true, true, false)
21+
collector.recordSpanFinished('openai', 'llm', false, true, false)
22+
collector.recordSpanFinished('anthropic', 'embedding', true, false, true)
23+
periodicAction.doIteration(telemetryService)
24+
25+
then:
26+
1 * telemetryService.addMetric({ Metric metric ->
27+
metric.namespace == 'mlobs' &&
28+
metric.metric == 'span.finished' &&
29+
metric.tags.toSet() == [
30+
'integration:openai',
31+
'span_kind:llm',
32+
'is_root_span:1',
33+
'autoinstrumented:1',
34+
'error:0'
35+
].toSet()
36+
})
37+
1 * telemetryService.addMetric({ Metric metric ->
38+
metric.namespace == 'mlobs' &&
39+
metric.metric == 'span.finished' &&
40+
metric.tags.toSet() == [
41+
'integration:openai',
42+
'span_kind:llm',
43+
'is_root_span:0',
44+
'autoinstrumented:1',
45+
'error:0'
46+
].toSet()
47+
})
48+
1 * telemetryService.addMetric({ Metric metric ->
49+
metric.namespace == 'mlobs' &&
50+
metric.metric == 'span.finished' &&
51+
metric.tags.toSet() == [
52+
'integration:anthropic',
53+
'span_kind:embedding',
54+
'is_root_span:1',
55+
'autoinstrumented:0',
56+
'error:1'
57+
].toSet()
58+
})
59+
0 * _
60+
}
61+
62+
void 'test aggregation of identical metrics'() {
63+
when:
64+
collector.recordSpanFinished('openai', 'llm', true, true, false)
65+
collector.recordSpanFinished('openai', 'llm', true, true, false)
66+
collector.recordSpanFinished('openai', 'llm', true, true, false)
67+
periodicAction.doIteration(telemetryService)
68+
69+
then:
70+
1 * telemetryService.addMetric({ Metric metric ->
71+
metric.namespace == 'mlobs' &&
72+
metric.metric == 'span.finished' &&
73+
metric.points.size() == 3 &&
74+
metric.points.every { it[1] == 1 } &&
75+
metric.tags.toSet() == [
76+
'integration:openai',
77+
'span_kind:llm',
78+
'is_root_span:1',
79+
'autoinstrumented:1',
80+
'error:0'
81+
].toSet()
82+
})
83+
0 * _
84+
}
85+
}
86+

0 commit comments

Comments
 (0)