Skip to content

Add peer tags, span kind and trace root flag to MetricKey bucket #9178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
private volatile String version;
private volatile String telemetryProxyEndpoint;
private volatile Set<String> peerTags = emptySet();
private volatile Set<String> spanKindsToComputedStats = emptySet();

private long lastTimeDiscovered;

Expand Down Expand Up @@ -127,7 +126,6 @@ private void reset() {
lastTimeDiscovered = 0;
telemetryProxyEndpoint = null;
peerTags = emptySet();
spanKindsToComputedStats = emptySet();
}

/** Run feature discovery, unconditionally. */
Expand Down Expand Up @@ -310,12 +308,6 @@ private boolean processInfoResponse(String response) {
peer_tags instanceof List
? unmodifiableSet(new HashSet<>((List<String>) peer_tags))
: emptySet();

Object span_kinds = map.get("span_kinds_stats_computed");
spanKindsToComputedStats =
span_kinds instanceof List
? unmodifiableSet(new HashSet<>((List<String>) span_kinds))
: emptySet();
}
try {
state = Strings.sha256(response);
Expand Down Expand Up @@ -377,10 +369,6 @@ public Set<String> peerTags() {
return peerTags;
}

public Set<String> spanKindsToComputedStats() {
return spanKindsToComputedStats;
}

public String getMetricsEndpoint() {
return metricsEndpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,6 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
"tablename",
"topicname"
)
features.spanKindsToComputedStats().containsAll(
"client",
"consumer",
"producer",
"server"
)
}

def "should send container id as header on the info request and parse the hash in the response"() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package datadog.trace.common.metrics;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.monitor.Monitoring;
import datadog.trace.api.WellKnownTags;
import datadog.trace.core.CoreSpan;
import datadog.trace.util.Strings;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@State(Scope.Benchmark)
@Warmup(iterations = 1, time = 30, timeUnit = SECONDS)
@Measurement(iterations = 3, time = 30, timeUnit = SECONDS)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
public class ConflatingMetricsAggregatorBenchmark {
private final DDAgentFeaturesDiscovery featuresDiscovery =
new FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
private final ConflatingMetricsAggregator aggregator =
new ConflatingMetricsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
new NullSink(),
2048,
2048);
private final List<CoreSpan<?>> spans = generateTrace(64);

static List<CoreSpan<?>> generateTrace(int len) {
final List<CoreSpan<?>> trace = new ArrayList<>();
for (int i = 0; i < len; i++) {
SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1);
span.setTag("peer.hostname", Strings.random(10));
trace.add(span);
}
return trace;
}

static class NullSink implements Sink {

@Override
public void register(EventListener listener) {}

@Override
public void accept(int messageCount, ByteBuffer buffer) {}
}

static class FixedAgentFeaturesDiscovery extends DDAgentFeaturesDiscovery {
private final Set<String> peerTags;
private final Set<String> spanKinds;

public FixedAgentFeaturesDiscovery(Set<String> peerTags, Set<String> spanKinds) {
// create a fixed discovery with metrics enabled
super(null, Monitoring.DISABLED, null, false, true);
this.peerTags = peerTags;
this.spanKinds = spanKinds;
}

@Override
public void discover() {
// do nothing
}

@Override
public boolean supportsMetrics() {
return true;
}

@Override
public Set<String> peerTags() {
return peerTags;
}
}

@Benchmark
public void benchmark(Blackhole blackhole) {
blackhole.consume(aggregator.publish(spans));
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package datadog.trace.common.metrics;

import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT;
import static datadog.trace.api.DDTags.BASE_SERVICE;
import static datadog.trace.api.Functions.UTF8_ENCODE;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER;
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT;
import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP;
import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.Collections.unmodifiableSet;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.Pair;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
Expand All @@ -24,14 +32,18 @@
import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.jctools.maps.NonBlockingHashMap;
import org.jctools.queues.MpscCompoundQueue;
import org.jctools.queues.SpmcArrayQueue;
Expand All @@ -48,8 +60,32 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private static final DDCache<String, UTF8BytesString> SERVICE_NAMES =
DDCaches.newFixedSizeCache(32);

private static final DDCache<CharSequence, UTF8BytesString> SPAN_KINDS =
DDCaches.newFixedSizeCache(16);
private static final DDCache<
String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>>
PEER_TAGS_CACHE =
DDCaches.newFixedSizeCache(
64); // it can be unbounded since those values are returned by the agent and should be
// under control. 64 entries is enough in this case to contain all the peer tags.
private static final Function<
String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>>
PEER_TAGS_CACHE_ADDER =
key ->
Pair.of(
DDCaches.newFixedSizeCache(512),
value -> UTF8BytesString.create(key + ":" + value));
Comment on lines +65 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 suggestion: ‏Can't we make a dedicated type for the cache and creator rather than using a Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>? It feels hard to read.

Because it always end up calling cache.computeIfAbsent(key, creator), so the whole Pair<...> thing can be simplified as a functional interface like: Function<String, UTF8BytesString>.

private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
unmodifiableSet(
new HashSet<>(
Arrays.asList(
SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER)));

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION =
unmodifiableSet(new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER)));

private final Set<String> ignoredResources;
private final Queue<Batch> batchPool;
private final NonBlockingHashMap<MetricKey, Batch> pending;
Expand Down Expand Up @@ -238,18 +274,23 @@ private boolean shouldComputeMetric(CoreSpan<?> span) {
private boolean spanKindEligible(CoreSpan<?> span) {
final Object spanKind = span.getTag(SPAN_KIND);
// use toString since it could be a CharSequence...
return spanKind != null && features.spanKindsToComputedStats().contains(spanKind.toString());
return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
}

private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
final CharSequence spanKind = span.getTag(SPAN_KIND, "");
MetricKey newKey =
new MetricKey(
span.getResourceName(),
SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE),
span.getOperationName(),
span.getType(),
span.getHttpStatusCode(),
isSynthetic(span));
isSynthetic(span),
span.isTopLevel(),
SPAN_KINDS.computeIfAbsent(
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
getPeerTags(span, spanKind.toString()));
boolean isNewKey = false;
MetricKey key = keys.putIfAbsent(newKey, newKey);
if (null == key) {
Expand All @@ -264,7 +305,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
// returning false means that either the batch can't take any
// more data, or it has already been consumed
if (batch.add(tag, durationNanos)) {
// added to a pending batch prior to consumption
// added to a pending batch prior to consumption,
// so skip publishing to the queue (we also know
// the key isn't rare enough to override the sampler)
return false;
Expand All @@ -284,6 +325,34 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
return isNewKey || span.getError() > 0;
}

private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
List<UTF8BytesString> peerTags = new ArrayList<>();
for (String peerTag : features.peerTags()) {
Object value = span.getTag(peerTag);
if (value != null) {
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
peerTags.add(
cacheAndCreator
.getLeft()
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
}
}
return peerTags;
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
// in this case only the base service should be aggregated if present
final String baseService = span.getTag(BASE_SERVICE);
if (baseService != null) {
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
return Collections.singletonList(
cacheAndCreator.getLeft().computeIfAbsent(baseService, cacheAndCreator.getRight()));
}
}
return Collections.emptyList();
}

private static boolean isSynthetic(CoreSpan<?> span) {
return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY;

import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import java.util.Collections;
import java.util.List;

/** The aggregation key for tracked metrics. */
public final class MetricKey {
Expand All @@ -13,25 +15,43 @@ public final class MetricKey {
private final int httpStatusCode;
private final boolean synthetics;
private final int hash;
private final boolean isTraceRoot;
private final UTF8BytesString spanKind;
private final List<UTF8BytesString> peerTags;

public MetricKey(
CharSequence resource,
CharSequence service,
CharSequence operationName,
CharSequence type,
int httpStatusCode,
boolean synthetics) {
boolean synthetics,
boolean isTraceRoot,
CharSequence spanKind,
List<UTF8BytesString> peerTags) {
this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource);
this.service = null == service ? EMPTY : UTF8BytesString.create(service);
this.operationName = null == operationName ? EMPTY : UTF8BytesString.create(operationName);
this.type = null == type ? EMPTY : UTF8BytesString.create(type);
this.httpStatusCode = httpStatusCode;
this.synthetics = synthetics;
// unrolled polynomial hashcode which avoids allocating varargs
// the constants are 31^5, 31^4, 31^3, 31^2, 31^1, 31^0
this.isTraceRoot = isTraceRoot;
this.spanKind = null == spanKind ? EMPTY : UTF8BytesString.create(spanKind);
this.peerTags = peerTags == null ? Collections.emptyList() : peerTags;

// Unrolled polynomial hashcode to avoid varargs allocation
// and eliminate data dependency between iterations as in Arrays.hashCode.
// Coefficient constants are powers of 31, with integer overflow (hence negative numbers).
// See
// https://richardstartin.github.io/posts/collecting-rocks-and-benchmarks
// https://richardstartin.github.io/posts/still-true-in-java-9-handwritten-hash-codes-are-faster

this.hash =
28629151 * this.resource.hashCode()
+ 923521 * this.service.hashCode()
-196513505 * Boolean.hashCode(this.isTraceRoot)
+ -1807454463 * this.spanKind.hashCode()
+ 887_503_681 * this.peerTags.hashCode() // possibly unroll here has well.
+ 28_629_151 * this.resource.hashCode()
+ 923_521 * this.service.hashCode()
+ 29791 * this.operationName.hashCode()
+ 961 * this.type.hashCode()
+ 31 * httpStatusCode
Expand Down Expand Up @@ -62,6 +82,18 @@ public boolean isSynthetics() {
return synthetics;
}

public boolean isTraceRoot() {
return isTraceRoot;
}

public UTF8BytesString getSpanKind() {
return spanKind;
}

public List<UTF8BytesString> getPeerTags() {
return peerTags;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -75,7 +107,10 @@ public boolean equals(Object o) {
&& resource.equals(metricKey.resource)
&& service.equals(metricKey.service)
&& operationName.equals(metricKey.operationName)
&& type.equals(metricKey.type);
&& type.equals(metricKey.type)
&& isTraceRoot == metricKey.isTraceRoot
&& spanKind.equals(metricKey.spanKind)
&& peerTags.equals(metricKey.peerTags);
}
return false;
}
Expand Down
Loading
Loading