Skip to content

Commit b2b7b72

Browse files
committed
chunk down flag assigned to not hit the max grpc limit
1 parent 5b2a9c0 commit b2b7b72

File tree

1 file changed

+77
-2
lines changed

1 file changed

+77
-2
lines changed

openfeature-provider-local/src/main/java/com/spotify/confidence/GrpcWasmFlagLogger.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,21 @@
88
import io.grpc.ManagedChannel;
99
import io.grpc.ManagedChannelBuilder;
1010
import java.time.Duration;
11+
import java.util.ArrayList;
12+
import java.util.List;
1113
import java.util.Optional;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
1216
import org.slf4j.Logger;
1317
import org.slf4j.LoggerFactory;
1418

1519
public class GrpcWasmFlagLogger implements WasmFlagLogger {
1620
private static final String CONFIDENCE_DOMAIN = "edge-grpc.spotify.com";
1721
private static final Logger logger = LoggerFactory.getLogger(GrpcWasmFlagLogger.class);
22+
// Max number of flag_assigned entries per chunk to avoid exceeding gRPC max message size
23+
private static final int MAX_FLAG_ASSIGNED_PER_CHUNK = 1000;
1824
private final InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub stub;
25+
private final ExecutorService executorService;
1926

2027
public GrpcWasmFlagLogger(ApiSecret apiSecret) {
2128
final var channel = createConfidenceChannel();
@@ -26,15 +33,83 @@ public GrpcWasmFlagLogger(ApiSecret apiSecret) {
2633
final Channel authenticatedChannel =
2734
ClientInterceptors.intercept(channel, new JwtAuthClientInterceptor(tokenHolder));
2835
this.stub = InternalFlagLoggerServiceGrpc.newBlockingStub(authenticatedChannel);
36+
this.executorService = Executors.newCachedThreadPool();
2937
}
3038

3139
@Override
3240
public void write(WriteFlagLogsRequest request) {
33-
if (request.getClientResolveInfoList().isEmpty() && request.getFlagAssignedList().isEmpty()) {
41+
if (request.getClientResolveInfoList().isEmpty()
42+
&& request.getFlagAssignedList().isEmpty()
43+
&& request.getFlagResolveInfoList().isEmpty()) {
3444
logger.debug("Skipping empty flag log request");
3545
return;
3646
}
37-
final var ignore = stub.writeFlagLogs(request);
47+
48+
final int flagAssignedCount = request.getFlagAssignedCount();
49+
50+
// If flag_assigned list is small enough, send everything as-is
51+
if (flagAssignedCount <= MAX_FLAG_ASSIGNED_PER_CHUNK) {
52+
sendAsync(request);
53+
return;
54+
}
55+
56+
// Split flag_assigned into chunks and send each chunk asynchronously
57+
logger.debug(
58+
"Splitting {} flag_assigned entries into chunks of {}",
59+
flagAssignedCount,
60+
MAX_FLAG_ASSIGNED_PER_CHUNK);
61+
62+
final List<WriteFlagLogsRequest> chunks = createFlagAssignedChunks(request);
63+
for (WriteFlagLogsRequest chunk : chunks) {
64+
sendAsync(chunk);
65+
}
66+
}
67+
68+
private List<WriteFlagLogsRequest> createFlagAssignedChunks(WriteFlagLogsRequest request) {
69+
final List<WriteFlagLogsRequest> chunks = new ArrayList<>();
70+
final int totalFlags = request.getFlagAssignedCount();
71+
72+
for (int i = 0; i < totalFlags; i += MAX_FLAG_ASSIGNED_PER_CHUNK) {
73+
final int end = Math.min(i + MAX_FLAG_ASSIGNED_PER_CHUNK, totalFlags);
74+
final WriteFlagLogsRequest.Builder chunkBuilder =
75+
WriteFlagLogsRequest.newBuilder()
76+
.addAllFlagAssigned(request.getFlagAssignedList().subList(i, end));
77+
78+
// Include metadata only in the first chunk
79+
if (i == 0) {
80+
if (request.hasTelemetryData()) {
81+
chunkBuilder.setTelemetryData(request.getTelemetryData());
82+
}
83+
chunkBuilder
84+
.addAllClientResolveInfo(request.getClientResolveInfoList())
85+
.addAllFlagResolveInfo(request.getFlagResolveInfoList());
86+
}
87+
88+
chunks.add(chunkBuilder.build());
89+
}
90+
91+
return chunks;
92+
}
93+
94+
private void sendAsync(WriteFlagLogsRequest request) {
95+
executorService.submit(
96+
() -> {
97+
try {
98+
stub.writeFlagLogs(request);
99+
logger.debug(
100+
"Successfully sent flag log with {} entries", request.getFlagAssignedCount());
101+
} catch (Exception e) {
102+
logger.error("Failed to write flag logs", e);
103+
}
104+
});
105+
}
106+
107+
/**
108+
* Shutdown the executor service. This will allow any pending async writes to complete. Call this
109+
* when the application is shutting down.
110+
*/
111+
public void shutdown() {
112+
executorService.shutdown();
38113
}
39114

40115
private static ManagedChannel createConfidenceChannel() {

0 commit comments

Comments
 (0)