Skip to content

Commit 7904d04

Browse files
authored
feat(events): Add EventDispatcher and related classes (#138)
* feat(events): Add `EventDispatcher` and related classes * nits * moar nits * add uuid * ignore explicit any * code review nits
1 parent 7f838c2 commit 7904d04

19 files changed

+506
-22
lines changed

.eslintrc.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ module.exports = {
2424
],
2525
'import/named': 'off',
2626
'import/no-unresolved': 'off',
27+
'@typescript-eslint/no-explicit-any': 'off',
2728
'import/order': [
2829
'warn',
2930
{
@@ -33,7 +34,7 @@ module.exports = {
3334
group: 'parent',
3435
position: 'before',
3536
},
36-
],
37+
],
3738
groups: ['builtin', 'external', 'parent', 'sibling', 'index'],
3839
'newlines-between': 'always',
3940
alphabetize: {

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"@types/lodash": "^4.17.5",
4747
"@types/md5": "^2.3.2",
4848
"@types/semver": "^7.5.6",
49+
"@types/uuid": "^10.0.0",
4950
"@typescript-eslint/eslint-plugin": "^5.13.0",
5051
"@typescript-eslint/parser": "^5.13.0",
5152
"eslint": "^8.17.0",
@@ -71,7 +72,8 @@
7172
"js-base64": "^3.7.7",
7273
"md5": "^2.3.0",
7374
"pino": "^8.19.0",
74-
"semver": "^7.5.4"
75+
"semver": "^7.5.4",
76+
"uuid": "^8.3.2"
7577
},
7678
"packageManager": "[email protected]+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
7779
}

src/client/eppo-client.ts

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { v4 as randomUUID } from 'uuid';
2+
13
import ApiEndpoints from '../api-endpoints';
24
import { logger } from '../application-logger';
35
import { IAssignmentEvent, IAssignmentLogger } from '../assignment-logger';
@@ -20,7 +22,8 @@ import { EppoValue } from '../eppo_value';
2022
import { Evaluator, FlagEvaluation, noneResult } from '../evaluator';
2123
import ArrayBackedNamedEventQueue from '../events/array-backed-named-event-queue';
2224
import { BoundedEventQueue } from '../events/bounded-event-queue';
23-
import NamedEventQueue from '../events/named-event-queue';
25+
import EventDispatcher from '../events/event-dispatcher';
26+
import NoOpEventDispatcher from '../events/no-op-event-dispatcher';
2427
import {
2528
FlagEvaluationDetailsBuilder,
2629
IFlagEvaluationDetails,
@@ -76,8 +79,17 @@ export interface IContainerExperiment<T> {
7679
treatmentVariationEntries: Array<T>;
7780
}
7881

82+
const DEFAULT_EVENT_DISPATCHER_CONFIG = {
83+
// TODO: Replace with actual ingestion URL
84+
ingestionUrl: 'https://example.com/events',
85+
batchSize: 10,
86+
flushIntervalMs: 10_000,
87+
retryIntervalMs: 5_000,
88+
maxRetries: 3,
89+
};
90+
7991
export default class EppoClient {
80-
private readonly eventQueue: NamedEventQueue<unknown>;
92+
private readonly eventDispatcher: EventDispatcher;
8193
private readonly assignmentEventsQueue: BoundedEventQueue<IAssignmentEvent> =
8294
newBoundedArrayEventQueue<IAssignmentEvent>('assignments');
8395
private readonly banditEventsQueue: BoundedEventQueue<IBanditEvent> =
@@ -98,23 +110,23 @@ export default class EppoClient {
98110
private readonly evaluator = new Evaluator();
99111

100112
constructor({
101-
eventQueue = new ArrayBackedNamedEventQueue('events'),
113+
eventDispatcher = new NoOpEventDispatcher(),
102114
isObfuscated = false,
103115
flagConfigurationStore,
104116
banditVariationConfigurationStore,
105117
banditModelConfigurationStore,
106118
configurationRequestParameters,
107119
}: {
108-
// Queue for arbitrary, application-level events (not to be confused with Eppo specific assignment
120+
// Dispatcher for arbitrary, application-level events (not to be confused with Eppo specific assignment
109121
// or bandit events). These events are application-specific and captures by EppoClient#track API.
110-
eventQueue?: NamedEventQueue<unknown>;
122+
eventDispatcher?: EventDispatcher;
111123
flagConfigurationStore: IConfigurationStore<Flag | ObfuscatedFlag>;
112124
banditVariationConfigurationStore?: IConfigurationStore<BanditVariation[]>;
113125
banditModelConfigurationStore?: IConfigurationStore<BanditParameters>;
114126
configurationRequestParameters?: FlagConfigurationRequestParameters;
115127
isObfuscated?: boolean;
116128
}) {
117-
this.eventQueue = eventQueue;
129+
this.eventDispatcher = eventDispatcher;
118130
this.flagConfigurationStore = flagConfigurationStore;
119131
this.banditVariationConfigurationStore = banditVariationConfigurationStore;
120132
this.banditModelConfigurationStore = banditModelConfigurationStore;
@@ -908,9 +920,10 @@ export default class EppoClient {
908920
return result;
909921
}
910922

923+
/** TODO */
911924
// noinspection JSUnusedGlobalSymbols
912925
track(event: unknown, params: Record<string, unknown>) {
913-
this.eventQueue.push({ event, params });
926+
this.eventDispatcher.dispatch({ id: randomUUID(), data: event, params });
914927
}
915928

916929
private newFlagEvaluationDetailsBuilder(flagKey: string): FlagEvaluationDetailsBuilder {
@@ -928,7 +941,9 @@ export default class EppoClient {
928941
return {
929942
configFetchedAt: this.flagConfigurationStore.getConfigFetchedAt() ?? '',
930943
configPublishedAt: this.flagConfigurationStore.getConfigPublishedAt() ?? '',
931-
configEnvironment: this.flagConfigurationStore.getEnvironment() ?? { name: '' },
944+
configEnvironment: this.flagConfigurationStore.getEnvironment() ?? {
945+
name: '',
946+
},
932947
};
933948
}
934949

@@ -1131,6 +1146,6 @@ export function checkValueTypeMatch(
11311146
}
11321147
}
11331148

1134-
export function newBoundedArrayEventQueue<T>(name: string): BoundedEventQueue<T> {
1149+
function newBoundedArrayEventQueue<T>(name: string): BoundedEventQueue<T> {
11351150
return new BoundedEventQueue<T>(new ArrayBackedNamedEventQueue<T>(name));
11361151
}

src/evaluator.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ export class Evaluator {
159159
split: Split,
160160
subjectKey: string,
161161
expectedVariationType: VariationType | undefined,
162-
): { flagEvaluationCode: FlagEvaluationCode; flagEvaluationDescription: string } => {
162+
): {
163+
flagEvaluationCode: FlagEvaluationCode;
164+
flagEvaluationDescription: string;
165+
} => {
163166
if (!checkValueTypeMatch(expectedVariationType, variation.value)) {
164167
const { key: vKey, value: vValue } = variation;
165168
return {

src/events/array-backed-named-event-queue.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import NamedEventQueue from './named-event-queue';
22

3-
/** A named event queue backed by an array. */
3+
/**
4+
* @internal
5+
* A named event queue backed by an **unbounded** array.
6+
* This class probably should NOT be used directly, but only as a backing store for
7+
* {@link BoundedEventQueue}.
8+
*/
49
export default class ArrayBackedNamedEventQueue<T> implements NamedEventQueue<T> {
510
private readonly events: T[] = [];
611

@@ -22,7 +27,11 @@ export default class ArrayBackedNamedEventQueue<T> implements NamedEventQueue<T>
2227
return this.events[Symbol.iterator]();
2328
}
2429

25-
shift(): T | undefined {
26-
return this.events.shift();
30+
splice(count: number): T[] {
31+
return this.events.splice(0, count);
32+
}
33+
34+
isEmpty(): boolean {
35+
return this.events.length === 0;
2736
}
2837
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import ArrayBackedNamedEventQueue from './array-backed-named-event-queue';
2+
import BatchEventProcessor from './batch-event-processor';
3+
4+
describe('BatchEventProcessor', () => {
5+
describe('nextBatch', () => {
6+
it('should return a batch and remove items from the queue', () => {
7+
const eventQueue = new ArrayBackedNamedEventQueue('test-queue');
8+
const processor = new BatchEventProcessor(eventQueue, 2);
9+
expect(processor.isEmpty()).toBeTruthy();
10+
expect(processor.nextBatch()).toHaveLength(0);
11+
processor.push({ id: 'foo-1', data: 'event1', params: {} });
12+
processor.push({ id: 'foo-2', data: 'event2', params: {} });
13+
processor.push({ id: 'foo-3', data: 'event3', params: {} });
14+
expect(processor.isEmpty()).toBeFalsy();
15+
const batch = processor.nextBatch();
16+
expect(batch).toEqual([
17+
{ id: 'foo-1', data: 'event1', params: {} },
18+
{ id: 'foo-2', data: 'event2', params: {} },
19+
]);
20+
expect(processor.nextBatch()).toEqual([{ id: 'foo-3', data: 'event3', params: {} }]);
21+
expect(processor.isEmpty()).toBeTruthy();
22+
});
23+
});
24+
});
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import NamedEventQueue from './named-event-queue';
2+
3+
export default class BatchEventProcessor {
4+
constructor(
5+
private readonly eventQueue: NamedEventQueue<unknown>,
6+
private readonly batchSize: number,
7+
) {}
8+
9+
nextBatch(): unknown[] {
10+
return this.eventQueue.splice(this.batchSize);
11+
}
12+
13+
push(event: unknown): void {
14+
this.eventQueue.push(event);
15+
}
16+
17+
isEmpty(): boolean {
18+
return this.eventQueue.isEmpty();
19+
}
20+
}

src/events/batch-retry-manager.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { logger } from '../application-logger';
2+
3+
import EventDelivery from './event-delivery';
4+
5+
/**
6+
* Attempts to retry delivering a batch of events to the ingestionUrl up to `maxRetries` times
7+
* using exponential backoff.
8+
*/
9+
export default class BatchRetryManager {
10+
/**
11+
* @param config.retryInterval - The minimum retry interval in milliseconds
12+
* @param config.maxRetryDelayMs - The maximum retry delay in milliseconds
13+
* @param config.maxRetries - The maximum number of retries
14+
*/
15+
constructor(
16+
private readonly delivery: EventDelivery,
17+
private readonly config: {
18+
retryIntervalMs: number;
19+
maxRetryDelayMs: number;
20+
maxRetries: number;
21+
},
22+
) {}
23+
24+
async retry(batch: unknown[], attempt = 0): Promise<void> {
25+
const { retryIntervalMs, maxRetryDelayMs, maxRetries } = this.config;
26+
const delay = Math.min(retryIntervalMs * Math.pow(2, attempt), maxRetryDelayMs);
27+
logger.info(`[BatchRetryManager] Retrying batch delivery in ${delay}ms...`);
28+
await new Promise((resolve) => setTimeout(resolve, delay));
29+
30+
const success = await this.delivery.deliver(batch);
31+
if (success) {
32+
logger.info(`[BatchRetryManager] Batch delivery successfully after ${attempt} retries.`);
33+
return;
34+
}
35+
if (attempt < maxRetries) {
36+
return this.retry(batch, attempt + 1);
37+
} else {
38+
// TODO: Persist batch to avoid data loss
39+
logger.warn(
40+
`[BatchRetryManager] Failed to deliver batch after ${maxRetries} retries, bailing`,
41+
);
42+
}
43+
}
44+
}

src/events/bounded-event-queue.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,4 @@ export class BoundedEventQueue<T> {
2424
this.queue.length = 0;
2525
return events;
2626
}
27-
28-
/** Returns the first event in the queue and removes it. */
29-
shift(): T | undefined {
30-
return this.queue.shift();
31-
}
3227
}

0 commit comments

Comments
 (0)