Skip to content

Commit 12c02b6

Browse files
authored
Fix LLM Observability Data Submission (#9476)
* adjust llm observability writers to submit data correctly * remove unneeded block * add tests * review comment * split log line into two lines
1 parent f7ec25f commit 12c02b6

File tree

2 files changed

+91
-10
lines changed

2 files changed

+91
-10
lines changed

dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,6 @@ public static Writer createWriter(
9494
"CI Visibility functionality is limited. Please upgrade to Agent v6.40+ or v7.40+ or enable Agentless mode.");
9595
}
9696
}
97-
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isLlmObsEnabled())) {
98-
featuresDiscovery.discoverIfOutdated();
99-
if (featuresDiscovery.supportsEvpProxy() || config.isLlmObsAgentlessEnabled()) {
100-
configuredType = DD_INTAKE_WRITER_TYPE;
101-
} else {
102-
log.info("LLM Observability functionality is limited.");
103-
// TODO: add supported agent version to this log line for llm obs
104-
}
105-
}
10697

10798
RemoteWriter remoteWriter;
10899
if (DD_INTAKE_WRITER_TYPE.equals(configuredType)) {
@@ -190,8 +181,21 @@ private static RemoteApi createDDIntakeRemoteApi(
190181
TrackType trackType) {
191182
featuresDiscovery.discoverIfOutdated();
192183
boolean evpProxySupported = featuresDiscovery.supportsEvpProxy();
184+
185+
boolean useLlmObsAgentless = config.isLlmObsAgentlessEnabled() || !evpProxySupported;
186+
if (useLlmObsAgentless && !config.isLlmObsAgentlessEnabled()) {
187+
boolean agentRunning = null != featuresDiscovery.getTraceEndpoint();
188+
log.info(
189+
"LLM Observability configured to use agent proxy, but is not compatible or agent is not running (agentRunning={}, compatible={})",
190+
agentRunning,
191+
evpProxySupported);
192+
log.info(
193+
"LLM Observability will use agentless data submission instead. Compatible agent versions are >=7.55.0 (found version={}",
194+
featuresDiscovery.getVersion());
195+
}
196+
193197
boolean useProxyApi =
194-
(evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled())
198+
(TrackType.LLMOBS == trackType && !useLlmObsAgentless)
195199
|| (evpProxySupported
196200
&& (TrackType.CITESTCOV == trackType || TrackType.CITESTCYCLE == trackType)
197201
&& !config.isCiVisibilityAgentlessEnabled());

dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE
55
import datadog.communication.ddagent.DDAgentFeaturesDiscovery
66
import datadog.communication.ddagent.SharedCommunicationObjects
77
import datadog.trace.api.Config
8+
import datadog.trace.api.intake.TrackType
89
import datadog.trace.common.sampling.Sampler
910
import datadog.trace.common.writer.ddagent.DDAgentApi
1011
import datadog.trace.common.writer.ddagent.Prioritization
@@ -97,6 +98,73 @@ class WriterFactoryTest extends DDSpecification {
9798
"not-found" | false | false | false | DDAgentWriter | [DDAgentApi] | false
9899
}
99100

101+
def "test writer creation for #configuredType when agentHasEvpProxy=#hasEvpProxy llmObsAgentless=#isLlmObsAgentlessEnabled for LLM Observability"() {
102+
setup:
103+
def config = Mock(Config)
104+
config.apiKey >> "my-api-key"
105+
config.agentUrl >> "http://my-agent.url"
106+
config.getEnumValue(PRIORITIZATION_TYPE, _, _) >> Prioritization.FAST_LANE
107+
config.tracerMetricsEnabled >> true
108+
config.isLlmObsEnabled() >> true
109+
110+
// Mock agent info response
111+
def response
112+
if (agentRunning) {
113+
response = buildHttpResponse(hasEvpProxy, true, HttpUrl.parse(config.agentUrl + "/info"))
114+
} else {
115+
response = buildHttpResponseNotOk(HttpUrl.parse(config.agentUrl + "/info"))
116+
}
117+
118+
// Mock HTTP client that simulates delayed response for async feature discovery
119+
def mockCall = Mock(Call)
120+
def mockHttpClient = Mock(OkHttpClient)
121+
mockCall.execute() >> {
122+
// Add a delay
123+
sleep(400)
124+
return response
125+
}
126+
mockHttpClient.newCall(_ as Request) >> mockCall
127+
128+
// Create SharedCommunicationObjects with mocked HTTP client
129+
def sharedComm = new SharedCommunicationObjects()
130+
sharedComm.okHttpClient = mockHttpClient
131+
sharedComm.agentUrl = HttpUrl.parse(config.agentUrl)
132+
sharedComm.createRemaining(config)
133+
134+
def sampler = Mock(Sampler)
135+
136+
when:
137+
config.llmObsAgentlessEnabled >> isLlmObsAgentlessEnabled
138+
139+
def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, configuredType)
140+
def llmObsApiClasses = ((RemoteWriter) writer).apis
141+
.stream()
142+
.filter(api -> {
143+
try {
144+
def trackTypeField = api.class.getDeclaredField("trackType")
145+
trackTypeField.setAccessible(true)
146+
return trackTypeField.get(api) == TrackType.LLMOBS
147+
} catch (Exception e) {
148+
return false
149+
}
150+
})
151+
.map(Object::getClass)
152+
.collect(Collectors.toList())
153+
154+
then:
155+
writer.class == expectedWriterClass
156+
llmObsApiClasses == expectedLlmObsApiClasses
157+
158+
where:
159+
configuredType | agentRunning | hasEvpProxy | isLlmObsAgentlessEnabled |expectedWriterClass | expectedLlmObsApiClasses
160+
"DDIntakeWriter" | true | true | false | DDIntakeWriter | [DDEvpProxyApi]
161+
"DDIntakeWriter" | true | false | false | DDIntakeWriter | [DDIntakeApi]
162+
"DDIntakeWriter" | false | false | false | DDIntakeWriter | [DDIntakeApi]
163+
"DDIntakeWriter" | true | true | true | DDIntakeWriter | [DDIntakeApi]
164+
"DDIntakeWriter" | true | false | true | DDIntakeWriter | [DDIntakeApi]
165+
"DDIntakeWriter" | false | false | true | DDIntakeWriter | [DDIntakeApi]
166+
}
167+
100168
Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) {
101169
def endpoints = []
102170
if (hasEvpProxy && evpProxySupportsCompression) {
@@ -120,4 +188,13 @@ class WriterFactoryTest extends DDSpecification {
120188
.body(ResponseBody.create(MediaType.parse("application/json"), new JsonBuilder(response).toString()))
121189
return builder.build()
122190
}
191+
192+
Response buildHttpResponseNotOk(HttpUrl agentUrl) {
193+
def builder = new Response.Builder()
194+
.code(500)
195+
.message("ERROR")
196+
.protocol(Protocol.HTTP_1_1)
197+
.request(new Request.Builder().url(agentUrl.resolve("/info")).build())
198+
return builder.build()
199+
}
123200
}

0 commit comments

Comments
 (0)