22
33import static datadog .communication .http .OkHttpUtils .gzippedMsgpackRequestBodyOf ;
44
5+ import datadog .communication .serialization .GrowableBuffer ;
56import datadog .communication .serialization .Writable ;
7+ import datadog .communication .serialization .msgpack .MsgPackWriter ;
68import datadog .trace .api .DDTags ;
79import datadog .trace .api .intake .TrackType ;
810import datadog .trace .api .llmobs .LLMObs ;
@@ -77,15 +79,30 @@ public class LLMObsSpanMapper implements RemoteMapper {
7779
7880 private static final String PARENT_ID_TAG_INTERNAL_FULL = LLMOBS_TAG_PREFIX + "parent_id" ;
7981
80- private final LLMObsSpanMapper . MetaWriter metaWriter = new MetaWriter ();
82+ private final MetaWriter metaWriter = new MetaWriter ();
8183 private final int size ;
8284
85+ private final ByteBuffer header ;
86+ private int spansWritten ;
87+
8388 public LLMObsSpanMapper () {
8489 this (5 << 20 );
8590 }
8691
8792 private LLMObsSpanMapper (int size ) {
8893 this .size = size ;
94+
95+ GrowableBuffer header = new GrowableBuffer (64 );
96+ MsgPackWriter headerWriter = new MsgPackWriter (header );
97+
98+ headerWriter .startMap (3 );
99+ headerWriter .writeUTF8 (EVENT_TYPE );
100+ headerWriter .writeString ("span" , null );
101+ headerWriter .writeUTF8 (STAGE );
102+ headerWriter .writeString ("raw" , null );
103+ headerWriter .writeUTF8 (SPANS );
104+
105+ this .header = header .slice ();
89106 }
90107
91108 @ Override
@@ -98,16 +115,6 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
98115 return ;
99116 }
100117
101- writable .startMap (3 );
102-
103- writable .writeUTF8 (EVENT_TYPE );
104- writable .writeString ("span" , null );
105-
106- writable .writeUTF8 (STAGE );
107- writable .writeString ("raw" , null );
108-
109- writable .writeUTF8 (SPANS );
110- writable .startArray (llmobsSpans .size ());
111118 for (CoreSpan <?> span : llmobsSpans ) {
112119 writable .startMap (11 );
113120 // 1
@@ -148,6 +155,10 @@ public void map(List<? extends CoreSpan<?>> trace, Writable writable) {
148155 /* 9 (metrics), 10 (tags), 11 meta */
149156 span .processTagsAndBaggage (metaWriter .withWritable (writable , getErrorsMap (span )));
150157 }
158+
159+ // Increase only after all spans have been written. This way, if it rolls back because of a
160+ // buffer overflow, the counter won't be skewed.
161+ spansWritten += llmobsSpans .size ();
151162 }
152163
153164 private static boolean isLLMObsSpan (CoreSpan <?> span ) {
@@ -157,7 +168,7 @@ private static boolean isLLMObsSpan(CoreSpan<?> span) {
157168
158169 @ Override
159170 public Payload newPayload () {
160- return new PayloadV1 ();
171+ return new PayloadV1 (header , spansWritten );
161172 }
162173
163174 @ Override
@@ -166,7 +177,10 @@ public int messageBufferSize() {
166177 }
167178
168179 @ Override
169- public void reset () {}
180+ public void reset () {
181+ // Reset the number of spans per message with each flush.
182+ spansWritten = 0 ;
183+ }
170184
171185 @ Override
172186 public String endpoint () {
@@ -206,7 +220,7 @@ private static final class MetaWriter implements MetadataConsumer {
206220 LLMOBS_TAG_PREFIX + LLMObsTags .MODEL_VERSION ,
207221 LLMOBS_TAG_PREFIX + LLMObsTags .METADATA )));
208222
209- LLMObsSpanMapper . MetaWriter withWritable (Writable writable , Map <String , String > errorInfo ) {
223+ MetaWriter withWritable (Writable writable , Map <String , String > errorInfo ) {
210224 this .writable = writable ;
211225 this .errorInfo = errorInfo ;
212226 return this ;
@@ -348,14 +362,20 @@ public void accept(Metadata metadata) {
348362 }
349363
350364 private static class PayloadV1 extends Payload {
365+ private final ByteBuffer header ;
366+ private final int spansWritten ;
367+
368+ public PayloadV1 (ByteBuffer header , int spansWritten ) {
369+ this .spansWritten = spansWritten ;
370+ this .header = header ;
371+ }
351372
352373 @ Override
353374 public int sizeInBytes () {
354375 if (traceCount () == 0 ) {
355376 return msgpackMapHeaderSize (0 );
356377 }
357-
358- return body .array ().length ;
378+ return header .remaining () + msgpackArrayHeaderSize (spansWritten ) + body .remaining ();
359379 }
360380
361381 @ Override
@@ -368,6 +388,8 @@ public void writeTo(WritableByteChannel channel) throws IOException {
368388 }
369389 } else {
370390 while (body .hasRemaining ()) {
391+ channel .write (header .slice ());
392+ channel .write (msgpackArrayHeader (spansWritten ));
371393 channel .write (body );
372394 }
373395 }
@@ -379,9 +401,13 @@ public RequestBody toRequest() {
379401 if (traceCount () == 0 ) {
380402 buffers = Collections .singletonList (msgpackMapHeader (0 ));
381403 } else {
382- buffers = Collections .singletonList (body );
404+ buffers =
405+ Arrays .asList (
406+ header .slice (),
407+ // Third Value: is an array of spans serialized into the body
408+ msgpackArrayHeader (spansWritten ),
409+ body );
383410 }
384-
385411 return gzippedMsgpackRequestBodyOf (buffers );
386412 }
387413 }
0 commit comments