Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions packages/core/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import {
SegmentError,
translateHTTPError,
} from './errors';
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';

type OnPluginAddedCallback = (plugin: Plugin) => void;

Expand Down Expand Up @@ -982,4 +983,46 @@ export class SegmentClient {
userId: userInfo.userId,
};
};
/* Method for clearing flush queue */
clear() {
const plugins = this.getPlugins();

plugins.forEach(async (plugin) => {
if (plugin instanceof SegmentDestination) {
const timelinePlugins = plugin.timeline?.plugins?.after ?? [];

for (const subPlugin of timelinePlugins) {
if (subPlugin instanceof QueueFlushingPlugin) {
await subPlugin.dequeueEvents();
}
}
}
});

this.flushPolicyExecuter.reset();
}

/**
* Method to get count of events in flush queue.
*/
async pendingEvents() {
const plugins = this.getPlugins();
let totalEventsCount = 0;

for (const plugin of plugins) {
// We're looking inside SegmentDestination's `after` plugins
if (plugin instanceof SegmentDestination) {
const timelinePlugins = plugin.timeline?.plugins?.after ?? [];

for (const subPlugin of timelinePlugins) {
if (subPlugin instanceof QueueFlushingPlugin) {
const eventsCount = await subPlugin.pendingEvents();
totalEventsCount += eventsCount;
}
}
}
}

return totalEventsCount;
}
}
16 changes: 16 additions & 0 deletions packages/core/src/plugins/QueueFlushingPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,20 @@ export class QueueFlushingPlugin extends UtilityPlugin {
return { events: filteredEvents };
});
}
/**
* Clear all events from the queue
*/
async dequeueEvents() {
await this.queueStore?.dispatch(() => {
return { events: [] };
});
}

/**
* * Returns the count of items in the queue
*/
async pendingEvents() {
const events = (await this.queueStore?.getState(true))?.events ?? [];
return events.length;
}
}
63 changes: 63 additions & 0 deletions packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,67 @@ describe('QueueFlushingPlugin', () => {
// @ts-ignore
expect(queuePlugin.queueStore?.getState().events).toHaveLength(0);
});
it('should clear all events from the queue', async () => {
const onFlush = jest.fn().mockResolvedValue(undefined);
const queuePlugin = setupQueuePlugin(onFlush, 10);
const event1: SegmentEvent = {
type: EventType.TrackEvent,
event: 'test1',
properties: {
test: 'test1',
},
};
const event2: SegmentEvent = {
type: EventType.TrackEvent,
event: 'test2',
properties: {
test: 'test2',
},
};
await queuePlugin.execute(event1);
await queuePlugin.execute(event2);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
expect(queuePlugin.queueStore?.getState().events).toHaveLength(2);
await queuePlugin.dequeueEvents();
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
expect(queuePlugin.queueStore?.getState().events).toHaveLength(0);
});
it('should return the count of items in the queue', async () => {
const onFlush = jest.fn().mockResolvedValue(undefined);
const queuePlugin = setupQueuePlugin(onFlush, 10);

const event1: SegmentEvent = {
type: EventType.TrackEvent,
event: 'test1',
properties: {
test: 'test1',
},
};

const event2: SegmentEvent = {
type: EventType.TrackEvent,
event: 'test2',
properties: {
test: 'test2',
},
};

await queuePlugin.execute(event1);
await queuePlugin.execute(event2);

let eventsCount = await queuePlugin.pendingEvents();
expect(eventsCount).toBe(2);

await queuePlugin.dequeue(event1);

eventsCount = await queuePlugin.pendingEvents();
expect(eventsCount).toBe(1);

await queuePlugin.dequeueEvents();

eventsCount = await queuePlugin.pendingEvents();
expect(eventsCount).toBe(0);
});
});
1 change: 0 additions & 1 deletion packages/core/src/storage/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ export interface Storage {
readonly pendingEvents: Watchable<SegmentEvent[]> &
Settable<SegmentEvent[]> &
Queue<SegmentEvent, SegmentEvent[]>;

readonly enabled: Watchable<boolean> & Settable<boolean>;
}
export type DeepLinkData = {
Expand Down
Loading