Skip to content
Open
88 changes: 44 additions & 44 deletions src/main/java/cloud/eppo/BaseEppoClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
Expand All @@ -43,57 +41,43 @@ public class BaseEppoClient {
private final IAssignmentCache banditAssignmentCache;
private Timer pollTimer;

@Nullable protected CompletableFuture<Boolean> getInitialConfigFuture() {
return initialConfigFuture;
}

private final CompletableFuture<Boolean> initialConfigFuture;

// Fields useful for testing in situations where we want to mock the http client or configuration
// store (accessed via reflection)
/** @noinspection FieldMayBeFinal */
private static EppoHttpClient httpClientOverride = null;
private static IEppoHttpClient httpClientOverride = null;

// It is important that the bandit assignment cache expire with a short-enough TTL to last about
// one user session.
// The recommended is 10 minutes (per @Sven)
/** @param host To be removed in v4. use `apiBaseUrl` instead. */
/** */
protected BaseEppoClient(
@NotNull String apiKey,
@NotNull String sdkKey,
@NotNull String sdkName,
@NotNull String sdkVersion,
@Deprecated @Nullable String host,
@Nullable String apiBaseUrl,
@Nullable AssignmentLogger assignmentLogger,
@Nullable BanditLogger banditLogger,
@Nullable IConfigurationStore configurationStore,
boolean isGracefulMode,
boolean expectObfuscatedConfig,
boolean supportBandits,
@Nullable CompletableFuture<Configuration> initialConfiguration,
@Nullable Configuration initialConfiguration,
@Nullable IAssignmentCache assignmentCache,
@Nullable IAssignmentCache banditAssignmentCache) {

if (apiBaseUrl == null) {
apiBaseUrl = host != null ? Constants.appendApiPathToHost(host) : Constants.DEFAULT_BASE_URL;
}

this.assignmentCache = assignmentCache;
this.banditAssignmentCache = banditAssignmentCache;

EppoHttpClient httpClient =
buildHttpClient(apiBaseUrl, new SDKKey(apiKey), sdkName, sdkVersion);
IEppoHttpClient httpClient =
buildHttpClient(apiBaseUrl, new SDKKey(sdkKey), sdkName, sdkVersion);
this.configurationStore =
configurationStore != null ? configurationStore : new ConfigurationStore();

// For now, the configuration is only obfuscated for Android clients
requestor =
new ConfigurationRequestor(
this.configurationStore, httpClient, expectObfuscatedConfig, supportBandits);
initialConfigFuture =
initialConfiguration != null
? requestor.setInitialConfiguration(initialConfiguration)
: null;
requestor = new ConfigurationRequestor(this.configurationStore, httpClient, supportBandits);

if (initialConfiguration != null) {
requestor.activateConfiguration(initialConfiguration);
}

this.assignmentLogger = assignmentLogger;
this.banditLogger = banditLogger;
Expand All @@ -103,7 +87,7 @@ protected BaseEppoClient(
this.sdkVersion = sdkVersion;
}

private EppoHttpClient buildHttpClient(
private IEppoHttpClient buildHttpClient(
String apiBaseUrl, SDKKey sdkKey, String sdkName, String sdkVersion) {
ApiEndpoints endpointHelper = new ApiEndpoints(sdkKey, apiBaseUrl);

Expand All @@ -112,6 +96,10 @@ private EppoHttpClient buildHttpClient(
: new EppoHttpClient(endpointHelper.getBaseUrl(), sdkKey.getToken(), sdkName, sdkVersion);
}

public void activateConfiguration(Configuration configuration) {
requestor.activateConfiguration(configuration);
}

protected void loadConfiguration() {
try {
requestor.fetchAndSaveFromRemote();
Expand Down Expand Up @@ -172,22 +160,25 @@ protected void startPolling(long pollingIntervalMs, long pollingJitterMs) {
fetchConfigurationsTask.scheduleNext();
}

protected CompletableFuture<Void> loadConfigurationAsync() {
CompletableFuture<Void> future = new CompletableFuture<>();
protected void loadConfigurationAsync(EppoActionCallback<Configuration> callback) {

requestor
.fetchAndSaveFromRemoteAsync()
.exceptionally(
ex -> {
log.error("Encountered Exception while loading configuration", ex);
if (!isGracefulMode) {
future.completeExceptionally(ex);
}
return null;
})
.thenAccept(future::complete);
requestor.fetchAndSaveFromRemoteAsync(
new EppoActionCallback<Configuration>() {
@Override
public void onSuccess(Configuration data) {
callback.onSuccess(data);
}

return future;
@Override
public void onFailure(Throwable error) {
log.error("Encountered Exception while loading configuration", error);
if (isGracefulMode) {
callback.onSuccess(null);
} else {
callback.onFailure(error);
}
}
});
}

protected EppoValue getTypedAssignment(
Expand Down Expand Up @@ -577,14 +568,23 @@ public void setIsGracefulFailureMode(boolean isGracefulFailureMode) {
this.isGracefulMode = isGracefulFailureMode;
}

public abstract static class EppoListener implements Configuration.ConfigurationCallback {
@Override
public void accept(Configuration configuration) {
this.onConfigurationChanged(configuration);
}

public abstract void onConfigurationChanged(Configuration newConfig);
}

/**
* Subscribe to changes to the configuration.
*
* @param callback A function to be executed when the configuration changes.
* @param callback A listener which is notified of configuration changes.
* @return a Runnable which, when called unsubscribes the callback from configuration change
* events.
*/
public Runnable onConfigurationChange(Consumer<Configuration> callback) {
public Runnable onConfigurationChange(EppoListener callback) {
return requestor.onConfigurationChange(callback);
}

Expand Down
164 changes: 61 additions & 103 deletions src/main/java/cloud/eppo/ConfigurationRequestor.java
Original file line number Diff line number Diff line change
@@ -1,84 +1,47 @@
package cloud.eppo;

import cloud.eppo.api.Configuration;
import cloud.eppo.api.EppoActionCallback;
import cloud.eppo.callback.CallbackManager;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigurationRequestor {
private static final Logger log = LoggerFactory.getLogger(ConfigurationRequestor.class);

private final EppoHttpClient client;
private final IEppoHttpClient client;
private final IConfigurationStore configurationStore;
private final boolean expectObfuscatedConfig;
private final boolean supportBandits;

private CompletableFuture<Void> remoteFetchFuture = null;
private CompletableFuture<Boolean> configurationFuture = null;
private boolean initialConfigSet = false;

private final CallbackManager<Configuration> configChangeManager = new CallbackManager<>();
private final CallbackManager<Configuration, Configuration.ConfigurationCallback>
configChangeManager =
new CallbackManager<>(
// no lambdas before java8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they android 21 compatible through its "desugaring"? Would that be ok to support or, do we need stay away from that entirely?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handy thing is that for the dev, if they're using a platform that supports lambdas, they can use a lambda when they register a callback (this is in EppoClient). Since the callback interface there is a once function interface, Java will treat it interchangeably with a lambda whose signature matches the function.

new CallbackManager.Dispatcher<Configuration, Configuration.ConfigurationCallback>() {
@Override
public void dispatch(
Configuration.ConfigurationCallback callback, Configuration data) {
callback.accept(data);
}
});

public ConfigurationRequestor(
@NotNull IConfigurationStore configurationStore,
@NotNull EppoHttpClient client,
boolean expectObfuscatedConfig,
@NotNull IEppoHttpClient client,
boolean supportBandits) {
this.configurationStore = configurationStore;
this.client = client;
this.expectObfuscatedConfig = expectObfuscatedConfig;
this.supportBandits = supportBandits;
}

// Synchronously set the initial configuration.
public void setInitialConfiguration(@NotNull Configuration configuration) {
if (initialConfigSet || this.configurationFuture != null) {
throw new IllegalStateException("Initial configuration has already been set");
}

initialConfigSet = saveConfigurationAndNotify(configuration).thenApply(v -> true).join();
}

/**
* Asynchronously sets the initial configuration. Resolves to `true` if the initial configuration
* was used, false if not (due to being empty, a fetched config taking precedence, etc.)
* Synchronously sets and activates the initial configuration.
*
* @param configuration The configuration to activate
*/
public CompletableFuture<Boolean> setInitialConfiguration(
@NotNull CompletableFuture<Configuration> configurationFuture) {
if (initialConfigSet || this.configurationFuture != null) {
throw new IllegalStateException("Configuration future has already been set");
}
this.configurationFuture =
configurationFuture
.thenApply(
(config) -> {
synchronized (configurationStore) {
if (config == null || config.isEmpty()) {
log.debug("Initial configuration future returned empty/null");
return false;
} else if (remoteFetchFuture != null
&& remoteFetchFuture.isDone()
&& !remoteFetchFuture.isCompletedExceptionally()) {
// Don't clobber a successful fetch.
log.debug("Fetch has completed; ignoring initial config load.");
return false;
} else {
initialConfigSet =
saveConfigurationAndNotify(config).thenApply((s) -> true).join();
return true;
}
}
})
.exceptionally(
(e) -> {
log.error("Error setting initial config", e);
return false;
});
return this.configurationFuture;
public void activateConfiguration(@NotNull Configuration configuration) {
saveConfigurationAndNotify(configuration);
}

/** Loads configuration synchronously from the API server. */
Expand All @@ -90,70 +53,65 @@ void fetchAndSaveFromRemote() {

byte[] flagConfigurationJsonBytes = client.get(Constants.FLAG_CONFIG_ENDPOINT);
Configuration.Builder configBuilder =
Configuration.builder(flagConfigurationJsonBytes, expectObfuscatedConfig)
.banditParametersFromConfig(lastConfig);
Configuration.builder(flagConfigurationJsonBytes).banditParametersFromConfig(lastConfig);

if (supportBandits && configBuilder.requiresUpdatedBanditModels()) {
byte[] banditParametersJsonBytes = client.get(Constants.BANDIT_ENDPOINT);
configBuilder.banditParameters(banditParametersJsonBytes);
}

saveConfigurationAndNotify(configBuilder.build()).join();
saveConfigurationAndNotify(configBuilder.build());
}

/** Loads configuration asynchronously from the API server, off-thread. */
CompletableFuture<Void> fetchAndSaveFromRemoteAsync() {
void fetchAndSaveFromRemoteAsync(EppoActionCallback<Configuration> callback) {
log.debug("Fetching configuration from API server");
final Configuration lastConfig = configurationStore.getConfiguration();

if (remoteFetchFuture != null && !remoteFetchFuture.isDone()) {
log.debug("Remote fetch is active. Cancelling and restarting");
remoteFetchFuture.cancel(true);
remoteFetchFuture = null;
}

remoteFetchFuture =
client
.getAsync(Constants.FLAG_CONFIG_ENDPOINT)
.thenCompose(
flagConfigJsonBytes -> {
synchronized (this) {
Configuration.Builder configBuilder =
Configuration.builder(flagConfigJsonBytes, expectObfuscatedConfig)
.banditParametersFromConfig(
lastConfig); // possibly reuse last bandit models loaded.

if (supportBandits && configBuilder.requiresUpdatedBanditModels()) {
byte[] banditParametersJsonBytes;
try {
banditParametersJsonBytes =
client.getAsync(Constants.BANDIT_ENDPOINT).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Error fetching from remote: " + e.getMessage());
throw new RuntimeException(e);
}
if (banditParametersJsonBytes != null) {
configBuilder.banditParameters(banditParametersJsonBytes);
}
}

return saveConfigurationAndNotify(configBuilder.build());
}
});
return remoteFetchFuture;
}
client.getAsync(
Constants.FLAG_CONFIG_ENDPOINT,
new IEppoHttpClient.EppoHttpCallback() {
@Override
public void onSuccess(byte[] flagConfigJsonBytes) {
synchronized (this) {
Configuration.Builder configBuilder =
Configuration.builder(flagConfigJsonBytes)
.banditParametersFromConfig(
lastConfig); // possibly reuse last bandit models loaded.

if (supportBandits && configBuilder.requiresUpdatedBanditModels()) {
byte[] banditParametersJsonBytes;

banditParametersJsonBytes = client.get(Constants.BANDIT_ENDPOINT);

if (banditParametersJsonBytes != null) {
configBuilder.banditParameters(banditParametersJsonBytes);
}
}

Configuration config = configBuilder.build();
saveConfigurationAndNotify(config);
callback.onSuccess(config);
}
}

private CompletableFuture<Void> saveConfigurationAndNotify(Configuration configuration) {
CompletableFuture<Void> saveFuture = configurationStore.saveConfiguration(configuration);
return saveFuture.thenRun(
() -> {
synchronized (configChangeManager) {
configChangeManager.notifyCallbacks(configuration);
@Override
public void onFailure(Throwable error) {
log.error(
"Failed to fetch configuration from API server: {}", error.getMessage(), error);
callback.onFailure(error);
}
});
}

public Runnable onConfigurationChange(Consumer<Configuration> callback) {
private void saveConfigurationAndNotify(Configuration configuration) {
configurationStore.saveConfiguration(configuration);
synchronized (configChangeManager) {
configChangeManager.notifyCallbacks(configuration);
}
}

public Runnable onConfigurationChange(Configuration.ConfigurationCallback callback) {
return configChangeManager.subscribe(callback);
}
}
4 changes: 1 addition & 3 deletions src/main/java/cloud/eppo/ConfigurationStore.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cloud.eppo;

import cloud.eppo.api.Configuration;
import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;

/** Memory-only configuration store. */
Expand All @@ -12,9 +11,8 @@ public class ConfigurationStore implements IConfigurationStore {

public ConfigurationStore() {}

public CompletableFuture<Void> saveConfiguration(@NotNull final Configuration configuration) {
public void saveConfiguration(@NotNull final Configuration configuration) {
this.configuration = configuration;
return CompletableFuture.completedFuture(null);
}

@NotNull public Configuration getConfiguration() {
Expand Down
Loading