Skip to content

Commit 5d7449d

Browse files
authored
Add zstd compressor implementation for OTLP exporters (#1108)
1 parent 17b6dc6 commit 5d7449d

File tree

12 files changed

+324
-1
lines changed

12 files changed

+324
-1
lines changed

.github/component_owners.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ components:
1919
aws-xray-propagator:
2020
- wangzlei
2121
- srprash
22+
compressors:
23+
- jack-berg
2224
consistent-sampling:
2325
- oertl
2426
- PeterF778

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ feature or via instrumentation, this project is hopefully for you.
1212
* [AWS Resources](./aws-resources/README.md)
1313
* [AWS X-Ray SDK Support](./aws-xray/README.md)
1414
* [AWS X-Ray Propagator](./aws-xray-propagator/README.md)
15+
* [zstd Compressor](./compressors/zstd/README.md)
1516
* [Consistent Sampling](./consistent-sampling/README.md)
1617
* [Disk Buffering](./disk-buffering/README.md)
1718
* [JMX Metric Gatherer](./jmx-metrics/README.md)

buildSrc/src/main/kotlin/otel.java-conventions.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ val DEFAULT_JAVA_VERSION = JavaVersion.VERSION_17
2222
java {
2323
toolchain {
2424
languageVersion.set(
25-
otelJava.minJavaVersionSupported.map { JavaLanguageVersion.of(Math.max(it.majorVersion.toInt(), DEFAULT_JAVA_VERSION.majorVersion.toInt())) }
25+
otelJava.minJavaVersionSupported.map { JavaLanguageVersion.of(Math.max(it.majorVersion.toInt(), DEFAULT_JAVA_VERSION.majorVersion.toInt())) }
2626
)
2727
}
2828

compressors/compressor-zstd/README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# zstd Compressor
2+
3+
A [zstd](https://en.wikipedia.org/wiki/Zstd) implementation of [Compressor](https://github.com/open-telemetry/opentelemetry-java/blob/d9f9812d4375a4229caff43bd681c50b7a45776a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/Compressor.java) and [CompressorProvider](https://github.com/open-telemetry/opentelemetry-java/blob/d9f9812d4375a4229caff43bd681c50b7a45776a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorProvider.java) based on [luben/zstd-jni](https://github.com/luben/zstd-jni).
4+
5+
This enables zstd compression with [opentelemetry-java's](https://github.com/open-telemetry/opentelemetry-java) [OTLP exporters](https://opentelemetry.io/docs/instrumentation/java/exporters/#otlp).
6+
7+
## Usage
8+
9+
Add dependency, replacing `{{version}}` with the latest release version.
10+
11+
**Maven:**
12+
13+
```xml
14+
<dependency>
15+
<groupId>io.opentelemetry.contrib</groupId>
16+
<artifactId>opentelemetry-compressor-zstd</artifactId>
17+
<version>{{version}}</version>
18+
</dependency>
19+
```
20+
21+
**Gradle:**
22+
23+
```groovy
24+
dependencies {
25+
implementation "io.opentelemetry.contrib:opentelemetry-compressor-zstd:{{version}}"
26+
}
27+
```
28+
29+
If programmatically configuring the exporter:
30+
31+
```java
32+
// same pattern applies to OtlpHttpMetricExporter, OtlpHttpSpanExporter, and the gRPC variants
33+
OtlpHttpLogRecordExporter.builder()
34+
.setCompression("zstd")
35+
// ...additional configuration omitted for brevity
36+
.build()
37+
```
38+
39+
If using [autoconfigure](https://github.com/open-telemetry/opentelemetry-java/tree/main/sdk-extensions/autoconfigure):
40+
41+
```shell
42+
export OTEL_EXPORTER_OTLP_COMPRESSION=zstd
43+
```
44+
45+
## Component owners
46+
47+
- [Jack Berg](https://github.com/jack-berg), New Relic
48+
49+
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
plugins {
2+
id("otel.java-conventions")
3+
id("otel.publish-conventions")
4+
}
5+
6+
description = "zstd compressor implementation for use with OTLP exporters"
7+
otelJava.moduleName.set("io.opentelemetry.contrib.compressor.zstd")
8+
9+
dependencies {
10+
// TODO(jack-berg): Use version from :depedencyManagement when opentelemetry-instrumentation-bom-alpha depends on opentelemetry-java 1.34.0
11+
var openTelemetryVersion = "1.34.0"
12+
api("io.opentelemetry:opentelemetry-exporter-common:$openTelemetryVersion")
13+
14+
implementation("com.github.luben:zstd-jni:1.5.5-10")
15+
16+
testImplementation("io.opentelemetry:opentelemetry-sdk-testing:$openTelemetryVersion")
17+
testImplementation("io.opentelemetry:opentelemetry-exporter-otlp:$openTelemetryVersion")
18+
19+
testImplementation("io.opentelemetry.proto:opentelemetry-proto")
20+
testImplementation("com.linecorp.armeria:armeria-junit5")
21+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.compressor.zstd;
7+
8+
import com.github.luben.zstd.ZstdOutputStream;
9+
import io.opentelemetry.exporter.internal.compression.Compressor;
10+
import java.io.IOException;
11+
import java.io.OutputStream;
12+
13+
public final class ZstdCompressor implements Compressor {
14+
15+
private static final ZstdCompressor INSTANCE = new ZstdCompressor();
16+
17+
private ZstdCompressor() {}
18+
19+
public static ZstdCompressor getInstance() {
20+
return INSTANCE;
21+
}
22+
23+
@Override
24+
public String getEncoding() {
25+
return "zstd";
26+
}
27+
28+
@Override
29+
public OutputStream compress(OutputStream outputStream) throws IOException {
30+
return new ZstdOutputStream(outputStream);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.compressor.zstd;
7+
8+
import io.opentelemetry.exporter.internal.compression.Compressor;
9+
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
10+
11+
public final class ZstdCompressorProvider implements CompressorProvider {
12+
@Override
13+
public Compressor getInstance() {
14+
return ZstdCompressor.getInstance();
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
io.opentelemetry.contrib.compressor.zstd.ZstdCompressorProvider
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.compressor.zstd;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import com.github.luben.zstd.ZstdInputStream;
11+
import com.linecorp.armeria.common.HttpRequest;
12+
import com.linecorp.armeria.common.HttpResponse;
13+
import com.linecorp.armeria.common.HttpStatus;
14+
import com.linecorp.armeria.common.MediaType;
15+
import com.linecorp.armeria.server.ServerBuilder;
16+
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
17+
import io.opentelemetry.api.common.Attributes;
18+
import io.opentelemetry.api.logs.Severity;
19+
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
20+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
21+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
22+
import io.opentelemetry.proto.common.v1.AnyValue;
23+
import io.opentelemetry.proto.common.v1.InstrumentationScope;
24+
import io.opentelemetry.proto.common.v1.KeyValue;
25+
import io.opentelemetry.proto.logs.v1.ResourceLogs;
26+
import io.opentelemetry.proto.logs.v1.SeverityNumber;
27+
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
28+
import io.opentelemetry.sdk.logs.data.LogRecordData;
29+
import io.opentelemetry.sdk.resources.Resource;
30+
import io.opentelemetry.sdk.testing.logs.TestLogRecordData;
31+
import java.io.ByteArrayInputStream;
32+
import java.io.ByteArrayOutputStream;
33+
import java.io.IOException;
34+
import java.io.UncheckedIOException;
35+
import java.time.Instant;
36+
import java.util.Collections;
37+
import java.util.concurrent.ConcurrentLinkedQueue;
38+
import java.util.concurrent.TimeUnit;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.extension.RegisterExtension;
41+
42+
class ZstdCompressorProviderTest {
43+
44+
private static final HttpResponse SUCCESS =
45+
HttpResponse.of(
46+
HttpStatus.OK,
47+
MediaType.parse("application/x-protobuf"),
48+
ExportLogsServiceResponse.getDefaultInstance().toByteArray());
49+
50+
private static final ConcurrentLinkedQueue<HttpRequest> httpRequests =
51+
new ConcurrentLinkedQueue<>();
52+
private static final ConcurrentLinkedQueue<ResourceLogs> exportedTelemetry =
53+
new ConcurrentLinkedQueue<>();
54+
55+
@RegisterExtension
56+
static final ServerExtension server =
57+
new ServerExtension() {
58+
@Override
59+
protected void configure(ServerBuilder sb) {
60+
sb.service(
61+
"/v1/logs",
62+
(ctx, req) -> {
63+
httpRequests.add(ctx.request());
64+
return HttpResponse.of(
65+
req.aggregate()
66+
.thenApply(
67+
aggReq -> {
68+
byte[] payload = aggReq.content().array();
69+
try {
70+
if (req.headers().contains("content-encoding", "zstd")) {
71+
ZstdInputStream is =
72+
new ZstdInputStream(new ByteArrayInputStream(payload));
73+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
74+
for (int result = is.read(); result != -1; result = is.read()) {
75+
baos.write((byte) result);
76+
}
77+
payload = baos.toByteArray();
78+
}
79+
ExportLogsServiceRequest parsed =
80+
ExportLogsServiceRequest.parseFrom(payload);
81+
exportedTelemetry.addAll(parsed.getResourceLogsList());
82+
return SUCCESS;
83+
} catch (IOException e) {
84+
throw new UncheckedIOException(e);
85+
}
86+
}));
87+
});
88+
sb.http(0);
89+
}
90+
};
91+
92+
@Test
93+
void exporterWithZstd() {
94+
try (OtlpHttpLogRecordExporter exporter =
95+
OtlpHttpLogRecordExporter.builder()
96+
.setEndpoint(server.httpUri() + "/v1/logs")
97+
.setCompression("zstd")
98+
.build()) {
99+
assertThat(
100+
exporter
101+
.export(Collections.singletonList(generateFakeLogRecordData()))
102+
.join(10, TimeUnit.SECONDS)
103+
.isSuccess())
104+
.isTrue();
105+
106+
assertThat(httpRequests)
107+
.satisfiesExactly(
108+
req -> assertThat(req.headers().contains("content-encoding", "zstd")).isTrue());
109+
assertThat(exportedTelemetry)
110+
.satisfiesExactly(
111+
resourceLogs ->
112+
assertThat(resourceLogs.getScopeLogsList())
113+
.satisfiesExactly(
114+
scopeLogs -> {
115+
InstrumentationScope scope = scopeLogs.getScope();
116+
assertThat(scope.getName()).isEqualTo("testLib");
117+
assertThat(scope.getVersion()).isEqualTo("1.0");
118+
assertThat(scopeLogs.getSchemaUrl()).isEqualTo("http://url");
119+
assertThat(scopeLogs.getLogRecordsList())
120+
.satisfiesExactly(
121+
logRecord -> {
122+
assertThat(logRecord.getBody().getStringValue())
123+
.isEqualTo("log body");
124+
assertThat(logRecord.getAttributesList())
125+
.isEqualTo(
126+
Collections.singletonList(
127+
KeyValue.newBuilder()
128+
.setKey("key")
129+
.setValue(
130+
AnyValue.newBuilder()
131+
.setStringValue("value")
132+
.build())
133+
.build()));
134+
assertThat(logRecord.getSeverityText()).isEqualTo("INFO");
135+
assertThat(logRecord.getSeverityNumber())
136+
.isEqualTo(SeverityNumber.SEVERITY_NUMBER_INFO);
137+
assertThat(logRecord.getTimeUnixNano()).isGreaterThan(0);
138+
assertThat(logRecord.getObservedTimeUnixNano())
139+
.isGreaterThan(0);
140+
});
141+
}));
142+
}
143+
}
144+
145+
/** Generate a fake {@link LogRecordData}. */
146+
public static LogRecordData generateFakeLogRecordData() {
147+
return TestLogRecordData.builder()
148+
.setResource(Resource.getDefault())
149+
.setInstrumentationScopeInfo(
150+
InstrumentationScopeInfo.builder("testLib")
151+
.setVersion("1.0")
152+
.setSchemaUrl("http://url")
153+
.build())
154+
.setBody("log body")
155+
.setAttributes(Attributes.builder().put("key", "value").build())
156+
.setSeverity(Severity.INFO)
157+
.setSeverityText(Severity.INFO.name())
158+
.setTimestamp(Instant.now())
159+
.setObservedTimestamp(Instant.now().plusNanos(100))
160+
.build();
161+
}
162+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.compressor.zstd;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
import com.github.luben.zstd.ZstdInputStream;
11+
import java.io.ByteArrayInputStream;
12+
import java.io.ByteArrayOutputStream;
13+
import java.io.IOException;
14+
import java.io.InputStream;
15+
import java.io.OutputStream;
16+
import java.nio.charset.StandardCharsets;
17+
import org.junit.jupiter.api.Test;
18+
19+
class ZstdCompressorTest {
20+
21+
@Test
22+
void roundTrip() throws IOException {
23+
String content = "hello world";
24+
25+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
26+
OutputStream os = ZstdCompressor.getInstance().compress(baos);
27+
os.write(content.getBytes(StandardCharsets.UTF_8));
28+
os.close();
29+
30+
byte[] decompressed = new byte[content.length()];
31+
InputStream is = new ZstdInputStream(new ByteArrayInputStream(baos.toByteArray()));
32+
is.read(decompressed);
33+
is.close();
34+
35+
assertThat(new String(decompressed, StandardCharsets.UTF_8)).isEqualTo(content);
36+
}
37+
}

0 commit comments

Comments
 (0)