Skip to content

Commit 329a2e5

Browse files
committed
revisit peer tags aggregation rules according to the rfc
1 parent 5832565 commit 329a2e5

File tree

2 files changed

+88
-17
lines changed

2 files changed

+88
-17
lines changed

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package datadog.trace.common.metrics;
22

33
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT;
4+
import static datadog.trace.api.DDTags.BASE_SERVICE;
45
import static datadog.trace.api.Functions.UTF8_ENCODE;
56
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
67
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
78
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
9+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL;
810
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;
911
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER;
1012
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
@@ -76,12 +78,15 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
7678
value -> UTF8BytesString.create(key + ":" + value));
7779
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
7880

79-
private static final Set<String> ELIGIBLE_SPAN_KINDS =
81+
private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
8082
unmodifiableSet(
8183
new HashSet<>(
8284
Arrays.asList(
8385
SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER)));
8486

87+
private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION =
88+
unmodifiableSet(new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER)));
89+
8590
private final Set<String> ignoredResources;
8691
private final Queue<Batch> batchPool;
8792
private final NonBlockingHashMap<MetricKey, Batch> pending;
@@ -293,10 +298,11 @@ private boolean shouldComputeMetric(CoreSpan<?> span) {
293298
private boolean spanKindEligible(CoreSpan<?> span) {
294299
final Object spanKind = span.getTag(SPAN_KIND);
295300
// use toString since it could be a CharSequence...
296-
return spanKind != null && ELIGIBLE_SPAN_KINDS.contains(spanKind.toString());
301+
return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
297302
}
298303

299304
private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
305+
final CharSequence spanKind = span.getTag(SPAN_KIND, "");
300306
MetricKey newKey =
301307
new MetricKey(
302308
span.getResourceName(),
@@ -306,8 +312,9 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
306312
span.getHttpStatusCode(),
307313
isSynthetic(span),
308314
span.isTopLevel(),
309-
SPAN_KINDS.computeIfAbsent(span.getTag(SPAN_KIND, ""), UTF8BytesString::create),
310-
getPeerTags(span));
315+
SPAN_KINDS.computeIfAbsent(
316+
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
317+
getPeerTags(span, spanKind.toString()));
311318
boolean isNewKey = false;
312319
MetricKey key = keys.putIfAbsent(newKey, newKey);
313320
if (null == key) {
@@ -342,17 +349,32 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
342349
return isNewKey || span.getError() > 0;
343350
}
344351

345-
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span) {
346-
List<UTF8BytesString> peerTags = new ArrayList<>();
347-
for (String peerTag : features.peerTags()) {
348-
Object value = span.getTag(peerTag);
349-
if (value != null) {
350-
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>> pair =
351-
PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
352-
peerTags.add(pair.getLeft().computeIfAbsent(value.toString(), pair.getRight()));
352+
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
353+
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
354+
List<UTF8BytesString> peerTags = new ArrayList<>();
355+
for (String peerTag : features.peerTags()) {
356+
Object value = span.getTag(peerTag);
357+
if (value != null) {
358+
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
359+
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
360+
peerTags.add(
361+
cacheAndCreator
362+
.getLeft()
363+
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
364+
}
365+
}
366+
return peerTags;
367+
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
368+
// in this case only the base service should be aggregated if present
369+
final String baseService = span.getTag(BASE_SERVICE);
370+
if (baseService != null) {
371+
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
372+
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
373+
return Collections.singletonList(
374+
cacheAndCreator.getLeft().computeIfAbsent(baseService, cacheAndCreator.getRight()));
353375
}
354376
}
355-
return peerTags;
377+
return Collections.emptyList();
356378
}
357379

358380
private static boolean isSynthetic(CoreSpan<?> span) {

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
204204
CountDownLatch latch = new CountDownLatch(1)
205205
aggregator.publish([
206206
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
207-
.setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe"),
207+
.setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"),
208208
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
209-
.setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe")
209+
.setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe")
210210
])
211211
aggregator.report()
212212
def latchTriggered = latch.await(2, SECONDS)
@@ -223,7 +223,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
223223
HTTP_OK,
224224
false,
225225
false,
226-
"grault",
226+
"client",
227227
[UTF8BytesString.create("country:france")]
228228
), { AggregateMetric aggregateMetric ->
229229
aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
@@ -237,7 +237,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
237237
HTTP_OK,
238238
false,
239239
false,
240-
"grault",
240+
"client",
241241
[UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")]
242242
), { AggregateMetric aggregateMetric ->
243243
aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
@@ -248,6 +248,55 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
248248
aggregator.close()
249249
}
250250

251+
def "should aggregate the right peer tags for kind #kind"() {
252+
setup:
253+
MetricWriter writer = Mock(MetricWriter)
254+
Sink sink = Stub(Sink)
255+
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
256+
features.supportsMetrics() >> true
257+
features.peerTags() >> ["peer.hostname", "_dd.base_service"]
258+
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
259+
features, sink, writer, 10, queueSize, reportingInterval, SECONDS)
260+
aggregator.start()
261+
262+
when:
263+
CountDownLatch latch = new CountDownLatch(1)
264+
aggregator.publish([
265+
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
266+
.setTag(SPAN_KIND, kind).setTag("peer.hostname", "localhost").setTag("_dd.base_service", "test")
267+
])
268+
aggregator.report()
269+
def latchTriggered = latch.await(2, SECONDS)
270+
271+
then:
272+
latchTriggered
273+
1 * writer.startBucket(1, _, _)
274+
1 * writer.add(
275+
new MetricKey(
276+
"resource",
277+
"service",
278+
"operation",
279+
"type",
280+
HTTP_OK,
281+
false,
282+
false,
283+
kind,
284+
expectedPeerTags
285+
), { AggregateMetric aggregateMetric ->
286+
aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
287+
})
288+
1 * writer.finishBucket() >> { latch.countDown() }
289+
290+
cleanup:
291+
aggregator.close()
292+
293+
where:
294+
kind | expectedPeerTags
295+
"client" | [UTF8BytesString.create("peer.hostname:localhost"), UTF8BytesString.create("_dd.base_service:test")]
296+
"internal" | [UTF8BytesString.create("_dd.base_service:test")]
297+
"server" | []
298+
}
299+
251300
def "measured spans do not contribute to top level count"() {
252301
setup:
253302
MetricWriter writer = Mock(MetricWriter)

0 commit comments

Comments
 (0)