diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index b527b08e13f..3ce8cecbbd4 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -92,7 +92,6 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy { private volatile String version; private volatile String telemetryProxyEndpoint; private volatile Set peerTags = emptySet(); - private volatile Set spanKindsToComputedStats = emptySet(); private long lastTimeDiscovered; @@ -127,7 +126,6 @@ private void reset() { lastTimeDiscovered = 0; telemetryProxyEndpoint = null; peerTags = emptySet(); - spanKindsToComputedStats = emptySet(); } /** Run feature discovery, unconditionally. */ @@ -310,12 +308,6 @@ private boolean processInfoResponse(String response) { peer_tags instanceof List ? unmodifiableSet(new HashSet<>((List) peer_tags)) : emptySet(); - - Object span_kinds = map.get("span_kinds_stats_computed"); - spanKindsToComputedStats = - span_kinds instanceof List - ? unmodifiableSet(new HashSet<>((List) span_kinds)) - : emptySet(); } try { state = Strings.sha256(response); @@ -377,10 +369,6 @@ public Set peerTags() { return peerTags; } - public Set spanKindsToComputedStats() { - return spanKindsToComputedStats; - } - public String getMetricsEndpoint() { return metricsEndpoint; } diff --git a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy index 1ee382f5775..5ac884e0814 100644 --- a/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy +++ b/communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy @@ -462,12 +462,6 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification { "tablename", "topicname" ) - features.spanKindsToComputedStats().containsAll( - "client", - "consumer", - "producer", - "server" - ) } def "should send container id as header on the info request and parse the hash in the response"() { diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java new file mode 100644 index 00000000000..cc3d037498d --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -0,0 +1,97 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.monitor.Monitoring; +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.util.Strings; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@State(Scope.Benchmark) +@Warmup(iterations = 1, time = 30, timeUnit = SECONDS) +@Measurement(iterations = 3, time = 30, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class ConflatingMetricsAggregatorBenchmark { + private final DDAgentFeaturesDiscovery featuresDiscovery = + new FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()); + private final ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + new NullSink(), + 2048, + 2048); + private final List> spans = generateTrace(64); + + static List> generateTrace(int len) { + final List> trace = new ArrayList<>(); + for (int i = 0; i < len; i++) { + SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1); + span.setTag("peer.hostname", Strings.random(10)); + trace.add(span); + } + return trace; + } + + static class NullSink implements Sink { + + @Override + public void register(EventListener listener) {} + + @Override + public void accept(int messageCount, ByteBuffer buffer) {} + } + + static class FixedAgentFeaturesDiscovery extends DDAgentFeaturesDiscovery { + private final Set peerTags; + private final Set spanKinds; + + public FixedAgentFeaturesDiscovery(Set peerTags, Set spanKinds) { + // create a fixed discovery with metrics enabled + super(null, Monitoring.DISABLED, null, false, true); + this.peerTags = peerTags; + this.spanKinds = spanKinds; + } + + @Override + public void discover() { + // do nothing + } + + @Override + public boolean supportsMetrics() { + return true; + } + + @Override + public Set peerTags() { + return peerTags; + } + } + + @Benchmark + public void benchmark(Blackhole blackhole) { + blackhole.consume(aggregator.publish(spans)); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 1280c0acf0d..b26fb616ae2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -1,8 +1,14 @@ package datadog.trace.common.metrics; import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT; +import static datadog.trace.api.DDTags.BASE_SERVICE; import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; @@ -10,11 +16,13 @@ import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; +import static java.util.Collections.unmodifiableSet; import static java.util.concurrent.TimeUnit.SECONDS; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; @@ -24,7 +32,10 @@ import datadog.trace.core.CoreSpan; import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.util.AgentTaskScheduler; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; @@ -32,6 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.maps.NonBlockingHashMap; import org.jctools.queues.MpscCompoundQueue; import org.jctools.queues.SpmcArrayQueue; @@ -48,8 +60,32 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); + private static final DDCache SPAN_KINDS = + DDCaches.newFixedSizeCache(16); + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = + DDCaches.newFixedSizeCache( + 64); // it can be unbounded since those values are returned by the agent and should be + // under control. 64 entries is enough in this case to contain all the peer tags. + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; + private static final Set ELIGIBLE_SPAN_KINDS_FOR_METRICS = + unmodifiableSet( + new HashSet<>( + Arrays.asList( + SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER))); + + private static final Set ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = + unmodifiableSet(new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER))); + private final Set ignoredResources; private final Queue batchPool; private final NonBlockingHashMap pending; @@ -238,10 +274,11 @@ private boolean shouldComputeMetric(CoreSpan span) { private boolean spanKindEligible(CoreSpan span) { final Object spanKind = span.getTag(SPAN_KIND); // use toString since it could be a CharSequence... - return spanKind != null && features.spanKindsToComputedStats().contains(spanKind.toString()); + return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); } private boolean publish(CoreSpan span, boolean isTopLevel) { + final CharSequence spanKind = span.getTag(SPAN_KIND, ""); MetricKey newKey = new MetricKey( span.getResourceName(), @@ -249,7 +286,11 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getOperationName(), span.getType(), span.getHttpStatusCode(), - isSynthetic(span)); + isSynthetic(span), + span.isTopLevel(), + SPAN_KINDS.computeIfAbsent( + spanKind, UTF8BytesString::create), // save repeated utf8 conversions + getPeerTags(span, spanKind.toString())); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); if (null == key) { @@ -264,7 +305,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { // returning false means that either the batch can't take any // more data, or it has already been consumed if (batch.add(tag, durationNanos)) { - // added to a pending batch prior to consumption + // added to a pending batch prior to consumption, // so skip publishing to the queue (we also know // the key isn't rare enough to override the sampler) return false; @@ -284,6 +325,34 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } + private List getPeerTags(CoreSpan span, String spanKind) { + if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { + List peerTags = new ArrayList<>(); + for (String peerTag : features.peerTags()) { + Object value = span.getTag(peerTag); + if (value != null) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); + peerTags.add( + cacheAndCreator + .getLeft() + .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + } + } + return peerTags; + } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { + // in this case only the base service should be aggregated if present + final String baseService = span.getTag(BASE_SERVICE); + if (baseService != null) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); + return Collections.singletonList( + cacheAndCreator.getLeft().computeIfAbsent(baseService, cacheAndCreator.getRight())); + } + } + return Collections.emptyList(); + } + private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java index 4ba23db6d57..73aca7d6daf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java @@ -3,6 +3,8 @@ import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.Collections; +import java.util.List; /** The aggregation key for tracked metrics. */ public final class MetricKey { @@ -13,6 +15,9 @@ public final class MetricKey { private final int httpStatusCode; private final boolean synthetics; private final int hash; + private final boolean isTraceRoot; + private final UTF8BytesString spanKind; + private final List peerTags; public MetricKey( CharSequence resource, @@ -20,18 +25,33 @@ public MetricKey( CharSequence operationName, CharSequence type, int httpStatusCode, - boolean synthetics) { + boolean synthetics, + boolean isTraceRoot, + CharSequence spanKind, + List peerTags) { this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource); this.service = null == service ? EMPTY : UTF8BytesString.create(service); this.operationName = null == operationName ? EMPTY : UTF8BytesString.create(operationName); this.type = null == type ? EMPTY : UTF8BytesString.create(type); this.httpStatusCode = httpStatusCode; this.synthetics = synthetics; - // unrolled polynomial hashcode which avoids allocating varargs - // the constants are 31^5, 31^4, 31^3, 31^2, 31^1, 31^0 + this.isTraceRoot = isTraceRoot; + this.spanKind = null == spanKind ? EMPTY : UTF8BytesString.create(spanKind); + this.peerTags = peerTags == null ? Collections.emptyList() : peerTags; + + // Unrolled polynomial hashcode to avoid varargs allocation + // and eliminate data dependency between iterations as in Arrays.hashCode. + // Coefficient constants are powers of 31, with integer overflow (hence negative numbers). + // See + // https://richardstartin.github.io/posts/collecting-rocks-and-benchmarks + // https://richardstartin.github.io/posts/still-true-in-java-9-handwritten-hash-codes-are-faster + this.hash = - 28629151 * this.resource.hashCode() - + 923521 * this.service.hashCode() + -196513505 * Boolean.hashCode(this.isTraceRoot) + + -1807454463 * this.spanKind.hashCode() + + 887_503_681 * this.peerTags.hashCode() // possibly unroll here has well. + + 28_629_151 * this.resource.hashCode() + + 923_521 * this.service.hashCode() + 29791 * this.operationName.hashCode() + 961 * this.type.hashCode() + 31 * httpStatusCode @@ -62,6 +82,18 @@ public boolean isSynthetics() { return synthetics; } + public boolean isTraceRoot() { + return isTraceRoot; + } + + public UTF8BytesString getSpanKind() { + return spanKind; + } + + public List getPeerTags() { + return peerTags; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -75,7 +107,10 @@ public boolean equals(Object o) { && resource.equals(metricKey.resource) && service.equals(metricKey.service) && operationName.equals(metricKey.operationName) - && type.equals(metricKey.type); + && type.equals(metricKey.type) + && isTraceRoot == metricKey.isTraceRoot + && spanKind.equals(metricKey.spanKind) + && peerTags.equals(metricKey.peerTags); } return false; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 485b3e90bbc..964c51b2cbf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -8,6 +8,7 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.List; public final class SerializingMetricWriter implements MetricWriter { @@ -31,6 +32,9 @@ public final class SerializingMetricWriter implements MetricWriter { private static final byte[] OK_SUMMARY = "OkSummary".getBytes(ISO_8859_1); private static final byte[] ERROR_SUMMARY = "ErrorSummary".getBytes(ISO_8859_1); private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1); + private static final byte[] IS_TRACE_ROOT = "IsTraceRoot".getBytes(ISO_8859_1); + private static final byte[] SPAN_KIND = "SpanKind".getBytes(ISO_8859_1); + private static final byte[] PEER_TAGS = "PeerTags".getBytes(ISO_8859_1); private final WellKnownTags wellKnownTags; private final WritableFormatter writer; @@ -93,8 +97,7 @@ public void startBucket(int metricCount, long start, long duration) { @Override public void add(MetricKey key, AggregateMetric aggregate) { - - writer.startMap(12); + writer.startMap(15); writer.writeUTF8(NAME); writer.writeUTF8(key.getOperationName()); @@ -114,6 +117,20 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(SYNTHETICS); writer.writeBoolean(key.isSynthetics()); + writer.writeUTF8(IS_TRACE_ROOT); + writer.writeBoolean(key.isTraceRoot()); + + writer.writeUTF8(SPAN_KIND); + writer.writeUTF8(key.getSpanKind()); + + writer.writeUTF8(PEER_TAGS); + final List peerTags = key.getPeerTags(); + writer.startArray(peerTags.size()); + + for (UTF8BytesString peerTag : peerTags) { + writer.writeUTF8(peerTag); + } + writer.writeUTF8(HITS); writer.writeInt(aggregate.getHitCount()); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy index f93e3af1540..3c7a247cae3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy @@ -1,5 +1,6 @@ package datadog.trace.common.metrics +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.test.util.DDSpecification import java.util.concurrent.BlockingDeque @@ -51,7 +52,7 @@ class AggregateMetricTest extends DDSpecification { given: AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG)) - Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false)) + Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")])) batch.add(0L, 10) batch.add(0L, 10) batch.add(0L, 10) @@ -126,7 +127,7 @@ class AggregateMetricTest extends DDSpecification { def "consistent under concurrent attempts to read and write"() { given: AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false) + MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")]) BlockingDeque queue = new LinkedBlockingDeque<>(1000) ExecutorService reader = Executors.newSingleThreadExecutor() int writerCount = 10 diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 3e2875c2993..676bbcfff14 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -99,22 +99,36 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: CountDownLatch latch = new CountDownLatch(1) - aggregator.publish([new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK)]) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "baz") + ]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 - } + 1 * writer.add(new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) >> { MetricKey key, AggregateMetric value -> + value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 + } 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -127,15 +141,15 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.spanKindsToComputedStats() >> ["client", "server", "producer", "consumer"] + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: CountDownLatch latch = new CountDownLatch(1) - def span = Spy(new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK)) - span.getTag(SPAN_KIND) >> kind + def span = new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, kind) aggregator.publish([span]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) @@ -143,9 +157,20 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered == statsComputed (statsComputed ? 1 : 0) * writer.startBucket(1, _, _) - (statsComputed ? 1 : 0) * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == 0 && value.getDuration() == 100 - } + (statsComputed ? 1 : 0) * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + kind, + [] + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -154,17 +179,128 @@ class ConflatingMetricAggregatorTest extends DDSpecification { where: kind | statsComputed "client" | true + "producer" | true + "consumer" | true UTF8BytesString.create("server") | true "internal" | false null | false } + def "should create bucket for each set of peer tags"() { + setup: + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.peerTags() >>> [["country"], ["country", "georegion"],] + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"), + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe") + ]) + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered + 1 * writer.startBucket(2, _, _) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + "client", + [UTF8BytesString.create("country:france")] + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + "client", + [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")] + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.finishBucket() >> { latch.countDown() } + + cleanup: + aggregator.close() + } + + def "should aggregate the right peer tags for kind #kind"() { + setup: + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.peerTags() >> ["peer.hostname", "_dd.base_service"] + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + aggregator.publish([ + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, kind).setTag("peer.hostname", "localhost").setTag("_dd.base_service", "test") + ]) + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered + 1 * writer.startBucket(1, _, _) + 1 * writer.add( + new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + kind, + expectedPeerTags + ), { AggregateMetric aggregateMetric -> + aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 + }) + 1 * writer.finishBucket() >> { latch.countDown() } + + cleanup: + aggregator.close() + + where: + kind | expectedPeerTags + "client" | [UTF8BytesString.create("peer.hostname:localhost"), UTF8BytesString.create("_dd.base_service:test")] + "internal" | [UTF8BytesString.create("_dd.base_service:test")] + "server" | [] + } + def "measured spans do not contribute to top level count"() { setup: MetricWriter writer = Mock(MetricWriter) Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() @@ -173,6 +309,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { CountDownLatch latch = new CountDownLatch(1) aggregator.publish([ new SimpleSpan("service", "operation", "resource", "type", measured, topLevel, false, 0, 100, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) @@ -180,9 +317,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { then: latchTriggered 1 * writer.startBucket(1, _, _) - 1 * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 - } + 1 * writer.add(new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + topLevel, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 + }) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -201,13 +348,14 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, 10, queueSize, reportingInterval, SECONDS) long duration = 100 List trace = [ - new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK), - new SimpleSpan("service1", "operation1", "resource1", "type", false, false, false, 0, 0, HTTP_OK), - new SimpleSpan("service2", "operation2", "resource2", "type", true, false, false, 0, duration * 2, HTTP_OK) + new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"), + new SimpleSpan("service1", "operation1", "resource1", "type", false, false, false, 0, 0, HTTP_OK).setTag(SPAN_KIND, "baz"), + new SimpleSpan("service2", "operation2", "resource2", "type", true, false, false, 0, duration * 2, HTTP_OK).setTag(SPAN_KIND, "baz") ] aggregator.start() @@ -224,12 +372,32 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.finishBucket() >> { latch.countDown() } 1 * writer.startBucket(2, _, SECONDS.toNanos(reportingInterval)) - 1 * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration - } - 1 * writer.add(new MetricKey("resource2", "service2", "operation2", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == count && value.getDuration() == count * duration * 2 - } + 1 * writer.add(new MetricKey( + "resource", + "service", + "operation", + "type", + HTTP_OK, + false, + false, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == count && value.getDuration() == count * duration + }) + 1 * writer.add(new MetricKey( + "resource2", + "service2", + "operation2", + "type", + HTTP_OK, + false, + false, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == count && value.getDuration() == count * duration * 2 + }) cleanup: aggregator.close() @@ -245,6 +413,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 @@ -255,6 +424,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 11; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) } aggregator.report() @@ -264,11 +434,31 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(10, _, SECONDS.toNanos(reportingInterval)) for (int i = 1; i < 11; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) >> { MetricKey key, AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + } } - 0 * writer.add(new MetricKey("resource", "service0", "operation", "type", HTTP_OK, false), _) + 0 * writer.add(new MetricKey( + "resource", + "service0", + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -282,6 +472,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 @@ -292,6 +483,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) } aggregator.report() @@ -301,9 +493,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } 1 * writer.finishBucket() >> { latch.countDown() } @@ -312,6 +514,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 1; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "baz") ]) } aggregator.report() @@ -321,11 +524,31 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(4, _, SECONDS.toNanos(reportingInterval)) for (int i = 1; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } - 0 * writer.add(new MetricKey("resource", "service0", "operation", "type", HTTP_OK, false), _) + 0 * writer.add(new MetricKey( + "resource", + "service0", + "operation", + "type", + HTTP_OK, + false, + true, + "baz", + [] + ), _) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -339,6 +562,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 @@ -349,6 +573,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "quux") ]) } aggregator.report() @@ -358,9 +583,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(reportingInterval)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "quux", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } 1 * writer.finishBucket() >> { latch.countDown() } @@ -383,6 +618,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 @@ -393,6 +629,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { for (int i = 0; i < 5; ++i) { aggregator.publish([ new SimpleSpan("service" + i, "operation", "resource", "type", false, true, false, 0, duration, HTTP_OK) + .setTag(SPAN_KIND, "garply") ]) } def latchTriggered = latch.await(2, SECONDS) @@ -401,9 +638,19 @@ class ConflatingMetricAggregatorTest extends DDSpecification { latchTriggered 1 * writer.startBucket(5, _, SECONDS.toNanos(1)) for (int i = 0; i < 5; ++i) { - 1 * writer.add(new MetricKey("resource", "service" + i, "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> - value.getHitCount() == 1 && value.getDuration() == duration - } + 1 * writer.add(new MetricKey( + "resource", + "service" + i, + "operation", + "type", + HTTP_OK, + false, + true, + "garply", + [] + ), { AggregateMetric value -> + value.getHitCount() == 1 && value.getDuration() == duration + }) } 1 * writer.finishBucket() >> { latch.countDown() } @@ -418,6 +665,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 @@ -456,6 +704,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 @@ -508,6 +757,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> false + features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, 10, queueSize, 200, MILLISECONDS) final spans = [ diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index e75ee5f8564..559e0c5f94a 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -27,6 +27,7 @@ class FootprintForkedTest extends DDSpecification { ValidatingSink sink = new ValidatingSink(latch) DDAgentFeaturesDiscovery features = Stub(DDAgentFeaturesDiscovery) { it.supportsMetrics() >> true + it.peerTags() >> [] } ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), @@ -36,8 +37,7 @@ class FootprintForkedTest extends DDSpecification { 1000, 1000, 100, - SECONDS - ) + SECONDS) // Removing the 'features' as it's a mock, and mocks are heavyweight, e.g. around 22MiB def baseline = footprint(aggregator, features) aggregator.start() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index d7c3e514ed6..a36db50b441 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -4,6 +4,7 @@ import datadog.trace.api.Config import datadog.trace.api.ProcessTags import datadog.trace.api.WellKnownTags import datadog.trace.api.Pair +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.test.util.DDSpecification import org.msgpack.core.MessagePack import org.msgpack.core.MessageUnpacker @@ -43,11 +44,58 @@ class SerializingMetricWriterTest extends DDSpecification { where: content << [ [ - Pair.of(new MetricKey("resource1", "service1", "operation1", "type", 0, false), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L))), - Pair.of(new MetricKey("resource2", "service2", "operation2", "type2", 200, true), new AggregateMetric().recordDurations(9, new AtomicLongArray(1L))) + Pair.of( + new MetricKey( + "resource1", + "service1", + "operation1", + "type", + 0, + false, + false, + "client", + [ + UTF8BytesString.create("country:canada"), + UTF8BytesString.create("georegion:amer"), + UTF8BytesString.create("peer.service:remote-service") + ] + ), + new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) + ), + Pair.of( + new MetricKey( + "resource2", + "service2", + "operation2", + "type2", + 200, + true, + false, + "producer", + [ + UTF8BytesString.create("country:canada"), + UTF8BytesString.create("georegion:amer"), + UTF8BytesString.create("peer.service:remote-service") + ], + ), + new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) + ) ], (0..10000).collect({ i -> - Pair.of(new MetricKey("resource" + i, "service" + i, "operation" + i, "type", 0, false), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L))) + Pair.of( + new MetricKey( + "resource" + i, + "service" + i, + "operation" + i, + "type", + 0, + false, + false, + "producer", + [UTF8BytesString.create("messaging.destination:dest" + i)] + ), + new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) + ) }) ] withProcessTags << [true, false] @@ -107,8 +155,8 @@ class SerializingMetricWriterTest extends DDSpecification { for (Pair pair : content) { MetricKey key = pair.getLeft() AggregateMetric value = pair.getRight() - int size = unpacker.unpackMapHeader() - assert size == 12 + int metricMapSize = unpacker.unpackMapHeader() + assert metricMapSize == 15 int elementCount = 0 assert unpacker.unpackString() == "Name" assert unpacker.unpackString() == key.getOperationName() as String @@ -128,6 +176,20 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == "Synthetics" assert unpacker.unpackBoolean() == key.isSynthetics() ++elementCount + assert unpacker.unpackString() == "IsTraceRoot" + assert unpacker.unpackBoolean() == key.isTraceRoot() + ++elementCount + assert unpacker.unpackString() == "SpanKind" + assert unpacker.unpackString() == key.getSpanKind() as String + ++elementCount + assert unpacker.unpackString() == "PeerTags" + int peerTagsLength = unpacker.unpackArrayHeader() + assert peerTagsLength == key.getPeerTags().size() + for (int i = 0; i < peerTagsLength; i++) { + def unpackedPeerTag = unpacker.unpackString() + assert unpackedPeerTag == key.getPeerTags()[i].toString() + } + ++elementCount assert unpacker.unpackString() == "Hits" assert unpacker.unpackInt() == value.getHitCount() ++elementCount @@ -146,7 +208,7 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpacker.unpackString() == "ErrorSummary" validateSketch(unpacker) ++elementCount - assert elementCount == size + assert elementCount == metricMapSize } validated = true } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy index 008059398f7..a3fcd9e79d7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy @@ -19,7 +19,10 @@ class SimpleSpan implements CoreSpan { private final long duration private final long startTime - SimpleSpan(String serviceName, + private final Map tags = [:] + + SimpleSpan( + String serviceName, String operationName, CharSequence resourceName, String type, @@ -28,7 +31,8 @@ class SimpleSpan implements CoreSpan { boolean error, long startTime, long duration, - int statusCode) { + int statusCode + ) { this.serviceName = serviceName this.operationName = operationName this.resourceName = resourceName @@ -38,7 +42,7 @@ class SimpleSpan implements CoreSpan { this.error = error this.startTime = startTime this.duration = duration - this.statusCode = (short)statusCode + this.statusCode = (short) statusCode } @Override @@ -118,57 +122,60 @@ class SimpleSpan implements CoreSpan { @Override SimpleSpan setTag(String tag, String value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, boolean value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, int value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, long value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, double value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, Number value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, CharSequence value) { - return this + return setTag(tag, (Object) value) } @Override SimpleSpan setTag(String tag, Object value) { + tags.put(tag, value) return this } @Override SimpleSpan removeTag(String tag) { + tags.remove(tag) return this } @Override U getTag(CharSequence name, U defaultValue) { - return defaultValue + def tagValue = tags.get(String.valueOf(name)) ?: defaultValue + return tagValue != null ? (U) tagValue : defaultValue } @Override U getTag(CharSequence name) { - return null + return getTag(name, null) } @Override @@ -197,8 +204,7 @@ class SimpleSpan implements CoreSpan { } @Override - void processTagsAndBaggage(MetadataConsumer consumer) { - } + void processTagsAndBaggage(MetadataConsumer consumer) {} @Override SimpleSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index fa96bd90218..9984b9700d0 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -2,6 +2,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.communication.http.OkHttpUtils import datadog.trace.api.Config import datadog.trace.api.WellKnownTags +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.common.metrics.AggregateMetric import datadog.trace.common.metrics.EventListener import datadog.trace.common.metrics.MetricKey @@ -34,11 +35,11 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) writer.add( - new MetricKey("resource1", "service1", "operation1", "sql", 0, false), + new MetricKey("resource1", "service1", "operation1", "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")]), new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) ) writer.add( - new MetricKey("resource2", "service2", "operation2", "web", 200, false), + new MetricKey("resource2", "service2", "operation2", "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")]), new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) ) writer.finishBucket()