Skip to content

Commit 92ab2a7

Browse files
[FSSDK-11003] disposable service implementation (#981)
1 parent 51e8c1a commit 92ab2a7

19 files changed

+261
-102
lines changed

lib/event_processor/batch_event_processor.spec.ts

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ describe('QueueingEventProcessor', async () => {
9494
await expect(processor.onRunning()).resolves.not.toThrow();
9595
});
9696

97-
it('should start dispatchRepeater and failedEventRepeater', () => {
97+
it('should start failedEventRepeater', () => {
9898
const eventDispatcher = getMockDispatcher();
9999
const dispatchRepeater = getMockRepeater();
100100
const failedEventRepeater = getMockRepeater();
@@ -107,7 +107,6 @@ describe('QueueingEventProcessor', async () => {
107107
});
108108

109109
processor.start();
110-
expect(dispatchRepeater.start).toHaveBeenCalledOnce();
111110
expect(failedEventRepeater.start).toHaveBeenCalledOnce();
112111
});
113112

@@ -167,14 +166,33 @@ describe('QueueingEventProcessor', async () => {
167166

168167
processor.start();
169168
await processor.onRunning();
170-
for(let i = 0; i < 100; i++) {
169+
for(let i = 0; i < 99; i++) {
171170
const event = createImpressionEvent(`id-${i}`);
172171
await processor.process(event);
173172
}
174173

175174
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);
176175
});
177176

177+
it('should start the dispatchRepeater if it is not running', async () => {
178+
const eventDispatcher = getMockDispatcher();
179+
const dispatchRepeater = getMockRepeater();
180+
181+
const processor = new BatchEventProcessor({
182+
eventDispatcher,
183+
dispatchRepeater,
184+
batchSize: 100,
185+
});
186+
187+
processor.start();
188+
await processor.onRunning();
189+
190+
const event = createImpressionEvent('id-1');
191+
await processor.process(event);
192+
193+
expect(dispatchRepeater.start).toHaveBeenCalledOnce();
194+
});
195+
178196
it('should dispatch events if queue is full and clear queue', async () => {
179197
const eventDispatcher = getMockDispatcher();
180198
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
@@ -190,30 +208,33 @@ describe('QueueingEventProcessor', async () => {
190208
await processor.onRunning();
191209

192210
let events: ProcessableEvent[] = [];
193-
for(let i = 0; i < 100; i++) {
211+
for(let i = 0; i < 99; i++){
194212
const event = createImpressionEvent(`id-${i}`);
195213
events.push(event);
196214
await processor.process(event);
197215
}
198216

199217
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);
200218

201-
let event = createImpressionEvent('id-100');
219+
let event = createImpressionEvent('id-99');
220+
events.push(event);
202221
await processor.process(event);
203-
222+
204223
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
205224
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
206225

207-
events = [event];
208-
for(let i = 101; i < 200; i++) {
226+
events = [];
227+
228+
for(let i = 100; i < 199; i++) {
209229
const event = createImpressionEvent(`id-${i}`);
210230
events.push(event);
211231
await processor.process(event);
212232
}
213233

214234
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
215235

216-
event = createImpressionEvent('id-200');
236+
event = createImpressionEvent('id-199');
237+
events.push(event);
217238
await processor.process(event);
218239

219240
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2);
@@ -257,6 +278,40 @@ describe('QueueingEventProcessor', async () => {
257278
expect(eventDispatcher.dispatchEvent.mock.calls[1][0]).toEqual(buildLogEvent([newEvent]));
258279
});
259280

281+
it('should flush queue immediately regardless of batchSize, if event processor is disposable', async () => {
282+
const eventDispatcher = getMockDispatcher();
283+
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
284+
mockDispatch.mockResolvedValue({});
285+
286+
const dispatchRepeater = getMockRepeater();
287+
const failedEventRepeater = getMockRepeater();
288+
289+
const processor = new BatchEventProcessor({
290+
eventDispatcher,
291+
dispatchRepeater,
292+
failedEventRepeater,
293+
batchSize: 100,
294+
});
295+
296+
processor.makeDisposable();
297+
processor.start();
298+
await processor.onRunning();
299+
300+
const events: ProcessableEvent[] = [];
301+
const event = createImpressionEvent('id-1');
302+
events.push(event);
303+
await processor.process(event);
304+
305+
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
306+
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
307+
expect(dispatchRepeater.reset).toHaveBeenCalledTimes(1);
308+
expect(dispatchRepeater.start).not.toHaveBeenCalled();
309+
expect(failedEventRepeater.start).not.toHaveBeenCalled();
310+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
311+
// @ts-ignore
312+
expect(processor.retryConfig?.maxRetries).toEqual(5);
313+
});
314+
260315
it('should store the event in the eventStore with increasing ids', async () => {
261316
const eventDispatcher = getMockDispatcher();
262317
const eventStore = getMockSyncCache<EventWithId>();

lib/event_processor/batch_event_processor.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import { areEventContextsEqual } from "./event_builder/user_event";
3030
import { EVENT_PROCESSOR_STOPPED, FAILED_TO_DISPATCH_EVENTS, FAILED_TO_DISPATCH_EVENTS_WITH_ARG } from "../exception_messages";
3131
import { sprintf } from "../utils/fns";
3232

33+
export const DEFAULT_MIN_BACKOFF = 1000;
34+
export const DEFAULT_MAX_BACKOFF = 32000;
35+
3336
export type EventWithId = {
3437
id: string;
3538
event: ProcessableEvent;
@@ -209,7 +212,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
209212
if (!batch) {
210213
return;
211214
}
212-
215+
216+
this.dispatchRepeater.reset();
213217
this.dispatchBatch(batch, closing);
214218
}
215219

@@ -218,10 +222,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
218222
return Promise.reject('Event processor is not running');
219223
}
220224

221-
if (this.eventQueue.length == this.batchSize) {
222-
this.flush();
223-
}
224-
225225
const eventWithId = {
226226
id: this.idGenerator.getId(),
227227
event: event,
@@ -232,29 +232,50 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
232232
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
233233
this.flush();
234234
}
235-
this.eventQueue.push(eventWithId);
235+
236+
this.eventQueue.push(eventWithId);
237+
238+
if (this.eventQueue.length == this.batchSize) {
239+
this.flush();
240+
} else if (!this.dispatchRepeater.isRunning()) {
241+
this.dispatchRepeater.start();
242+
}
243+
236244
}
237245

238246
start(): void {
239247
if (!this.isNew()) {
240248
return;
241249
}
250+
242251
super.start();
243252
this.state = ServiceState.Running;
244-
this.dispatchRepeater.start();
245-
this.failedEventRepeater?.start();
253+
254+
if(!this.disposable) {
255+
this.failedEventRepeater?.start();
256+
}
246257

247258
this.retryFailedEvents();
248259
this.startPromise.resolve();
249260
}
250261

262+
makeDisposable(): void {
263+
super.makeDisposable();
264+
this.batchSize = 1;
265+
this.retryConfig = {
266+
maxRetries: Math.min(this.retryConfig?.maxRetries ?? 5, 5),
267+
backoffProvider:
268+
this.retryConfig?.backoffProvider ||
269+
(() => new ExponentialBackoff(DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, 500)),
270+
}
271+
}
272+
251273
stop(): void {
252274
if (this.isDone()) {
253275
return;
254276
}
255277

256278
if (this.isNew()) {
257-
// TOOD: replace message with imported constants
258279
this.startPromise.reject(new Error(EVENT_PROCESSOR_STOPPED));
259280
}
260281

lib/event_processor/event_processor_factory.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
*/
1616

1717
import { describe, it, expect, beforeEach, vi, MockInstance } from 'vitest';
18-
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, getBatchEventProcessor } from './event_processor_factory';
19-
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId } from './batch_event_processor';
18+
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, getBatchEventProcessor } from './event_processor_factory';
19+
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId,DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF } from './batch_event_processor';
2020
import { ExponentialBackoff, IntervalRepeater } from '../utils/repeater/repeater';
2121
import { getMockSyncCache } from '../tests/mock/mock_cache';
2222
import { LogLevel } from '../modules/logging';

lib/event_processor/event_processor_factory.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ import { StartupLog } from "../service";
1919
import { ExponentialBackoff, IntervalRepeater } from "../utils/repeater/repeater";
2020
import { EventDispatcher } from "./event_dispatcher/event_dispatcher";
2121
import { EventProcessor } from "./event_processor";
22-
import { BatchEventProcessor, EventWithId, RetryConfig } from "./batch_event_processor";
22+
import { BatchEventProcessor, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, EventWithId, RetryConfig } from "./batch_event_processor";
2323
import { AsyncPrefixCache, Cache, SyncPrefixCache } from "../utils/cache/cache";
2424

2525
export const DEFAULT_EVENT_BATCH_SIZE = 10;
2626
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1000;
2727
export const DEFAULT_EVENT_MAX_QUEUE_SIZE = 10000;
28-
export const DEFAULT_MIN_BACKOFF = 1000;
29-
export const DEFAULT_MAX_BACKOFF = 32000;
3028
export const FAILED_EVENT_RETRY_INTERVAL = 20 * 1000;
3129
export const EVENT_STORE_PREFIX = 'optly_event:';
3230

lib/exception_messages.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ export const DATAFILE_MANAGER_STOPPED = 'Datafile manager stopped before it coul
3131
export const DATAFILE_MANAGER_FAILED_TO_START = 'Datafile manager failed to start';
3232
export const FAILED_TO_FETCH_DATAFILE = 'Failed to fetch datafile';
3333
export const FAILED_TO_STOP = 'Failed to stop';
34-
export const YOU_MUST_PROVIDE_DATAFILE_IN_SSR = 'You must provide datafile in SSR';
3534
export const YOU_MUST_PROVIDE_AT_LEAST_ONE_OF_SDKKEY_OR_DATAFILE = 'You must provide at least one of sdkKey or datafile';
3635
export const RETRY_CANCELLED = 'Retry cancelled';
3736
export const REQUEST_TIMEOUT = 'Request timeout';

lib/odp/event_manager/odp_event_manager.spec.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,40 @@ describe('DefaultOdpEventManager', () => {
207207
}
208208
});
209209

210+
it('should flush the queue immediately if disposable, regardless of the batchSize', async () => {
211+
const apiManager = getMockApiManager();
212+
const repeater = getMockRepeater()
213+
apiManager.sendEvents.mockResolvedValue({ statusCode: 200 });
214+
215+
const odpEventManager = new DefaultOdpEventManager({
216+
repeater,
217+
apiManager: apiManager,
218+
batchSize: 10,
219+
retryConfig: {
220+
maxRetries: 3,
221+
backoffProvider: vi.fn(),
222+
},
223+
});
224+
225+
odpEventManager.updateConfig({
226+
integrated: true,
227+
odpConfig: config,
228+
});
229+
odpEventManager.makeDisposable();
230+
odpEventManager.start();
231+
232+
await expect(odpEventManager.onRunning()).resolves.not.toThrow();
233+
234+
const event = makeEvent(0);
235+
odpEventManager.sendEvent(event);
236+
await exhaustMicrotasks();
237+
238+
expect(apiManager.sendEvents).toHaveBeenCalledTimes(1);
239+
expect(apiManager.sendEvents).toHaveBeenNthCalledWith(1, config, [event]);
240+
expect(repeater.reset).toHaveBeenCalledTimes(1);
241+
expect(repeater.start).not.toHaveBeenCalled();
242+
})
243+
210244
it('drops events and logs if the state is not running', async () => {
211245
const apiManager = getMockApiManager();
212246
apiManager.sendEvents.mockResolvedValue({ statusCode: 200 });

lib/odp/event_manager/odp_event_manager.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,20 @@ export class DefaultOdpEventManager extends BaseService implements OdpEventManag
107107
}
108108

109109
super.start();
110+
110111
if (this.odpIntegrationConfig) {
111112
this.goToRunningState();
112113
} else {
113114
this.state = ServiceState.Starting;
114115
}
115116
}
116117

118+
makeDisposable(): void {
119+
super.makeDisposable();
120+
this.retryConfig.maxRetries = Math.min(this.retryConfig.maxRetries, 5);
121+
this.batchSize = 1;
122+
}
123+
117124
updateConfig(odpIntegrationConfig: OdpIntegrationConfig): void {
118125
if (this.isDone()) {
119126
return;

lib/odp/odp_manager.spec.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const getMockOdpEventManager = () => {
5151
getState: vi.fn(),
5252
updateConfig: vi.fn(),
5353
sendEvent: vi.fn(),
54+
makeDisposable: vi.fn(),
5455
};
5556
};
5657

@@ -696,5 +697,19 @@ describe('DefaultOdpManager', () => {
696697
eventManagerTerminatedPromise.reject(new Error(FAILED_TO_STOP));
697698
await expect(odpManager.onTerminated()).rejects.toThrow();
698699
});
700+
701+
it('should call makeDisposable() on eventManager when makeDisposable() is called on odpManager', async () => {
702+
const eventManager = getMockOdpEventManager();
703+
const segmentManager = getMockOdpSegmentManager();
704+
705+
const odpManager = new DefaultOdpManager({
706+
segmentManager,
707+
eventManager,
708+
});
709+
710+
odpManager.makeDisposable();
711+
712+
expect(eventManager.makeDisposable).toHaveBeenCalled();
713+
})
699714
});
700715

lib/odp/odp_manager.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ export class DefaultOdpManager extends BaseService implements OdpManager {
108108
});
109109
}
110110

111+
makeDisposable(): void {
112+
super.makeDisposable();
113+
this.eventManager.makeDisposable();
114+
}
115+
111116
private handleStartSuccess() {
112117
if (this.isDone()) {
113118
return;

0 commit comments

Comments
 (0)