Skip to content

Commit 3265beb

Browse files
author
Sunita Prajapati
committed
chore: added waiting plugin
1 parent 9d1aee7 commit 3265beb

File tree

2 files changed

+118
-1
lines changed

2 files changed

+118
-1
lines changed

packages/core/src/analytics.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ import {
7272
translateHTTPError,
7373
} from './errors';
7474
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';
75+
import { WaitingPlugin } from './plugins/Waiting';
7576

7677
type OnPluginAddedCallback = (plugin: Plugin) => void;
7778

@@ -97,6 +98,11 @@ export class SegmentClient {
9798
private isAddingPlugins = false;
9899

99100
private timeline: Timeline;
101+
// running state (matches Kotlin's running flag)
102+
private isRunning = true;
103+
104+
// Waiting plugin instance (buffers events while paused)
105+
private waitingPlugin?: WaitingPlugin;
100106

101107
private pluginsToAdd: Plugin[] = [];
102108

@@ -200,6 +206,18 @@ export class SegmentClient {
200206
this.store = store;
201207
this.timeline = new Timeline();
202208

209+
// create and add waiting plugin immediately so early events get buffered.
210+
try {
211+
this.waitingPlugin = new WaitingPlugin();
212+
// add directly to timeline via addPlugin to ensure configure() is called immediately
213+
this.addPlugin(this.waitingPlugin);
214+
// initial running state false until init completes (mirrors Kotlin semantics)
215+
this.isRunning = false;
216+
} catch (e) {
217+
// if WaitingPlugin instantiation or add fails, fallback to running=true
218+
this.isRunning = true;
219+
}
220+
203221
// Initialize the watchables
204222
this.context = {
205223
get: this.store.context.get,
@@ -296,6 +314,9 @@ export class SegmentClient {
296314
await this.storageReady();
297315
}
298316

317+
// Pause pipeline at init start (buffer events until init completes)
318+
this.pauseEventProcessing();
319+
299320
// Get new settings from segment
300321
// It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins
301322
// which make use of the settings object to initialize
@@ -309,7 +330,8 @@ export class SegmentClient {
309330
]);
310331
await this.onReady();
311332
this.isReady.value = true;
312-
333+
// Resume pipeline before processing pending events so WaitingPlugin flushes
334+
await this.resumeEventProcessing();
313335
// Process all pending events
314336
await this.processPendingEvents();
315337
// Trigger manual flush
@@ -1027,4 +1049,45 @@ export class SegmentClient {
10271049

10281050
return totalEventsCount;
10291051
}
1052+
/*
1053+
* Running / pause/resume helpers (Kotlin parity)
1054+
*/
1055+
1056+
public running() {
1057+
return this.isRunning;
1058+
}
1059+
/**
1060+
* Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin.
1061+
* An auto-resume will be scheduled after `timeout` ms.
1062+
*/
1063+
public pauseEventProcessing(timeout = 30000) {
1064+
if (!this.isRunning) {
1065+
return;
1066+
}
1067+
1068+
this.isRunning = false;
1069+
try {
1070+
this.waitingPlugin?.pause();
1071+
} catch {
1072+
// ignore if plugin not present
1073+
}
1074+
1075+
// auto-resume after timeout to avoid permanent blocking
1076+
setTimeout(() => {
1077+
void this.resumeEventProcessing();
1078+
}, timeout);
1079+
}
1080+
public async resumeEventProcessing() {
1081+
if (this.isRunning) {
1082+
return;
1083+
}
1084+
1085+
this.isRunning = true;
1086+
1087+
try {
1088+
await this.waitingPlugin?.resume();
1089+
} catch {
1090+
// ignore plugin errors during resume
1091+
}
1092+
}
10301093
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { SegmentClient } from 'src';
2+
import { Plugin } from 'src/plugin';
3+
import { PluginType, SegmentEvent } from 'src/types';
4+
5+
/**
6+
* WaitingPlugin
7+
* Buffers events when paused and releases them when resumed.
8+
*/
9+
export class WaitingPlugin extends Plugin {
10+
public type = PluginType.before;
11+
private paused = true;
12+
private buffer: SegmentEvent[] = [];
13+
14+
configure(analytics: SegmentClient) {
15+
super.configure(analytics);
16+
}
17+
18+
isPaused() {
19+
return this.paused;
20+
}
21+
22+
pause() {
23+
this.paused = true;
24+
}
25+
26+
async resume() {
27+
if (!this.paused) {
28+
return;
29+
}
30+
31+
this.paused = false;
32+
33+
const events = [...this.buffer];
34+
this.buffer = [];
35+
36+
for (const event of events) {
37+
try {
38+
if (this.analytics !== undefined) {
39+
await this.analytics.process(event);
40+
}
41+
} catch (err) {
42+
// Ignore individual errors
43+
}
44+
}
45+
}
46+
47+
execute(event: SegmentEvent): SegmentEvent | undefined {
48+
if (this.paused) {
49+
this.buffer.push(event);
50+
return undefined;
51+
}
52+
return event;
53+
}
54+
}

0 commit comments

Comments
 (0)