Skip to content

Commit f7395c5

Browse files
committed
chunk down flag assigned to not hit the max grpc limit
1 parent 5b2a9c0 commit f7395c5

File tree

5 files changed

+102
-6
lines changed

5 files changed

+102
-6
lines changed

openfeature-provider-local/src/main/java/com/spotify/confidence/GrpcWasmFlagLogger.java

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,21 @@
88
import io.grpc.ManagedChannel;
99
import io.grpc.ManagedChannelBuilder;
1010
import java.time.Duration;
11+
import java.util.ArrayList;
12+
import java.util.List;
1113
import java.util.Optional;
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
1216
import org.slf4j.Logger;
1317
import org.slf4j.LoggerFactory;
1418

1519
public class GrpcWasmFlagLogger implements WasmFlagLogger {
1620
private static final String CONFIDENCE_DOMAIN = "edge-grpc.spotify.com";
1721
private static final Logger logger = LoggerFactory.getLogger(GrpcWasmFlagLogger.class);
22+
// Max number of flag_assigned entries per chunk to avoid exceeding gRPC max message size
23+
private static final int MAX_FLAG_ASSIGNED_PER_CHUNK = 1000;
1824
private final InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub stub;
25+
private final ExecutorService executorService;
1926

2027
public GrpcWasmFlagLogger(ApiSecret apiSecret) {
2128
final var channel = createConfidenceChannel();
@@ -26,15 +33,84 @@ public GrpcWasmFlagLogger(ApiSecret apiSecret) {
2633
final Channel authenticatedChannel =
2734
ClientInterceptors.intercept(channel, new JwtAuthClientInterceptor(tokenHolder));
2835
this.stub = InternalFlagLoggerServiceGrpc.newBlockingStub(authenticatedChannel);
36+
this.executorService = Executors.newCachedThreadPool();
2937
}
3038

3139
@Override
3240
public void write(WriteFlagLogsRequest request) {
33-
if (request.getClientResolveInfoList().isEmpty() && request.getFlagAssignedList().isEmpty()) {
41+
if (request.getClientResolveInfoList().isEmpty()
42+
&& request.getFlagAssignedList().isEmpty()
43+
&& request.getFlagResolveInfoList().isEmpty()) {
3444
logger.debug("Skipping empty flag log request");
3545
return;
3646
}
37-
final var ignore = stub.writeFlagLogs(request);
47+
48+
final int flagAssignedCount = request.getFlagAssignedCount();
49+
50+
// If flag_assigned list is small enough, send everything as-is
51+
if (flagAssignedCount <= MAX_FLAG_ASSIGNED_PER_CHUNK) {
52+
sendAsync(request);
53+
return;
54+
}
55+
56+
// Split flag_assigned into chunks and send each chunk asynchronously
57+
logger.debug(
58+
"Splitting {} flag_assigned entries into chunks of {}",
59+
flagAssignedCount,
60+
MAX_FLAG_ASSIGNED_PER_CHUNK);
61+
62+
final List<WriteFlagLogsRequest> chunks = createFlagAssignedChunks(request);
63+
for (WriteFlagLogsRequest chunk : chunks) {
64+
sendAsync(chunk);
65+
}
66+
}
67+
68+
private List<WriteFlagLogsRequest> createFlagAssignedChunks(WriteFlagLogsRequest request) {
69+
final List<WriteFlagLogsRequest> chunks = new ArrayList<>();
70+
final int totalFlags = request.getFlagAssignedCount();
71+
72+
for (int i = 0; i < totalFlags; i += MAX_FLAG_ASSIGNED_PER_CHUNK) {
73+
final int end = Math.min(i + MAX_FLAG_ASSIGNED_PER_CHUNK, totalFlags);
74+
final WriteFlagLogsRequest.Builder chunkBuilder =
75+
WriteFlagLogsRequest.newBuilder()
76+
.addAllFlagAssigned(request.getFlagAssignedList().subList(i, end));
77+
78+
// Include metadata only in the first chunk
79+
if (i == 0) {
80+
if (request.hasTelemetryData()) {
81+
chunkBuilder.setTelemetryData(request.getTelemetryData());
82+
}
83+
chunkBuilder
84+
.addAllClientResolveInfo(request.getClientResolveInfoList())
85+
.addAllFlagResolveInfo(request.getFlagResolveInfoList());
86+
}
87+
88+
chunks.add(chunkBuilder.build());
89+
}
90+
91+
return chunks;
92+
}
93+
94+
private void sendAsync(WriteFlagLogsRequest request) {
95+
executorService.submit(
96+
() -> {
97+
try {
98+
stub.writeFlagLogs(request);
99+
logger.debug(
100+
"Successfully sent flag log with {} entries", request.getFlagAssignedCount());
101+
} catch (Exception e) {
102+
logger.error("Failed to write flag logs", e);
103+
}
104+
});
105+
}
106+
107+
/**
108+
* Shutdown the executor service. This will allow any pending async writes to complete. Call this
109+
* when the application is shutting down.
110+
*/
111+
@Override
112+
public void shutdown() {
113+
executorService.shutdown();
38114
}
39115

40116
private static ManagedChannel createConfidenceChannel() {

openfeature-provider-local/src/main/java/com/spotify/confidence/LocalResolverServiceFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import com.spotify.confidence.shaded.flags.admin.v1.ResolverStateServiceGrpc.ResolverStateServiceBlockingStub;
1111
import com.spotify.confidence.shaded.flags.resolver.v1.InternalFlagLoggerServiceGrpc;
1212
import com.spotify.confidence.shaded.flags.resolver.v1.Sdk;
13-
import com.spotify.confidence.shaded.flags.resolver.v1.WriteFlagLogsResponse;
13+
import com.spotify.confidence.shaded.flags.resolver.v1.WriteFlagLogsRequest;
1414
import com.spotify.confidence.shaded.iam.v1.AuthServiceGrpc;
1515
import com.spotify.confidence.shaded.iam.v1.AuthServiceGrpc.AuthServiceBlockingStub;
1616
import com.spotify.confidence.shaded.iam.v1.ClientCredential.ClientSecret;
@@ -179,7 +179,15 @@ private static FlagResolverService createFlagResolverService(
179179
.orElse(Duration.ofMinutes(5).toSeconds());
180180
final AtomicReference<byte[]> resolverStateProtobuf =
181181
new AtomicReference<>(accountStateProvider.provide());
182-
final WasmFlagLogger flagLogger = request -> WriteFlagLogsResponse.getDefaultInstance();
182+
// No-op logger for wasm mode with AccountStateProvider
183+
final WasmFlagLogger flagLogger =
184+
new WasmFlagLogger() {
185+
@Override
186+
public void write(WriteFlagLogsRequest request) {}
187+
188+
@Override
189+
public void shutdown() {}
190+
};
183191
final ResolverApi wasmResolverApi =
184192
new ThreadLocalSwapWasmResolverApi(
185193
flagLogger, resolverStateProtobuf.get(), accountId, stickyResolveStrategy);

openfeature-provider-local/src/main/java/com/spotify/confidence/ThreadLocalSwapWasmResolverApi.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public int getInstanceCount() {
121121
@Override
122122
public void close() {
123123
resolverInstances.values().forEach(SwapWasmResolverApi::close);
124+
flagLogger.shutdown();
124125
resolverInstances.clear();
125126
}
126127
}

openfeature-provider-local/src/main/java/com/spotify/confidence/WasmResolveApi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@
3131

3232
class IsClosedException extends Exception {}
3333

34-
@FunctionalInterface
3534
interface WasmFlagLogger {
3635
void write(WriteFlagLogsRequest request);
36+
37+
void shutdown();
3738
}
3839

3940
class WasmResolveApi {

openfeature-provider-local/src/test/java/com/spotify/confidence/TestBase.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.protobuf.util.Values;
88
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsRequest;
99
import com.spotify.confidence.shaded.flags.resolver.v1.ResolveFlagsResponse;
10+
import com.spotify.confidence.shaded.flags.resolver.v1.WriteFlagLogsRequest;
1011
import com.spotify.confidence.shaded.iam.v1.Client;
1112
import com.spotify.confidence.shaded.iam.v1.ClientCredential;
1213
import java.util.BitSet;
@@ -45,7 +46,16 @@ protected TestBase(ResolverState state, boolean isWasm) {
4546
if (isWasm) {
4647
final var wasmResolverApi =
4748
new SwapWasmResolverApi(
48-
request -> {}, desiredState.toProto().toByteArray(), "", mockFallback);
49+
new WasmFlagLogger() {
50+
@Override
51+
public void write(WriteFlagLogsRequest request) {}
52+
53+
@Override
54+
public void shutdown() {}
55+
},
56+
desiredState.toProto().toByteArray(),
57+
"",
58+
mockFallback);
4959
resolverServiceFactory =
5060
new LocalResolverServiceFactory(
5161
wasmResolverApi, resolverState, resolveTokenConverter, mock(), mockFallback);

0 commit comments

Comments
 (0)