Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datadog.trace.util.TagsHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -22,6 +23,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.zip.GZIPOutputStream;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okio.BufferedSink;
import okio.Okio;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,41 +107,49 @@ public void flush() {
if (scopesToSerialize.isEmpty()) {
return;
}
String json =
serializeAndUpload(scopesToSerialize);
}

private void serializeAndUpload(List<Scope> scopesToSerialize) {
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2 * 1024 * 1024);
try (OutputStream outputStream =
isCompressed ? new GZIPOutputStream(byteArrayOutputStream) : byteArrayOutputStream) {
BufferedSink sink = Okio.buffer(Okio.sink(outputStream));
SERVICE_VERSION_ADAPTER.toJson(
new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
updateStats(scopesToSerialize, json);
doUpload(scopesToSerialize, json);
sink, new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
sink.flush();
}
doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed);
} catch (IOException e) {
LOGGER.debug("Error serializing scopes", e);
}
}

private void doUpload(List<Scope> scopesToSerialize, String json) {
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
byte[] payload = null;
if (isCompressed) {
payload = compressPayload(jsonBytes);
private void doUpload(List<Scope> scopesToSerialize, byte[] payload, boolean isCompressed) {
if (payload.length > maxPayloadSize) {
LOGGER.warn(
"Payload is too big: {}/{} isCompressed={}",
payload.length,
maxPayloadSize,
isCompressed);
splitAndSend(scopesToSerialize);
return;
}
if (payload == null) {
if (json.length() > maxPayloadSize) {
LOGGER.warn("Payload is too big: {}/{}", json.length(), maxPayloadSize);
splitAndSend(scopesToSerialize);
return;
}
symbolUploader.uploadAsMultipart(
"",
event,
new BatchUploader.MultiPartContent(jsonBytes, "file", "file.json", APPLICATION_JSON));
} else {
if (payload.length > maxPayloadSize) {
LOGGER.warn("Compressed payload is too big: {}/{}", payload.length, maxPayloadSize);
splitAndSend(scopesToSerialize);
return;
}
LOGGER.debug("Sending {} jar scopes size={}", scopesToSerialize.size(), payload.length);
symbolUploader.uploadAsMultipart(
"",
event,
new BatchUploader.MultiPartContent(payload, "file", "file.gz", APPLICATION_GZIP));
updateStats(scopesToSerialize, payload.length);
LOGGER.debug(
"Sending {} jar scopes size={} isCompressed={}",
scopesToSerialize.size(),
payload.length,
isCompressed);
String fileName = "file.json";
MediaType mediaType = APPLICATION_JSON;
if (isCompressed) {
fileName = "file.gz";
mediaType = APPLICATION_GZIP;
}
symbolUploader.uploadAsMultipart(
"", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType));
}

private static byte[] compressPayload(byte[] jsonBytes) {
Expand All @@ -164,27 +176,16 @@ private void splitAndSend(List<Scope> scopesToSerialize) {
// try to split by jar scopes: one scope per request
if (scopesToSerialize.size() < BatchUploader.MAX_ENQUEUED_REQUESTS) {
for (Scope scope : scopesToSerialize) {
String json =
SERVICE_VERSION_ADAPTER.toJson(
new ServiceVersion(
serviceName, env, version, "JAVA", Collections.singletonList(scope)));
LOGGER.debug("Sending {} jar scope size={}", scope.getName(), json.length());
doUpload(Collections.singletonList(scope), json);
serializeAndUpload(Collections.singletonList(scope));
}
} else {
// split the list of jar scope in 2 list jar scopes with half of the jar scopes
int half = scopesToSerialize.size() / 2;
List<Scope> firstHalf = scopesToSerialize.subList(0, half);
List<Scope> secondHalf = scopesToSerialize.subList(half, scopesToSerialize.size());
LOGGER.debug("split jar scope list in 2: {} and {}", firstHalf.size(), secondHalf.size());
String jsonFirstHalf =
SERVICE_VERSION_ADAPTER.toJson(
new ServiceVersion(serviceName, env, version, "JAVA", firstHalf));
doUpload(firstHalf, jsonFirstHalf);
String jsonSecondHalf =
SERVICE_VERSION_ADAPTER.toJson(
new ServiceVersion(serviceName, env, version, "JAVA", secondHalf));
doUpload(secondHalf, jsonSecondHalf);
serializeAndUpload(firstHalf);
serializeAndUpload(secondHalf);
}
} else {
Scope jarScope = scopesToSerialize.get(0);
Expand Down Expand Up @@ -217,12 +218,12 @@ private static Scope createJarScope(String jarName, List<Scope> classScopes) {
return Scope.builder(ScopeType.JAR, jarName, 0, 0).name(jarName).scopes(classScopes).build();
}

private void updateStats(List<Scope> scopesToSerialize, String json) {
private void updateStats(List<Scope> scopesToSerialize, long size) {
int totalClasses = 0;
for (Scope scope : scopesToSerialize) {
totalClasses += scope.getScopes() != null ? scope.getScopes().size() : 0;
}
stats.updateStats(totalClasses, json.length());
stats.updateStats(totalClasses, size);
LOGGER.debug("SymbolSink stats: {}", stats);
}

Expand Down