Skip to content

Commit fe370cc

Browse files
authored
feat: onConfigurationChange (#97)
* feat: callback manager class and test * feat: onConfigurationChange
1 parent 5544f7c commit fe370cc

File tree

5 files changed

+331
-9
lines changed

5 files changed

+331
-9
lines changed

src/main/java/cloud/eppo/BaseEppoClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020
import java.util.Timer;
2121
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
2223
import org.jetbrains.annotations.NotNull;
2324
import org.jetbrains.annotations.Nullable;
2425
import org.slf4j.Logger;
@@ -580,6 +581,17 @@ public void setIsGracefulFailureMode(boolean isGracefulFailureMode) {
580581
this.isGracefulMode = isGracefulFailureMode;
581582
}
582583

584+
/**
585+
* Subscribe to changes to the configuration.
586+
*
587+
* @param callback A function to be executed when the configuration changes.
588+
* @return a Runnable which, when called unsubscribes the callback from configuration change
589+
* events.
590+
*/
591+
public Runnable onConfigurationChange(Consumer<Configuration> callback) {
592+
return requestor.onConfigurationChange(callback);
593+
}
594+
583595
/**
584596
* Returns the configuration object used by the EppoClient for assignment and bandit evaluation.
585597
*

src/main/java/cloud/eppo/ConfigurationRequestor.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package cloud.eppo;
22

33
import cloud.eppo.api.Configuration;
4+
import cloud.eppo.callback.CallbackManager;
45
import java.util.concurrent.CompletableFuture;
56
import java.util.concurrent.ExecutionException;
7+
import java.util.function.Consumer;
68
import org.jetbrains.annotations.NotNull;
79
import org.slf4j.Logger;
810
import org.slf4j.LoggerFactory;
@@ -19,6 +21,8 @@ public class ConfigurationRequestor {
1921
private CompletableFuture<Boolean> configurationFuture = null;
2022
private boolean initialConfigSet = false;
2123

24+
private final CallbackManager<Configuration> configChangeManager = new CallbackManager<>();
25+
2226
public ConfigurationRequestor(
2327
@NotNull IConfigurationStore configurationStore,
2428
@NotNull EppoHttpClient client,
@@ -36,8 +40,7 @@ public void setInitialConfiguration(@NotNull Configuration configuration) {
3640
throw new IllegalStateException("Initial configuration has already been set");
3741
}
3842

39-
initialConfigSet =
40-
configurationStore.saveConfiguration(configuration).thenApply(v -> true).join();
43+
initialConfigSet = saveConfigurationAndNotify(configuration).thenApply(v -> true).join();
4144
}
4245

4346
/**
@@ -65,10 +68,7 @@ public CompletableFuture<Boolean> setInitialConfiguration(
6568
return false;
6669
} else {
6770
initialConfigSet =
68-
configurationStore
69-
.saveConfiguration(config)
70-
.thenApply((s) -> true)
71-
.join();
71+
saveConfigurationAndNotify(config).thenApply((s) -> true).join();
7272
return true;
7373
}
7474
}
@@ -98,7 +98,7 @@ void fetchAndSaveFromRemote() {
9898
configBuilder.banditParameters(banditParametersJsonBytes);
9999
}
100100

101-
configurationStore.saveConfiguration(configBuilder.build()).join();
101+
saveConfigurationAndNotify(configBuilder.build()).join();
102102
}
103103

104104
/** Loads configuration asynchronously from the API server, off-thread. */
@@ -115,7 +115,7 @@ CompletableFuture<Void> fetchAndSaveFromRemoteAsync() {
115115
remoteFetchFuture =
116116
client
117117
.getAsync(Constants.FLAG_CONFIG_ENDPOINT)
118-
.thenApply(
118+
.thenCompose(
119119
flagConfigJsonBytes -> {
120120
synchronized (this) {
121121
Configuration.Builder configBuilder =
@@ -137,9 +137,23 @@ CompletableFuture<Void> fetchAndSaveFromRemoteAsync() {
137137
}
138138
}
139139

140-
return configurationStore.saveConfiguration(configBuilder.build()).join();
140+
return saveConfigurationAndNotify(configBuilder.build());
141141
}
142142
});
143143
return remoteFetchFuture;
144144
}
145+
146+
private CompletableFuture<Void> saveConfigurationAndNotify(Configuration configuration) {
147+
CompletableFuture<Void> saveFuture = configurationStore.saveConfiguration(configuration);
148+
return saveFuture.thenRun(
149+
() -> {
150+
synchronized (configChangeManager) {
151+
configChangeManager.notifyCallbacks(configuration);
152+
}
153+
});
154+
}
155+
156+
public Runnable onConfigurationChange(Consumer<Configuration> callback) {
157+
return configChangeManager.subscribe(callback);
158+
}
145159
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package cloud.eppo.callback;
2+
3+
import java.util.Map;
4+
import java.util.UUID;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.function.Consumer;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
/**
11+
* A generic callback manager that allows registration and notification of callbacks.
12+
*
13+
* @param <T> The type of data that will be passed to the callbacks
14+
*/
15+
public class CallbackManager<T> {
16+
private static final Logger log = LoggerFactory.getLogger(CallbackManager.class);
17+
private final Map<String, Consumer<T>> subscribers;
18+
19+
public CallbackManager() {
20+
this.subscribers = new ConcurrentHashMap<>();
21+
}
22+
23+
/**
24+
* Register a callback to be notified when events occur.
25+
*
26+
* @param callback The callback function to be called with event data
27+
* @return A Runnable that can be called to unsubscribe the callback
28+
*/
29+
public Runnable subscribe(Consumer<T> callback) {
30+
String id = UUID.randomUUID().toString();
31+
subscribers.put(id, callback);
32+
33+
return () -> subscribers.remove(id);
34+
}
35+
36+
/**
37+
* Notify all subscribers with the provided data.
38+
*
39+
* @param data The data to pass to all callbacks
40+
*/
41+
public void notifyCallbacks(T data) {
42+
subscribers
43+
.values()
44+
.forEach(
45+
callback -> {
46+
try {
47+
callback.accept(data);
48+
} catch (Exception e) {
49+
log.error("Eppo SDK: Error thrown by callback: {}", e.getMessage());
50+
}
51+
});
52+
}
53+
54+
/** Remove all subscribers. */
55+
public void clear() {
56+
subscribers.clear();
57+
}
58+
}

src/test/java/cloud/eppo/ConfigurationRequestorTest.java

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@
22

33
import static org.junit.jupiter.api.Assertions.*;
44
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.ArgumentMatchers.anyString;
56
import static org.mockito.Mockito.mock;
67
import static org.mockito.Mockito.when;
78

89
import cloud.eppo.api.Configuration;
910
import java.io.File;
1011
import java.io.IOException;
1112
import java.nio.charset.StandardCharsets;
13+
import java.util.ArrayList;
14+
import java.util.List;
1215
import java.util.concurrent.CompletableFuture;
16+
import java.util.concurrent.atomic.AtomicInteger;
1317
import org.apache.commons.io.FileUtils;
18+
import org.junit.jupiter.api.BeforeEach;
1419
import org.junit.jupiter.api.Test;
1520
import org.mockito.Mockito;
1621

@@ -160,4 +165,136 @@ public void testCacheWritesAfterBrokenFetch() throws IOException {
160165

161166
assertNull(configStore.getConfiguration().getFlag("boolean_flag"));
162167
}
168+
169+
private ConfigurationStore mockConfigStore;
170+
private EppoHttpClient mockHttpClient;
171+
private ConfigurationRequestor requestor;
172+
173+
@BeforeEach
174+
public void setup() {
175+
mockConfigStore = mock(ConfigurationStore.class);
176+
mockHttpClient = mock(EppoHttpClient.class);
177+
requestor = new ConfigurationRequestor(mockConfigStore, mockHttpClient, false, true);
178+
}
179+
180+
@Test
181+
public void testConfigurationChangeListener() throws IOException {
182+
// Setup mock response
183+
String flagConfig = FileUtils.readFileToString(initialFlagConfigFile, StandardCharsets.UTF_8);
184+
when(mockHttpClient.get(anyString())).thenReturn(flagConfig.getBytes());
185+
when(mockConfigStore.saveConfiguration(any()))
186+
.thenReturn(CompletableFuture.completedFuture(null));
187+
188+
List<Configuration> receivedConfigs = new ArrayList<>();
189+
190+
// Subscribe to configuration changes
191+
Runnable unsubscribe = requestor.onConfigurationChange(receivedConfigs::add);
192+
193+
// Initial fetch should trigger the callback
194+
requestor.fetchAndSaveFromRemote();
195+
assertEquals(1, receivedConfigs.size());
196+
197+
// Another fetch should trigger the callback again (fetches aren't optimized with eTag yet).
198+
requestor.fetchAndSaveFromRemote();
199+
assertEquals(2, receivedConfigs.size());
200+
201+
// Unsubscribe should prevent further callbacks
202+
unsubscribe.run();
203+
requestor.fetchAndSaveFromRemote();
204+
assertEquals(2, receivedConfigs.size()); // Count should remain the same
205+
}
206+
207+
@Test
208+
public void testMultipleConfigurationChangeListeners() {
209+
// Setup mock response
210+
when(mockHttpClient.get(anyString())).thenReturn("{}".getBytes());
211+
when(mockConfigStore.saveConfiguration(any()))
212+
.thenReturn(CompletableFuture.completedFuture(null));
213+
214+
AtomicInteger callCount1 = new AtomicInteger(0);
215+
AtomicInteger callCount2 = new AtomicInteger(0);
216+
217+
// Subscribe multiple listeners
218+
Runnable unsubscribe1 = requestor.onConfigurationChange(v -> callCount1.incrementAndGet());
219+
Runnable unsubscribe2 = requestor.onConfigurationChange(v -> callCount2.incrementAndGet());
220+
221+
// Fetch should trigger both callbacks
222+
requestor.fetchAndSaveFromRemote();
223+
assertEquals(1, callCount1.get());
224+
assertEquals(1, callCount2.get());
225+
226+
// Unsubscribe first listener
227+
unsubscribe1.run();
228+
requestor.fetchAndSaveFromRemote();
229+
assertEquals(1, callCount1.get()); // Should not increase
230+
assertEquals(2, callCount2.get()); // Should increase
231+
232+
// Unsubscribe second listener
233+
unsubscribe2.run();
234+
requestor.fetchAndSaveFromRemote();
235+
assertEquals(1, callCount1.get()); // Should not increase
236+
assertEquals(2, callCount2.get()); // Should not increase
237+
}
238+
239+
@Test
240+
public void testConfigurationChangeListenerIgnoresFailedFetch() {
241+
// Setup mock response to simulate failure
242+
when(mockHttpClient.get(anyString())).thenThrow(new RuntimeException("Fetch failed"));
243+
244+
AtomicInteger callCount = new AtomicInteger(0);
245+
requestor.onConfigurationChange(v -> callCount.incrementAndGet());
246+
247+
// Failed fetch should not trigger the callback
248+
try {
249+
requestor.fetchAndSaveFromRemote();
250+
} catch (Exception e) {
251+
// Expected
252+
}
253+
assertEquals(0, callCount.get());
254+
}
255+
256+
@Test
257+
public void testConfigurationChangeListenerIgnoresFailedSave() {
258+
// Setup mock responses
259+
when(mockHttpClient.get(anyString())).thenReturn("{}".getBytes());
260+
when(mockConfigStore.saveConfiguration(any()))
261+
.thenReturn(
262+
CompletableFuture.supplyAsync(
263+
() -> {
264+
throw new RuntimeException("Save failed");
265+
}));
266+
267+
AtomicInteger callCount = new AtomicInteger(0);
268+
requestor.onConfigurationChange(v -> callCount.incrementAndGet());
269+
270+
// Failed save should not trigger the callback
271+
try {
272+
requestor.fetchAndSaveFromRemote();
273+
} catch (RuntimeException e) {
274+
// Pass
275+
}
276+
assertEquals(0, callCount.get());
277+
}
278+
279+
@Test
280+
public void testConfigurationChangeListenerAsyncSave() {
281+
// Setup mock responses
282+
when(mockHttpClient.getAsync(anyString()))
283+
.thenReturn(CompletableFuture.completedFuture("{\"flags\":{}}".getBytes()));
284+
285+
CompletableFuture<Void> saveFuture = new CompletableFuture<>();
286+
when(mockConfigStore.saveConfiguration(any())).thenReturn(saveFuture);
287+
288+
AtomicInteger callCount = new AtomicInteger(0);
289+
requestor.onConfigurationChange(v -> callCount.incrementAndGet());
290+
291+
// Start fetch
292+
CompletableFuture<Void> fetch = requestor.fetchAndSaveFromRemoteAsync();
293+
assertEquals(0, callCount.get()); // Callback should not be called yet
294+
295+
// Complete the save
296+
saveFuture.complete(null);
297+
fetch.join();
298+
assertEquals(1, callCount.get()); // Callback should be called after save completes
299+
}
163300
}

0 commit comments

Comments
 (0)