Skip to content

Commit 0007547

Browse files
authored
Send only snapshots to Debugger Track (#9501)
We introduce a separate BatchUploader instance to send capureSnapshot=true snapshots (including variables) to dedicated datadog endpoint for DEBUGGER track. SnapshotSink now expect 2 BatchUploaders (lowRate for snapshots and highRate for templated logs only) BatchUploader now take a name to identify in debug logs which instance is used. Smoke tests are updated to use processRequest everywhere and get rid of retrieveSnapshots that is not correct.
1 parent b84c018 commit 0007547

File tree

19 files changed

+469
-325
lines changed

19 files changed

+469
-325
lines changed

communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ private static class State {
8989
boolean supportsDropping;
9090
String state;
9191
String configEndpoint;
92-
String debuggerEndpoint;
92+
String debuggerLogEndpoint;
93+
String debuggerSnapshotEndpoint;
9394
String debuggerDiagnosticsEndpoint;
9495
String evpProxyEndpoint;
9596
String version;
@@ -266,15 +267,17 @@ private boolean processInfoResponse(State newState, String response) {
266267
}
267268
}
268269

269-
// both debugger endpoint v2 and diagnostics endpoint are forwarding events to the DEBUGGER
270-
// intake
271-
// because older agents support diagnostics, we fallback to it before falling back to v1
270+
if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) {
271+
newState.debuggerLogEndpoint = DEBUGGER_ENDPOINT_V1;
272+
}
273+
// both debugger v2 and diagnostics endpoints are forwarding events to the DEBUGGER intake
274+
// because older agents support diagnostics, we fall back to it before falling back to v1
272275
if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V2)) {
273-
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V2;
276+
newState.debuggerSnapshotEndpoint = DEBUGGER_ENDPOINT_V2;
274277
} else if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
275-
newState.debuggerEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
278+
newState.debuggerSnapshotEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
276279
} else if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) {
277-
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V1;
280+
newState.debuggerSnapshotEndpoint = DEBUGGER_ENDPOINT_V1;
278281
}
279282
if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
280283
newState.debuggerDiagnosticsEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
@@ -359,11 +362,15 @@ public boolean supportsMetrics() {
359362
}
360363

361364
public boolean supportsDebugger() {
362-
return discoveryState.debuggerEndpoint != null;
365+
return discoveryState.debuggerLogEndpoint != null;
366+
}
367+
368+
public String getDebuggerSnapshotEndpoint() {
369+
return discoveryState.debuggerSnapshotEndpoint;
363370
}
364371

365-
public String getDebuggerEndpoint() {
366-
return discoveryState.debuggerEndpoint;
372+
public String getDebuggerLogEndpoint() {
373+
return discoveryState.debuggerLogEndpoint;
367374
}
368375

369376
public boolean supportsDebuggerDiagnostics() {

communication/src/test/groovy/datadog/communication/ddagent/DDAgentFeaturesDiscoveryTest.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
6666
features.state() == INFO_STATE
6767
features.getConfigEndpoint() == V7_CONFIG_ENDPOINT
6868
features.supportsDebugger()
69-
features.getDebuggerEndpoint() == "debugger/v2/input"
69+
features.getDebuggerSnapshotEndpoint() == "debugger/v2/input"
7070
features.supportsDebuggerDiagnostics()
7171
features.supportsEvpProxy()
7272
features.supportsContentEncodingHeadersWithEvpProxy()
@@ -440,7 +440,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
440440
1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_WITH_TELEMETRY_PROXY_RESPONSE) }
441441
features.supportsTelemetryProxy()
442442
features.supportsDebugger()
443-
features.getDebuggerEndpoint() == "debugger/v1/input"
443+
features.getDebuggerSnapshotEndpoint() == "debugger/v1/input"
444444
!features.supportsDebuggerDiagnostics()
445445
0 * _
446446
}
@@ -459,7 +459,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
459459
features.getEvpProxyEndpoint() == "evp_proxy/v2/" // v3 is advertised, but the tracer should ignore it
460460
!features.supportsContentEncodingHeadersWithEvpProxy()
461461
features.supportsDebugger()
462-
features.getDebuggerEndpoint() == "debugger/v1/diagnostics"
462+
features.getDebuggerSnapshotEndpoint() == "debugger/v1/diagnostics"
463463
features.supportsDebuggerDiagnostics()
464464
0 * _
465465
}

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerAgent.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,19 @@ private static DebuggerSink createDebuggerSink(
282282
DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery,
283283
ProbeStatusSink probeStatusSink) {
284284
String tags = getDefaultTagsMergedWithGlobalTags(config);
285-
SnapshotSink snapshotSink =
286-
new SnapshotSink(
285+
BatchUploader lowRateUploader =
286+
new BatchUploader(
287+
"Snapshots",
287288
config,
288-
tags,
289-
new BatchUploader(
290-
config,
291-
getDebuggerEndpoint(config, ddAgentFeaturesDiscovery),
292-
SnapshotSink.RETRY_POLICY));
289+
getSnapshotEndpoint(config, ddAgentFeaturesDiscovery),
290+
SnapshotSink.RETRY_POLICY);
291+
BatchUploader highRateUploader =
292+
new BatchUploader(
293+
"Logs",
294+
config,
295+
getLogEndpoint(config, ddAgentFeaturesDiscovery),
296+
SnapshotSink.RETRY_POLICY);
297+
SnapshotSink snapshotSink = new SnapshotSink(config, tags, lowRateUploader, highRateUploader);
293298
SymbolSink symbolSink = new SymbolSink(config);
294299
return new DebuggerSink(
295300
config,
@@ -323,11 +328,21 @@ public static String getDefaultTagsMergedWithGlobalTags(Config config) {
323328
return debuggerTags + "," + globalTags;
324329
}
325330

326-
private static String getDebuggerEndpoint(
331+
private static String getLogEndpoint(
332+
Config config, DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery) {
333+
if (ddAgentFeaturesDiscovery.supportsDebugger()) {
334+
return ddAgentFeaturesDiscovery
335+
.buildUrl(ddAgentFeaturesDiscovery.getDebuggerLogEndpoint())
336+
.toString();
337+
}
338+
return config.getFinalDebuggerSnapshotUrl();
339+
}
340+
341+
private static String getSnapshotEndpoint(
327342
Config config, DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery) {
328343
if (ddAgentFeaturesDiscovery.supportsDebugger()) {
329344
return ddAgentFeaturesDiscovery
330-
.buildUrl(ddAgentFeaturesDiscovery.getDebuggerEndpoint())
345+
.buildUrl(ddAgentFeaturesDiscovery.getDebuggerSnapshotEndpoint())
331346
.toString();
332347
}
333348
return config.getFinalDebuggerSnapshotUrl();

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/agent/DebuggerTransformer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,15 @@ public DebuggerTransformer(Config config, Configuration configuration) {
181181
config,
182182
"",
183183
new BatchUploader(
184-
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
184+
"Snapshots",
185+
config,
186+
config.getFinalDebuggerSnapshotUrl(),
187+
SnapshotSink.RETRY_POLICY),
188+
new BatchUploader(
189+
"Logs",
190+
config,
191+
config.getFinalDebuggerSnapshotUrl(),
192+
SnapshotSink.RETRY_POLICY)),
185193
new SymbolSink(config)));
186194
}
187195

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/DebuggerSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ public DebuggerSink(Config config, ProbeStatusSink probeStatusSink) {
5151
config,
5252
null,
5353
new BatchUploader(
54-
config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
54+
"Snapshots",
55+
config,
56+
config.getFinalDebuggerSnapshotUrl(),
57+
SnapshotSink.RETRY_POLICY),
58+
new BatchUploader(
59+
"Logs", config, config.getFinalDebuggerSnapshotUrl(), SnapshotSink.RETRY_POLICY)),
5560
new SymbolSink(config));
5661
}
5762

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/ProbeStatusSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ public class ProbeStatusSink {
4848
private final boolean useMultiPart;
4949

5050
public ProbeStatusSink(Config config, String diagnosticsEndpoint, boolean useMultiPart) {
51-
this(config, new BatchUploader(config, diagnosticsEndpoint, RETRY_POLICY), useMultiPart);
51+
this(
52+
config,
53+
new BatchUploader("Diagnostics", config, diagnosticsEndpoint, RETRY_POLICY),
54+
useMultiPart);
5255
}
5356

5457
ProbeStatusSink(Config config, BatchUploader diagnosticUploader, boolean useMultiPart) {

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SnapshotSink.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
import org.slf4j.Logger;
1919
import org.slf4j.LoggerFactory;
2020

21-
/** Collects snapshots that needs to be sent to the backend */
21+
/**
22+
* Collects snapshots that needs to be sent to the backend the notion of high/low rate is used to
23+
* control the flush interval low rate is used for snapshots with Captures (deep variable capture,
24+
* captureSnapshot = true) high rate is used for snapshots without Captures (dynamic templated logs)
25+
*/
2226
public class SnapshotSink {
2327
private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotSink.class);
2428
public static final int MAX_SNAPSHOT_SIZE = 1024 * 1024;
@@ -32,25 +36,32 @@ public class SnapshotSink {
3236
static final long HIGH_RATE_STEP_SIZE = 10;
3337
public static final BatchUploader.RetryPolicy RETRY_POLICY = new BatchUploader.RetryPolicy(0);
3438

39+
// low rate queue (aka snapshots)
3540
private final BlockingQueue<Snapshot> lowRateSnapshots =
3641
new ArrayBlockingQueue<>(LOW_RATE_CAPACITY);
42+
// high rate queue (aka dynamic templated logs)
3743
private final BlockingQueue<Snapshot> highRateSnapshots =
3844
new ArrayBlockingQueue<>(HIGH_RATE_CAPACITY);
3945
private final String serviceName;
4046
private final int batchSize;
4147
private final String tags;
42-
private final BatchUploader snapshotUploader;
48+
// uploader for low rate (aka snapshots)
49+
private final BatchUploader lowRateUploader;
50+
// uploader for high rate (aka dynamic templated logs)
51+
private final BatchUploader highRateUploader;
4352
private final AgentTaskScheduler highRateScheduler =
4453
new AgentTaskScheduler(AgentThreadFactory.AgentThread.DEBUGGER_SNAPSHOT_SERIALIZER);
4554
private final AtomicBoolean started = new AtomicBoolean();
4655
private volatile AgentTaskScheduler.Scheduled<SnapshotSink> highRateScheduled;
4756
private volatile long currentHighRateFlushInterval = HIGH_RATE_MAX_FLUSH_INTERVAL_MS;
4857

49-
public SnapshotSink(Config config, String tags, BatchUploader snapshotUploader) {
58+
public SnapshotSink(
59+
Config config, String tags, BatchUploader lowRateUploader, BatchUploader highRateUploader) {
5060
this.serviceName = TagsHelper.sanitize(config.getServiceName());
5161
this.batchSize = config.getDynamicInstrumentationUploadBatchSize();
5262
this.tags = tags;
53-
this.snapshotUploader = snapshotUploader;
63+
this.lowRateUploader = lowRateUploader;
64+
this.highRateUploader = highRateUploader;
5465
}
5566

5667
public void start() {
@@ -66,7 +77,8 @@ public void stop() {
6677
if (localScheduled != null) {
6778
localScheduled.cancel();
6879
}
69-
snapshotUploader.shutdown();
80+
lowRateUploader.shutdown();
81+
highRateUploader.shutdown();
7082
started.set(false);
7183
}
7284

@@ -75,7 +87,7 @@ public void lowRateFlush(String tags) {
7587
if (snapshots.isEmpty()) {
7688
return;
7789
}
78-
uploadPayloads(snapshots, tags);
90+
uploadPayloads(lowRateUploader, snapshots, tags);
7991
}
8092

8193
public void highRateFlush(SnapshotSink ignored) {
@@ -87,12 +99,12 @@ public void highRateFlush(SnapshotSink ignored) {
8799
}
88100
int count = snapshots.size();
89101
reconsiderHighRateFlushInterval(count);
90-
uploadPayloads(snapshots, tags);
102+
uploadPayloads(highRateUploader, snapshots, tags);
91103
} while (!highRateSnapshots.isEmpty());
92104
}
93105

94106
public HttpUrl getUrl() {
95-
return snapshotUploader.getUrl();
107+
return lowRateUploader.getUrl();
96108
}
97109

98110
public long remainingCapacity() {
@@ -194,10 +206,10 @@ private String serializeSnapshot(String serviceName, Snapshot snapshot) {
194206
return prunedStr;
195207
}
196208

197-
private void uploadPayloads(List<String> payloads, String tags) {
209+
private static void uploadPayloads(BatchUploader uploader, List<String> payloads, String tags) {
198210
List<byte[]> batches = IntakeBatchHelper.createBatches(payloads);
199211
for (byte[] batch : batches) {
200-
snapshotUploader.upload(batch, tags);
212+
uploader.upload(batch, tags);
201213
}
202214
}
203215
}

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class SymbolSink {
5353
public SymbolSink(Config config) {
5454
this(
5555
config,
56-
new BatchUploader(config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY),
56+
new BatchUploader("SymDB", config, config.getFinalDebuggerSymDBUrl(), RETRY_POLICY),
5757
MAX_SYMDB_UPLOAD_SIZE);
5858
}
5959

0 commit comments

Comments
 (0)