Skip to content

Commit eaf397f

Browse files
committed
working impl supporting both agentless and agent
1 parent eda73dd commit eaf397f

File tree

7 files changed

+382
-2
lines changed

7 files changed

+382
-2
lines changed

dd-java-agent/agent-llmobs/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ minimumInstructionCoverage = 0.0
2222

2323
dependencies {
2424
api libs.slf4j
25+
implementation libs.jctools
2526

2627
implementation project(':communication')
2728
implementation project(':components:json')
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package datadog.trace.llmobs;
2+
3+
import static datadog.trace.util.AgentThreadFactory.AgentThread.LLMOBS_EVALS_PROCESSOR;
4+
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
5+
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
6+
7+
import com.squareup.moshi.JsonAdapter;
8+
import com.squareup.moshi.Moshi;
9+
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
10+
import datadog.communication.ddagent.SharedCommunicationObjects;
11+
import datadog.communication.http.HttpRetryPolicy;
12+
import datadog.communication.http.OkHttpUtils;
13+
import datadog.trace.api.Config;
14+
import datadog.trace.llmobs.domain.LLMObsEval;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.concurrent.TimeUnit;
18+
import okhttp3.Headers;
19+
import okhttp3.HttpUrl;
20+
import okhttp3.OkHttpClient;
21+
import okhttp3.Request;
22+
import okhttp3.RequestBody;
23+
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
/**
28+
* Worker which applies rules to traces and serializes the results. Upon completion, the serialized
29+
* traces are published in batches to the Datadog Agent}.
30+
*
31+
* <p>publishing to the buffer will not block the calling thread, but instead will return false if
32+
* the buffer is full. This is to avoid impacting an application thread.
33+
*/
34+
public class EvalProcessingWorker implements AutoCloseable {
35+
36+
private static final String EVAL_METRIC_API_DOMAIN = "api";
37+
private static final String EVAL_METRIC_API_PATH = "api/intake/llm-obs/v1/eval-metric";
38+
39+
private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain";
40+
private static final String DD_API_KEY_HEADER_NAME = "DD-API-KEY";
41+
42+
private static final Logger log = LoggerFactory.getLogger(EvalProcessingWorker.class);
43+
44+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
45+
private final Thread serializerThread;
46+
47+
public EvalProcessingWorker(
48+
final int capacity,
49+
final long flushInterval,
50+
final TimeUnit timeUnit,
51+
final SharedCommunicationObjects sco,
52+
Config config) {
53+
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);
54+
55+
boolean isAgentless = config.isLlmObsAgentlessEnabled();
56+
if (isAgentless && (config.getApiKey() == null || config.getApiKey().isEmpty())) {
57+
log.error("Agentless eval metric submission requires an API key");
58+
}
59+
60+
Headers headers;
61+
HttpUrl submissionUrl;
62+
if (isAgentless) {
63+
submissionUrl = HttpUrl.get("https://" + EVAL_METRIC_API_DOMAIN + "." + config.getSite() + "/" + EVAL_METRIC_API_PATH);
64+
headers = Headers.of(DD_API_KEY_HEADER_NAME, config.getApiKey());
65+
} else {
66+
submissionUrl = HttpUrl.get(sco.agentUrl.toString() + DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT + EVAL_METRIC_API_PATH);
67+
headers = Headers.of(EVP_SUBDOMAIN_HEADER_NAME, EVAL_METRIC_API_DOMAIN);
68+
}
69+
70+
EvalSerializingHandler serializingHandler = new EvalSerializingHandler(
71+
queue,
72+
flushInterval,
73+
timeUnit,
74+
submissionUrl,
75+
headers
76+
);
77+
this.serializerThread = newAgentThread(LLMOBS_EVALS_PROCESSOR, serializingHandler);
78+
}
79+
80+
public void start() {
81+
this.serializerThread.start();
82+
}
83+
84+
public boolean addToQueue(final LLMObsEval eval) {
85+
return queue.offer(eval);
86+
}
87+
88+
@Override
89+
public void close() {
90+
serializerThread.interrupt();
91+
try {
92+
serializerThread.join(THREAD_JOIN_TIMOUT_MS);
93+
} catch (InterruptedException ignored) {
94+
}
95+
}
96+
97+
public static class EvalSerializingHandler implements Runnable {
98+
99+
private static final Logger log = LoggerFactory.getLogger(EvalSerializingHandler.class);
100+
private static final int FLUSH_THRESHOLD = 50;
101+
102+
private final MpscBlockingConsumerArrayQueue<LLMObsEval> queue;
103+
private final long ticksRequiredToFlush;
104+
private long lastTicks;
105+
106+
private final Moshi moshi;
107+
private final JsonAdapter<LLMObsEval.Request> evalJsonAdapter;
108+
private final OkHttpClient httpClient;
109+
private final HttpUrl submissionUrl;
110+
private final Headers headers;
111+
112+
private final List<LLMObsEval> buffer = new ArrayList<>();
113+
114+
public EvalSerializingHandler(
115+
final MpscBlockingConsumerArrayQueue<LLMObsEval> queue,
116+
final long flushInterval,
117+
final TimeUnit timeUnit,
118+
final HttpUrl submissionUrl,
119+
final Headers headers) {
120+
this.queue = queue;
121+
this.moshi = new Moshi.Builder().add(LLMObsEval.class, new LLMObsEval.Adapter()).build();
122+
123+
this.evalJsonAdapter = moshi.adapter(LLMObsEval.Request.class);
124+
this.httpClient = new OkHttpClient();
125+
this.submissionUrl = submissionUrl;
126+
this.headers = headers;
127+
128+
this.lastTicks = System.nanoTime();
129+
this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
130+
131+
log.debug("starting eval metric serializer, url={}", submissionUrl);
132+
}
133+
134+
@Override
135+
public void run() {
136+
try {
137+
runDutyCycle();
138+
} catch (InterruptedException e) {
139+
Thread.currentThread().interrupt();
140+
}
141+
log.debug(
142+
"eval processor worker exited. submitting evals stopped. unsubmitted evals left: "
143+
+ !queuesAreEmpty());
144+
}
145+
146+
private void runDutyCycle() throws InterruptedException {
147+
Thread thread = Thread.currentThread();
148+
while (!thread.isInterrupted()) {
149+
consumeBatch();
150+
flushIfNecessary();
151+
}
152+
}
153+
154+
private void consumeBatch() {
155+
queue.drain(buffer::add, queue.size());
156+
}
157+
158+
protected void flushIfNecessary() {
159+
if (buffer.isEmpty()) {
160+
return;
161+
}
162+
if (shouldFlush()) {
163+
LLMObsEval.Request llmobsEvalReq = new LLMObsEval.Request(this.buffer);
164+
HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true);
165+
166+
String reqBod = evalJsonAdapter.toJson(llmobsEvalReq);
167+
168+
RequestBody requestBody = RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod);
169+
Request request =
170+
new Request.Builder()
171+
.headers(headers)
172+
.url(submissionUrl)
173+
.post(requestBody)
174+
.build();
175+
176+
try (okhttp3.Response response =
177+
OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) {
178+
179+
if (response.isSuccessful()) {
180+
log.debug("successfully flushed evaluation request with {} evals", this.buffer.size());
181+
this.buffer.clear();
182+
} else {
183+
log.error(
184+
"Could not submit eval metrics (HTTP code "
185+
+ response.code()
186+
+ ")"
187+
+ (response.body() != null ? ": " + response.body().string() : ""));
188+
}
189+
} catch (Exception e) {
190+
log.error("Could not submit eval metrics", e);
191+
}
192+
}
193+
}
194+
195+
private boolean shouldFlush() {
196+
long nanoTime = System.nanoTime();
197+
long ticks = nanoTime - lastTicks;
198+
if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) {
199+
lastTicks = nanoTime;
200+
return true;
201+
}
202+
return false;
203+
}
204+
205+
protected boolean queuesAreEmpty() {
206+
return queue.isEmpty();
207+
}
208+
}
209+
}

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
import datadog.trace.api.llmobs.LLMObsTags;
88
import datadog.trace.bootstrap.instrumentation.api.Tags;
99
import datadog.trace.llmobs.domain.DDLLMObsSpan;
10+
import datadog.trace.llmobs.domain.LLMObsEval;
1011
import datadog.trace.llmobs.domain.LLMObsInternal;
1112
import java.lang.instrument.Instrumentation;
13+
import java.util.Map;
14+
import java.util.concurrent.TimeUnit;
1215
import org.jetbrains.annotations.Nullable;
1316
import org.slf4j.Logger;
1417
import org.slf4j.LoggerFactory;
@@ -29,9 +32,61 @@ public static void start(Instrumentation inst, SharedCommunicationObjects sco) {
2932
sco.createRemaining(config);
3033

3134
LLMObsServices llmObsServices = new LLMObsServices(config, sco);
35+
36+
String mlApp = config.getLlmObsMlApp();
3237
LLMObsInternal.setLLMObsSpanFactory(
3338
new LLMObsManualSpanFactory(
34-
config.getLlmObsMlApp(), config.getServiceName(), llmObsServices));
39+
mlApp, config.getServiceName(), llmObsServices));
40+
41+
LLMObsInternal.setLLMObsEvalProcessor(new LLMObsCustomEvalProcessor(mlApp, sco, config));
42+
}
43+
44+
private static class LLMObsCustomEvalProcessor implements LLMObs.LLMObsEvalProcessor {
45+
private final String defaultMLApp;
46+
private final EvalProcessingWorker evalProcessingWorker;
47+
48+
public LLMObsCustomEvalProcessor(String defaultMLApp, SharedCommunicationObjects sco, Config config) {
49+
50+
this.defaultMLApp = defaultMLApp;
51+
this.evalProcessingWorker = new EvalProcessingWorker(1024, 100, TimeUnit.MILLISECONDS, sco, config);
52+
this.evalProcessingWorker.start();
53+
}
54+
55+
@Override
56+
public void SubmitEvaluation(LLMObsSpan llmObsSpan, String label, double numericalValue, Map<String, Object> tags) {
57+
LOGGER.debug("Submitting score evaluation for {}", llmObsSpan);
58+
SubmitEvaluation(llmObsSpan, label, numericalValue, defaultMLApp, tags);
59+
}
60+
61+
@Override
62+
public void SubmitEvaluation(LLMObsSpan llmObsSpan, String label, double numericalValue, String mlApp, Map<String, Object> tags) {
63+
String traceID = llmObsSpan.getTraceId().toHexString();
64+
long spanID = llmObsSpan.getSpanId();
65+
LLMObsEval.Score score = new LLMObsEval.Score(traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, numericalValue);
66+
if (!this.evalProcessingWorker.addToQueue(score)) {
67+
LOGGER.warn("queue full, failed to add score eval, ml_app={}, trace_id={}, span_id={}, label={}", mlApp, traceID, spanID, label);
68+
} else {
69+
LOGGER.debug("added score eval");
70+
}
71+
}
72+
73+
@Override
74+
public void SubmitEvaluation(LLMObsSpan llmObsSpan, String label, String categoricalValue, Map<String, Object> tags) {
75+
LOGGER.debug("Submitting categorical evaluation for {}", llmObsSpan);
76+
SubmitEvaluation(llmObsSpan, label, categoricalValue, defaultMLApp, tags);
77+
}
78+
79+
@Override
80+
public void SubmitEvaluation(LLMObsSpan llmObsSpan, String label, String categoricalValue, String mlApp, Map<String, Object> tags) {
81+
String traceID = llmObsSpan.getTraceId().toHexString();
82+
long spanID = llmObsSpan.getSpanId();
83+
LLMObsEval.Categorical category = new LLMObsEval.Categorical(traceID, spanID, System.currentTimeMillis(), mlApp, label, tags, categoricalValue);
84+
if (!this.evalProcessingWorker.addToQueue(category)) {
85+
LOGGER.warn("queue full, failed to add categorical eval, ml_app={}, trace_id={}, span_id={}, label={}", mlApp, traceID, spanID, label);
86+
} else {
87+
LOGGER.debug("added categorical eval");
88+
}
89+
}
3590
}
3691

3792
private static class LLMObsManualSpanFactory implements LLMObs.LLMObsSpanFactory {
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package datadog.trace.llmobs.domain;
2+
3+
import com.squareup.moshi.JsonAdapter;
4+
import com.squareup.moshi.JsonDataException;
5+
import com.squareup.moshi.JsonReader;
6+
import com.squareup.moshi.JsonWriter;
7+
import com.squareup.moshi.Moshi;
8+
import java.io.IOException;
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
import java.util.Map;
12+
import org.jetbrains.annotations.Nullable;
13+
14+
public abstract class LLMObsEval {
15+
private static final String METRIC_TYPE_SCORE = "score";
16+
private static final String METRIC_TYPE_CATEGORICAL = "categorical";
17+
18+
public final String trace_id;
19+
public final String span_id;
20+
public final long timestamp_ms;
21+
public final String ml_app;
22+
public final String metric_type;
23+
public final String label;
24+
public final List<String> tags;
25+
26+
public LLMObsEval(String traceID, String spanID, long timestampMs, String mlApp, String metricType, String label, Map<String, Object> tags) {
27+
this.trace_id = traceID;
28+
this.span_id = spanID;
29+
this.timestamp_ms = timestampMs;
30+
this.ml_app = mlApp;
31+
this.metric_type = metricType;
32+
this.label = label;
33+
List<String> tagsList = new ArrayList<>(tags.size());
34+
for (Map.Entry<String, Object> entry : tags.entrySet()) {
35+
tagsList.add(entry.getKey() + ":" + entry.getValue());
36+
}
37+
this.tags = tagsList;
38+
}
39+
40+
public final static class Adapter extends JsonAdapter<LLMObsEval> {
41+
private final Moshi moshi = new Moshi.Builder().build();
42+
private final JsonAdapter<Score> scoreJsonAdapter = moshi.adapter(Score.class);
43+
private final JsonAdapter<Categorical> categoricalJsonAdapter = moshi.adapter(Categorical.class);
44+
45+
@Nullable
46+
@Override
47+
public LLMObsEval fromJson(JsonReader reader) throws IOException {
48+
return null;
49+
}
50+
51+
@Override
52+
public void toJson(JsonWriter writer, @Nullable LLMObsEval value) throws IOException {
53+
if (value instanceof Score) {
54+
scoreJsonAdapter.toJson(writer, (Score) value);
55+
} else if (value instanceof Categorical) {
56+
categoricalJsonAdapter.toJson(writer, (Categorical) value);
57+
} else {
58+
throw new JsonDataException("Unknown llm obs eval subclass: " + value.getClass());
59+
}
60+
}
61+
}
62+
63+
64+
public final static class Score extends LLMObsEval {
65+
public final double score_value;
66+
67+
public Score(String traceID, long spanID, long timestampMS, String mlApp, String label, Map<String, Object> tags, double scoreValue) {
68+
super(traceID, String.valueOf(spanID), timestampMS, mlApp, METRIC_TYPE_SCORE, label, tags);
69+
this.score_value = scoreValue;
70+
}
71+
}
72+
73+
public final static class Categorical extends LLMObsEval {
74+
public final String categorical_value;
75+
76+
public Categorical(String traceID, long spanID, long timestampMS, String mlApp, String label, Map<String, Object> tags, String categoricalValue) {
77+
super(traceID, String.valueOf(spanID), timestampMS, mlApp, METRIC_TYPE_CATEGORICAL, label, tags);
78+
this.categorical_value = categoricalValue;
79+
}
80+
}
81+
82+
public final static class Request {
83+
public final Data data;
84+
85+
public static class Data {
86+
public final String type = "evaluation_metric";
87+
public Attributes attributes;
88+
}
89+
90+
public static class Attributes {
91+
public List<LLMObsEval> metrics;
92+
}
93+
94+
public Request(List<LLMObsEval> metrics) {
95+
this.data = new Data();
96+
this.data.attributes = new Attributes();
97+
this.data.attributes.metrics = metrics;
98+
}
99+
100+
101+
}
102+
103+
}

0 commit comments

Comments
 (0)