Skip to content

Commit 10f539b

Browse files
authored
Implement health metrics for client stats (#9377)
* Implement health metrics for client stats * Use same naming than dd-trace-go * format
1 parent 5380221 commit 10f539b

File tree

10 files changed

+235
-44
lines changed

10 files changed

+235
-44
lines changed

communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ public boolean supportsDebuggerDiagnostics() {
365365
return debuggerDiagnosticsEndpoint != null;
366366
}
367367

368-
boolean supportsDropping() {
368+
public boolean supportsDropping() {
369369
return supportsDropping;
370370
}
371371

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import datadog.trace.common.writer.ddagent.DDAgentApi;
2424
import datadog.trace.core.CoreSpan;
2525
import datadog.trace.core.DDTraceCoreInfo;
26+
import datadog.trace.core.monitor.HealthMetrics;
2627
import datadog.trace.util.AgentTaskScheduler;
2728
import java.util.Collections;
2829
import java.util.List;
@@ -61,15 +62,19 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
6162
private final long reportingInterval;
6263
private final TimeUnit reportingIntervalTimeUnit;
6364
private final DDAgentFeaturesDiscovery features;
65+
private final HealthMetrics healthMetrics;
6466

6567
private volatile AgentTaskScheduler.Scheduled<?> cancellation;
6668

6769
public ConflatingMetricsAggregator(
68-
Config config, SharedCommunicationObjects sharedCommunicationObjects) {
70+
Config config,
71+
SharedCommunicationObjects sharedCommunicationObjects,
72+
HealthMetrics healthMetrics) {
6973
this(
7074
config.getWellKnownTags(),
7175
config.getMetricsIgnoredResources(),
7276
sharedCommunicationObjects.featuresDiscovery(config),
77+
healthMetrics,
7378
new OkHttpSink(
7479
sharedCommunicationObjects.okHttpClient,
7580
sharedCommunicationObjects.agentUrl.toString(),
@@ -85,16 +90,27 @@ public ConflatingMetricsAggregator(
8590
WellKnownTags wellKnownTags,
8691
Set<String> ignoredResources,
8792
DDAgentFeaturesDiscovery features,
93+
HealthMetrics healthMetric,
8894
Sink sink,
8995
int maxAggregates,
9096
int queueSize) {
91-
this(wellKnownTags, ignoredResources, features, sink, maxAggregates, queueSize, 10, SECONDS);
97+
this(
98+
wellKnownTags,
99+
ignoredResources,
100+
features,
101+
healthMetric,
102+
sink,
103+
maxAggregates,
104+
queueSize,
105+
10,
106+
SECONDS);
92107
}
93108

94109
ConflatingMetricsAggregator(
95110
WellKnownTags wellKnownTags,
96111
Set<String> ignoredResources,
97112
DDAgentFeaturesDiscovery features,
113+
HealthMetrics healthMetric,
98114
Sink sink,
99115
int maxAggregates,
100116
int queueSize,
@@ -103,6 +119,7 @@ public ConflatingMetricsAggregator(
103119
this(
104120
ignoredResources,
105121
features,
122+
healthMetric,
106123
sink,
107124
new SerializingMetricWriter(wellKnownTags, sink),
108125
maxAggregates,
@@ -114,6 +131,7 @@ public ConflatingMetricsAggregator(
114131
ConflatingMetricsAggregator(
115132
Set<String> ignoredResources,
116133
DDAgentFeaturesDiscovery features,
134+
HealthMetrics healthMetric,
117135
Sink sink,
118136
MetricWriter metricWriter,
119137
int maxAggregates,
@@ -126,6 +144,7 @@ public ConflatingMetricsAggregator(
126144
this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3);
127145
this.keys = new NonBlockingHashMap<>();
128146
this.features = features;
147+
this.healthMetrics = healthMetric;
129148
this.sink = sink;
130149
this.aggregator =
131150
new Aggregator(
@@ -215,17 +234,22 @@ public Future<Boolean> forceReport() {
215234
@Override
216235
public boolean publish(List<? extends CoreSpan<?>> trace) {
217236
boolean forceKeep = false;
237+
int counted = 0;
218238
if (features.supportsMetrics()) {
219239
for (CoreSpan<?> span : trace) {
220240
boolean isTopLevel = span.isTopLevel();
221241
if (shouldComputeMetric(span)) {
222242
if (ignoredResources.contains(span.getResourceName().toString())) {
223243
// skip publishing all children
224-
return false;
244+
forceKeep = false;
245+
break;
225246
}
247+
counted++;
226248
forceKeep |= publish(span, isTopLevel);
227249
}
228250
}
251+
healthMetrics.onClientStatTraceComputed(
252+
counted, trace.size(), features.supportsDropping() && !forceKeep);
229253
}
230254
return forceKeep;
231255
}
@@ -314,18 +338,23 @@ public void close() {
314338

315339
@Override
316340
public void onEvent(EventType eventType, String message) {
341+
healthMetrics.onClientStatPayloadSent();
317342
switch (eventType) {
318343
case DOWNGRADED:
319344
log.debug("Agent downgrade was detected");
320345
disable();
346+
healthMetrics.onClientStatDowngraded();
321347
break;
322348
case BAD_PAYLOAD:
323349
log.debug("bad metrics payload sent to trace agent: {}", message);
350+
healthMetrics.onClientStatErrorReceived();
324351
break;
325352
case ERROR:
326353
log.debug("trace agent errored receiving metrics payload: {}", message);
354+
healthMetrics.onClientStatErrorReceived();
327355
break;
328356
default:
357+
break;
329358
}
330359
}
331360

dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@
22

33
import datadog.communication.ddagent.SharedCommunicationObjects;
44
import datadog.trace.api.Config;
5+
import datadog.trace.core.monitor.HealthMetrics;
56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
78

89
public class MetricsAggregatorFactory {
910
private static final Logger log = LoggerFactory.getLogger(MetricsAggregatorFactory.class);
1011

1112
public static MetricsAggregator createMetricsAggregator(
12-
Config config, SharedCommunicationObjects sharedCommunicationObjects) {
13+
Config config,
14+
SharedCommunicationObjects sharedCommunicationObjects,
15+
HealthMetrics healthMetrics) {
1316
if (config.isTracerMetricsEnabled()) {
1417
log.debug("tracer metrics enabled");
15-
return new ConflatingMetricsAggregator(config, sharedCommunicationObjects);
18+
return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics);
1619
}
1720
log.debug("tracer metrics disabled");
1821
return NoOpMetricsAggregator.INSTANCE;

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ public static class CoreTracerBuilder {
316316
private Map<String, String> baggageMapping;
317317
private int partialFlushMinSpans;
318318
private StatsDClient statsDClient;
319+
private HealthMetrics healthMetrics;
319320
private TagInterceptor tagInterceptor;
320321
private boolean strictTraceWrites;
321322
private InstrumentationGateway instrumentationGateway;
@@ -419,8 +420,8 @@ public CoreTracerBuilder tagInterceptor(TagInterceptor tagInterceptor) {
419420
return this;
420421
}
421422

422-
public CoreTracerBuilder statsDClient(TagInterceptor tagInterceptor) {
423-
this.tagInterceptor = tagInterceptor;
423+
public CoreTracerBuilder healthMetrics(HealthMetrics healthMetrics) {
424+
this.healthMetrics = healthMetrics;
424425
return this;
425426
}
426427

@@ -522,6 +523,7 @@ public CoreTracer build() {
522523
baggageMapping,
523524
partialFlushMinSpans,
524525
statsDClient,
526+
healthMetrics,
525527
tagInterceptor,
526528
strictTraceWrites,
527529
instrumentationGateway,
@@ -553,6 +555,7 @@ private CoreTracer(
553555
final Map<String, String> baggageMapping,
554556
final int partialFlushMinSpans,
555557
final StatsDClient statsDClient,
558+
final HealthMetrics healthMetrics,
556559
final TagInterceptor tagInterceptor,
557560
final boolean strictTraceWrites,
558561
final InstrumentationGateway instrumentationGateway,
@@ -580,6 +583,7 @@ private CoreTracer(
580583
baggageMapping,
581584
partialFlushMinSpans,
582585
statsDClient,
586+
healthMetrics,
583587
tagInterceptor,
584588
strictTraceWrites,
585589
instrumentationGateway,
@@ -610,6 +614,7 @@ private CoreTracer(
610614
final Map<String, String> baggageMapping,
611615
final int partialFlushMinSpans,
612616
final StatsDClient statsDClient,
617+
final HealthMetrics healthMetrics,
613618
final TagInterceptor tagInterceptor,
614619
final boolean strictTraceWrites,
615620
final InstrumentationGateway instrumentationGateway,
@@ -698,11 +703,13 @@ private CoreTracer(
698703
config.isHealthMetricsEnabled()
699704
? new MonitoringImpl(this.statsDClient, 10, SECONDS)
700705
: Monitoring.DISABLED;
701-
healthMetrics =
702-
config.isHealthMetricsEnabled()
703-
? new TracerHealthMetrics(this.statsDClient)
704-
: HealthMetrics.NO_OP;
705-
healthMetrics.start();
706+
this.healthMetrics =
707+
healthMetrics != null
708+
? healthMetrics
709+
: (config.isHealthMetricsEnabled()
710+
? new TracerHealthMetrics(this.statsDClient)
711+
: HealthMetrics.NO_OP);
712+
this.healthMetrics.start();
706713
performanceMonitoring =
707714
config.isPerfMetricsEnabled()
708715
? new MonitoringImpl(this.statsDClient, 10, SECONDS)
@@ -715,7 +722,7 @@ private CoreTracer(
715722
config.getScopeDepthLimit(),
716723
config.isScopeStrictMode(),
717724
profilingContextIntegration,
718-
healthMetrics);
725+
this.healthMetrics);
719726

720727
externalAgentLauncher = new ExternalAgentLauncher(config);
721728

@@ -740,7 +747,7 @@ private CoreTracer(
740747
if (writer == null) {
741748
this.writer =
742749
WriterFactory.createWriter(
743-
config, sharedCommunicationObjects, sampler, singleSpanSampler, healthMetrics);
750+
config, sharedCommunicationObjects, sampler, singleSpanSampler, this.healthMetrics);
744751
} else {
745752
this.writer = writer;
746753
}
@@ -757,22 +764,23 @@ private CoreTracer(
757764
&& (config.isCiVisibilityAgentlessEnabled() || featuresDiscovery.supportsEvpProxy())) {
758765
pendingTraceBuffer = PendingTraceBuffer.discarding();
759766
traceCollectorFactory =
760-
new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics);
767+
new StreamingTraceCollector.Factory(this, this.timeSource, this.healthMetrics);
761768
} else {
762769
pendingTraceBuffer =
763770
strictTraceWrites
764771
? PendingTraceBuffer.discarding()
765772
: PendingTraceBuffer.delaying(
766-
this.timeSource, config, sharedCommunicationObjects, healthMetrics);
773+
this.timeSource, config, sharedCommunicationObjects, this.healthMetrics);
767774
traceCollectorFactory =
768775
new PendingTrace.Factory(
769-
this, pendingTraceBuffer, this.timeSource, strictTraceWrites, healthMetrics);
776+
this, pendingTraceBuffer, this.timeSource, strictTraceWrites, this.healthMetrics);
770777
}
771778
pendingTraceBuffer.start();
772779

773780
sharedCommunicationObjects.whenReady(this.writer::start);
774781

775-
metricsAggregator = createMetricsAggregator(config, sharedCommunicationObjects);
782+
metricsAggregator =
783+
createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics);
776784
// Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds
777785
// (using milliseconds granularity.) This avoids a fleet of traced applications starting at the
778786
// same time from sending metrics in sync.

dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,24 @@ public void onFailedSend(
7373

7474
public void onLongRunningUpdate(final int dropped, final int write, final int expired) {}
7575

76+
/**
77+
* Report that a trace has been used to compute client stats.
78+
*
79+
* @param countedSpan the number of spans used for the stat computation
80+
* @param totalSpan the number of total spans in the trace
81+
* @param dropped true if the trace can be dropped. Note: the PayloadDispatcher also count this.
82+
* However, this counter will report how many p0 dropped we could achieve before that the span
83+
* got sampled.
84+
*/
85+
public void onClientStatTraceComputed(
86+
final int countedSpan, final int totalSpan, boolean dropped) {}
87+
88+
public void onClientStatPayloadSent() {}
89+
90+
public void onClientStatErrorReceived() {}
91+
92+
public void onClientStatDowngraded() {}
93+
7694
/** @return Human-readable summary of the current health metrics. */
7795
public String summary() {
7896
return "";

0 commit comments

Comments
 (0)