Skip to content

Commit affb61a

Browse files
Improve data streams performance (#7749)
* update queue used * remove deprecated features * only tag spans with schema definition sampled * fmt * add fields to reflect config
1 parent a1c2f48 commit affb61a

File tree

5 files changed

+35
-26
lines changed

5 files changed

+35
-26
lines changed

dd-java-agent/agent-bootstrap/src/main/resources/META-INF/native-image/com.datadoghq/dd-java-agent/reflect-config.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,5 +133,23 @@
133133
"fields": [
134134
{"name": "consumerIndex", "allowUnsafeAccess": true}
135135
]
136+
},
137+
{
138+
"name" : "datadog.jctools.queues.MpscArrayQueueProducerIndexField",
139+
"fields": [
140+
{"name": "producerIndex", "allowUnsafeAccess": true}
141+
]
142+
},
143+
{
144+
"name" : "datadog.jctools.queues.MpscArrayQueueProducerLimitField",
145+
"fields": [
146+
{"name": "producerLimit", "allowUnsafeAccess": true}
147+
]
148+
},
149+
{
150+
"name" : "datadog.jctools.queues.MpscArrayQueueConsumerIndexField",
151+
"fields": [
152+
{"name": "consumerIndex", "allowUnsafeAccess": true}
153+
]
136154
}
137155
]

dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/SchemaExtractor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,6 @@ public static void attachSchemaOnSpan(
143143
return;
144144
}
145145
AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring();
146-
span.setTag(DDTags.SCHEMA_TYPE, AVRO);
147-
span.setTag(DDTags.SCHEMA_NAME, schema.getFullName());
148-
span.setTag(DDTags.SCHEMA_OPERATION, operation);
149146

150147
if (!dsm.canSampleSchema(operation)) {
151148
return;
@@ -162,6 +159,9 @@ public static void attachSchemaOnSpan(
162159
}
163160

164161
Schema schemaData = SchemaExtractor.extractSchemas(schema);
162+
span.setTag(DDTags.SCHEMA_TYPE, AVRO);
163+
span.setTag(DDTags.SCHEMA_NAME, schema.getFullName());
164+
span.setTag(DDTags.SCHEMA_OPERATION, operation);
165165
span.setTag(DDTags.SCHEMA_DEFINITION, schemaData.definition);
166166
span.setTag(DDTags.SCHEMA_WEIGHT, weight);
167167
span.setTag(DDTags.SCHEMA_ID, schemaData.id);

dd-java-agent/instrumentation/protobuf/src/main/java/datadog/trace/instrumentation/protobuf_java/SchemaExtractor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,6 @@ public static void attachSchemaOnSpan(
183183
return;
184184
}
185185
AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring();
186-
span.setTag(DDTags.SCHEMA_TYPE, PROTOBUF);
187-
span.setTag(DDTags.SCHEMA_NAME, descriptor.getFullName());
188-
span.setTag(DDTags.SCHEMA_OPERATION, operation);
189186
// do a check against the schema sampler to avoid forcing the trace sampling decision too often.
190187
if (!dsm.canSampleSchema(operation)) {
191188
return;
@@ -200,6 +197,9 @@ public static void attachSchemaOnSpan(
200197
return;
201198
}
202199
Schema schema = SchemaExtractor.extractSchemas(descriptor);
200+
span.setTag(DDTags.SCHEMA_TYPE, PROTOBUF);
201+
span.setTag(DDTags.SCHEMA_NAME, descriptor.getFullName());
202+
span.setTag(DDTags.SCHEMA_OPERATION, operation);
203203
span.setTag(DDTags.SCHEMA_DEFINITION, schema.definition);
204204
span.setTag(DDTags.SCHEMA_WEIGHT, weight);
205205
span.setTag(DDTags.SCHEMA_ID, schema.id);

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package datadog.trace.core.datastreams;
22

33
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT;
4-
import static datadog.trace.api.DDTags.PATHWAY_HASH;
54
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
65
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
76
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
@@ -42,11 +41,10 @@
4241
import java.util.LinkedHashMap;
4342
import java.util.List;
4443
import java.util.Map;
45-
import java.util.concurrent.BlockingQueue;
4644
import java.util.concurrent.ConcurrentHashMap;
4745
import java.util.concurrent.TimeUnit;
4846
import java.util.function.Supplier;
49-
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
47+
import org.jctools.queues.MpscArrayQueue;
5048
import org.slf4j.Logger;
5149
import org.slf4j.LoggerFactory;
5250

@@ -61,7 +59,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
6159
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0);
6260

6361
private final Map<Long, StatsBucket> timeToBucket = new HashMap<>();
64-
private final BlockingQueue<InboxItem> inbox = new MpscBlockingConsumerArrayQueue<>(1024);
62+
private final MpscArrayQueue<InboxItem> inbox = new MpscArrayQueue<>(1024);
6563
private final DatastreamsPayloadWriter payloadWriter;
6664
private final DDAgentFeaturesDiscovery features;
6765
private final TimeSource timeSource;
@@ -240,9 +238,6 @@ public void setCheckpoint(
240238
PathwayContext pathwayContext = span.context().getPathwayContext();
241239
if (pathwayContext != null) {
242240
pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp, payloadSizeBytes);
243-
if (pathwayContext.getHash() != 0) {
244-
span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash()));
245-
}
246241
}
247242
}
248243

@@ -318,7 +313,11 @@ public void run() {
318313
Thread currentThread = Thread.currentThread();
319314
while (!currentThread.isInterrupted()) {
320315
try {
321-
InboxItem payload = inbox.take();
316+
InboxItem payload = inbox.poll();
317+
if (payload == null) {
318+
Thread.sleep(10);
319+
continue;
320+
}
322321

323322
if (payload == REPORT) {
324323
checkDynamicConfig();
@@ -350,8 +349,6 @@ public void run() {
350349
statsBucket.addBacklog(backlog);
351350
}
352351
}
353-
} catch (InterruptedException e) {
354-
currentThread.interrupt();
355352
} catch (Exception e) {
356353
log.debug("Error monitoring data streams", e);
357354
}

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -442,15 +442,9 @@ private long generateNodeHash(PathwayHashBuilder pathwayHashBuilder) {
442442
}
443443

444444
private long generatePathwayHash(long nodeHash, long parentHash) {
445-
lock.lock();
446-
try {
447-
outputBuffer.clear();
448-
outputBuffer.writeLongLE(nodeHash);
449-
outputBuffer.writeLongLE(parentHash);
450-
451-
return FNV64Hash.generateHash(outputBuffer.backingArray(), 0, 16, FNV64Hash.Version.v1);
452-
} finally {
453-
lock.unlock();
454-
}
445+
outputBuffer.clear();
446+
outputBuffer.writeLongLE(nodeHash);
447+
outputBuffer.writeLongLE(parentHash);
448+
return FNV64Hash.generateHash(outputBuffer.backingArray(), 0, 16, FNV64Hash.Version.v1);
455449
}
456450
}

0 commit comments

Comments
 (0)