Skip to content
This repository was archived by the owner on Mar 15, 2021. It is now read-only.

Commit ef47556

Browse files
authored
Resize pubsub batches when they are above 10MB limit (#246)
1 parent 646f88e commit ef47556

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

modules/pubsub/src/main/java/com/spotify/ffwd/pubsub/PubsubPluginSink.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.api.gax.rpc.NotFoundException;
2727
import com.google.cloud.pubsub.v1.Publisher;
2828
import com.google.common.cache.Cache;
29+
import com.google.common.collect.Lists;
2930
import com.google.common.util.concurrent.MoreExecutors;
3031
import com.google.inject.Inject;
3132
import com.google.protobuf.ByteString;
@@ -42,6 +43,7 @@
4243
import eu.toolchain.async.AsyncFramework;
4344
import eu.toolchain.async.AsyncFuture;
4445
import java.io.IOException;
46+
import java.util.ArrayList;
4547
import java.util.Collection;
4648
import java.util.Collections;
4749
import java.util.List;
@@ -58,6 +60,9 @@
5860
*/
5961
public class PubsubPluginSink implements BatchablePluginSink {
6062

63+
// Pubsub publish request limit is 10MB
64+
private final static double MAX_BATCH_SIZE_BYTES = 10_000_000.0;
65+
6166
private final Executor executorService = MoreExecutors.directExecutor();
6267
@Inject
6368
AsyncFramework async;
@@ -116,7 +121,18 @@ public AsyncFuture<Void> sendMetrics(Collection<Metric> metrics) {
116121

117122
try {
118123
final ByteString m = ByteString.copyFrom(serializer.serialize(metrics, writeCache));
119-
publishPubSub(m);
124+
125+
if (m.size() > MAX_BATCH_SIZE_BYTES) {
126+
logger.debug("Above byte limit, resizing batch");
127+
int times = (int)Math.ceil(m.size()/MAX_BATCH_SIZE_BYTES);
128+
List<List<Metric>> collections = Lists.partition(new ArrayList<>(metrics), m.size()/times);
129+
for (List<Metric> l: collections) {
130+
final ByteString mResize = ByteString.copyFrom(serializer.serialize(l, writeCache));
131+
publishPubSub(mResize);
132+
}
133+
} else {
134+
publishPubSub(m);
135+
}
120136
} catch (Exception e) {
121137
logger.error("Failed to serialize batch of metrics: ", e);
122138
}

0 commit comments

Comments
 (0)