Skip to content

Commit 8bb3074

Browse files
author
Sunita Prajapati
committed
feat: waiting plugin
add waiting plugin interface. any plugin implement this interface will pause event processing when it's added to analytics any plugin implement this interface can pause and resume event processing as needed
1 parent 3265beb commit 8bb3074

File tree

7 files changed

+525
-106
lines changed

7 files changed

+525
-106
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import {
2+
WaitingPlugin,
3+
PluginType,
4+
Plugin,
5+
6+
} from '@segment/analytics-react-native';
7+
8+
import type {SegmentAPISettings, SegmentClient, SegmentEvent, UpdateType} from '@segment/analytics-react-native';
9+
export class ExampleWaitingPlugin extends WaitingPlugin {
10+
type = PluginType.enrichment;
11+
analytics = undefined;
12+
tracked = false;
13+
14+
/**
15+
* Called when settings are updated
16+
*/
17+
update(_settings: SegmentAPISettings, _type: UpdateType) {
18+
if (this.type === PluginType.before) {
19+
// delay 3 seconds, then resume event processing
20+
setTimeout(() => {
21+
this.resume();
22+
}, 3000);
23+
}
24+
}
25+
26+
/**
27+
* Called for track events
28+
*/
29+
track(event: SegmentEvent) {
30+
this.tracked = true;
31+
return event;
32+
}
33+
}

packages/core/src/analytics.ts

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ import {
7272
translateHTTPError,
7373
} from './errors';
7474
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';
75-
import { WaitingPlugin } from './plugins/Waiting';
75+
import { WaitingPlugin } from './plugin';
7676

7777
type OnPluginAddedCallback = (plugin: Plugin) => void;
7878

@@ -98,11 +98,6 @@ export class SegmentClient {
9898
private isAddingPlugins = false;
9999

100100
private timeline: Timeline;
101-
// running state (matches Kotlin's running flag)
102-
private isRunning = true;
103-
104-
// Waiting plugin instance (buffers events while paused)
105-
private waitingPlugin?: WaitingPlugin;
106101

107102
private pluginsToAdd: Plugin[] = [];
108103

@@ -206,17 +201,17 @@ export class SegmentClient {
206201
this.store = store;
207202
this.timeline = new Timeline();
208203

209-
// create and add waiting plugin immediately so early events get buffered.
210-
try {
211-
this.waitingPlugin = new WaitingPlugin();
212-
// add directly to timeline via addPlugin to ensure configure() is called immediately
213-
this.addPlugin(this.waitingPlugin);
214-
// initial running state false until init completes (mirrors Kotlin semantics)
215-
this.isRunning = false;
216-
} catch (e) {
217-
// if WaitingPlugin instantiation or add fails, fallback to running=true
218-
this.isRunning = true;
219-
}
204+
// // create and add waiting plugin immediately so early events get buffered.
205+
// try {
206+
// this.waitingPlugin = new WaitingPlugin();
207+
// // add directly to timeline via addPlugin to ensure configure() is called immediately
208+
// this.addPlugin(this.waitingPlugin);
209+
// // initial running state false until init completes (mirrors Kotlin semantics)
210+
// this.isRunning = false;
211+
// } catch (e) {
212+
// // if WaitingPlugin instantiation or add fails, fallback to running=true
213+
// this.isRunning = true;
214+
// }
220215

221216
// Initialize the watchables
222217
this.context = {
@@ -313,10 +308,6 @@ export class SegmentClient {
313308
if ((await this.store.isReady.get(true)) === false) {
314309
await this.storageReady();
315310
}
316-
317-
// Pause pipeline at init start (buffer events until init completes)
318-
this.pauseEventProcessing();
319-
320311
// Get new settings from segment
321312
// It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins
322313
// which make use of the settings object to initialize
@@ -330,8 +321,6 @@ export class SegmentClient {
330321
]);
331322
await this.onReady();
332323
this.isReady.value = true;
333-
// Resume pipeline before processing pending events so WaitingPlugin flushes
334-
await this.resumeEventProcessing();
335324
// Process all pending events
336325
await this.processPendingEvents();
337326
// Trigger manual flush
@@ -487,17 +476,24 @@ export class SegmentClient {
487476
settings
488477
);
489478
}
490-
491-
if (!this.isReady.value) {
479+
console.log('!this.isReady.value', !this.isReady.value);
480+
if (!this.isReady.value && !(plugin instanceof WaitingPlugin)) {
492481
this.pluginsToAdd.push(plugin);
493482
} else {
483+
console.log('this.addPlugin');
494484
this.addPlugin(plugin);
495485
}
496486
}
497487

498488
private addPlugin(plugin: Plugin) {
499489
plugin.configure(this);
500490
this.timeline.add(plugin);
491+
//check for waiting plugin here
492+
if (plugin instanceof WaitingPlugin) {
493+
console.log('add plugin');
494+
this.pauseEventProcessingForPlugin(plugin);
495+
}
496+
501497
this.triggerOnPluginLoaded(plugin);
502498
}
503499

@@ -534,7 +530,7 @@ export class SegmentClient {
534530
): Promise<SegmentEvent | undefined> {
535531
const event = await this.applyContextData(incomingEvent);
536532
this.flushPolicyExecuter.notify(event);
537-
return this.timeline.process(event);
533+
return await this.timeline.process(event);
538534
}
539535

540536
private async trackDeepLinks() {
@@ -1049,45 +1045,58 @@ export class SegmentClient {
10491045

10501046
return totalEventsCount;
10511047
}
1052-
/*
1053-
* Running / pause/resume helpers (Kotlin parity)
1054-
*/
1048+
private resumeTimeoutId?: ReturnType<typeof setTimeout>;
1049+
private waitingPlugins = new Set<WaitingPlugin>();
10551050

1056-
public running() {
1057-
return this.isRunning;
1058-
}
10591051
/**
10601052
* Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin.
10611053
* An auto-resume will be scheduled after `timeout` ms.
10621054
*/
1063-
public pauseEventProcessing(timeout = 30000) {
1064-
if (!this.isRunning) {
1055+
pauseEventProcessingForPlugin(plugin?: WaitingPlugin) {
1056+
if (plugin) {
1057+
this.waitingPlugins.add(plugin);
1058+
}
1059+
this.pauseEventProcessing();
1060+
}
1061+
async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) {
1062+
if (plugin) {
1063+
this.waitingPlugins.delete(plugin);
1064+
}
1065+
if (this.waitingPlugins.size > 0) {
1066+
return; // still blocked
1067+
}
1068+
1069+
await this.resumeEventProcessing();
1070+
}
1071+
1072+
pauseEventProcessing(timeout = 30000) {
1073+
// IMPORTANT: ignore repeated pauses
1074+
if (!this.isReady.value) {
10651075
return;
10661076
}
10671077

1068-
this.isRunning = false;
1069-
try {
1070-
this.waitingPlugin?.pause();
1071-
} catch {
1072-
// ignore if plugin not present
1078+
this.isReady.value = false;
1079+
1080+
// clear previous timeout if any
1081+
if (this.resumeTimeoutId) {
1082+
clearTimeout(this.resumeTimeoutId);
10731083
}
10741084

1075-
// auto-resume after timeout to avoid permanent blocking
1076-
setTimeout(() => {
1077-
void this.resumeEventProcessing();
1085+
this.resumeTimeoutId = setTimeout(async () => {
1086+
await this.resumeEventProcessing();
10781087
}, timeout);
10791088
}
1080-
public async resumeEventProcessing() {
1081-
if (this.isRunning) {
1089+
async resumeEventProcessing() {
1090+
if (this.isReady.value) {
10821091
return;
10831092
}
10841093

1085-
this.isRunning = true;
1086-
1087-
try {
1088-
await this.waitingPlugin?.resume();
1089-
} catch {
1090-
// ignore plugin errors during resume
1094+
if (this.resumeTimeoutId) {
1095+
clearTimeout(this.resumeTimeoutId);
1096+
this.resumeTimeoutId = undefined;
10911097
}
1098+
// this.waitingPlugins.clear();
1099+
this.isReady.value = true;
1100+
await this.processPendingEvents();
10921101
}
10931102
}

packages/core/src/plugin.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin {
115115
key = '';
116116

117117
timeline = new Timeline();
118+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
119+
store: any;
118120

119121
private hasSettings() {
120122
return this.analytics?.settings.get()?.[this.key] !== undefined;
121123
}
122124

123-
private isEnabled(event: SegmentEvent): boolean {
125+
protected isEnabled(event: SegmentEvent): boolean {
124126
let customerDisabled = false;
125127
if (event.integrations?.[this.key] === false) {
126128
customerDisabled = true;
@@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin {
140142
if (analytics) {
141143
plugin.configure(analytics);
142144
}
145+
146+
if (analytics && plugin instanceof WaitingPlugin) {
147+
analytics.pauseEventProcessingForPlugin(plugin);
148+
}
143149
this.timeline.add(plugin);
144150
return plugin;
145151
}
@@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin {
179185
type: PluginType.before,
180186
event,
181187
});
182-
183188
if (beforeResult === undefined) {
184189
return;
185190
}
@@ -210,3 +215,27 @@ export class UtilityPlugin extends EventPlugin {}
210215

211216
// For internal platform-specific bits
212217
export class PlatformPlugin extends Plugin {}
218+
219+
export { PluginType };
220+
221+
/**
222+
* WaitingPlugin
223+
* Buffers events when paused and releases them when resumed.
224+
*/
225+
export class WaitingPlugin extends Plugin {
226+
constructor() {
227+
super();
228+
}
229+
230+
configure(analytics: SegmentClient) {
231+
super.configure(analytics);
232+
}
233+
234+
pause() {
235+
this.analytics?.pauseEventProcessingForPlugin(this);
236+
}
237+
238+
async resume() {
239+
await this.analytics?.resumeEventProcessingForPlugin(this);
240+
}
241+
}

packages/core/src/plugins/Waiting.ts

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)