Skip to content

Commit 469dd4a

Browse files
authored
Avoid sending empty payloads to the LLMObs endpoint (#10372) (#10410)
Add the receiving endpoint and payload size to the debug logs. (cherry picked from commit 2a6801e)
1 parent 682121c commit 469dd4a

File tree

5 files changed

+96
-4
lines changed

5 files changed

+96
-4
lines changed

communication/src/main/java/datadog/communication/serialization/FlushingBuffer.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ public boolean isDirty() {
2727

2828
@Override
2929
public void mark() {
30-
mark = buffer.position();
31-
++messageCount;
30+
int current = buffer.position();
31+
if (current != mark) {
32+
// count only non-empty messages
33+
++messageCount;
34+
mark = current;
35+
}
3236
}
3337

3438
@Override
@@ -101,4 +105,9 @@ public void reset() {
101105
buffer.limit(buffer.capacity());
102106
mark = 0;
103107
}
108+
109+
// for tests only
110+
int getMessageCount() {
111+
return messageCount;
112+
}
104113
}

communication/src/test/groovy/datadog/communication/serialization/FlushingBufferTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,45 @@ public class FlushingBufferTest {
1010
public void testBufferCapacity() {
1111
assertEquals(5, new FlushingBuffer(5, (messageCount, buffer) -> {}).capacity());
1212
}
13+
14+
@Test
15+
public void testMessageCount() {
16+
FlushingBuffer fb = new FlushingBuffer(10, (messageCount, buffer) -> {});
17+
18+
// initial counter
19+
assertEquals(0, fb.getMessageCount());
20+
21+
fb.mark();
22+
fb.mark();
23+
24+
// counter doesn't change if no data pushed into the buffer
25+
assertEquals(0, fb.getMessageCount());
26+
27+
fb.put((byte) 1);
28+
// still zero because the message counter increases on mark
29+
assertEquals(0, fb.getMessageCount());
30+
31+
fb.mark();
32+
// expect increased message counter
33+
assertEquals(1, fb.getMessageCount());
34+
35+
fb.mark();
36+
fb.mark();
37+
// no change to the counter expected for consecutive mark calls
38+
39+
fb.putChar('a');
40+
fb.putChar('b');
41+
fb.putChar('c');
42+
// no change to the message counter expected before mark call
43+
assertEquals(1, fb.getMessageCount());
44+
45+
fb.mark();
46+
// expect increased message counter
47+
assertEquals(2, fb.getMessageCount());
48+
49+
fb.mark();
50+
fb.mark();
51+
// no change to the counter expected for consecutive mark calls
52+
assertEquals(2, fb.getMessageCount());
53+
}
1354
}

dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,20 @@ public void accept(int messageCount, ByteBuffer buffer) {
111111
mapper.reset();
112112
if (response.success()) {
113113
if (log.isDebugEnabled()) {
114-
log.debug("Successfully sent {} traces to the API", messageCount);
114+
log.debug(
115+
"Successfully sent {} traces {} bytes to the API {}",
116+
messageCount,
117+
buffer.position(),
118+
mapper.endpoint());
115119
}
116120
healthMetrics.onSend(messageCount, sizeInBytes, response);
117121
} else {
118122
if (log.isDebugEnabled()) {
119123
log.debug(
120-
"Failed to send {} traces of size {} bytes to the API", messageCount, sizeInBytes);
124+
"Failed to send {} traces of size {} bytes to the API {}",
125+
messageCount,
126+
sizeInBytes,
127+
mapper.endpoint());
121128
}
122129
healthMetrics.onFailedSend(messageCount, sizeInBytes, response);
123130
}

dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
9393
List<? extends CoreSpan<?>> llmobsSpans =
9494
trace.stream().filter(LLMObsSpanMapper::isLLMObsSpan).collect(Collectors.toList());
9595

96+
if (llmobsSpans.isEmpty()) {
97+
// do nothing if no llmobs spans in the trace
98+
return;
99+
}
100+
96101
writable.startMap(3);
97102

98103
writable.writeUTF8(EVENT_TYPE);

dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,36 @@ class LLMObsSpanMapperTest extends DDCoreSpecification {
110110
spanData["tags"].contains("language:jvm")
111111
}
112112

113+
def "test LLMObsSpanMapper writes no spans when none are LLMObs spans"() {
114+
setup:
115+
def mapper = new LLMObsSpanMapper()
116+
def tracer = tracerBuilder().writer(new ListWriter()).build()
117+
118+
def regularSpan1 = tracer.buildSpan("http.request")
119+
.withResourceName("GET /api/users")
120+
.withTag("http.method", "GET")
121+
.withTag("http.url", "https://example.com/api/users")
122+
.start()
123+
regularSpan1.finish()
124+
125+
def regularSpan2 = tracer.buildSpan("database.query")
126+
.withResourceName("SELECT * FROM users")
127+
.withTag("db.type", "postgresql")
128+
.start()
129+
regularSpan2.finish()
130+
131+
def trace = [regularSpan1, regularSpan2]
132+
CapturingByteBufferConsumer sink = new CapturingByteBufferConsumer()
133+
MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(1024, sink))
134+
135+
when:
136+
packer.format(trace, mapper)
137+
packer.flush()
138+
139+
then:
140+
sink.captured == null
141+
}
142+
113143
static class CapturingByteBufferConsumer implements ByteBufferConsumer {
114144

115145
ByteBuffer captured

0 commit comments

Comments
 (0)