Skip to content

Commit 51f4abb

Browse files
committed
refactor: implement new initialization/poller behavior
1 parent 60c8617 commit 51f4abb

11 files changed

+567
-182
lines changed

src/client/eppo-client.ts

Lines changed: 358 additions & 132 deletions
Large diffs are not rendered by default.

src/client/eppo-precomputed-client.spec.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
import { IConfigurationStore, ISyncStore } from '../configuration-store/configuration-store';
1515
import { MemoryOnlyConfigurationStore } from '../configuration-store/memory.store';
1616
import { IPrecomputedConfigurationResponse } from '../configuration-wire/configuration-wire-types';
17-
import { DEFAULT_POLL_INTERVAL_MS, MAX_EVENT_QUEUE_SIZE, POLL_JITTER_PCT } from '../constants';
17+
import { DEFAULT_BASE_POLLING_INTERVAL_MS, MAX_EVENT_QUEUE_SIZE, POLL_JITTER_PCT } from '../constants';
1818
import FetchHttpClient from '../http-client';
1919
import {
2020
FormatEnum,
@@ -398,7 +398,7 @@ describe('EppoPrecomputedClient E2E test', () => {
398398
const precomputedFlagKey = 'string-flag';
399399
const red = 'red';
400400

401-
const maxRetryDelay = DEFAULT_POLL_INTERVAL_MS * POLL_JITTER_PCT;
401+
const maxRetryDelay = DEFAULT_BASE_POLLING_INTERVAL_MS * POLL_JITTER_PCT;
402402

403403
beforeAll(async () => {
404404
global.fetch = jest.fn(() => {
@@ -513,7 +513,7 @@ describe('EppoPrecomputedClient E2E test', () => {
513513

514514
// Expire the cache and advance time until a reload should happen
515515
MockStore.expired = true;
516-
await jest.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL_MS * 1.5);
516+
await jest.advanceTimersByTimeAsync(DEFAULT_BASE_POLLING_INTERVAL_MS * 1.5);
517517

518518
variation = client.getStringAssignment(precomputedFlagKey, 'default');
519519
expect(variation).toBe(red);
@@ -641,7 +641,7 @@ describe('EppoPrecomputedClient E2E test', () => {
641641
expect(variation).toBe(red);
642642
expect(callCount).toBe(2);
643643

644-
await jest.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL_MS);
644+
await jest.advanceTimersByTimeAsync(DEFAULT_BASE_POLLING_INTERVAL_MS);
645645
// By default, no more polling
646646
expect(callCount).toBe(pollAfterSuccessfulInitialization ? 3 : 2);
647647
});
@@ -706,7 +706,7 @@ describe('EppoPrecomputedClient E2E test', () => {
706706
expect(client.getStringAssignment(precomputedFlagKey, 'default')).toBe('default');
707707

708708
// Advance timers so a post-init poll can take place
709-
await jest.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL_MS * 1.5);
709+
await jest.advanceTimersByTimeAsync(DEFAULT_BASE_POLLING_INTERVAL_MS * 1.5);
710710

711711
// if pollAfterFailedInitialization = true, we will poll later and get a config, otherwise not
712712
expect(callCount).toBe(pollAfterFailedInitialization ? 2 : 1);

src/client/eppo-precomputed-client.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
DEFAULT_INITIAL_CONFIG_REQUEST_RETRIES,
1515
DEFAULT_POLL_CONFIG_REQUEST_RETRIES,
1616
DEFAULT_REQUEST_TIMEOUT_MS,
17-
DEFAULT_POLL_INTERVAL_MS,
17+
DEFAULT_BASE_POLLING_INTERVAL_MS,
1818
MAX_EVENT_QUEUE_SIZE,
1919
PRECOMPUTED_BASE_URL,
2020
} from '../constants';
@@ -43,13 +43,13 @@ export interface Subject {
4343
subjectAttributes: Attributes | ContextAttributes;
4444
}
4545

46-
export type PrecomputedFlagsRequestParameters = {
46+
export type PrecomputedRequestParameters = {
4747
apiKey: string;
4848
sdkVersion: string;
4949
sdkName: string;
5050
baseUrl?: string;
5151
requestTimeoutMs?: number;
52-
pollingIntervalMs?: number;
52+
basePollingIntervalMs?: number;
5353
numInitialRequestRetries?: number;
5454
numPollRequestRetries?: number;
5555
pollAfterSuccessfulInitialization?: boolean;
@@ -64,7 +64,7 @@ interface EppoPrecomputedClientOptions {
6464
overrideStore?: ISyncStore<Variation>;
6565
subject: Subject;
6666
banditActions?: Record<FlagKey, Record<string, ContextAttributes>>;
67-
requestParameters?: PrecomputedFlagsRequestParameters;
67+
requestParameters?: PrecomputedRequestParameters;
6868
}
6969

7070
export default class EppoPrecomputedClient {
@@ -75,7 +75,7 @@ export default class EppoPrecomputedClient {
7575
private banditAssignmentCache?: AssignmentCache;
7676
private assignmentCache?: AssignmentCache;
7777
private requestPoller?: IPoller;
78-
private requestParameters?: PrecomputedFlagsRequestParameters;
78+
private requestParameters?: PrecomputedRequestParameters;
7979
private subject: {
8080
subjectKey: string;
8181
subjectAttributes: ContextAttributes;
@@ -153,10 +153,10 @@ export default class EppoPrecomputedClient {
153153
} = this.requestParameters;
154154
const { subjectKey, subjectAttributes } = this.subject;
155155

156-
let { pollingIntervalMs = DEFAULT_POLL_INTERVAL_MS } = this.requestParameters;
157-
if (pollingIntervalMs <= 0) {
158-
logger.error('pollingIntervalMs must be greater than 0. Using default');
159-
pollingIntervalMs = DEFAULT_POLL_INTERVAL_MS;
156+
let { basePollingIntervalMs = DEFAULT_BASE_POLLING_INTERVAL_MS } = this.requestParameters;
157+
if (basePollingIntervalMs <= 0) {
158+
logger.error('basePollingIntervalMs must be greater than 0. Using default');
159+
basePollingIntervalMs = DEFAULT_BASE_POLLING_INTERVAL_MS;
160160
}
161161

162162
// todo: Inject the chain of dependencies below
@@ -180,7 +180,7 @@ export default class EppoPrecomputedClient {
180180
}
181181
};
182182

183-
this.requestPoller = initPoller(pollingIntervalMs, pollingCallback, {
183+
this.requestPoller = initPoller(basePollingIntervalMs, pollingCallback, {
184184
maxStartRetries: numInitialRequestRetries,
185185
maxPollRetries: numPollRequestRetries,
186186
pollAfterSuccessfulStart: pollAfterSuccessfulInitialization,

src/configuration-poller.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import ConfigurationRequestor from './configuration-requestor';
2+
import { Listeners } from './listener';
3+
import { Configuration } from './configuration';
4+
import { randomJitterMs } from './poller';
5+
import { logger } from './application-logger';
6+
7+
/**
8+
* Polls for new configurations from the Eppo server. When a new configuration is fetched,
9+
* it is passed to the subscribers of `onConfigurationFetched`.
10+
*
11+
* The poller is created in the stopped state. Call `start` to begin polling.
12+
*
13+
* @internal
14+
*/
15+
export class ConfigurationPoller {
16+
private readonly listeners = new Listeners<[Configuration]>();
17+
private readonly basePollingIntervalMs: number;
18+
private readonly maxPollingIntervalMs: number;
19+
private isRunning = false;
20+
21+
public constructor(
22+
private readonly configurationRequestor: ConfigurationRequestor,
23+
options: {
24+
basePollingIntervalMs: number;
25+
maxPollingIntervalMs: number;
26+
},
27+
) {
28+
this.basePollingIntervalMs = options.basePollingIntervalMs;
29+
this.maxPollingIntervalMs = options.maxPollingIntervalMs;
30+
}
31+
32+
/**
33+
* Starts the configuration poller.
34+
*
35+
* This method will start polling for new configurations from the Eppo server.
36+
* It will continue to poll until the `stop` method is called.
37+
*/
38+
public start(): void {
39+
if (!this.isRunning) {
40+
this.isRunning = true;
41+
this.poll().finally(() => {
42+
// Just to be safe, reset isRunning if the poll() method throws an error or exits (it
43+
// shouldn't).
44+
this.isRunning = false;
45+
});
46+
}
47+
}
48+
49+
/**
50+
* Stops the configuration poller.
51+
*
52+
* This method will stop polling for new configurations from the Eppo server. Note that it will
53+
* not interrupt the current poll cycle / active fetch, but it will make sure that configuration
54+
* listeners are not notified of any new configurations after this method is called.
55+
*/
56+
public stop(): void {
57+
this.isRunning = false;
58+
}
59+
60+
/**
61+
* Register a listener to be notified when new configuration is fetched.
62+
* @param listener Callback function that receives the fetched `Configuration` object
63+
* @returns A function that can be called to unsubscribe the listener.
64+
*/
65+
public onConfigurationFetched(listener: (configuration: Configuration) => void): () => void {
66+
return this.listeners.addListener(listener);
67+
}
68+
69+
/**
70+
* Fetch configuration immediately without waiting for the next polling cycle.
71+
*
72+
* Note: This does not coordinate with active polling - polling intervals will not be adjusted
73+
* when using this method.
74+
*
75+
* @throws If there is an error fetching the configuration
76+
*/
77+
public async fetchImmediate(): Promise<Configuration | null> {
78+
const configuration = await this.configurationRequestor.fetchConfiguration();
79+
if (configuration) {
80+
this.listeners.notify(configuration);
81+
}
82+
return configuration;
83+
}
84+
85+
private async poll(): Promise<void> {
86+
// Number of failures we've seen in a row.
87+
let consecutiveFailures = 0;
88+
89+
while (this.isRunning) {
90+
try {
91+
const configuration = await this.configurationRequestor.fetchConfiguration();
92+
if (configuration && this.isRunning) {
93+
this.listeners.notify(configuration);
94+
}
95+
// Reset failure counter on success
96+
consecutiveFailures = 0;
97+
} catch (err) {
98+
logger.warn('Eppo SDK encountered an error polling configurations', { err });
99+
consecutiveFailures++;
100+
}
101+
102+
if (consecutiveFailures === 0) {
103+
await timeout(this.basePollingIntervalMs + randomJitterMs(this.basePollingIntervalMs));
104+
} else {
105+
// Exponential backoff capped at maxPollingIntervalMs.
106+
const baseDelayMs = Math.min((Math.pow(2, consecutiveFailures) * this.basePollingIntervalMs), this.maxPollingIntervalMs);
107+
const delayMs = baseDelayMs + randomJitterMs(baseDelayMs);
108+
109+
logger.warn('Eppo SDK will try polling again', {
110+
delayMs,
111+
consecutiveFailures,
112+
});
113+
114+
await timeout(delayMs);
115+
}
116+
}
117+
}
118+
}
119+
120+
function timeout(ms: number) {
121+
return new Promise(resolve => setTimeout(resolve, ms));
122+
}
123+

src/configuration-requestor.ts

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ export type ConfigurationRequestorOptions = {
66
wantsBandits: boolean;
77
};
88

9-
// Requests AND stores flag configurations
9+
/**
10+
* @internal
11+
*/
1012
export default class ConfigurationRequestor {
1113
private readonly options: ConfigurationRequestorOptions;
1214

13-
constructor(
15+
public constructor(
1416
private readonly httpClient: IHttpClient,
1517
private readonly configurationStore: ConfigurationStore,
1618
options: Partial<ConfigurationRequestorOptions> = {},
@@ -21,7 +23,7 @@ export default class ConfigurationRequestor {
2123
};
2224
}
2325

24-
async fetchConfiguration(): Promise<Configuration | null> {
26+
public async fetchConfiguration(): Promise<Configuration | null> {
2527
const flags = await this.httpClient.getUniversalFlagConfiguration();
2628
if (!flags?.response.flags) {
2729
return null;
@@ -32,19 +34,12 @@ export default class ConfigurationRequestor {
3234
return Configuration.fromResponses({ flags, bandits });
3335
}
3436

35-
async fetchAndStoreConfigurations(): Promise<void> {
36-
const configuration = await this.fetchConfiguration();
37-
if (configuration) {
38-
this.configurationStore.setConfiguration(configuration);
39-
}
40-
}
41-
4237
/**
4338
* Get bandits configuration matching the flags configuration.
4439
*
4540
* This function does not fetch bandits if the client does not want
4641
* them (`ConfigurationRequestorOptions.wantsBandits === false`) or
47-
* we we can reuse bandit models from `ConfigurationStore`.
42+
* if we can reuse bandit models from `ConfigurationStore`.
4843
*/
4944
private async getBanditsFor(flags: FlagsConfig): Promise<BanditsConfig | undefined> {
5045
const needsBandits =

src/configuration-store/configuration-store.ts

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Configuration } from '../configuration';
22
import { Environment } from '../interfaces';
3+
import { Listeners } from '../listener';
34

45
/**
56
* `ConfigurationStore` is a central piece of Eppo SDK and answers a
@@ -9,7 +10,7 @@ import { Environment } from '../interfaces';
910
*/
1011
export class ConfigurationStore {
1112
private configuration: Configuration;
12-
private readonly listeners: Array<(configuration: Configuration) => void> = [];
13+
private readonly listeners: Listeners<[Configuration]> = new Listeners();
1314

1415
public constructor(configuration: Configuration = Configuration.empty()) {
1516
this.configuration = configuration;
@@ -21,7 +22,7 @@ export class ConfigurationStore {
2122

2223
public setConfiguration(configuration: Configuration): void {
2324
this.configuration = configuration;
24-
this.notifyListeners();
25+
this.listeners.notify(configuration);
2526
}
2627

2728
/**
@@ -31,24 +32,7 @@ export class ConfigurationStore {
3132
* Returns a function to unsubscribe from future updates.
3233
*/
3334
public onConfigurationChange(listener: (configuration: Configuration) => void): () => void {
34-
this.listeners.push(listener);
35-
36-
return () => {
37-
const idx = this.listeners.indexOf(listener);
38-
if (idx !== -1) {
39-
this.listeners.splice(idx, 1);
40-
}
41-
};
42-
}
43-
44-
private notifyListeners(): void {
45-
for (const listener of this.listeners) {
46-
try {
47-
listener(this.configuration);
48-
} catch {
49-
// ignore
50-
}
51-
}
35+
return this.listeners.addListener(listener);
5236
}
5337
}
5438

src/configuration.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,34 @@ export class Configuration {
8787
return this.flags;
8888
}
8989

90+
/** @internal */
91+
public getFetchedAt(): Date | undefined {
92+
const flagsFetchedAt = this.flags?.fetchedAt ? new Date(this.flags.fetchedAt).getTime() : 0;
93+
const banditsFetchedAt = this.bandits?.fetchedAt ? new Date(this.bandits.fetchedAt).getTime() : 0;
94+
const maxFetchedAt = Math.max(flagsFetchedAt, banditsFetchedAt);
95+
return maxFetchedAt > 0 ? new Date(maxFetchedAt) : undefined;
96+
}
97+
98+
/** @internal */
99+
public isEmpty(): boolean {
100+
return !this.flags;
101+
}
102+
103+
/** @internal */
104+
public getAge(): number | undefined {
105+
const fetchedAt = this.getFetchedAt();
106+
if (!fetchedAt) {
107+
return undefined;
108+
}
109+
return Date.now() - fetchedAt.getTime();
110+
}
111+
112+
/** @internal */
113+
public isStale(maxAgeSeconds: number): boolean {
114+
const age = this.getAge();
115+
return !!age && age > maxAgeSeconds * 1000;
116+
}
117+
90118
/** @internal
91119
*
92120
* Returns flag configuration for the given flag key. Obfuscation is
@@ -116,6 +144,7 @@ export class Configuration {
116144
return this.flagBanditVariations[flagKey] ?? [];
117145
}
118146

147+
/** @internal */
119148
public getFlagVariationBandit(flagKey: string, variationValue: string): BanditParameters | null {
120149
const banditVariations = this.getFlagBanditVariations(flagKey);
121150
const banditKey = banditVariations?.find(

src/constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { FormatEnum } from './interfaces';
22

33
export const DEFAULT_REQUEST_TIMEOUT_MS = 5000;
44
export const REQUEST_TIMEOUT_MILLIS = DEFAULT_REQUEST_TIMEOUT_MS; // for backwards compatibility
5-
export const DEFAULT_POLL_INTERVAL_MS = 30000;
5+
export const DEFAULT_BASE_POLLING_INTERVAL_MS = 30000;
66
export const POLL_JITTER_PCT = 0.1;
77
export const DEFAULT_INITIAL_CONFIG_REQUEST_RETRIES = 1;
88
export const DEFAULT_POLL_CONFIG_REQUEST_RETRIES = 7;

0 commit comments

Comments
 (0)