Skip to content

Commit a74e456

Browse files
committed
working impl supporting both agentless and agent
1 parent 5fc9069 commit a74e456

File tree

8 files changed

+440
-4
lines changed

8 files changed

+440
-4
lines changed

dd-java-agent/agent-llmobs/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ minimumInstructionCoverage = 0.0
2222

2323
dependencies {
2424
api libs.slf4j
25+
implementation libs.jctools
2526

2627
implementation project(':communication')
2728
implementation project(':components:json')
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package datadog.trace.llmobs;
2+
3+
import static datadog.trace.util.AgentThreadFactory.AgentThread.LLMOBS_EVALS_PROCESSOR;
4+
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
5+
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
6+
7+
import com.squareup.moshi.JsonAdapter;
8+
import com.squareup.moshi.Moshi;
9+
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
10+
import datadog.communication.ddagent.SharedCommunicationObjects;
11+
import datadog.communication.http.HttpRetryPolicy;
12+
import datadog.communication.http.OkHttpUtils;
13+
import datadog.trace.api.Config;
14+
import datadog.trace.llmobs.domain.LLMObsEval;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.concurrent.TimeUnit;
18+
import okhttp3.Headers;
19+
import okhttp3.HttpUrl;
20+
import okhttp3.OkHttpClient;
21+
import okhttp3.Request;
22+
import okhttp3.RequestBody;
23+
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class EvalProcessingWorker implements AutoCloseable {
28+
29+
private static final String EVAL_METRIC_API_DOMAIN = "api";
30+
private static final String EVAL_METRIC_API_PATH = "api/intake/llm-obs/v1/eval-metric";
31+
32+
private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain";
33+
private static final String DD_API_KEY_HEADER_NAME = "DD-API-KEY";
34+
35+
private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);
36+
37+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
38+
private final Thread serializerThread;
39+
40+
public EvalProcessingWorker(
41+
final int capacity,
42+
final long flushInterval,
43+
final TimeUnit timeUnit,
44+
final SharedCommunicationObjects sco,
45+
Config config) {
46+
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);
47+
48+
boolean isAgentless = config.isLlmObsAgentlessEnabled();
49+
if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) {
50+
log.error("Agentless eval metric submission requires an API key");
51+
}
52+
53+
Headers headers;
54+
HttpUrl submissionUrl;
55+
if (isAgentless) {
56+
submissionUrl =
57+
HttpUrl.get(
58+
"https://"
59+
+ EVAL_METRIC_API_DOMAIN
60+
+ "."
61+
+ config.getSite()
62+
+ "/"
63+
+ EVAL_METRIC_API_PATH);
64+
headers = Headers.of(DD_API_KEY_HEADER_NAME, config.getApiKey());
65+
} else {
66+
submissionUrl =
67+
HttpUrl.get(
68+
sco.agentUrl.toString()
69+
+ DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
70+
+ EVAL_METRIC_API_PATH);
71+
headers = Headers.of(EVP_SUBDOMAIN_HEADER_NAME, EVAL_METRIC_API_DOMAIN);
72+
}
73+
74+
EvalSerializingHandler serializingHandler =
75+
new EvalSerializingHandler(queue, flushInterval, timeUnit, submissionUrl, headers);
76+
this.serializerThread = newAgentThread(LLMOBS_EVALS_PROCESSOR, serializingHandler);
77+
}
78+
79+
public void start() {
80+
this.serializerThread.start();
81+
}
82+
83+
public boolean addToQueue(final LLMObsEval eval) {
84+
return queue.offer(eval);
85+
}
86+
87+
@Override
88+
public void close() {
89+
serializerThread.interrupt();
90+
try {
91+
serializerThread.join(THREAD_JOIN_TIMOUT_MS);
92+
} catch (InterruptedException ignored) {
93+
}
94+
}
95+
96+
public static class EvalSerializingHandler implements Runnable {
97+
98+
private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
99+
private static final int FLUSH_THRESHOLD = 50;
100+
101+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
102+
private final long ticksRequiredToFlush;
103+
private long lastTicks;
104+
105+
private final Moshi moshi;
106+
private final JsonAdapter<LLMObsEval.Request> evalJsonAdapter;
107+
private final OkHttpClient httpClient;
108+
private final HttpUrl submissionUrl;
109+
private final Headers headers;
110+
111+
private final List<LLMObsEval> buffer = new ArrayList<>();
112+
113+
public EvalSerializingHandler(
114+
final MpscBlockingConsumerArrayQueue<LLMObsEval> queue,
115+
final long flushInterval,
116+
final TimeUnit timeUnit,
117+
final HttpUrl submissionUrl,
118+
final Headers headers) {
119+
this.queue = queue;
120+
this.moshi = new Moshi.Builder().add(LLMObsEval.class, new LLMObsEval.Adapter()).build();
121+
122+
this.evalJsonAdapter = moshi.adapter(LLMObsEval.Request.class);
123+
this.httpClient = new OkHttpClient();
124+
this.submissionUrl = submissionUrl;
125+
this.headers = headers;
126+
127+
this.lastTicks = System.nanoTime();
128+
this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
129+
130+
log.debug("starting eval metric serializer, url={}", submissionUrl);
131+
}
132+
133+
@Override
134+
public void run() {
135+
try {
136+
runDutyCycle();
137+
} catch (InterruptedException e) {
138+
Thread.currentThread().interrupt();
139+
}
140+
log.debug(
141+
"eval processor worker exited. submitting evals stopped. unsubmitted evals left: "
142+
+ !queuesAreEmpty());
143+
}
144+
145+
private void runDutyCycle() throws InterruptedException {
146+
Thread thread = Thread.currentThread();
147+
while (!thread.isInterrupted()) {
148+
consumeBatch();
149+
flushIfNecessary();
150+
}
151+
}
152+
153+
private void consumeBatch() {
154+
queue.drain(buffer::add, queue.size());
155+
}
156+
157+
protected void flushIfNecessary() {
158+
if (buffer.isEmpty()) {
159+
return;
160+
}
161+
if (shouldFlush()) {
162+
LLMObsEval.Request llmobsEvalReq = new LLMObsEval.Request(this.buffer);
163+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
164+
165+
String reqBod = evalJsonAdapter.toJson(llmobsEvalReq);
166+
167+
RequestBody requestBody =
168+
RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod);
169+
Request request =
170+
new Request.Builder().headers(headers).url(submissionUrl).post(requestBody).build();
171+
172+
try (okhttp3.Response response =
173+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
174+
175+
if (response.isSuccessful()) {
176+
log.debug("successfully flushed evaluation request with {} evals", this.buffer.size());
177+
this.buffer.clear();
178+
} else {
179+
log.error(
180+
"Could not submit eval metrics (HTTP code "
181+
+ response.code()
182+
+ ")"
183+
+ (response.body() != null ? ": " + response.body().string() : ""));
184+
}
185+
} catch (Exception e) {
186+
log.error("Could not submit eval metrics", e);
187+
}
188+
}
189+
}
190+
191+
private boolean shouldFlush() {
192+
long nanoTime = System.nanoTime();
193+
long ticks = nanoTime - lastTicks;
194+
if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) {
195+
lastTicks = nanoTime;
196+
return true;
197+
}
198+
return false;
199+
}
200+
201+
protected boolean queuesAreEmpty() {
202+
return queue.isEmpty();
203+
}
204+
}
205+
}

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
import datadog.trace.api.llmobs.LLMObsTags;
88
import datadog.trace.bootstrap.instrumentation.api.Tags;
99
import datadog.trace.llmobs.domain.DDLLMObsSpan;
10+
import datadog.trace.llmobs.domain.LLMObsEval;
1011
import datadog.trace.llmobs.domain.LLMObsInternal;
1112
import java.lang.instrument.Instrumentation;
13+
import java.util.Map;
14+
import java.util.concurrent.TimeUnit;
1215
import org.jetbrains.annotations.Nullable;
1316
import org.slf4j.Logger;
1417
import org.slf4j.LoggerFactory;
@@ -31,6 +34,88 @@ public static void start(Instrumentation inst, SharedCommunicationObjects sco) {
3134
LLMObsInternal.setLLMObsSpanFactory(
3235
new LLMObsManualSpanFactory(
3336
config.getLlmObsMlApp(), config.getServiceName()));
37+
38+
String mlApp = config.getLlmObsMlApp();
39+
LLMObsInternal.setLLMObsSpanFactory(
40+
new LLMObsManualSpanFactory(mlApp, config.getServiceName()));
41+
42+
LLMObsInternal.setLLMObsEvalProcessor(new LLMObsCustomEvalProcessor(mlApp, sco, config));
43+
}
44+
45+
private static class LLMObsCustomEvalProcessor implements LLMObs.LLMObsEvalProcessor {
46+
private final String defaultMLApp;
47+
private final EvalProcessingWorker evalProcessingWorker;
48+
49+
public LLMObsCustomEvalProcessor(
50+
String defaultMLApp, SharedCommunicationObjects sco, Config config) {
51+
52+
this.defaultMLApp = defaultMLApp;
53+
this.evalProcessingWorker =
54+
new EvalProcessingWorker(1024, 100, TimeUnit.MILLISECONDS, sco, config);
55+
this.evalProcessingWorker.start();
56+
}
57+
58+
@Override
59+
public void SubmitEvaluation(
60+
LLMObsSpan llmObsSpan, String label, double scoreValue, Map<String, Object> tags) {
61+
SubmitEvaluation(llmObsSpan, label, scoreValue, defaultMLApp, tags);
62+
}
63+
64+
@Override
65+
public void SubmitEvaluation(
66+
LLMObsSpan llmObsSpan,
67+
String label,
68+
double scoreValue,
69+
String mlApp,
70+
Map<String, Object> tags) {
71+
if (llmObsSpan == null) {
72+
LOGGER.error("null llm obs span provided, eval not recorded");
73+
}
74+
String traceID = llmObsSpan.getTraceId().toHexString();
75+
long spanID = llmObsSpan.getSpanId();
76+
LLMObsEval.Score score =
77+
new LLMObsEval.Score(
78+
traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, scoreValue);
79+
if (!this.evalProcessingWorker.addToQueue(score)) {
80+
LOGGER.warn(
81+
"queue full, failed to add score eval, ml_app={}, trace_id={}, span_id={}, label={}",
82+
mlApp,
83+
traceID,
84+
spanID,
85+
label);
86+
}
87+
}
88+
89+
@Override
90+
public void SubmitEvaluation(
91+
LLMObsSpan llmObsSpan, String label, String categoricalValue, Map<String, Object> tags) {
92+
SubmitEvaluation(llmObsSpan, label, categoricalValue, defaultMLApp, tags);
93+
}
94+
95+
@Override
96+
public void SubmitEvaluation(
97+
LLMObsSpan llmObsSpan,
98+
String label,
99+
String categoricalValue,
100+
String mlApp,
101+
Map<String, Object> tags) {
102+
if (llmObsSpan == null) {
103+
LOGGER.error("null llm obs span provided, eval not recorded");
104+
}
105+
String traceID = llmObsSpan.getTraceId().toHexString();
106+
long spanID = llmObsSpan.getSpanId();
107+
LLMObsEval.Categorical category =
108+
new LLMObsEval.Categorical(
109+
traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, categoricalValue);
110+
if (!this.evalProcessingWorker.addToQueue(category)) {
111+
LOGGER.warn(
112+
"queue full, failed to add categorical eval, ml_app={}, trace_id={}, span_id={}, label={}",
113+
mlApp,
114+
traceID,
115+
spanID,
116+
label);
117+
}
118+
}
34119
}
35120

36121
private static class LLMObsManualSpanFactory implements LLMObs.LLMObsSpanFactory {

0 commit comments

Comments
 (0)