Skip to content
This repository was archived by the owner on Mar 15, 2021. It is now read-only.

Commit 646f88e

Browse files
author
William Huba
authored
Add optional compression to the OpenTelemetry gRPC requests (#245)
1 parent 92bfb52 commit 646f88e

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryOutputPlugin.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,21 @@ public class OpenTelemetryOutputPlugin extends OutputPlugin {
4141

4242
private final Map<String, String> headers;
4343
private final String endpoint;
44+
private final String compression;
4445

4546
@JsonCreator
4647
public OpenTelemetryOutputPlugin(
4748
@JsonProperty("filter") Optional<Filter> filter,
4849
@JsonProperty("flushInterval") @Nullable Long flushInterval,
4950
@JsonProperty("batching") Optional<Batching> batching,
5051
@JsonProperty("headers") Optional<Map<String, String>> headers,
51-
@JsonProperty("endpoint") @Nullable String endpoint
52+
@JsonProperty("endpoint") @Nullable String endpoint,
53+
@JsonProperty("compression") @Nullable String compression
5254
) {
5355
super(filter, Batching.from(flushInterval, batching));
5456
this.headers = headers.orElse(new HashMap<>());
5557
this.endpoint = Objects.requireNonNull(endpoint, "endpoint must be set");
58+
this.compression = compression;
5659
}
5760

5861
@Override
@@ -62,7 +65,7 @@ public Module module(Key<PluginSink> key, String id) {
6265
protected void configure() {
6366
bind(Logger.class).toInstance(LoggerFactory.getLogger(id));
6467
bind(key).toInstance(
65-
new OpenTelemetryPluginSink(endpoint, headers));
68+
new OpenTelemetryPluginSink(endpoint, headers, compression));
6669
expose(key);
6770
}
6871
};

modules/opentelemetry/src/main/java/com/spotify/ffwd/opentelemetry/OpenTelemetryPluginSink.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.Map;
4444
import java.util.concurrent.TimeUnit;
4545
import java.util.stream.Collectors;
46+
import javax.annotation.Nullable;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

@@ -55,13 +56,16 @@ public class OpenTelemetryPluginSink implements PluginSink {
5556
private Map<String, String> headers;
5657
private ManagedChannel channel;
5758
private MetricsServiceGrpc.MetricsServiceBlockingStub stub;
59+
@Nullable private String compression;
5860

5961
OpenTelemetryPluginSink(
6062
String endpoint,
61-
Map<String, String> headers
63+
Map<String, String> headers,
64+
@Nullable String compression
6265
) {
6366
this.endpoint = endpoint;
6467
this.headers = headers;
68+
this.compression = compression;
6569
}
6670

6771

@@ -122,6 +126,10 @@ public AsyncFuture<Void> start() {
122126
MetricsServiceGrpc.MetricsServiceBlockingStub stub =
123127
MetricsServiceGrpc.newBlockingStub(channel);
124128

129+
if (compression != null) {
130+
stub = stub.withCompression(compression);
131+
}
132+
125133
Metadata extraHeaders = new Metadata();
126134
headers.forEach((key, value) -> {
127135
Metadata.Key<String> header = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);

0 commit comments

Comments
 (0)