Skip to content

Commit ccffe56

Browse files
authored
feat(events): Redeliver subset of failed events from a batch (#176)
* wip * feat(events): Redeliver subset of failed events from a batch * improve retries
1 parent 07c19c8 commit ccffe56

12 files changed

+220
-43
lines changed

src/events/batch-event-processor.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import ArrayBackedNamedEventQueue from './array-backed-named-event-queue';
22
import BatchEventProcessor from './batch-event-processor';
3-
import { Event } from './event-dispatcher';
3+
import Event from './event';
44

55
describe('BatchEventProcessor', () => {
66
describe('nextBatch', () => {

src/events/batch-event-processor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Event } from './event-dispatcher';
1+
import Event from './event';
22
import NamedEventQueue from './named-event-queue';
33

44
export default class BatchEventProcessor {
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import BatchRetryManager from './batch-retry-manager';
2+
import Event from './event';
3+
4+
describe('BatchRetryManager', () => {
5+
const mockDelivery = { deliver: jest.fn() };
6+
const retryIntervalMs = 100;
7+
const maxRetryDelayMs = 1000;
8+
const maxRetries = 3;
9+
const mockConfig = { retryIntervalMs, maxRetryDelayMs, maxRetries };
10+
11+
let batchRetryManager: BatchRetryManager;
12+
let mockBatch: Event[];
13+
14+
beforeEach(() => {
15+
jest.clearAllMocks();
16+
batchRetryManager = new BatchRetryManager(mockDelivery, mockConfig);
17+
mockBatch = [{ uuid: 'event1' }, { uuid: 'event2' }] as Event[];
18+
});
19+
20+
it('should successfully retry and deliver a batch with no failures', async () => {
21+
mockDelivery.deliver.mockResolvedValueOnce({ failedEvents: [] });
22+
const result = await batchRetryManager.retry(mockBatch);
23+
expect(result).toEqual([]);
24+
expect(mockDelivery.deliver).toHaveBeenCalledTimes(1);
25+
expect(mockDelivery.deliver).toHaveBeenCalledWith(mockBatch);
26+
});
27+
28+
it('should retry failed deliveries up to maxRetries times and return last failed batch', async () => {
29+
mockDelivery.deliver.mockResolvedValue({ failedEvents: [{ id: 'event1' }] });
30+
const result = await batchRetryManager.retry(mockBatch);
31+
expect(result).toEqual([{ id: 'event1' }]);
32+
expect(mockDelivery.deliver).toHaveBeenCalledTimes(maxRetries);
33+
});
34+
35+
it('should exponentially delay retries up to maxRetryDelayMs', async () => {
36+
mockDelivery.deliver
37+
.mockResolvedValueOnce({ failedEvents: [{ id: 'event1' }] })
38+
.mockResolvedValueOnce({ failedEvents: [{ id: 'event1' }] })
39+
.mockResolvedValueOnce({ failedEvents: [] });
40+
41+
jest.useFakeTimers();
42+
43+
const retryPromise = batchRetryManager.retry(mockBatch);
44+
45+
// 1st retry: 100ms
46+
// 2nd retry: 200ms
47+
// 3rd retry: 400ms
48+
await jest.advanceTimersByTimeAsync(100 + 200 + 400);
49+
50+
const result = await retryPromise;
51+
expect(result).toEqual([]);
52+
expect(mockDelivery.deliver).toHaveBeenCalledTimes(3);
53+
54+
jest.useRealTimers();
55+
});
56+
57+
it('should not exceed maxRetryDelayMs for delays', async () => {
58+
batchRetryManager = new BatchRetryManager(mockDelivery, {
59+
...mockConfig,
60+
maxRetryDelayMs: 300,
61+
});
62+
mockDelivery.deliver
63+
.mockResolvedValueOnce({ failedEvents: mockBatch })
64+
.mockResolvedValueOnce({ failedEvents: mockBatch })
65+
.mockResolvedValueOnce({ failedEvents: mockBatch })
66+
.mockResolvedValueOnce({ failedEvents: mockBatch });
67+
68+
jest.useFakeTimers();
69+
70+
const retryPromise = batchRetryManager.retry(mockBatch);
71+
// 100ms + 200ms + 300ms (maxRetryDelayMs) = 600ms
72+
await jest.advanceTimersByTimeAsync(600);
73+
const result = await retryPromise;
74+
expect(result).toEqual(mockBatch);
75+
jest.useRealTimers();
76+
});
77+
});

src/events/batch-retry-manager.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { logger } from '../application-logger';
22

3-
import EventDelivery from './event-delivery';
4-
import { Event } from './event-dispatcher';
3+
import Event from './event';
4+
import { EventDeliveryResult } from './event-delivery';
5+
6+
export type IEventDelivery = {
7+
deliver(batch: Event[]): Promise<EventDeliveryResult>;
8+
};
59

610
/**
711
* Attempts to retry delivering a batch of events to the ingestionUrl up to `maxRetries` times
@@ -14,34 +18,34 @@ export default class BatchRetryManager {
1418
* @param config.maxRetries - The maximum number of retries
1519
*/
1620
constructor(
17-
private readonly delivery: EventDelivery,
21+
private readonly delivery: IEventDelivery,
1822
private readonly config: {
1923
retryIntervalMs: number;
2024
maxRetryDelayMs: number;
2125
maxRetries: number;
2226
},
2327
) {}
2428

25-
/** Re-attempts delivery of the provided batch, returns whether the retry succeeded. */
26-
async retry(batch: Event[], attempt = 0): Promise<boolean> {
29+
/** Re-attempts delivery of the provided batch, returns the UUIDs of events that failed retry. */
30+
async retry(batch: Event[], attempt = 0): Promise<Event[]> {
2731
const { retryIntervalMs, maxRetryDelayMs, maxRetries } = this.config;
2832
const delay = Math.min(retryIntervalMs * Math.pow(2, attempt), maxRetryDelayMs);
29-
logger.info(`[BatchRetryManager] Retrying batch delivery in ${delay}ms...`);
33+
logger.info(
34+
`[BatchRetryManager] Retrying batch delivery of ${batch.length} events in ${delay}ms...`,
35+
);
3036
await new Promise((resolve) => setTimeout(resolve, delay));
3137

32-
const success = await this.delivery.deliver(batch);
33-
if (success) {
34-
logger.info(`[BatchRetryManager] Batch delivery successfully after ${attempt} retries.`);
35-
return true;
38+
const { failedEvents } = await this.delivery.deliver(batch);
39+
if (failedEvents.length === 0) {
40+
logger.info(`[BatchRetryManager] Batch delivery successfully after ${attempt + 1} tries.`);
41+
return [];
3642
}
3743
// attempts are zero-indexed while maxRetries is not
3844
if (attempt < maxRetries - 1) {
39-
return this.retry(batch, attempt + 1);
45+
return this.retry(failedEvents, attempt + 1);
4046
} else {
41-
logger.warn(
42-
`[BatchRetryManager] Failed to deliver batch after ${maxRetries} retries, bailing`,
43-
);
44-
return false;
47+
logger.warn(`[BatchRetryManager] Failed to deliver batch after ${maxRetries} tries, bailing`);
48+
return batch;
4549
}
4650
}
4751
}

src/events/default-event-dispatcher.spec.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import DefaultEventDispatcher, {
44
EventDispatcherConfig,
55
newDefaultEventDispatcher,
66
} from './default-event-dispatcher';
7-
import { Event } from './event-dispatcher';
7+
import Event from './event';
88
import NetworkStatusListener from './network-status-listener';
99
import NoOpEventDispatcher from './no-op-event-dispatcher';
1010

@@ -100,7 +100,7 @@ describe('DefaultEventDispatcher', () => {
100100
});
101101

102102
const fetch = global.fetch as jest.Mock;
103-
fetch.mockResolvedValue({ ok: true });
103+
fetch.mockResolvedValue({ ok: true, json: () => Promise.resolve([]) });
104104

105105
await new Promise((resolve) => setTimeout(resolve, 100));
106106

@@ -207,7 +207,7 @@ describe('DefaultEventDispatcher', () => {
207207
});
208208

209209
const fetch = global.fetch as jest.Mock;
210-
fetch.mockResolvedValue({ ok: true });
210+
fetch.mockResolvedValue({ ok: true, json: () => Promise.resolve([]) });
211211

212212
isOffline = true;
213213
// simulate the network going offline
@@ -242,7 +242,7 @@ describe('DefaultEventDispatcher', () => {
242242
// Simulate fetch failure on the first attempt
243243
(global.fetch as jest.Mock)
244244
.mockResolvedValueOnce({ ok: false }) // First attempt fails
245-
.mockResolvedValueOnce({ ok: true }); // Second attempt succeeds
245+
.mockResolvedValueOnce({ ok: true, json: () => Promise.resolve([]) }); // Second attempt succeeds
246246

247247
// Fast-forward to trigger the first attempt
248248
await new Promise((resolve) => setTimeout(resolve, 100));

src/events/default-event-dispatcher.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import { logger } from '../application-logger';
22

33
import BatchEventProcessor from './batch-event-processor';
44
import BatchRetryManager from './batch-retry-manager';
5+
import Event from './event';
56
import EventDelivery from './event-delivery';
6-
import EventDispatcher, { Event } from './event-dispatcher';
7+
import EventDispatcher from './event-dispatcher';
78
import NamedEventQueue from './named-event-queue';
89
import NetworkStatusListener from './network-status-listener';
910
import NoOpEventDispatcher from './no-op-event-dispatcher';
@@ -93,13 +94,13 @@ export default class DefaultEventDispatcher implements EventDispatcher {
9394
return;
9495
}
9596

96-
const success = await this.eventDelivery.deliver(batch);
97-
if (!success) {
98-
logger.warn('[EventDispatcher] Failed to deliver batch, retrying...');
99-
const retrySucceeded = await this.retryManager.retry(batch);
100-
if (!retrySucceeded) {
97+
const { failedEvents } = await this.eventDelivery.deliver(batch);
98+
if (failedEvents.length > 0) {
99+
logger.warn('[EventDispatcher] Failed to deliver some events from batch, retrying...');
100+
const failedRetry = await this.retryManager.retry(failedEvents);
101+
if (failedRetry.length > 0) {
101102
// re-enqueue events that failed to retry
102-
this.batchProcessor.push(...batch);
103+
this.batchProcessor.push(...failedRetry);
103104
}
104105
}
105106
logger.debug(`[EventDispatcher] Delivered batch of ${batch.length} events.`);

src/events/event-delivery.spec.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import Event from './event';
2+
import EventDelivery from './event-delivery';
3+
4+
describe('EventDelivery', () => {
5+
global.fetch = jest.fn();
6+
const sdkKey = 'test-sdk-key';
7+
const ingestionUrl = 'https://test-ingestion.url';
8+
const testBatch: Event[] = [
9+
{ uuid: '1', timestamp: Date.now(), type: 'test_event', payload: { key: 'value1' } },
10+
{ uuid: '2', timestamp: Date.now(), type: 'test_event', payload: { key: 'value2' } },
11+
{ uuid: '3', timestamp: Date.now(), type: 'test_event', payload: { key: 'value3' } },
12+
];
13+
let eventDelivery: EventDelivery;
14+
15+
beforeEach(() => {
16+
jest.clearAllMocks();
17+
eventDelivery = new EventDelivery(sdkKey, ingestionUrl);
18+
});
19+
20+
it('should deliver events successfully when response is OK', async () => {
21+
const mockResponse = { ok: true, json: async () => ({}) };
22+
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
23+
const result = await eventDelivery.deliver(testBatch);
24+
expect(result).toEqual({ failedEvents: [] });
25+
expect(global.fetch).toHaveBeenCalledWith(ingestionUrl, {
26+
method: 'POST',
27+
headers: { 'Content-Type': 'application/json', 'x-eppo-token': sdkKey },
28+
body: JSON.stringify({ eppo_events: testBatch }),
29+
});
30+
});
31+
32+
it('should return failed result if response is not OK', async () => {
33+
const mockResponse = { ok: false };
34+
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
35+
const result = await eventDelivery.deliver(testBatch);
36+
expect(result).toEqual({ failedEvents: testBatch });
37+
});
38+
39+
it('should return failed events when response includes failed events', async () => {
40+
const failedEvents = ['1', '2'];
41+
const mockResponse = { ok: true, json: async () => ({ failed_events: failedEvents }) };
42+
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
43+
const result = await eventDelivery.deliver(testBatch);
44+
expect(result).toEqual({ failedEvents: [testBatch[0], testBatch[1]] });
45+
});
46+
47+
it('should return success=true if no failed events in the response', async () => {
48+
const mockResponse = { ok: true, json: async () => ({}) };
49+
(global.fetch as jest.Mock).mockResolvedValue(mockResponse);
50+
const result = await eventDelivery.deliver(testBatch);
51+
expect(result).toEqual({ failedEvents: [] });
52+
});
53+
54+
it('should handle fetch errors gracefully', async () => {
55+
(global.fetch as jest.Mock).mockRejectedValue(new Error('Network error'));
56+
const result = await eventDelivery.deliver(testBatch);
57+
expect(result).toEqual({ failedEvents: testBatch });
58+
});
59+
});

src/events/event-delivery.ts

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
import { logger } from '../application-logger';
22

3-
import { Event } from './event-dispatcher';
3+
import Event from './event';
4+
5+
export type EventDeliveryResult = {
6+
failedEvents: Event[];
7+
};
48

59
export default class EventDelivery {
610
constructor(private readonly sdkKey: string, private readonly ingestionUrl: string) {}
711

8-
async deliver(batch: Event[]): Promise<boolean> {
12+
/**
13+
* Delivers a batch of events to the ingestion URL endpoint. Returns the UUIDs of any events from
14+
* the batch that failed ingestion.
15+
*/
16+
async deliver(batch: Event[]): Promise<EventDeliveryResult> {
917
try {
1018
logger.info(
1119
`[EventDispatcher] Delivering batch of ${batch.length} events to ${this.ingestionUrl}...`,
@@ -15,11 +23,34 @@ export default class EventDelivery {
1523
headers: { 'Content-Type': 'application/json', 'x-eppo-token': this.sdkKey },
1624
body: JSON.stringify({ eppo_events: batch }),
1725
});
18-
// TODO: Parse response to check `failed_event_uploads` for any failed event ingestions in the batch
19-
return response.ok;
20-
} catch {
21-
logger.warn('Failed to upload event batch');
22-
return false;
26+
if (response.ok) {
27+
return await this.parseFailedEvents(response, batch);
28+
} else {
29+
return { failedEvents: batch };
30+
}
31+
} catch (e: any) {
32+
logger.warn(`Failed to upload event batch`, e);
33+
return { failedEvents: batch };
34+
}
35+
}
36+
37+
private async parseFailedEvents(
38+
response: Response,
39+
batch: Event[],
40+
): Promise<EventDeliveryResult> {
41+
logger.info('[EventDispatcher] Batch delivered successfully.');
42+
const responseBody = (await response.json()) as { failed_events?: string[] };
43+
const failedEvents = new Set(responseBody?.failed_events || []);
44+
if (failedEvents.size > 0) {
45+
logger.warn(
46+
`[EventDispatcher] ${failedEvents.size}/${batch.length} events failed ingestion.`,
47+
);
48+
// even though some events may have failed to successfully deliver, we'll still consider
49+
// the batch as a whole to have been delivered successfully and just re-enqueue the failed
50+
// events for retry later
51+
return { failedEvents: batch.filter(({ uuid }) => failedEvents.has(uuid)) };
52+
} else {
53+
return { failedEvents: [] };
2354
}
2455
}
2556
}

src/events/event-dispatcher.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
export type Event = {
2-
uuid: string;
3-
timestamp: number;
4-
type: string;
5-
payload: Record<string, unknown>;
6-
};
1+
import Event from './event';
72

83
export default interface EventDispatcher {
94
/** Dispatches (enqueues) an event for eventual delivery. */

src/events/event.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
type Event = {
2+
uuid: string;
3+
timestamp: number;
4+
type: string;
5+
payload: Record<string, unknown>;
6+
};
7+
8+
export default Event;

0 commit comments

Comments
 (0)