From 1889745da7377e16ba4d84fce251b95c84d8f8f1 Mon Sep 17 00:00:00 2001 From: jean-philippe bempel Date: Wed, 1 Oct 2025 09:49:40 +0200 Subject: [PATCH 1/2] Stream the SymDB serialization and compression Current SymDB serialization use intermediate full large buffer for every intermediate operation: - serializing to string - converting to UTF-8 byte array - compressing with gzip every operation imply potentially large buffer (several 10MBs). On constraint heaps it can lead to OOME. we now stream this process using outputstream, buffered sink for serialization so we have less large buffers in flight. --- .../com/datadog/debugger/sink/SymbolSink.java | 71 +++++++++++++------ 1 file changed, 51 insertions(+), 20 deletions(-) diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java index 06116220b64..f01a9752b7e 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java @@ -13,6 +13,7 @@ import datadog.trace.util.TagsHelper; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -22,6 +23,9 @@ import java.util.concurrent.BlockingQueue; import java.util.zip.GZIPOutputStream; import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okio.BufferedSink; +import okio.Okio; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,11 +107,49 @@ public void flush() { if (scopesToSerialize.isEmpty()) { return; } - String json = + serializeAndUpload(scopesToSerialize); + } + + private void serializeAndUpload(List scopesToSerialize) { + try { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2 * 1024 * 1024); + try (OutputStream outputStream = + isCompressed ? new GZIPOutputStream(byteArrayOutputStream) : byteArrayOutputStream) { + BufferedSink sink = Okio.buffer(Okio.sink(outputStream)); SERVICE_VERSION_ADAPTER.toJson( - new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize)); - updateStats(scopesToSerialize, json); - doUpload(scopesToSerialize, json); + sink, new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize)); + sink.flush(); + } + doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed); + } catch (IOException e) { + LOGGER.debug("Error serializing scopes", e); + } + } + + private void doUpload(List scopesToSerialize, byte[] payload, boolean isCompressed) { + if (payload.length > maxPayloadSize) { + LOGGER.warn( + "Payload is too big: {}/{} isCompressed={}", + payload.length, + maxPayloadSize, + isCompressed); + splitAndSend(scopesToSerialize); + return; + } + updateStats(scopesToSerialize, payload.length); + LOGGER.debug( + "Sending {} jar scopes size={} isCompressed={}", + scopesToSerialize.size(), + payload.length, + isCompressed); + String fileName = "file.json"; + MediaType mediaType = APPLICATION_JSON; + if (isCompressed) { + fileName = "file.gz"; + mediaType = APPLICATION_GZIP; + } + symbolUploader.uploadAsMultipart( + "", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType)); } private void doUpload(List scopesToSerialize, String json) { @@ -164,12 +206,7 @@ private void splitAndSend(List scopesToSerialize) { // try to split by jar scopes: one scope per request if (scopesToSerialize.size() < BatchUploader.MAX_ENQUEUED_REQUESTS) { for (Scope scope : scopesToSerialize) { - String json = - SERVICE_VERSION_ADAPTER.toJson( - new ServiceVersion( - serviceName, env, version, "JAVA", Collections.singletonList(scope))); - LOGGER.debug("Sending {} jar scope size={}", scope.getName(), json.length()); - doUpload(Collections.singletonList(scope), json); + serializeAndUpload(Collections.singletonList(scope)); } } else { // split the list of jar scope in 2 list jar scopes with half of the jar scopes @@ -177,14 +214,8 @@ private void splitAndSend(List scopesToSerialize) { List firstHalf = scopesToSerialize.subList(0, half); List secondHalf = scopesToSerialize.subList(half, scopesToSerialize.size()); LOGGER.debug("split jar scope list in 2: {} and {}", firstHalf.size(), secondHalf.size()); - String jsonFirstHalf = - SERVICE_VERSION_ADAPTER.toJson( - new ServiceVersion(serviceName, env, version, "JAVA", firstHalf)); - doUpload(firstHalf, jsonFirstHalf); - String jsonSecondHalf = - SERVICE_VERSION_ADAPTER.toJson( - new ServiceVersion(serviceName, env, version, "JAVA", secondHalf)); - doUpload(secondHalf, jsonSecondHalf); + serializeAndUpload(firstHalf); + serializeAndUpload(secondHalf); } } else { Scope jarScope = scopesToSerialize.get(0); @@ -217,12 +248,12 @@ private static Scope createJarScope(String jarName, List classScopes) { return Scope.builder(ScopeType.JAR, jarName, 0, 0).name(jarName).scopes(classScopes).build(); } - private void updateStats(List scopesToSerialize, String json) { + private void updateStats(List scopesToSerialize, long size) { int totalClasses = 0; for (Scope scope : scopesToSerialize) { totalClasses += scope.getScopes() != null ? scope.getScopes().size() : 0; } - stats.updateStats(totalClasses, json.length()); + stats.updateStats(totalClasses, size); LOGGER.debug("SymbolSink stats: {}", stats); } From e4b9cf7516956e70bc3f29d17b160f519463112d Mon Sep 17 00:00:00 2001 From: jean-philippe bempel Date: Wed, 1 Oct 2025 14:38:50 +0200 Subject: [PATCH 2/2] remove useless code --- .../com/datadog/debugger/sink/SymbolSink.java | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java index f01a9752b7e..a18cd2d3df5 100644 --- a/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java +++ b/dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java @@ -152,36 +152,6 @@ private void doUpload(List scopesToSerialize, byte[] payload, boolean isC "", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType)); } - private void doUpload(List scopesToSerialize, String json) { - byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8); - byte[] payload = null; - if (isCompressed) { - payload = compressPayload(jsonBytes); - } - if (payload == null) { - if (json.length() > maxPayloadSize) { - LOGGER.warn("Payload is too big: {}/{}", json.length(), maxPayloadSize); - splitAndSend(scopesToSerialize); - return; - } - symbolUploader.uploadAsMultipart( - "", - event, - new BatchUploader.MultiPartContent(jsonBytes, "file", "file.json", APPLICATION_JSON)); - } else { - if (payload.length > maxPayloadSize) { - LOGGER.warn("Compressed payload is too big: {}/{}", payload.length, maxPayloadSize); - splitAndSend(scopesToSerialize); - return; - } - LOGGER.debug("Sending {} jar scopes size={}", scopesToSerialize.size(), payload.length); - symbolUploader.uploadAsMultipart( - "", - event, - new BatchUploader.MultiPartContent(payload, "file", "file.gz", APPLICATION_GZIP)); - } - } - private static byte[] compressPayload(byte[] jsonBytes) { // usual compression factor 40:1 for those json payload, so we are preallocating 1/40 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(jsonBytes.length / 40);