Skip to content

Commit 17d27a3

Browse files
[FSSDK-11003] disposable batch event processor
1 parent 11c850d commit 17d27a3

File tree

6 files changed

+65
-24
lines changed

6 files changed

+65
-24
lines changed

lib/event_processor/batch_event_processor.spec.ts

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ describe('QueueingEventProcessor', async () => {
167167

168168
processor.start();
169169
await processor.onRunning();
170-
for(let i = 0; i < 100; i++) {
170+
for(let i = 0; i < 99; i++) {
171171
const event = createImpressionEvent(`id-${i}`);
172172
await processor.process(event);
173173
}
@@ -190,30 +190,33 @@ describe('QueueingEventProcessor', async () => {
190190
await processor.onRunning();
191191

192192
let events: ProcessableEvent[] = [];
193-
for(let i = 0; i < 100; i++) {
193+
for(let i = 0; i < 99; i++){
194194
const event = createImpressionEvent(`id-${i}`);
195195
events.push(event);
196196
await processor.process(event);
197197
}
198198

199199
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);
200200

201-
let event = createImpressionEvent('id-100');
201+
let event = createImpressionEvent('id-99');
202+
events.push(event);
202203
await processor.process(event);
203-
204+
204205
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
205206
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
206207

207-
events = [event];
208-
for(let i = 101; i < 200; i++) {
208+
events = [];
209+
210+
for(let i = 100; i < 199; i++) {
209211
const event = createImpressionEvent(`id-${i}`);
210212
events.push(event);
211213
await processor.process(event);
212214
}
213215

214216
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
215217

216-
event = createImpressionEvent('id-200');
218+
event = createImpressionEvent('id-199');
219+
events.push(event);
217220
await processor.process(event);
218221

219222
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2);
@@ -257,6 +260,33 @@ describe('QueueingEventProcessor', async () => {
257260
expect(eventDispatcher.dispatchEvent.mock.calls[1][0]).toEqual(buildLogEvent([newEvent]));
258261
});
259262

263+
it('should flush queue immediately regardless of batchSize, if event processor is disposable', async () => {
264+
const eventDispatcher = getMockDispatcher();
265+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
266+
mockDispatch.mockResolvedValue({});
267+
268+
const dispatchRepeater = getMockRepeater();
269+
270+
const processor = new BatchEventProcessor({
271+
eventDispatcher,
272+
dispatchRepeater,
273+
batchSize: 100,
274+
});
275+
276+
processor.makeDisposable();
277+
processor.start();
278+
await processor.onRunning();
279+
280+
const events: ProcessableEvent[] = [];
281+
const event = createImpressionEvent('id-1');
282+
events.push(event);
283+
await processor.process(event);
284+
285+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
286+
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
287+
expect(dispatchRepeater.reset).toHaveBeenCalledTimes(1);
288+
});
289+
260290
it('should store the event in the eventStore with increasing ids', async () => {
261291
const eventDispatcher = getMockDispatcher();
262292
const eventStore = getMockSyncCache<EventWithId>();

lib/event_processor/batch_event_processor.ts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import { areEventContextsEqual } from "./event_builder/user_event";
3030
import { EVENT_PROCESSOR_STOPPED, FAILED_TO_DISPATCH_EVENTS, FAILED_TO_DISPATCH_EVENTS_WITH_ARG } from "../exception_messages";
3131
import { sprintf } from "../utils/fns";
3232

33+
export const DEFAULT_MIN_BACKOFF = 1000;
34+
export const DEFAULT_MAX_BACKOFF = 32000;
35+
3336
export type EventWithId = {
3437
id: string;
3538
event: ProcessableEvent;
@@ -209,7 +212,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
209212
if (!batch) {
210213
return;
211214
}
212-
215+
216+
this.dispatchRepeater.reset();
213217
this.dispatchBatch(batch, closing);
214218
}
215219

@@ -218,10 +222,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
218222
return Promise.reject('Event processor is not running');
219223
}
220224

221-
if (this.eventQueue.length == this.batchSize) {
222-
this.flush();
223-
}
224-
225225
const eventWithId = {
226226
id: this.idGenerator.getId(),
227227
event: event,
@@ -232,13 +232,30 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
232232
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
233233
this.flush();
234234
}
235-
this.eventQueue.push(eventWithId);
235+
236+
this.eventQueue.push(eventWithId);
237+
238+
if (this.eventQueue.length == this.batchSize) {
239+
this.flush();
240+
} else if (!this.dispatchRepeater.isRunning()) {
241+
this.dispatchRepeater.start();
242+
}
243+
236244
}
237245

238246
start(): void {
239247
if (!this.isNew()) {
240248
return;
241249
}
250+
if(this.disposable) {
251+
this.batchSize = 1;
252+
this.retryConfig = {
253+
maxRetries: Math.min(this.retryConfig?.maxRetries ?? 5, 5),
254+
backoffProvider:
255+
this.retryConfig?.backoffProvider ||
256+
(() => new ExponentialBackoff(DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, 500)),
257+
};
258+
}
242259
super.start();
243260
this.state = ServiceState.Running;
244261
this.dispatchRepeater.start();
@@ -254,7 +271,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
254271
}
255272

256273
if (this.isNew()) {
257-
// TOOD: replace message with imported constants
258274
this.startPromise.reject(new Error(EVENT_PROCESSOR_STOPPED));
259275
}
260276

lib/event_processor/event_processor_factory.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
*/
1616

1717
import { describe, it, expect, beforeEach, vi, MockInstance } from 'vitest';
18-
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, getBatchEventProcessor } from './event_processor_factory';
19-
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId } from './batch_event_processor';
18+
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, getBatchEventProcessor } from './event_processor_factory';
19+
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId,DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF } from './batch_event_processor';
2020
import { ExponentialBackoff, IntervalRepeater } from '../utils/repeater/repeater';
2121
import { getMockSyncCache } from '../tests/mock/mock_cache';
2222
import { LogLevel } from '../modules/logging';

lib/event_processor/event_processor_factory.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ import { StartupLog } from "../service";
1919
import { ExponentialBackoff, IntervalRepeater } from "../utils/repeater/repeater";
2020
import { EventDispatcher } from "./event_dispatcher/event_dispatcher";
2121
import { EventProcessor } from "./event_processor";
22-
import { BatchEventProcessor, EventWithId, RetryConfig } from "./batch_event_processor";
22+
import { BatchEventProcessor, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, EventWithId, RetryConfig } from "./batch_event_processor";
2323
import { AsyncPrefixCache, Cache, SyncPrefixCache } from "../utils/cache/cache";
2424

2525
export const DEFAULT_EVENT_BATCH_SIZE = 10;
2626
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1000;
2727
export const DEFAULT_EVENT_MAX_QUEUE_SIZE = 10000;
28-
export const DEFAULT_MIN_BACKOFF = 1000;
29-
export const DEFAULT_MAX_BACKOFF = 32000;
3028
export const FAILED_EVENT_RETRY_INTERVAL = 20 * 1000;
3129
export const EVENT_STORE_PREFIX = 'optly_event:';
3230

lib/odp/event_manager/odp_event_manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ export class DefaultOdpEventManager extends BaseService implements OdpEventManag
107107
}
108108
// Override for disposable event manager
109109
if(this.disposable) {
110-
this.retryConfig.maxRetries = 5;
110+
this.retryConfig.maxRetries = Math.min(this.retryConfig.maxRetries, 5);
111111
this.batchSize = 1
112112
}
113113

lib/project_config/project_config_manager.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ import {
2929
} from '../exception_messages';
3030

3131
interface ProjectConfigManagerConfig {
32-
// TODO: Don't use object type
33-
// eslint-disable-next-line @typescript-eslint/ban-types
34-
datafile?: string | object;
32+
datafile?: string | Record<string, unknown>;
3533
jsonSchemaValidator?: Transformer<unknown, boolean>,
3634
datafileManager?: DatafileManager;
3735
logger?: LoggerFacade;
@@ -198,7 +196,6 @@ export class ProjectConfigManagerImpl extends BaseService implements ProjectConf
198196
}
199197

200198
if (this.isNew() || this.isStarting()) {
201-
// TOOD: replace message with imported constants
202199
this.startPromise.reject(new Error(DATAFILE_MANAGER_STOPPED));
203200
}
204201

0 commit comments

Comments
 (0)