diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index e6caf7816..9f4ec7315 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -71,6 +71,7 @@ import { SegmentError, translateHTTPError, } from './errors'; +import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; type OnPluginAddedCallback = (plugin: Plugin) => void; @@ -982,4 +983,46 @@ export class SegmentClient { userId: userInfo.userId, }; }; + /* Method for clearing flush queue */ + clear() { + const plugins = this.getPlugins(); + + plugins.forEach(async (plugin) => { + if (plugin instanceof SegmentDestination) { + const timelinePlugins = plugin.timeline?.plugins?.after ?? []; + + for (const subPlugin of timelinePlugins) { + if (subPlugin instanceof QueueFlushingPlugin) { + await subPlugin.dequeueEvents(); + } + } + } + }); + + this.flushPolicyExecuter.reset(); + } + + /** + * Method to get count of events in flush queue. + */ + async pendingEvents() { + const plugins = this.getPlugins(); + let totalEventsCount = 0; + + for (const plugin of plugins) { + // We're looking inside SegmentDestination's `after` plugins + if (plugin instanceof SegmentDestination) { + const timelinePlugins = plugin.timeline?.plugins?.after ?? []; + + for (const subPlugin of timelinePlugins) { + if (subPlugin instanceof QueueFlushingPlugin) { + const eventsCount = await subPlugin.pendingEvents(); + totalEventsCount += eventsCount; + } + } + } + } + + return totalEventsCount; + } } diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 4ca749aac..1580ee288 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -130,4 +130,20 @@ export class QueueFlushingPlugin extends UtilityPlugin { return { events: filteredEvents }; }); } + /** + * Clear all events from the queue + */ + async dequeueEvents() { + await this.queueStore?.dispatch(() => { + return { events: [] }; + }); + } + + /** + * * Returns the count of items in the queue + */ + async pendingEvents() { + const events = (await this.queueStore?.getState(true))?.events ?? []; + return events.length; + } } diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index 81eabac7b..b3b02d802 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -77,4 +77,67 @@ describe('QueueFlushingPlugin', () => { // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); }); + it('should clear all events from the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(2); + await queuePlugin.dequeueEvents(); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); + }); + it('should return the count of items in the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + + let eventsCount = await queuePlugin.pendingEvents(); + expect(eventsCount).toBe(2); + + await queuePlugin.dequeue(event1); + + eventsCount = await queuePlugin.pendingEvents(); + expect(eventsCount).toBe(1); + + await queuePlugin.dequeueEvents(); + + eventsCount = await queuePlugin.pendingEvents(); + expect(eventsCount).toBe(0); + }); }); diff --git a/packages/core/src/storage/types.ts b/packages/core/src/storage/types.ts index 162398ee6..91955b715 100644 --- a/packages/core/src/storage/types.ts +++ b/packages/core/src/storage/types.ts @@ -92,7 +92,6 @@ export interface Storage { readonly pendingEvents: Watchable & Settable & Queue; - readonly enabled: Watchable & Settable; } export type DeepLinkData = {