Skip to content

Commit 318a10e

Browse files
committed
revisit peer tags aggregation rules according to the rfc
1 parent 877e8cf commit 318a10e

File tree

2 files changed

+88
-17
lines changed

2 files changed

+88
-17
lines changed

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package datadog.trace.common.metrics;
22

33
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT;
4+
import static datadog.trace.api.DDTags.BASE_SERVICE;
45
import static datadog.trace.api.Functions.UTF8_ENCODE;
56
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
67
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
78
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
9+
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL;
810
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;
911
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER;
1012
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
@@ -75,12 +77,15 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
7577
value -> UTF8BytesString.create(key + ":" + value));
7678
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
7779

78-
private static final Set<String> ELIGIBLE_SPAN_KINDS =
80+
private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
7981
unmodifiableSet(
8082
new HashSet<>(
8183
Arrays.asList(
8284
SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER)));
8385

86+
private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION =
87+
unmodifiableSet(new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER)));
88+
8489
private final Set<String> ignoredResources;
8590
private final Queue<Batch> batchPool;
8691
private final NonBlockingHashMap<MetricKey, Batch> pending;
@@ -269,10 +274,11 @@ private boolean shouldComputeMetric(CoreSpan<?> span) {
269274
private boolean spanKindEligible(CoreSpan<?> span) {
270275
final Object spanKind = span.getTag(SPAN_KIND);
271276
// use toString since it could be a CharSequence...
272-
return spanKind != null && ELIGIBLE_SPAN_KINDS.contains(spanKind.toString());
277+
return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
273278
}
274279

275280
private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
281+
final CharSequence spanKind = span.getTag(SPAN_KIND, "");
276282
MetricKey newKey =
277283
new MetricKey(
278284
span.getResourceName(),
@@ -282,8 +288,9 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
282288
span.getHttpStatusCode(),
283289
isSynthetic(span),
284290
span.isTopLevel(),
285-
SPAN_KINDS.computeIfAbsent(span.getTag(SPAN_KIND, ""), UTF8BytesString::create),
286-
getPeerTags(span));
291+
SPAN_KINDS.computeIfAbsent(
292+
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
293+
getPeerTags(span, spanKind.toString()));
287294
boolean isNewKey = false;
288295
MetricKey key = keys.putIfAbsent(newKey, newKey);
289296
if (null == key) {
@@ -318,17 +325,32 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
318325
return isNewKey || span.getError() > 0;
319326
}
320327

321-
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span) {
322-
List<UTF8BytesString> peerTags = new ArrayList<>();
323-
for (String peerTag : features.peerTags()) {
324-
Object value = span.getTag(peerTag);
325-
if (value != null) {
326-
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>> pair =
327-
PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
328-
peerTags.add(pair.getLeft().computeIfAbsent(value.toString(), pair.getRight()));
328+
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
329+
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
330+
List<UTF8BytesString> peerTags = new ArrayList<>();
331+
for (String peerTag : features.peerTags()) {
332+
Object value = span.getTag(peerTag);
333+
if (value != null) {
334+
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
335+
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
336+
peerTags.add(
337+
cacheAndCreator
338+
.getLeft()
339+
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
340+
}
341+
}
342+
return peerTags;
343+
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
344+
// in this case only the base service should be aggregated if present
345+
final String baseService = span.getTag(BASE_SERVICE);
346+
if (baseService != null) {
347+
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
348+
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
349+
return Collections.singletonList(
350+
cacheAndCreator.getLeft().computeIfAbsent(baseService, cacheAndCreator.getRight()));
329351
}
330352
}
331-
return peerTags;
353+
return Collections.emptyList();
332354
}
333355

334356
private static boolean isSynthetic(CoreSpan<?> span) {

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,9 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
201201
CountDownLatch latch = new CountDownLatch(1)
202202
aggregator.publish([
203203
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
204-
.setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe"),
204+
.setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"),
205205
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
206-
.setTag(SPAN_KIND, "grault").setTag("country", "france").setTag("georegion", "europe")
206+
.setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe")
207207
])
208208
aggregator.report()
209209
def latchTriggered = latch.await(2, SECONDS)
@@ -220,7 +220,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
220220
HTTP_OK,
221221
false,
222222
false,
223-
"grault",
223+
"client",
224224
[UTF8BytesString.create("country:france")]
225225
), { AggregateMetric aggregateMetric ->
226226
aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
@@ -234,7 +234,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
234234
HTTP_OK,
235235
false,
236236
false,
237-
"grault",
237+
"client",
238238
[UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")]
239239
), { AggregateMetric aggregateMetric ->
240240
aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
@@ -245,6 +245,55 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
245245
aggregator.close()
246246
}
247247

248+
def "should aggregate the right peer tags for kind #kind"() {
249+
setup:
250+
MetricWriter writer = Mock(MetricWriter)
251+
Sink sink = Stub(Sink)
252+
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
253+
features.supportsMetrics() >> true
254+
features.peerTags() >> ["peer.hostname", "_dd.base_service"]
255+
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
256+
features, sink, writer, 10, queueSize, reportingInterval, SECONDS)
257+
aggregator.start()
258+
259+
when:
260+
CountDownLatch latch = new CountDownLatch(1)
261+
aggregator.publish([
262+
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK)
263+
.setTag(SPAN_KIND, kind).setTag("peer.hostname", "localhost").setTag("_dd.base_service", "test")
264+
])
265+
aggregator.report()
266+
def latchTriggered = latch.await(2, SECONDS)
267+
268+
then:
269+
latchTriggered
270+
1 * writer.startBucket(1, _, _)
271+
1 * writer.add(
272+
new MetricKey(
273+
"resource",
274+
"service",
275+
"operation",
276+
"type",
277+
HTTP_OK,
278+
false,
279+
false,
280+
kind,
281+
expectedPeerTags
282+
), { AggregateMetric aggregateMetric ->
283+
aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100
284+
})
285+
1 * writer.finishBucket() >> { latch.countDown() }
286+
287+
cleanup:
288+
aggregator.close()
289+
290+
where:
291+
kind | expectedPeerTags
292+
"client" | [UTF8BytesString.create("peer.hostname:localhost"), UTF8BytesString.create("_dd.base_service:test")]
293+
"internal" | [UTF8BytesString.create("_dd.base_service:test")]
294+
"server" | []
295+
}
296+
248297
def "measured spans do not contribute to top level count"() {
249298
setup:
250299
MetricWriter writer = Mock(MetricWriter)

0 commit comments

Comments
 (0)