Skip to content

Commit b8dffa7

Browse files
authored
Fix Otlp Aws exporters failures for GZIP compressed telemetry exports (#1124)
1 parent 8a3b772 commit b8dffa7

File tree

9 files changed

+216
-41
lines changed

9 files changed

+216
-41
lines changed

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ public final class AwsApplicationSignalsCustomizerProvider
135135
private static final String OTEL_TRACES_SAMPLER = "otel.traces.sampler";
136136
private static final String OTEL_TRACES_SAMPLER_ARG = "otel.traces.sampler.arg";
137137
static final String OTEL_EXPORTER_OTLP_LOGS_HEADERS = "otel.exporter.otlp.logs.headers";
138+
private static final String OTEL_EXPORTER_OTLP_COMPRESSION_CONFIG =
139+
"otel.exporter.otlp.compression";
140+
private static final String OTEL_EXPORTER_OTLP_TRACES_COMPRESSION_CONFIG =
141+
"otel.exporter.otlp.traces.compression";
142+
private static final String OTEL_EXPORTER_OTLP_LOGS_COMPRESSION_CONFIG =
143+
"otel.exporter.otlp.logs.compression";
138144

139145
// UDP packet can be upto 64KB. To limit the packet size, we limit the exported batch size.
140146
// This is a bit of a magic number, as there is no simple way to tell how many spans can make a
@@ -372,11 +378,18 @@ SpanExporter customizeSpanExporter(SpanExporter spanExporter, ConfigProperties c
372378
// and OTEL_EXPORTER_OTLP_TRACES_PROTOCOL is http/protobuf
373379
// so the given spanExporter will be an instance of OtlpHttpSpanExporter
374380

381+
// get compression method from environment
382+
String compression =
383+
configProps.getString(
384+
OTEL_EXPORTER_OTLP_TRACES_COMPRESSION_CONFIG,
385+
configProps.getString(OTEL_EXPORTER_OTLP_COMPRESSION_CONFIG, "none"));
386+
375387
try {
376388
spanExporter =
377389
OtlpAwsSpanExporterBuilder.create(
378390
(OtlpHttpSpanExporter) spanExporter,
379391
configProps.getString(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT))
392+
.setCompression(compression)
380393
.build();
381394
} catch (Exception e) {
382395
// This technically should never happen as the validator checks for the correct env
@@ -408,10 +421,17 @@ LogRecordExporter customizeLogsExporter(
408421
// OTEL_EXPORTER_OTLP_LOGS_PROTOCOL is http/protobuf
409422
// so the given logsExporter will be an instance of OtlpHttpLogRecorderExporter
410423

424+
// get compression method from environment
425+
String compression =
426+
configProps.getString(
427+
OTEL_EXPORTER_OTLP_LOGS_COMPRESSION_CONFIG,
428+
configProps.getString(OTEL_EXPORTER_OTLP_COMPRESSION_CONFIG, "none"));
429+
411430
try {
412431
return OtlpAwsLogsExporterBuilder.create(
413432
(OtlpHttpLogRecordExporter) logsExporter,
414433
configProps.getString(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT))
434+
.setCompression(compression)
415435
.build();
416436
} catch (Exception e) {
417437
// This technically should never happen as the validator checks for the correct env
Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
package software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common;
1717

1818
import java.io.ByteArrayInputStream;
19+
import java.io.ByteArrayOutputStream;
20+
import java.io.IOException;
1921
import java.net.URI;
2022
import java.util.*;
2123
import java.util.function.Supplier;
2224
import java.util.logging.Level;
2325
import java.util.logging.Logger;
26+
import java.util.zip.GZIPOutputStream;
2427
import software.amazon.awssdk.auth.credentials.AwsCredentials;
2528
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
2629
import software.amazon.awssdk.http.SdkHttpFullRequest;
@@ -29,19 +32,19 @@
2932
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
3033
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
3134

32-
final class SigV4AuthHeaderSupplier implements Supplier<Map<String, String>> {
35+
final class AwsAuthHeaderSupplier implements Supplier<Map<String, String>> {
3336
BaseOtlpAwsExporter exporter;
3437
Logger logger;
3538

36-
public SigV4AuthHeaderSupplier(BaseOtlpAwsExporter exporter) {
39+
public AwsAuthHeaderSupplier(BaseOtlpAwsExporter exporter) {
3740
this.exporter = exporter;
3841
this.logger = Logger.getLogger(exporter.getClass().getName());
3942
}
4043

4144
@Override
4245
public Map<String, String> get() {
4346
try {
44-
byte[] data = exporter.data.get();
47+
ByteArrayOutputStream data = exporter.data.get();
4548

4649
SdkHttpRequest httpRequest =
4750
SdkHttpFullRequest.builder()
@@ -50,6 +53,14 @@ public Map<String, String> get() {
5053
.putHeader("Content-Type", "application/x-protobuf")
5154
.build();
5255

56+
// Compress the data before signing with gzip
57+
ByteArrayOutputStream compressedData;
58+
if (exporter.getCompression().equals(CompressionMethod.GZIP)) {
59+
compressedData = compressWithGzip(data);
60+
} else {
61+
compressedData = data;
62+
}
63+
5364
AwsCredentials credentials = DefaultCredentialsProvider.create().resolveCredentials();
5465

5566
SignedRequest signedRequest =
@@ -60,7 +71,7 @@ public Map<String, String> get() {
6071
.request(httpRequest)
6172
.putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, exporter.serviceName())
6273
.putProperty(AwsV4HttpSigner.REGION_NAME, exporter.awsRegion)
63-
.payload(() -> new ByteArrayInputStream(data)));
74+
.payload(() -> new ByteArrayInputStream(compressedData.toByteArray())));
6475

6576
Map<String, String> result = new HashMap<>();
6677

@@ -84,4 +95,21 @@ public Map<String, String> get() {
8495
return Collections.emptyMap();
8596
}
8697
}
98+
99+
/**
100+
* Compresses the given byte array using GZIP compression.
101+
*
102+
* @param data the byte array stream to compress
103+
* @return the compressed byte as a ByteArrayOutputStream
104+
* @throws IOException if compression fails
105+
*/
106+
private ByteArrayOutputStream compressWithGzip(ByteArrayOutputStream data) throws IOException {
107+
ByteArrayOutputStream compressedData = new ByteArrayOutputStream();
108+
109+
try (GZIPOutputStream gzipOut = new GZIPOutputStream(compressedData)) {
110+
data.writeTo(gzipOut);
111+
}
112+
113+
return compressedData;
114+
}
87115
}

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/common/BaseOtlpAwsExporter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common;
1717

18+
import java.io.ByteArrayOutputStream;
1819
import java.util.Map;
1920
import java.util.concurrent.atomic.AtomicReference;
2021
import java.util.function.Supplier;
@@ -27,15 +28,21 @@ public abstract class BaseOtlpAwsExporter {
2728

2829
protected final String awsRegion;
2930
protected final String endpoint;
30-
protected final AtomicReference<byte[]> data;
31+
protected final AtomicReference<ByteArrayOutputStream> data;
3132
protected final Supplier<Map<String, String>> headerSupplier;
33+
protected final CompressionMethod compression;
3234

33-
protected BaseOtlpAwsExporter(String endpoint) {
35+
protected BaseOtlpAwsExporter(String endpoint, CompressionMethod compression) {
3436
this.endpoint = endpoint.toLowerCase();
37+
this.compression = compression;
3538
this.awsRegion = endpoint.split("\\.")[1];
3639
this.data = new AtomicReference<>();
37-
this.headerSupplier = new SigV4AuthHeaderSupplier(this);
40+
this.headerSupplier = new AwsAuthHeaderSupplier(this);
3841
}
3942

4043
public abstract String serviceName();
44+
45+
public CompressionMethod getCompression() {
46+
return this.compression;
47+
}
4148
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common;
17+
18+
public enum CompressionMethod {
19+
NONE,
20+
GZIP
21+
}

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/logs/OtlpAwsLogsExporter.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.StringJoiner;
2929
import javax.annotation.Nonnull;
3030
import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common.BaseOtlpAwsExporter;
31+
import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common.CompressionMethod;
3132

3233
/**
3334
* This exporter extends the functionality of the OtlpHttpLogsRecordExporter to allow logs to be
@@ -42,19 +43,18 @@ public final class OtlpAwsLogsExporter extends BaseOtlpAwsExporter implements Lo
4243
private final OtlpHttpLogRecordExporter parentExporter;
4344

4445
static OtlpAwsLogsExporter getDefault(String endpoint) {
45-
return new OtlpAwsLogsExporter(endpoint);
46+
return new OtlpAwsLogsExporter(
47+
OtlpHttpLogRecordExporter.getDefault(), endpoint, CompressionMethod.NONE);
4648
}
4749

48-
static OtlpAwsLogsExporter create(OtlpHttpLogRecordExporter parent, String endpoint) {
49-
return new OtlpAwsLogsExporter(parent, endpoint);
50+
static OtlpAwsLogsExporter create(
51+
OtlpHttpLogRecordExporter parent, String endpoint, CompressionMethod compression) {
52+
return new OtlpAwsLogsExporter(parent, endpoint, compression);
5053
}
5154

52-
private OtlpAwsLogsExporter(String endpoint) {
53-
this(OtlpHttpLogRecordExporter.getDefault(), endpoint);
54-
}
55-
56-
private OtlpAwsLogsExporter(OtlpHttpLogRecordExporter parentExporter, String endpoint) {
57-
super(endpoint);
55+
private OtlpAwsLogsExporter(
56+
OtlpHttpLogRecordExporter parentExporter, String endpoint, CompressionMethod compression) {
57+
super(endpoint, compression);
5858

5959
this.parentExporterBuilder =
6060
parentExporter.toBuilder()
@@ -75,7 +75,7 @@ public CompletableResultCode export(@Nonnull Collection<LogRecordData> logs) {
7575
try {
7676
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
7777
LogsRequestMarshaler.create(logs).writeBinaryTo(buffer);
78-
this.data.set(buffer.toByteArray());
78+
this.data.set(buffer);
7979
return this.parentExporter.export(logs);
8080
} catch (IOException e) {
8181
return CompletableResultCode.ofFailure();

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/logs/OtlpAwsLogsExporterBuilder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import static java.util.Objects.requireNonNull;
1919

2020
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
21+
import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common.CompressionMethod;
2122

2223
public class OtlpAwsLogsExporterBuilder {
2324
private final OtlpHttpLogRecordExporter parentExporter;
2425
private final String endpoint;
26+
private String compression;
2527

2628
public static OtlpAwsLogsExporterBuilder create(
2729
OtlpHttpLogRecordExporter parentExporter, String endpoint) {
@@ -32,8 +34,18 @@ public static OtlpAwsLogsExporter getDefault(String endpoint) {
3234
return OtlpAwsLogsExporter.getDefault(endpoint);
3335
}
3436

37+
public OtlpAwsLogsExporterBuilder setCompression(String compression) {
38+
this.compression = compression;
39+
return this;
40+
}
41+
3542
public OtlpAwsLogsExporter build() {
36-
return OtlpAwsLogsExporter.create(this.parentExporter, this.endpoint);
43+
CompressionMethod compression = CompressionMethod.NONE;
44+
if (this.compression != null && "gzip".equalsIgnoreCase(this.compression)) {
45+
compression = CompressionMethod.GZIP;
46+
}
47+
48+
return OtlpAwsLogsExporter.create(this.parentExporter, this.endpoint, compression);
3749
}
3850

3951
private OtlpAwsLogsExporterBuilder(OtlpHttpLogRecordExporter parentExporter, String endpoint) {

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/traces/OtlpAwsSpanExporter.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.StringJoiner;
2929
import javax.annotation.Nonnull;
3030
import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common.BaseOtlpAwsExporter;
31+
import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common.CompressionMethod;
3132

3233
/**
3334
* This exporter extends the functionality of the OtlpHttpSpanExporter to allow spans to be exported
@@ -40,19 +41,18 @@ public final class OtlpAwsSpanExporter extends BaseOtlpAwsExporter implements Sp
4041
private final OtlpHttpSpanExporter parentExporter;
4142

4243
static OtlpAwsSpanExporter getDefault(String endpoint) {
43-
return new OtlpAwsSpanExporter(endpoint);
44+
return new OtlpAwsSpanExporter(
45+
OtlpHttpSpanExporter.getDefault(), endpoint, CompressionMethod.NONE);
4446
}
4547

46-
static OtlpAwsSpanExporter create(OtlpHttpSpanExporter parent, String endpoint) {
47-
return new OtlpAwsSpanExporter(parent, endpoint);
48+
static OtlpAwsSpanExporter create(
49+
OtlpHttpSpanExporter parent, String endpoint, CompressionMethod compression) {
50+
return new OtlpAwsSpanExporter(parent, endpoint, compression);
4851
}
4952

50-
private OtlpAwsSpanExporter(String endpoint) {
51-
this(OtlpHttpSpanExporter.getDefault(), endpoint);
52-
}
53-
54-
private OtlpAwsSpanExporter(OtlpHttpSpanExporter parentExporter, String endpoint) {
55-
super(endpoint);
53+
private OtlpAwsSpanExporter(
54+
OtlpHttpSpanExporter parentExporter, String endpoint, CompressionMethod compression) {
55+
super(endpoint, compression);
5656

5757
this.parentExporterBuilder =
5858
parentExporter.toBuilder()
@@ -73,7 +73,7 @@ public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {
7373
try {
7474
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
7575
TraceRequestMarshaler.create(spans).writeBinaryTo(buffer);
76-
this.data.set(buffer.toByteArray());
76+
this.data.set(buffer);
7777
return this.parentExporter.export(spans);
7878
} catch (IOException e) {
7979
return CompletableResultCode.ofFailure();

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/traces/OtlpAwsSpanExporterBuilder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import static java.util.Objects.requireNonNull;
1919

2020
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
21+
import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.common.CompressionMethod;
2122

2223
public class OtlpAwsSpanExporterBuilder {
2324
private final OtlpHttpSpanExporter parentExporter;
2425
private final String endpoint;
26+
private String compression;
2527

2628
public static OtlpAwsSpanExporterBuilder create(
2729
OtlpHttpSpanExporter parentExporter, String endpoint) {
@@ -32,12 +34,22 @@ public static OtlpAwsSpanExporter getDefault(String endpoint) {
3234
return OtlpAwsSpanExporter.getDefault(endpoint);
3335
}
3436

37+
public OtlpAwsSpanExporterBuilder setCompression(String compression) {
38+
this.compression = compression;
39+
return this;
40+
}
41+
3542
private OtlpAwsSpanExporterBuilder(OtlpHttpSpanExporter parentExporter, String endpoint) {
3643
this.parentExporter = requireNonNull(parentExporter, "Must set a parentExporter");
3744
this.endpoint = requireNonNull(endpoint, "Must set an endpoint");
3845
}
3946

4047
public OtlpAwsSpanExporter build() {
41-
return OtlpAwsSpanExporter.create(this.parentExporter, this.endpoint);
48+
CompressionMethod compression = CompressionMethod.NONE;
49+
if (this.compression != null && "gzip".equalsIgnoreCase(this.compression)) {
50+
compression = CompressionMethod.GZIP;
51+
}
52+
53+
return OtlpAwsSpanExporter.create(this.parentExporter, this.endpoint, compression);
4254
}
4355
}

0 commit comments

Comments
 (0)