-
Notifications
You must be signed in to change notification settings - Fork 1
feat(events): Add EventDispatcher
and related classes
#138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
d5d426e
726a6ac
acdf827
4314045
e543f2d
d297322
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
{ | ||
"name": "@eppo/js-client-sdk-common", | ||
"version": "4.3.0", | ||
"version": "4.4.0", | ||
"description": "Eppo SDK for client-side JavaScript applications (base for both web and react native)", | ||
"main": "dist/index.js", | ||
"files": [ | ||
|
@@ -46,6 +46,7 @@ | |
"@types/lodash": "^4.17.5", | ||
"@types/md5": "^2.3.2", | ||
"@types/semver": "^7.5.6", | ||
"@types/uuid": "^10.0.0", | ||
"@typescript-eslint/eslint-plugin": "^5.13.0", | ||
"@typescript-eslint/parser": "^5.13.0", | ||
"eslint": "^8.17.0", | ||
|
@@ -71,7 +72,8 @@ | |
"js-base64": "^3.7.7", | ||
"md5": "^2.3.0", | ||
"pino": "^8.19.0", | ||
"semver": "^7.5.4" | ||
"semver": "^7.5.4", | ||
"uuid": "^8.3.2" | ||
}, | ||
"packageManager": "[email protected]+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
import { v4 as randomUUID } from 'uuid'; | ||
|
||
import ApiEndpoints from '../api-endpoints'; | ||
import { logger } from '../application-logger'; | ||
import { IAssignmentEvent, IAssignmentLogger } from '../assignment-logger'; | ||
|
@@ -21,7 +23,8 @@ | |
import { Evaluator, FlagEvaluation, noneResult } from '../evaluator'; | ||
import ArrayBackedNamedEventQueue from '../events/array-backed-named-event-queue'; | ||
import { BoundedEventQueue } from '../events/bounded-event-queue'; | ||
import NamedEventQueue from '../events/named-event-queue'; | ||
import EventDispatcher from '../events/event-dispatcher'; | ||
import NoOpEventDispatcher from '../events/no-op-event-dispatcher'; | ||
import { | ||
FlagEvaluationDetailsBuilder, | ||
IFlagEvaluationDetails, | ||
|
@@ -77,8 +80,17 @@ | |
treatmentVariationEntries: Array<T>; | ||
} | ||
|
||
const DEFAULT_EVENT_DISPATCHER_CONFIG = { | ||
Check warning on line 83 in src/client/eppo-client.ts
|
||
// TODO: Replace with actual ingestion URL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will be handled in a follow up PR by parsing it from the SDK key |
||
ingestionUrl: 'https://example.com/events', | ||
batchSize: 10, | ||
flushInterval: 10_000, | ||
retryInterval: 5_000, | ||
|
||
maxRetries: 3, | ||
}; | ||
|
||
export default class EppoClient { | ||
private readonly eventQueue: NamedEventQueue<unknown>; | ||
private readonly eventDispatcher: EventDispatcher; | ||
private readonly assignmentEventsQueue: BoundedEventQueue<IAssignmentEvent> = | ||
newBoundedArrayEventQueue<IAssignmentEvent>('assignments'); | ||
private readonly banditEventsQueue: BoundedEventQueue<IBanditEvent> = | ||
|
@@ -99,23 +111,23 @@ | |
private readonly evaluator = new Evaluator(); | ||
|
||
constructor({ | ||
eventQueue = new ArrayBackedNamedEventQueue('events'), | ||
eventDispatcher = new NoOpEventDispatcher(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. default implementation is a no-op if none provided |
||
isObfuscated = false, | ||
flagConfigurationStore, | ||
banditVariationConfigurationStore, | ||
banditModelConfigurationStore, | ||
configurationRequestParameters, | ||
}: { | ||
// Queue for arbitrary, application-level events (not to be confused with Eppo specific assignment | ||
// Dispatcher for arbitrary, application-level events (not to be confused with Eppo specific assignment | ||
// or bandit events). These events are application-specific and captures by EppoClient#track API. | ||
eventQueue?: NamedEventQueue<unknown>; | ||
eventDispatcher?: EventDispatcher; | ||
flagConfigurationStore: IConfigurationStore<Flag | ObfuscatedFlag>; | ||
banditVariationConfigurationStore?: IConfigurationStore<BanditVariation[]>; | ||
banditModelConfigurationStore?: IConfigurationStore<BanditParameters>; | ||
configurationRequestParameters?: FlagConfigurationRequestParameters; | ||
isObfuscated?: boolean; | ||
}) { | ||
this.eventQueue = eventQueue; | ||
this.eventDispatcher = eventDispatcher; | ||
this.flagConfigurationStore = flagConfigurationStore; | ||
this.banditVariationConfigurationStore = banditVariationConfigurationStore; | ||
this.banditModelConfigurationStore = banditModelConfigurationStore; | ||
|
@@ -546,7 +558,7 @@ | |
); | ||
evaluationDetails.banditAction = action; | ||
} | ||
} catch (err: any) { | ||
Check warning on line 561 in src/client/eppo-client.ts
|
||
logger.error('Error determining bandit action', err); | ||
if (!this.isGracefulFailureMode) { | ||
throw err; | ||
|
@@ -781,7 +793,7 @@ | |
expectedVariationType, | ||
); | ||
return this.parseVariationWithDetails(result, defaultValue, expectedVariationType); | ||
} catch (error: any) { | ||
Check warning on line 796 in src/client/eppo-client.ts
|
||
const eppoValue = this.rethrowIfNotGraceful(error, defaultValue); | ||
if (error instanceof FlagEvaluationError && error.flagEvaluationDetails) { | ||
return { | ||
|
@@ -814,7 +826,7 @@ | |
} | ||
const eppoValue = EppoValue.valueOf(variation.value, expectedVariationType); | ||
return { eppoValue, flagEvaluationDetails }; | ||
} catch (error: any) { | ||
Check warning on line 829 in src/client/eppo-client.ts
|
||
const eppoValue = this.rethrowIfNotGraceful(error, defaultValue); | ||
return { eppoValue, flagEvaluationDetails }; | ||
} | ||
|
@@ -909,9 +921,10 @@ | |
return result; | ||
} | ||
|
||
/** TODO */ | ||
// noinspection JSUnusedGlobalSymbols | ||
track(event: unknown, params: Record<string, unknown>) { | ||
this.eventQueue.push({ event, params }); | ||
this.eventDispatcher.dispatch({ id: randomUUID(), data: event, params }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use a random uuid for avoiding duplicates server side There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking the payload would contain 4 fields:
What is intended use of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's pretty arbitrary and will change. I wouldn't worry too much about it just yet, since we're not using the actual types here, it's a placeholder. |
||
} | ||
|
||
private newFlagEvaluationDetailsBuilder(flagKey: string): FlagEvaluationDetailsBuilder { | ||
|
@@ -929,7 +942,9 @@ | |
return { | ||
configFetchedAt: this.flagConfigurationStore.getConfigFetchedAt() ?? '', | ||
configPublishedAt: this.flagConfigurationStore.getConfigPublishedAt() ?? '', | ||
configEnvironment: this.flagConfigurationStore.getEnvironment() ?? { name: '' }, | ||
configEnvironment: this.flagConfigurationStore.getEnvironment() ?? { | ||
name: '', | ||
}, | ||
}; | ||
} | ||
|
||
|
@@ -1037,7 +1052,7 @@ | |
eventsToFlush.forEach((event) => { | ||
try { | ||
logFunction(event); | ||
} catch (error: any) { | ||
Check warning on line 1055 in src/client/eppo-client.ts
|
||
logger.error(`[Eppo SDK] Error flushing event to logger: ${error.message}`); | ||
} | ||
}); | ||
|
@@ -1085,7 +1100,7 @@ | |
allocationKey: allocationKey ?? '__eppo_no_allocation', | ||
variationKey: variation?.key ?? '__eppo_no_variation', | ||
}); | ||
} catch (error: any) { | ||
Check warning on line 1103 in src/client/eppo-client.ts
|
||
logger.error(`[Eppo SDK] Error logging assignment event: ${error.message}`); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,11 @@ | ||
import NamedEventQueue from './named-event-queue'; | ||
|
||
/** A named event queue backed by an array. */ | ||
/** | ||
* @internal | ||
* A named event queue backed by an **unbounded** array. | ||
* This class probably should NOT be used directly, but only as a backing store for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this shouldn't ever be used directly, I think this should be built in to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
* {@link BoundedEventQueue}. | ||
*/ | ||
export default class ArrayBackedNamedEventQueue<T> implements NamedEventQueue<T> { | ||
private readonly events: T[] = []; | ||
|
||
|
@@ -22,7 +27,11 @@ export default class ArrayBackedNamedEventQueue<T> implements NamedEventQueue<T> | |
return this.events[Symbol.iterator](); | ||
} | ||
|
||
shift(): T | undefined { | ||
return this.events.shift(); | ||
splice(count: number): T[] { | ||
return this.events.splice(0, count); | ||
} | ||
|
||
isEmpty(): boolean { | ||
return this.events.length === 0; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import ArrayBackedNamedEventQueue from './array-backed-named-event-queue'; | ||
import BatchEventProcessor from './batch-event-processor'; | ||
|
||
describe('BatchEventProcessor', () => { | ||
describe('nextBatch', () => { | ||
it('should return a batch and remove items from the queue', () => { | ||
const eventQueue = new ArrayBackedNamedEventQueue('test-queue'); | ||
const processor = new BatchEventProcessor(eventQueue, 2); | ||
expect(processor.isEmpty()).toBeTruthy(); | ||
expect(processor.nextBatch()).toHaveLength(0); | ||
processor.push({ id: 'foo-1', data: 'event1', params: {} }); | ||
processor.push({ id: 'foo-2', data: 'event2', params: {} }); | ||
processor.push({ id: 'foo-3', data: 'event3', params: {} }); | ||
expect(processor.isEmpty()).toBeFalsy(); | ||
const batch = processor.nextBatch(); | ||
expect(batch).toEqual([ | ||
{ id: 'foo-1', data: 'event1', params: {} }, | ||
{ id: 'foo-2', data: 'event2', params: {} }, | ||
]); | ||
Comment on lines
+16
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
expect(processor.nextBatch()).toEqual([{ id: 'foo-3', data: 'event3', params: {} }]); | ||
expect(processor.isEmpty()).toBeTruthy(); | ||
}); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import NamedEventQueue from './named-event-queue'; | ||
|
||
export default class BatchEventProcessor { | ||
constructor( | ||
private readonly eventQueue: NamedEventQueue<unknown>, | ||
private readonly batchSize: number, | ||
) {} | ||
|
||
nextBatch(): unknown[] { | ||
return this.eventQueue.splice(this.batchSize); | ||
} | ||
|
||
push(event: unknown): void { | ||
this.eventQueue.push(event); | ||
} | ||
|
||
isEmpty(): boolean { | ||
return this.eventQueue.isEmpty(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import { logger } from '../application-logger'; | ||
|
||
import EventDelivery from './event-delivery'; | ||
|
||
/** | ||
* Attempts to retry delivering a batch of events to the ingestionUrl up to `maxRetries` times | ||
* using exponential backoff. | ||
*/ | ||
export default class BatchRetryManager { | ||
/** | ||
* @param config.retryInterval - The minimum retry interval in milliseconds | ||
* @param config.maxRetryDelayMs - The maximum retry delay in milliseconds | ||
* @param config.maxRetries - The maximum number of retries | ||
*/ | ||
constructor( | ||
private readonly delivery: EventDelivery, | ||
private readonly config: { | ||
retryIntervalMs: number; | ||
maxRetryDelayMs: number; | ||
maxRetries: number; | ||
}, | ||
) {} | ||
|
||
async retry(batch: unknown[], attempt = 0): Promise<void> { | ||
const { retryIntervalMs, maxRetryDelayMs, maxRetries } = this.config; | ||
const delay = Math.min(retryIntervalMs * Math.pow(2, attempt), maxRetryDelayMs); | ||
logger.info(`[BatchRetryManager] Retrying batch delivery in ${delay}ms...`); | ||
await new Promise((resolve) => setTimeout(resolve, delay)); | ||
|
||
const success = await this.delivery.deliver(batch); | ||
if (success) { | ||
logger.info(`[BatchRetryManager] Batch delivery successfully after ${attempt} retries.`); | ||
return; | ||
} | ||
if (attempt < maxRetries) { | ||
return this.retry(batch, attempt + 1); | ||
} else { | ||
// TODO: Persist batch to avoid data loss | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will also do this in a future PR |
||
logger.warn( | ||
`[BatchRetryManager] Failed to deliver batch after ${maxRetries} retries, bailing`, | ||
); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a fairly old version of uuid (4 years old) but the last one that's compatible with our module settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we don't want to add an additional dependency, we can delegate the event ID generation to consuming libraries (js-client, node, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you hoping to use uuid v7 because of this https://buildkite.com/resources/blog/goodbye-integers-hello-uuids/ or another feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately we can't use uuid v7 because it's not available in the version of the library we're using. looks like it was added on uuid v10. however yes, that could be a pretty decent primary key for these events. I don't want to make any assumptions here, the only goal was to have a unique identifier and we can't use sequential IDs reliably because we can't guarantee the sequence ordering (we'd have to persist additional state for the last ID used, which is inconvenient).