Skip to content

Commit f6c3d4f

Browse files
felipecslpetzel
andauthored
feat(events): Add event context (#213)
* feat(events): Add event context * valida context value type * ensure context limits * fix lint * fix lint v2 * Update src/events/default-event-dispatcher.ts Co-authored-by: Eric Petzel <[email protected]> * Update src/events/default-event-dispatcher.ts Co-authored-by: Eric Petzel <[email protected]> --------- Co-authored-by: Eric Petzel <[email protected]>
1 parent 2b6753e commit f6c3d4f

9 files changed

+141
-24
lines changed

src/client/eppo-client.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,21 @@ export default class EppoClient {
165165
this.eventDispatcher = eventDispatcher;
166166
}
167167

168+
/**
169+
* Attaches a context to be included with all events dispatched by the EventDispatcher.
170+
* The context is delivered as a top-level object in the ingestion request payload.
171+
* An existing key can be removed by providing a `null` value.
172+
* Calling this method with same key multiple times causes only the last value to be used for the
173+
* given key.
174+
*
175+
* @param key - The context entry key.
176+
* @param value - The context entry value, must be a string, number, boolean, or null. If value is
177+
* an object or an array, will throw an ArgumentError.
178+
*/
179+
setContext(key: string, value: string | number | boolean | null) {
180+
this.eventDispatcher?.attachContext(key, value);
181+
}
182+
168183
// noinspection JSUnusedGlobalSymbols
169184
setBanditModelConfigurationStore(
170185
banditModelConfigurationStore: IConfigurationStore<BanditParameters>,

src/events/batch-retry-manager.spec.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ describe('BatchRetryManager', () => {
1919

2020
it('should successfully retry and deliver a batch with no failures', async () => {
2121
mockDelivery.deliver.mockResolvedValueOnce({ failedEvents: [] });
22-
const result = await batchRetryManager.retry(mockBatch);
22+
const result = await batchRetryManager.retry(mockBatch, {});
2323
expect(result).toEqual([]);
2424
expect(mockDelivery.deliver).toHaveBeenCalledTimes(1);
25-
expect(mockDelivery.deliver).toHaveBeenCalledWith(mockBatch);
25+
expect(mockDelivery.deliver).toHaveBeenCalledWith(mockBatch, {});
2626
});
2727

2828
it('should retry failed deliveries up to maxRetries times and return last failed batch', async () => {
2929
mockDelivery.deliver.mockResolvedValue({ failedEvents: [{ id: 'event1' }] });
30-
const result = await batchRetryManager.retry(mockBatch);
30+
const result = await batchRetryManager.retry(mockBatch, {});
3131
expect(result).toEqual([{ id: 'event1' }]);
3232
expect(mockDelivery.deliver).toHaveBeenCalledTimes(maxRetries);
3333
});
@@ -40,7 +40,7 @@ describe('BatchRetryManager', () => {
4040

4141
jest.useFakeTimers();
4242

43-
const retryPromise = batchRetryManager.retry(mockBatch);
43+
const retryPromise = batchRetryManager.retry(mockBatch, {});
4444

4545
// 1st retry: 100ms
4646
// 2nd retry: 200ms
@@ -67,7 +67,7 @@ describe('BatchRetryManager', () => {
6767

6868
jest.useFakeTimers();
6969

70-
const retryPromise = batchRetryManager.retry(mockBatch);
70+
const retryPromise = batchRetryManager.retry(mockBatch, {});
7171
// 100ms + 200ms + 300ms (maxRetryDelayMs) = 600ms
7272
await jest.advanceTimersByTimeAsync(600);
7373
const result = await retryPromise;

src/events/batch-retry-manager.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import { logger } from '../application-logger';
22

3+
import { EventContext } from './default-event-dispatcher';
34
import Event from './event';
45
import { EventDeliveryResult } from './event-delivery';
56

6-
export type IEventDelivery = {
7-
deliver(batch: Event[]): Promise<EventDeliveryResult>;
8-
};
7+
export interface IEventDelivery {
8+
deliver(batch: Event[], context: EventContext): Promise<EventDeliveryResult>;
9+
}
910

1011
/**
1112
* Attempts to retry delivering a batch of events to the ingestionUrl up to `maxRetries` times
@@ -27,22 +28,22 @@ export default class BatchRetryManager {
2728
) {}
2829

2930
/** Re-attempts delivery of the provided batch, returns the UUIDs of events that failed retry. */
30-
async retry(batch: Event[], attempt = 0): Promise<Event[]> {
31+
async retry(batch: Event[], context: EventContext, attempt = 0): Promise<Event[]> {
3132
const { retryIntervalMs, maxRetryDelayMs, maxRetries } = this.config;
3233
const delay = Math.min(retryIntervalMs * Math.pow(2, attempt), maxRetryDelayMs);
3334
logger.info(
3435
`[BatchRetryManager] Retrying batch delivery of ${batch.length} events in ${delay}ms...`,
3536
);
3637
await new Promise((resolve) => setTimeout(resolve, delay));
3738

38-
const { failedEvents } = await this.delivery.deliver(batch);
39+
const { failedEvents } = await this.delivery.deliver(batch, context);
3940
if (failedEvents.length === 0) {
4041
logger.info(`[BatchRetryManager] Batch delivery successfully after ${attempt + 1} tries.`);
4142
return [];
4243
}
4344
// attempts are zero-indexed while maxRetries is not
4445
if (attempt < maxRetries - 1) {
45-
return this.retry(failedEvents, attempt + 1);
46+
return this.retry(failedEvents, context, attempt + 1);
4647
} else {
4748
logger.warn(`[BatchRetryManager] Failed to deliver batch after ${maxRetries} tries, bailing`);
4849
return batch;

src/events/default-event-dispatcher.spec.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { v4 as randomUUID } from 'uuid';
2+
13
import ArrayBackedNamedEventQueue from './array-backed-named-event-queue';
24
import BatchEventProcessor from './batch-event-processor';
35
import DefaultEventDispatcher, {
@@ -120,6 +122,7 @@ describe('DefaultEventDispatcher', () => {
120122
let fetchOptions = fetch.mock.calls[0][1];
121123
let payload = JSON.parse(fetchOptions.body);
122124
expect(payload).toEqual({
125+
context: {},
123126
eppo_events: [
124127
expect.objectContaining({ payload: { foo: 'event1' } }),
125128
expect.objectContaining({ payload: { foo: 'event2' } }),
@@ -139,6 +142,7 @@ describe('DefaultEventDispatcher', () => {
139142
fetchOptions = fetch.mock.calls[1][1];
140143
payload = JSON.parse(fetchOptions.body);
141144
expect(payload).toEqual({
145+
context: {},
142146
eppo_events: [expect.objectContaining({ payload: { foo: 'event3' } })],
143147
});
144148
});
@@ -319,6 +323,57 @@ describe('DefaultEventDispatcher', () => {
319323
});
320324
});
321325

326+
describe('attachContext', () => {
327+
it('should throw an error if the value is an object', () => {
328+
const eventQueue = new ArrayBackedNamedEventQueue<Event>('test-queue');
329+
const { dispatcher } = createDispatcher({ maxRetries: 1 }, eventQueue);
330+
expect(() => dispatcher.attachContext('foo', {} as any)).toThrow();
331+
expect(() => dispatcher.attachContext('foo', [] as any)).toThrow();
332+
});
333+
334+
it('should not throw an error if the value is a string, number, boolean, or null', () => {
335+
const eventQueue = new ArrayBackedNamedEventQueue<Event>('test-queue');
336+
const { dispatcher } = createDispatcher({ maxRetries: 1 }, eventQueue);
337+
expect(() => dispatcher.attachContext('foo', 'bar')).not.toThrow();
338+
expect(() => dispatcher.attachContext('foo', 1)).not.toThrow();
339+
expect(() => dispatcher.attachContext('foo', true)).not.toThrow();
340+
expect(() => dispatcher.attachContext('foo', null)).not.toThrow();
341+
});
342+
343+
it('should throw an error if the context value is too long', () => {
344+
const eventQueue = new ArrayBackedNamedEventQueue<Event>('test-queue');
345+
const { dispatcher } = createDispatcher({ maxRetries: 1 }, eventQueue);
346+
expect(() => dispatcher.attachContext('foo', 'a'.repeat(2049))).toThrow();
347+
});
348+
349+
it('attaches a context to be included with all events dispatched by this dispatcher', async () => {
350+
const eventQueue = new ArrayBackedNamedEventQueue<Event>('test-queue');
351+
const { dispatcher } = createDispatcher({ maxRetries: 1 }, eventQueue);
352+
dispatcher.attachContext('foo', 'bar');
353+
dispatcher.attachContext('baz', 'qux');
354+
const event = {
355+
uuid: randomUUID(),
356+
payload: { foo: 'event1' },
357+
timestamp: new Date().getTime(),
358+
type: 'foo',
359+
};
360+
dispatcher.dispatch(event);
361+
const fetch = global.fetch as jest.Mock;
362+
fetch.mockResolvedValue({ ok: true, json: () => Promise.resolve([]) });
363+
364+
await new Promise((resolve) => setTimeout(resolve, 100));
365+
366+
expect(global.fetch).toHaveBeenCalledWith('http://example.com', {
367+
method: 'POST',
368+
headers: { 'Content-Type': 'application/json', 'x-eppo-token': 'test-sdk-key' },
369+
body: JSON.stringify({
370+
eppo_events: [event],
371+
context: { foo: 'bar', baz: 'qux' },
372+
}),
373+
});
374+
});
375+
});
376+
322377
describe('validation', () => {
323378
it('should throw an error if the serialized event is too long', () => {
324379
const { dispatcher } = createDispatcher();

src/events/default-event-dispatcher.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ export type EventDispatcherConfig = {
2525
maxRetries?: number;
2626
};
2727

28-
export const MAX_EVENT_SERIALIZED_LENGTH = 4096;
28+
export type EventContext = Record<string, string | number | boolean | null>;
29+
30+
const MAX_CONTEXT_SERIALIZED_LENGTH = 2048;
31+
const MAX_EVENT_SERIALIZED_LENGTH = 4096;
32+
2933
export const DEFAULT_EVENT_DISPATCHER_BATCH_SIZE = 1_000;
3034
export const DEFAULT_EVENT_DISPATCHER_CONFIG: Omit<
3135
EventDispatcherConfig,
@@ -47,6 +51,7 @@ export default class DefaultEventDispatcher implements EventDispatcher {
4751
private readonly eventDelivery: EventDelivery;
4852
private readonly retryManager: BatchRetryManager;
4953
private readonly deliveryIntervalMs: number;
54+
private readonly context: EventContext = {};
5055
private dispatchTimer: NodeJS.Timeout | null = null;
5156
private isOffline = false;
5257

@@ -75,15 +80,36 @@ export default class DefaultEventDispatcher implements EventDispatcher {
7580
});
7681
}
7782

83+
attachContext(key: string, value: string | number | boolean | null): void {
84+
this.ensureValidContext(key, value);
85+
this.context[key] = value;
86+
}
87+
7888
dispatch(event: Event) {
7989
this.ensureValidEvent(event);
8090
this.batchProcessor.push(event);
8191
this.maybeScheduleNextDelivery();
8292
}
8393

94+
private ensureValidContext(key: string, value: string | number | boolean | null) {
95+
if (value && (typeof value === 'object' || Array.isArray(value))) {
96+
throw new Error('Context value must be a string, number, boolean, or null');
97+
}
98+
if (
99+
value &&
100+
JSON.stringify({ ...this.context, [key]: value }).length > MAX_CONTEXT_SERIALIZED_LENGTH
101+
) {
102+
throw new Error(
103+
`The total context size must be less than ${MAX_CONTEXT_SERIALIZED_LENGTH} characters`,
104+
);
105+
}
106+
}
107+
84108
private ensureValidEvent(event: Event) {
85109
if (JSON.stringify(event).length > MAX_EVENT_SERIALIZED_LENGTH) {
86-
throw new Error('Event serialized length exceeds maximum allowed length of 4096');
110+
throw new Error(
111+
`Event serialized length exceeds maximum allowed length of ${MAX_EVENT_SERIALIZED_LENGTH}`,
112+
);
87113
}
88114
}
89115

@@ -100,10 +126,12 @@ export default class DefaultEventDispatcher implements EventDispatcher {
100126
return;
101127
}
102128

103-
const { failedEvents } = await this.eventDelivery.deliver(batch);
129+
// make a defensive copy of the context to avoid mutating the original
130+
const context = { ...this.context };
131+
const { failedEvents } = await this.eventDelivery.deliver(batch, context);
104132
if (failedEvents.length > 0) {
105133
logger.warn('[EventDispatcher] Failed to deliver some events from batch, retrying...');
106-
const failedRetry = await this.retryManager.retry(failedEvents);
134+
const failedRetry = await this.retryManager.retry(failedEvents, context);
107135
if (failedRetry.length > 0) {
108136
// re-enqueue events that failed to retry
109137
this.batchProcessor.push(...failedRetry);

src/events/event-delivery.spec.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,40 @@ describe('EventDelivery', () => {
2020
it('should deliver events successfully when response is OK', async () => {
2121
const mockResponse = { ok: true, json: async () => ({}) };
2222
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
23-
const result = await eventDelivery.deliver(testBatch);
23+
const result = await eventDelivery.deliver(testBatch, {});
2424
expect(result).toEqual({ failedEvents: [] });
2525
expect(global.fetch).toHaveBeenCalledWith(ingestionUrl, {
2626
method: 'POST',
2727
headers: { 'Content-Type': 'application/json', 'x-eppo-token': sdkKey },
28-
body: JSON.stringify({ eppo_events: testBatch }),
28+
body: JSON.stringify({ eppo_events: testBatch, context: {} }),
2929
});
3030
});
3131

3232
it('should return failed result if response is not OK', async () => {
3333
const mockResponse = { ok: false };
3434
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
35-
const result = await eventDelivery.deliver(testBatch);
35+
const result = await eventDelivery.deliver(testBatch, {});
3636
expect(result).toEqual({ failedEvents: testBatch });
3737
});
3838

3939
it('should return failed events when response includes failed events', async () => {
4040
const failedEvents = ['1', '2'];
4141
const mockResponse = { ok: true, json: async () => ({ failed_events: failedEvents }) };
4242
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
43-
const result = await eventDelivery.deliver(testBatch);
43+
const result = await eventDelivery.deliver(testBatch, {});
4444
expect(result).toEqual({ failedEvents: [testBatch[0], testBatch[1]] });
4545
});
4646

4747
it('should return success=true if no failed events in the response', async () => {
4848
const mockResponse = { ok: true, json: async () => ({}) };
4949
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
50-
const result = await eventDelivery.deliver(testBatch);
50+
const result = await eventDelivery.deliver(testBatch, {});
5151
expect(result).toEqual({ failedEvents: [] });
5252
});
5353

5454
it('should handle fetch errors gracefully', async () => {
5555
(global.fetch as jest.Mock).mockRejectedValue(new Error('Network error'));
56-
const result = await eventDelivery.deliver(testBatch);
56+
const result = await eventDelivery.deliver(testBatch, {});
5757
expect(result).toEqual({ failedEvents: testBatch });
5858
});
5959
});

src/events/event-delivery.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { logger } from '../application-logger';
22

3+
import { IEventDelivery } from './batch-retry-manager';
4+
import { EventContext } from './default-event-dispatcher';
35
import Event from './event';
46

57
export type EventDeliveryResult = {
68
failedEvents: Event[];
79
};
810

9-
export default class EventDelivery {
11+
export default class EventDelivery implements IEventDelivery {
1012
constructor(
1113
private readonly sdkKey: string,
1214
private readonly ingestionUrl: string,
@@ -16,15 +18,15 @@ export default class EventDelivery {
1618
* Delivers a batch of events to the ingestion URL endpoint. Returns the UUIDs of any events from
1719
* the batch that failed ingestion.
1820
*/
19-
async deliver(batch: Event[]): Promise<EventDeliveryResult> {
21+
async deliver(batch: Event[], context: EventContext): Promise<EventDeliveryResult> {
2022
try {
2123
logger.info(
2224
`[EventDispatcher] Delivering batch of ${batch.length} events to ${this.ingestionUrl}...`,
2325
);
2426
const response = await fetch(this.ingestionUrl, {
2527
method: 'POST',
2628
headers: { 'Content-Type': 'application/json', 'x-eppo-token': this.sdkKey },
27-
body: JSON.stringify({ eppo_events: batch }),
29+
body: JSON.stringify({ eppo_events: batch, context }),
2830
});
2931
if (response.ok) {
3032
return await this.parseFailedEvents(response, batch);

src/events/event-dispatcher.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,15 @@ import Event from './event';
33
export default interface EventDispatcher {
44
/** Dispatches (enqueues) an event for eventual delivery. */
55
dispatch(event: Event): void;
6+
/**
7+
* Attaches a context to be included with all events dispatched by this dispatcher.
8+
* The context is delivered as a top-level object in the ingestion request payload.
9+
* An existing key can be removed by providing a `null` value.
10+
* Calling this method with same key multiple times causes only the last value to be used for the
11+
* given key.
12+
*
13+
* @param key - The context entry key.
14+
* @param value - The context entry value.
15+
*/
16+
attachContext(key: string, value: string | number | boolean | null): void;
617
}

src/events/no-op-event-dispatcher.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ import Event from './event';
22
import EventDispatcher from './event-dispatcher';
33

44
export default class NoOpEventDispatcher implements EventDispatcher {
5+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
6+
attachContext(key: string, value: string | number | boolean | null): void {
7+
// Do nothing
8+
}
9+
510
// eslint-disable-next-line @typescript-eslint/no-unused-vars
611
dispatch(_: Event): void {
712
// Do nothing

0 commit comments

Comments
 (0)