Skip to content

[FSSDK-11003] disposable service implementation #981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions lib/event_processor/batch_event_processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ describe('QueueingEventProcessor', async () => {
await expect(processor.onRunning()).resolves.not.toThrow();
});

it('should start dispatchRepeater and failedEventRepeater', () => {
it('should start failedEventRepeater', () => {
const eventDispatcher = getMockDispatcher();
const dispatchRepeater = getMockRepeater();
const failedEventRepeater = getMockRepeater();
Expand All @@ -107,7 +107,6 @@ describe('QueueingEventProcessor', async () => {
});

processor.start();
expect(dispatchRepeater.start).toHaveBeenCalledOnce();
expect(failedEventRepeater.start).toHaveBeenCalledOnce();
});

Expand Down Expand Up @@ -167,14 +166,33 @@ describe('QueueingEventProcessor', async () => {

processor.start();
await processor.onRunning();
for(let i = 0; i < 100; i++) {
for(let i = 0; i < 99; i++) {
const event = createImpressionEvent(`id-${i}`);
await processor.process(event);
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);
});

it('should start the dispatchRepeater if it is not running', async () => {
const eventDispatcher = getMockDispatcher();
const dispatchRepeater = getMockRepeater();

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater,
batchSize: 100,
});

processor.start();
await processor.onRunning();

const event = createImpressionEvent('id-1');
await processor.process(event);

expect(dispatchRepeater.start).toHaveBeenCalledOnce();
});

it('should dispatch events if queue is full and clear queue', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
Expand All @@ -190,30 +208,33 @@ describe('QueueingEventProcessor', async () => {
await processor.onRunning();

let events: ProcessableEvent[] = [];
for(let i = 0; i < 100; i++) {
for(let i = 0; i < 99; i++){
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event);
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);

let event = createImpressionEvent('id-100');
let event = createImpressionEvent('id-99');
events.push(event);
await processor.process(event);

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));

events = [event];
for(let i = 101; i < 200; i++) {
events = [];

for(let i = 100; i < 199; i++) {
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event);
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);

event = createImpressionEvent('id-200');
event = createImpressionEvent('id-199');
events.push(event);
await processor.process(event);

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -257,6 +278,40 @@ describe('QueueingEventProcessor', async () => {
expect(eventDispatcher.dispatchEvent.mock.calls[1][0]).toEqual(buildLogEvent([newEvent]));
});

it('should flush queue immediately regardless of batchSize, if event processor is disposable', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue({});

const dispatchRepeater = getMockRepeater();
const failedEventRepeater = getMockRepeater();

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater,
failedEventRepeater,
batchSize: 100,
});

processor.makeDisposable();
processor.start();
await processor.onRunning();

const events: ProcessableEvent[] = [];
const event = createImpressionEvent('id-1');
events.push(event);
await processor.process(event);

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
expect(dispatchRepeater.reset).toHaveBeenCalledTimes(1);
expect(dispatchRepeater.start).not.toHaveBeenCalled();
expect(failedEventRepeater.start).not.toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
expect(processor.retryConfig?.maxRetries).toEqual(5);
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add tests for the following as well when disposable

  1. FailedEventRepeater is not started
  2. dispatchRepeater is not started
  3. maxRetry is limited

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding your number 2 - Current logic is - dispatchRepeater starts, dispatch immediately and then stops. Isn't this expected ? Or am I missing something here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the refactored logic in this PR, if batchSize == 1, dispatch repeater should never start. It will immediately dispatch the event, the repeater start is in the else branch

it('should store the event in the eventStore with increasing ids', async () => {
const eventDispatcher = getMockDispatcher();
const eventStore = getMockSyncCache<EventWithId>();
Expand Down
39 changes: 30 additions & 9 deletions lib/event_processor/batch_event_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import { areEventContextsEqual } from "./event_builder/user_event";
import { EVENT_PROCESSOR_STOPPED, FAILED_TO_DISPATCH_EVENTS, FAILED_TO_DISPATCH_EVENTS_WITH_ARG } from "../exception_messages";
import { sprintf } from "../utils/fns";

export const DEFAULT_MIN_BACKOFF = 1000;
export const DEFAULT_MAX_BACKOFF = 32000;

export type EventWithId = {
id: string;
event: ProcessableEvent;
Expand Down Expand Up @@ -209,7 +212,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
if (!batch) {
return;
}


this.dispatchRepeater.reset();
this.dispatchBatch(batch, closing);
}

Expand All @@ -218,10 +222,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
return Promise.reject('Event processor is not running');
}

if (this.eventQueue.length == this.batchSize) {
this.flush();
}

const eventWithId = {
id: this.idGenerator.getId(),
event: event,
Expand All @@ -232,29 +232,50 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
this.flush();
}
this.eventQueue.push(eventWithId);

this.eventQueue.push(eventWithId);

if (this.eventQueue.length == this.batchSize) {
this.flush();
} else if (!this.dispatchRepeater.isRunning()) {
this.dispatchRepeater.start();
}

}

start(): void {
if (!this.isNew()) {
return;
}

super.start();
this.state = ServiceState.Running;
this.dispatchRepeater.start();
this.failedEventRepeater?.start();

if(!this.disposable) {
this.failedEventRepeater?.start();
}

this.retryFailedEvents();
this.startPromise.resolve();
}

makeDisposable(): void {
super.makeDisposable();
this.batchSize = 1;
this.retryConfig = {
maxRetries: Math.min(this.retryConfig?.maxRetries ?? 5, 5),
backoffProvider:
this.retryConfig?.backoffProvider ||
(() => new ExponentialBackoff(DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, 500)),
}
}

stop(): void {
if (this.isDone()) {
return;
}

if (this.isNew()) {
// TOOD: replace message with imported constants
this.startPromise.reject(new Error(EVENT_PROCESSOR_STOPPED));
}

Expand Down
4 changes: 2 additions & 2 deletions lib/event_processor/event_processor_factory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

import { describe, it, expect, beforeEach, vi, MockInstance } from 'vitest';
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, getBatchEventProcessor } from './event_processor_factory';
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId } from './batch_event_processor';
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, getBatchEventProcessor } from './event_processor_factory';
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId,DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF } from './batch_event_processor';
import { ExponentialBackoff, IntervalRepeater } from '../utils/repeater/repeater';
import { getMockSyncCache } from '../tests/mock/mock_cache';
import { LogLevel } from '../modules/logging';
Expand Down
4 changes: 1 addition & 3 deletions lib/event_processor/event_processor_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import { StartupLog } from "../service";
import { ExponentialBackoff, IntervalRepeater } from "../utils/repeater/repeater";
import { EventDispatcher } from "./event_dispatcher/event_dispatcher";
import { EventProcessor } from "./event_processor";
import { BatchEventProcessor, EventWithId, RetryConfig } from "./batch_event_processor";
import { BatchEventProcessor, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, EventWithId, RetryConfig } from "./batch_event_processor";
import { AsyncPrefixCache, Cache, SyncPrefixCache } from "../utils/cache/cache";

export const DEFAULT_EVENT_BATCH_SIZE = 10;
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1000;
export const DEFAULT_EVENT_MAX_QUEUE_SIZE = 10000;
export const DEFAULT_MIN_BACKOFF = 1000;
export const DEFAULT_MAX_BACKOFF = 32000;
export const FAILED_EVENT_RETRY_INTERVAL = 20 * 1000;
export const EVENT_STORE_PREFIX = 'optly_event:';

Expand Down
1 change: 0 additions & 1 deletion lib/exception_messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export const DATAFILE_MANAGER_STOPPED = 'Datafile manager stopped before it coul
export const DATAFILE_MANAGER_FAILED_TO_START = 'Datafile manager failed to start';
export const FAILED_TO_FETCH_DATAFILE = 'Failed to fetch datafile';
export const FAILED_TO_STOP = 'Failed to stop';
export const YOU_MUST_PROVIDE_DATAFILE_IN_SSR = 'You must provide datafile in SSR';
export const YOU_MUST_PROVIDE_AT_LEAST_ONE_OF_SDKKEY_OR_DATAFILE = 'You must provide at least one of sdkKey or datafile';
export const RETRY_CANCELLED = 'Retry cancelled';
export const REQUEST_TIMEOUT = 'Request timeout';
Expand Down
34 changes: 34 additions & 0 deletions lib/odp/event_manager/odp_event_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,40 @@ describe('DefaultOdpEventManager', () => {
}
});

it('should flush the queue immediately if disposable, regardless of the batchSize', async () => {
const apiManager = getMockApiManager();
const repeater = getMockRepeater()
apiManager.sendEvents.mockResolvedValue({ statusCode: 200 });

const odpEventManager = new DefaultOdpEventManager({
repeater,
apiManager: apiManager,
batchSize: 10,
retryConfig: {
maxRetries: 3,
backoffProvider: vi.fn(),
},
});

odpEventManager.updateConfig({
integrated: true,
odpConfig: config,
});
odpEventManager.makeDisposable();
odpEventManager.start();

await expect(odpEventManager.onRunning()).resolves.not.toThrow();

const event = makeEvent(0);
odpEventManager.sendEvent(event);
await exhaustMicrotasks();

expect(apiManager.sendEvents).toHaveBeenCalledTimes(1);
expect(apiManager.sendEvents).toHaveBeenNthCalledWith(1, config, [event]);
expect(repeater.reset).toHaveBeenCalledTimes(1);
expect(repeater.start).not.toHaveBeenCalled();
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add tests for repeater stop when disposable = true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the actual implementationrepeater.reset calls repeater.stop. But in the mock version of the repeater thats not the case. Whats your suggestion on that? Should I update the mock to support this ? (Not meaningful IMO).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see we have assertions for repeater.reset here. We can ignore this comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a test to assert the repeater is not started, similar to eventProcessor

it('drops events and logs if the state is not running', async () => {
const apiManager = getMockApiManager();
apiManager.sendEvents.mockResolvedValue({ statusCode: 200 });
Expand Down
7 changes: 7 additions & 0 deletions lib/odp/event_manager/odp_event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,20 @@ export class DefaultOdpEventManager extends BaseService implements OdpEventManag
}

super.start();

if (this.odpIntegrationConfig) {
this.goToRunningState();
} else {
this.state = ServiceState.Starting;
}
}

makeDisposable(): void {
super.makeDisposable();
this.retryConfig.maxRetries = Math.min(this.retryConfig.maxRetries, 5);
this.batchSize = 1;
}

updateConfig(odpIntegrationConfig: OdpIntegrationConfig): void {
if (this.isDone()) {
return;
Expand Down
15 changes: 15 additions & 0 deletions lib/odp/odp_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const getMockOdpEventManager = () => {
getState: vi.fn(),
updateConfig: vi.fn(),
sendEvent: vi.fn(),
makeDisposable: vi.fn(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add test that it makes the eventManager disposable when its makeDisposable() is called?

};
};

Expand Down Expand Up @@ -696,5 +697,19 @@ describe('DefaultOdpManager', () => {
eventManagerTerminatedPromise.reject(new Error(FAILED_TO_STOP));
await expect(odpManager.onTerminated()).rejects.toThrow();
});

it('should call makeDisposable() on eventManager when makeDisposable() is called on odpManager', async () => {
const eventManager = getMockOdpEventManager();
const segmentManager = getMockOdpSegmentManager();

const odpManager = new DefaultOdpManager({
segmentManager,
eventManager,
});

odpManager.makeDisposable();

expect(eventManager.makeDisposable).toHaveBeenCalled();
})
});

5 changes: 5 additions & 0 deletions lib/odp/odp_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ export class DefaultOdpManager extends BaseService implements OdpManager {
});
}

makeDisposable(): void {
super.makeDisposable();
this.eventManager.makeDisposable();
}

private handleStartSuccess() {
if (this.isDone()) {
return;
Expand Down
Loading
Loading