Skip to content

Commit b655d93

Browse files
BrandonArpclaude
andcommitted
Add compression support to KairosDbSink
- Add getEnableCompression accessor method to HttpPostSink - Implement GZIP compression in KairosDbSink when enabled - Override createRequest to handle compression with proper headers 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent a3fcec1 commit b655d93

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,15 @@ ImmutableSet<Integer> getRetryableStatusCodes() {
229229
return _retryableStatusCodes;
230230
}
231231

232+
/**
233+
* Accessor for whether compression is enabled.
234+
*
235+
* @return True if compression is enabled.
236+
*/
237+
protected boolean getEnableCompression() {
238+
return _enableCompression;
239+
}
240+
232241
/**
233242
* Serialize the {@link PeriodicData} instance for posting.
234243
*

src/main/java/com/arpnetworking/tsdcore/sinks/KairosDbSink.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@
3535
import com.fasterxml.jackson.databind.ObjectMapper;
3636
import com.google.common.collect.ImmutableMap;
3737
import com.google.common.collect.Lists;
38+
import io.netty.handler.codec.http.HttpHeaderNames;
3839
import it.unimi.dsi.fastutil.doubles.Double2LongMap;
3940
import net.sf.oval.constraint.Min;
4041
import net.sf.oval.constraint.NotNull;
42+
import org.asynchttpclient.AsyncHttpClient;
43+
import org.asynchttpclient.RequestBuilder;
44+
import org.asynchttpclient.util.HttpConstants;
4145

4246
import java.io.ByteArrayOutputStream;
4347
import java.io.IOException;
@@ -49,6 +53,7 @@
4953
import java.util.Map;
5054
import java.util.Optional;
5155
import java.util.concurrent.atomic.LongAdder;
56+
import java.util.zip.GZIPOutputStream;
5257

5358
/**
5459
* Publishes to a KairosDbSink endpoint. This class is thread safe.
@@ -155,6 +160,33 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
155160
return completeChunks;
156161
}
157162

163+
@Override
164+
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
165+
if (this.getEnableCompression()) {
166+
final byte[] bodyData;
167+
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
168+
GZIPOutputStream gzipStream = new GZIPOutputStream(bos)) {
169+
170+
gzipStream.write(serializedData);
171+
gzipStream.flush();
172+
bodyData = bos.toByteArray();
173+
} catch (final IOException e) {
174+
throw new RuntimeException(e);
175+
}
176+
177+
final RequestBuilder requestBuilder = new RequestBuilder()
178+
.setUri(getAysncHttpClientUri())
179+
.setHeader(HttpHeaderNames.CONTENT_TYPE, "application/gzip")
180+
.setBody(bodyData)
181+
.setMethod(HttpConstants.Methods.POST);
182+
183+
return new RequestInfo(requestBuilder.build(), serializedData.length, bodyData.length);
184+
185+
} else {
186+
return super.createRequest(client, serializedData);
187+
}
188+
}
189+
158190
private void addChunk(
159191
final ByteArrayOutputStream chunkStream,
160192
final ByteBuffer currentChunk,

0 commit comments

Comments
 (0)