Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/node/src/app/analytics-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ export class Analytics extends NodeEmitter implements CoreAnalytics {
return version
}

/**
* Instantly flush any queued events, if there are any.
* @returns a promise resolving after the batch request has been successfully sent.
*/
public flush(): Promise<void> {
return this._publisher.flush()
}

/**
* Call this method to stop collecting new events and flush all existing events.
* This method also waits for any event method-specific callbacks to be triggered,
Expand Down
19 changes: 18 additions & 1 deletion packages/node/src/plugins/customerio/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function sleep(timeoutInMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, timeoutInMs))
}

function noop() { }
function noop() {}

interface PendingItem {
resolver: (ctx: Context) => void
Expand Down Expand Up @@ -89,6 +89,23 @@ export class Publisher {
this._batch = undefined
}

/**
* Instantly flush any queued events, if there are any.
* @returns a promise resolving after the batch request has been successfully sent.
*/
async flush(): Promise<void> {
const batch = this._batch

if (!batch || batch.length === 0) {
return
}

const resultPromise = this.send(batch)
this.clearBatch()
// If the send request fails, the caller will throw when "awaiting" this returned promise
return resultPromise
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Concurrent Flush Causes Duplicate Requests

The flush() method can send the same batch multiple times when called concurrently with itself or enqueue(), resulting in duplicate HTTP requests and potential double-resolution of event promises. This occurs because clearBatch() is called after send(). Additionally, the flush() method's comment incorrectly states send() throws on failure; send() always resolves, marking individual events as failed.

Fix in Cursor Fix in Web

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the first bit as the logic is similar in flushAfterClose. But true the send call won't throw for some response errors. Not sure if we can surface the request send result to the caller, so if it fails we can try to recover somehow.

What I really want in my specific case is:

  1. Flush all queued events (e.g. some identity calls adding users)
  2. If success - Send a transactional message (not part of this library currently) to one or more of those users.

Without flushing we would try to send a transactional message to a user which does not exist, which I guess would fail (not 100% sure)


flushAfterClose(pendingItemsCount: number) {
if (!pendingItemsCount) {
// if number of pending items is 0, there will never be anything else entering the batch, since the app is closed.
Expand Down