Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {
WaitingPlugin,
PluginType,
Plugin,

} from '@segment/analytics-react-native';

import type {SegmentAPISettings, SegmentClient, SegmentEvent, UpdateType} from '@segment/analytics-react-native';
export class ExampleWaitingPlugin extends WaitingPlugin {
type = PluginType.enrichment;
analytics = undefined;
tracked = false;

/**
* Called when settings are updated
*/
update(_settings: SegmentAPISettings, _type: UpdateType) {
if (this.type === PluginType.before) {
// delay 3 seconds, then resume event processing
setTimeout(() => {
this.resume();
}, 3000);
}
}

/**
* Called for track events
*/
track(event: SegmentEvent) {
this.tracked = true;
return event;
}
}
82 changes: 77 additions & 5 deletions packages/core/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import {
translateHTTPError,
} from './errors';
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';
import { WaitingPlugin } from './plugin';

type OnPluginAddedCallback = (plugin: Plugin) => void;

Expand Down Expand Up @@ -200,6 +201,18 @@ export class SegmentClient {
this.store = store;
this.timeline = new Timeline();

// // create and add waiting plugin immediately so early events get buffered.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: it is a bit easier to read multiline comments if you put them in a comment block like this:

/*
my comment
my comment2
*/

// try {
// this.waitingPlugin = new WaitingPlugin();
// // add directly to timeline via addPlugin to ensure configure() is called immediately
// this.addPlugin(this.waitingPlugin);
// // initial running state false until init completes (mirrors Kotlin semantics)
// this.isRunning = false;
// } catch (e) {
// // if WaitingPlugin instantiation or add fails, fallback to running=true
// this.isRunning = true;
// }

// Initialize the watchables
this.context = {
get: this.store.context.get,
Expand Down Expand Up @@ -295,7 +308,6 @@ export class SegmentClient {
if ((await this.store.isReady.get(true)) === false) {
await this.storageReady();
}

// Get new settings from segment
// It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins
// which make use of the settings object to initialize
Expand All @@ -309,7 +321,6 @@ export class SegmentClient {
]);
await this.onReady();
this.isReady.value = true;

// Process all pending events
await this.processPendingEvents();
// Trigger manual flush
Expand Down Expand Up @@ -465,17 +476,24 @@ export class SegmentClient {
settings
);
}

if (!this.isReady.value) {
console.log('!this.isReady.value', !this.isReady.value);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should clean up & remove debug console.logs

if (!this.isReady.value && !(plugin instanceof WaitingPlugin)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't prevent waiting plugins from being queued to be added before the client is done initializing

this.pluginsToAdd.push(plugin);
} else {
console.log('this.addPlugin');
this.addPlugin(plugin);
}
}

private addPlugin(plugin: Plugin) {
plugin.configure(this);
this.timeline.add(plugin);
//check for waiting plugin here
if (plugin instanceof WaitingPlugin) {
console.log('add plugin');
this.pauseEventProcessingForPlugin(plugin);
}

this.triggerOnPluginLoaded(plugin);
}

Expand Down Expand Up @@ -512,7 +530,7 @@ export class SegmentClient {
): Promise<SegmentEvent | undefined> {
const event = await this.applyContextData(incomingEvent);
this.flushPolicyExecuter.notify(event);
return this.timeline.process(event);
return await this.timeline.process(event);
}

private async trackDeepLinks() {
Expand Down Expand Up @@ -1027,4 +1045,58 @@ export class SegmentClient {

return totalEventsCount;
}
private resumeTimeoutId?: ReturnType<typeof setTimeout>;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if multiple waiting plugins are running concurrently would they both set the resumeTimeoutId and overwrite the other's?

private waitingPlugins = new Set<WaitingPlugin>();

/**
* Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin.
* An auto-resume will be scheduled after `timeout` ms.
*/
pauseEventProcessingForPlugin(plugin?: WaitingPlugin) {
if (plugin) {
this.waitingPlugins.add(plugin);
}
this.pauseEventProcessing();
}
async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) {
if (plugin) {
this.waitingPlugins.delete(plugin);
}
if (this.waitingPlugins.size > 0) {
return; // still blocked
}

await this.resumeEventProcessing();
}

pauseEventProcessing(timeout = 30000) {
// IMPORTANT: ignore repeated pauses
if (!this.isReady.value) {
return;
}

this.isReady.value = false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use our own value instead of re-using isReady, since isReady tracks initialization state and is referenced elsewhere

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example its used in the init function to ensure init isn't called twice on the client

  /**
   * Initializes the client plugins, settings and subscribers.
   * Can only be called once.
   */
  async init() {
    try {
      if (this.isReady.value) {
        this.logger.warn('SegmentClient already initialized');
        return;
      }

if we init the client, add a waiting plugin, then init the client again we would end up bypassing the safety check and running init twice

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my suggestion would be to make a new variable in the state called "running" to match the kotlin sdk, then update QueueFlushingPlugin.ts to only remove events from the queue and add them to the state only when state.running === true.

then we can pause/unpause event flushing in analytics.ts


// clear previous timeout if any
if (this.resumeTimeoutId) {
clearTimeout(this.resumeTimeoutId);
}

this.resumeTimeoutId = setTimeout(async () => {
await this.resumeEventProcessing();
}, timeout);
}
async resumeEventProcessing() {
if (this.isReady.value) {
return;
}

if (this.resumeTimeoutId) {
clearTimeout(this.resumeTimeoutId);
this.resumeTimeoutId = undefined;
}
// this.waitingPlugins.clear();
this.isReady.value = true;
await this.processPendingEvents();
}
}
33 changes: 31 additions & 2 deletions packages/core/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin {
key = '';

timeline = new Timeline();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
store: any;

private hasSettings() {
return this.analytics?.settings.get()?.[this.key] !== undefined;
}

private isEnabled(event: SegmentEvent): boolean {
protected isEnabled(event: SegmentEvent): boolean {
let customerDisabled = false;
if (event.integrations?.[this.key] === false) {
customerDisabled = true;
Expand All @@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin {
if (analytics) {
plugin.configure(analytics);
}

if (analytics && plugin instanceof WaitingPlugin) {
analytics.pauseEventProcessingForPlugin(plugin);
}
this.timeline.add(plugin);
return plugin;
}
Expand Down Expand Up @@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin {
type: PluginType.before,
event,
});

if (beforeResult === undefined) {
return;
}
Expand Down Expand Up @@ -210,3 +215,27 @@ export class UtilityPlugin extends EventPlugin {}

// For internal platform-specific bits
export class PlatformPlugin extends Plugin {}

export { PluginType };

/**
* WaitingPlugin
* Buffers events when paused and releases them when resumed.
*/
export class WaitingPlugin extends Plugin {
constructor() {
super();
}

configure(analytics: SegmentClient) {
super.configure(analytics);
}

pause() {
this.analytics?.pauseEventProcessingForPlugin(this);
}

async resume() {
await this.analytics?.resumeEventProcessingForPlugin(this);
}
}
Loading
Loading