Skip to content

Commit 63cc2aa

Browse files
committed
Send process tags once per payload
1 parent bd72002 commit 63cc2aa

File tree

8 files changed

+60
-36
lines changed

8 files changed

+60
-36
lines changed

communication/src/main/java/datadog/communication/serialization/Mapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@
33
// TODO @FunctionalInterface
44
public interface Mapper<T> {
55
void map(T data, Writable packer);
6+
7+
default void reset() {}
68
}

communication/src/main/java/datadog/communication/serialization/msgpack/MsgPackWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public <T> boolean format(T message, Mapper<T> mapper) {
9090
// max capacity, then reject the message
9191
if (buffer.flush()) {
9292
try {
93+
mapper.reset();
9394
mapper.map(message, this);
9495
buffer.mark();
9596
return true;

dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteMapper.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ public interface RemoteMapper extends Mapper<List<? extends CoreSpan<?>>> {
3636

3737
int messageBufferSize();
3838

39-
void reset();
40-
4139
String endpoint();
4240

4341
class NoopRemoteMapper implements RemoteMapper {
@@ -55,9 +53,6 @@ public int messageBufferSize() {
5553
return 0;
5654
}
5755

58-
@Override
59-
public void reset() {}
60-
6156
@Override
6257
public String endpoint() {
6358
return null;

dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.communication.serialization.Writable;
88
import datadog.communication.serialization.msgpack.MsgPackWriter;
99
import datadog.trace.api.Config;
10-
import datadog.trace.api.ProcessTags;
1110
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
1211
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
1312
import datadog.trace.common.writer.Payload;
@@ -35,6 +34,7 @@ public final class TraceMapperV0_4 implements TraceMapper {
3534
: null;
3635

3736
private final int size;
37+
private boolean firstSpanWritten;
3838

3939
public TraceMapperV0_4(int size) {
4040
this.size = size;
@@ -47,21 +47,27 @@ public TraceMapperV0_4() {
4747
private static final class MetaWriter implements MetadataConsumer {
4848

4949
private Writable writable;
50-
private boolean firstSpanInChunk;
51-
private boolean lastSpanInChunk;
50+
private boolean firstSpanInTrace;
51+
private boolean lastSpanInTrace;
52+
private boolean firstSpanInPayload;
5253

5354
MetaWriter withWritable(Writable writable) {
5455
this.writable = writable;
5556
return this;
5657
}
5758

58-
MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) {
59-
this.firstSpanInChunk = firstSpanInChunk;
59+
MetaWriter forFirstSpanInTrace(final boolean firstSpanInTrace) {
60+
this.firstSpanInTrace = firstSpanInTrace;
6061
return this;
6162
}
6263

63-
MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) {
64-
this.lastSpanInChunk = lastSpanInChunk;
64+
MetaWriter forLastSpanInTrace(final boolean lastSpanInTrace) {
65+
this.lastSpanInTrace = lastSpanInTrace;
66+
return this;
67+
}
68+
69+
MetaWriter forFirstSpanInPayload(final boolean firstSpanInPayload) {
70+
this.firstSpanInPayload = firstSpanInPayload;
6571
return this;
6672
}
6773

@@ -70,9 +76,8 @@ public void accept(Metadata metadata) {
7076
if (TAG_CACHE != null) TAG_CACHE.recalibrate();
7177
if (VALUE_CACHE != null) VALUE_CACHE.recalibrate();
7278

73-
final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk;
74-
final UTF8BytesString processTags =
75-
firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null;
79+
final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace;
80+
final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null;
7681
int metaSize =
7782
metadata.getBaggage().size()
7883
+ metadata.getTags().size()
@@ -301,12 +306,14 @@ public void map(List<? extends CoreSpan<?>> trace, final Writable writable) {
301306
span.processTagsAndBaggage(
302307
metaWriter
303308
.withWritable(writable)
304-
.forFirstSpanInChunk(i == 0)
305-
.forLastSpanInChunk(i == trace.size() - 1));
309+
.forFirstSpanInPayload(!firstSpanWritten)
310+
.forFirstSpanInTrace(i == 0)
311+
.forLastSpanInTrace(i == trace.size() - 1));
306312
if (!metaStruct.isEmpty()) {
307313
/* 13 */
308314
metaStructWriter.withWritable(writable).write(metaStruct);
309315
}
316+
firstSpanWritten = true;
310317
}
311318
}
312319

@@ -321,7 +328,9 @@ public int messageBufferSize() {
321328
}
322329

323330
@Override
324-
public void reset() {}
331+
public void reset() {
332+
firstSpanWritten = false;
333+
}
325334

326335
@Override
327336
public String endpoint() {

dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_5.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.communication.serialization.Writable;
88
import datadog.communication.serialization.WritableFormatter;
99
import datadog.communication.serialization.msgpack.MsgPackWriter;
10-
import datadog.trace.api.ProcessTags;
1110
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
1211
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
1312
import datadog.trace.common.writer.Payload;
@@ -33,6 +32,7 @@ public final class TraceMapperV0_5 implements TraceMapper {
3332

3433
private final MetaWriter metaWriter = new MetaWriter();
3534
private final int size;
35+
private boolean firstSpanWritten;
3636

3737
public TraceMapperV0_5() {
3838
this(2 << 20);
@@ -79,10 +79,12 @@ public void map(final List<? extends CoreSpan<?>> trace, final Writable writable
7979
span.processTagsAndBaggage(
8080
metaWriter
8181
.withWritable(writable)
82-
.forFirstSpanInChunk(i == 0)
83-
.forLastSpanInChunk(i == trace.size() - 1));
82+
.forFirstSpanInTrace(i == 0)
83+
.forLastSpanInTrace(i == trace.size() - 1)
84+
.forFirstSpanInPayload(!firstSpanWritten));
8485
/* 12 */
8586
writeDictionaryEncoded(writable, span.getType());
87+
firstSpanWritten = true;
8688
}
8789
}
8890

@@ -115,6 +117,7 @@ public int messageBufferSize() {
115117
public void reset() {
116118
dictionary.reset();
117119
encoding.clear();
120+
firstSpanWritten = false;
118121
}
119122

120123
@Override
@@ -181,29 +184,34 @@ private List<ByteBuffer> toList() {
181184
private final class MetaWriter implements MetadataConsumer {
182185

183186
private Writable writable;
184-
private boolean firstSpanInChunk;
185-
private boolean lastSpanInChunk;
187+
private boolean firstSpanInTrace;
188+
private boolean lastSpanInTrace;
189+
private boolean firstSpanInPayload;
186190

187191
MetaWriter withWritable(final Writable writable) {
188192
this.writable = writable;
189193
return this;
190194
}
191195

192-
MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) {
193-
this.firstSpanInChunk = firstSpanInChunk;
196+
MetaWriter forFirstSpanInTrace(final boolean firstSpanInTrace) {
197+
this.firstSpanInTrace = firstSpanInTrace;
194198
return this;
195199
}
196200

197-
MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) {
198-
this.lastSpanInChunk = lastSpanInChunk;
201+
MetaWriter forLastSpanInTrace(final boolean lastSpanInTrace) {
202+
this.lastSpanInTrace = lastSpanInTrace;
203+
return this;
204+
}
205+
206+
MetaWriter forFirstSpanInPayload(final boolean firstSpanInPayload) {
207+
this.firstSpanInPayload = firstSpanInPayload;
199208
return this;
200209
}
201210

202211
@Override
203212
public void accept(Metadata metadata) {
204-
final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk;
205-
final UTF8BytesString processTags =
206-
firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null;
213+
final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace;
214+
final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null;
207215
int metaSize =
208216
metadata.getBaggage().size()
209217
+ metadata.getTags().size()

dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterCombinedTest.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package datadog.trace.common.writer
22

3+
import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED
4+
35
import datadog.trace.api.DDSpanId
46
import datadog.trace.api.DDTraceId
7+
import datadog.trace.api.ProcessTags
58
import datadog.trace.api.StatsDClient
69
import datadog.trace.api.sampling.PrioritySampling
710
import datadog.trace.api.datastreams.NoopPathwayContext
@@ -194,6 +197,9 @@ class DDAgentWriterCombinedTest extends DDCoreSpecification {
194197
@Timeout(30)
195198
def "test default buffer size for #agentVersion"() {
196199
setup:
200+
// disable process tags since they are only written on the first span and it will break the trace size estimation
201+
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
202+
ProcessTags.reset()
197203
def api = Mock(DDAgentApi)
198204
def discovery = Mock(DDAgentFeaturesDiscovery)
199205
def writer = DDAgentWriter.builder()

dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV04PayloadTest.groovy

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class TraceMapperV04PayloadTest extends DDSpecification {
5555
if (!packer.format(trace, traceMapper)) {
5656
verifier.skipLargeTrace()
5757
tracesFitInBuffer = false
58+
// in the real like the mapper is always reset each trace.
59+
// here we need to force it when we fail since the buffer will be reset as well
60+
traceMapper.reset()
5861
}
5962
}
6063
packer.flush()
@@ -239,7 +242,7 @@ class TraceMapperV04PayloadTest extends DDSpecification {
239242
if (expectedTraces.isEmpty() && messageCount == 0) {
240243
return
241244
}
242-
boolean hasProcessTags = false
245+
int processTagsCount = 0
243246
try {
244247
Payload payload = mapper.newPayload().withBody(messageCount, buffer)
245248
payload.writeTo(this)
@@ -351,7 +354,7 @@ class TraceMapperV04PayloadTest extends DDSpecification {
351354
assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled())
352355
assertEquals(0, k)
353356
assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue())
354-
hasProcessTags = true
357+
processTagsCount++
355358
} else {
356359
Object tag = expectedSpan.getTag(entry.getKey())
357360
if (null != tag) {
@@ -379,10 +382,10 @@ class TraceMapperV04PayloadTest extends DDSpecification {
379382
} catch (IOException e) {
380383
Assertions.fail(e.getMessage())
381384
} finally {
382-
assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled()
383385
mapper.reset()
384386
captured.position(0)
385387
captured.limit(captured.capacity())
388+
assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0)
386389
}
387390
}
388391

dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV05PayloadTest.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
216216

217217
@Override
218218
void accept(int messageCount, ByteBuffer buffer) {
219-
def hasProcessTags = false
219+
def processTagsCount = 0
220220
try {
221221
Payload payload = mapper.newPayload().withBody(messageCount, buffer)
222222
payload.writeTo(this)
@@ -268,7 +268,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
268268
} else if(DDTags.ORIGIN_KEY.equals(entry.getKey())) {
269269
assertEquals(expectedSpan.getOrigin(), entry.getValue())
270270
} else if (DDTags.PROCESS_TAGS.equals(entry.getKey())) {
271-
hasProcessTags = true
271+
processTagsCount++
272272
assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled())
273273
assertEquals(0, k)
274274
assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue())
@@ -338,7 +338,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
338338
} catch (IOException e) {
339339
Assert.fail(e.getMessage())
340340
} finally {
341-
assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled()
341+
assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0)
342342
mapper.reset()
343343
captured.position(0)
344344
captured.limit(captured.capacity())

0 commit comments

Comments
 (0)