Skip to content

Commit 3cd36a2

Browse files
committed
Stream the SymDB serialization and compression
Current SymDB serialization use intermediate full large buffer for every intermediate operation: - serializing to string - converting to UTF-8 byte array - compressing with gzip every operation imply potentially large buffer (several 10MBs). On constraint heaps it can lead to OOME. we now stream this process using outputstream, buffered sink for serialization so we have less large buffers in flight.
1 parent ffc2be9 commit 3cd36a2

File tree

1 file changed

+51
-20
lines changed
  • dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink

1 file changed

+51
-20
lines changed

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import datadog.trace.util.TagsHelper;
1414
import java.io.ByteArrayOutputStream;
1515
import java.io.IOException;
16+
import java.io.OutputStream;
1617
import java.nio.charset.StandardCharsets;
1718
import java.util.ArrayList;
1819
import java.util.Arrays;
@@ -22,6 +23,9 @@
2223
import java.util.concurrent.BlockingQueue;
2324
import java.util.zip.GZIPOutputStream;
2425
import okhttp3.HttpUrl;
26+
import okhttp3.MediaType;
27+
import okio.BufferedSink;
28+
import okio.Okio;
2529
import org.slf4j.Logger;
2630
import org.slf4j.LoggerFactory;
2731

@@ -103,11 +107,49 @@ public void flush() {
103107
if (scopesToSerialize.isEmpty()) {
104108
return;
105109
}
106-
String json =
110+
serializeAndUpload(scopesToSerialize);
111+
}
112+
113+
private void serializeAndUpload(List<Scope> scopesToSerialize) {
114+
try {
115+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2 * 1024 * 1024);
116+
try (OutputStream outputStream =
117+
isCompressed ? new GZIPOutputStream(byteArrayOutputStream) : byteArrayOutputStream) {
118+
BufferedSink sink = Okio.buffer(Okio.sink(outputStream));
107119
SERVICE_VERSION_ADAPTER.toJson(
108-
new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
109-
updateStats(scopesToSerialize, json);
110-
doUpload(scopesToSerialize, json);
120+
sink, new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
121+
sink.flush();
122+
}
123+
doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed);
124+
} catch (IOException e) {
125+
LOGGER.debug("Error serializing scopes", e);
126+
}
127+
}
128+
129+
private void doUpload(List<Scope> scopesToSerialize, byte[] payload, boolean isCompressed) {
130+
if (payload.length > maxPayloadSize) {
131+
LOGGER.warn(
132+
"Payload is too big: {}/{} isCompressed={}",
133+
payload.length,
134+
maxPayloadSize,
135+
isCompressed);
136+
splitAndSend(scopesToSerialize);
137+
return;
138+
}
139+
updateStats(scopesToSerialize, payload.length);
140+
LOGGER.debug(
141+
"Sending {} jar scopes size={} isCompressed={}",
142+
scopesToSerialize.size(),
143+
payload.length,
144+
isCompressed);
145+
String fileName = "file.json";
146+
MediaType mediaType = APPLICATION_JSON;
147+
if (isCompressed) {
148+
fileName = "file.gz";
149+
mediaType = APPLICATION_GZIP;
150+
}
151+
symbolUploader.uploadAsMultipart(
152+
"", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType));
111153
}
112154

113155
private void doUpload(List<Scope> scopesToSerialize, String json) {
@@ -164,27 +206,16 @@ private void splitAndSend(List<Scope> scopesToSerialize) {
164206
// try to split by jar scopes: one scope per request
165207
if (scopesToSerialize.size() < BatchUploader.MAX_ENQUEUED_REQUESTS) {
166208
for (Scope scope : scopesToSerialize) {
167-
String json =
168-
SERVICE_VERSION_ADAPTER.toJson(
169-
new ServiceVersion(
170-
serviceName, env, version, "JAVA", Collections.singletonList(scope)));
171-
LOGGER.debug("Sending {} jar scope size={}", scope.getName(), json.length());
172-
doUpload(Collections.singletonList(scope), json);
209+
serializeAndUpload(Collections.singletonList(scope));
173210
}
174211
} else {
175212
// split the list of jar scope in 2 list jar scopes with half of the jar scopes
176213
int half = scopesToSerialize.size() / 2;
177214
List<Scope> firstHalf = scopesToSerialize.subList(0, half);
178215
List<Scope> secondHalf = scopesToSerialize.subList(half, scopesToSerialize.size());
179216
LOGGER.debug("split jar scope list in 2: {} and {}", firstHalf.size(), secondHalf.size());
180-
String jsonFirstHalf =
181-
SERVICE_VERSION_ADAPTER.toJson(
182-
new ServiceVersion(serviceName, env, version, "JAVA", firstHalf));
183-
doUpload(firstHalf, jsonFirstHalf);
184-
String jsonSecondHalf =
185-
SERVICE_VERSION_ADAPTER.toJson(
186-
new ServiceVersion(serviceName, env, version, "JAVA", secondHalf));
187-
doUpload(secondHalf, jsonSecondHalf);
217+
serializeAndUpload(firstHalf);
218+
serializeAndUpload(secondHalf);
188219
}
189220
} else {
190221
Scope jarScope = scopesToSerialize.get(0);
@@ -217,12 +248,12 @@ private static Scope createJarScope(String jarName, List<Scope> classScopes) {
217248
return Scope.builder(ScopeType.JAR, jarName, 0, 0).name(jarName).scopes(classScopes).build();
218249
}
219250

220-
private void updateStats(List<Scope> scopesToSerialize, String json) {
251+
private void updateStats(List<Scope> scopesToSerialize, long size) {
221252
int totalClasses = 0;
222253
for (Scope scope : scopesToSerialize) {
223254
totalClasses += scope.getScopes() != null ? scope.getScopes().size() : 0;
224255
}
225-
stats.updateStats(totalClasses, json.length());
256+
stats.updateStats(totalClasses, size);
226257
LOGGER.debug("SymbolSink stats: {}", stats);
227258
}
228259

0 commit comments

Comments
 (0)