Skip to content

Conversation

@alisd23
Copy link

@alisd23 alisd23 commented Sep 1, 2025

Not sure if you guys accept public pull requests, but no one is able to file issues so I thought a pull request might help speed the process along. I'm thinking about migrating to customer.io so I'm doing some initial research on the library support.

Issue

The flush() method does not exist as documented in the customer.io docs. Not sure if this is just on the backlog still or was missed, but it is currently documented so I would call this a bug in the documentation or the library.

Solution

I did this fairly quickly so it may be naive, as there is some fairly complex queue/emitter stuff going on I don't 100% understand. Let me know what you think, and if this public PRs are helpful or just annoying.

Other thoughts

  • It would be great if we could file issues against the code directly instead of routing it through support, which in my opinion just adds an unnecessary, non-technical middleman, but maybe it's important to your internal process.
  • The current supported Node version is v14.21.3 (in the mise.toml file, and I think some of the tests are failing locally on my local version - 22). You probably already know but v14 is marked as end of life (since 2023). What are the current supported versions - will this work for example with v21/22?
  • I haven't added tests yet as I am not familiar with the codebase

this.clearBatch()
// If the send request fails, the caller will throw when "awaiting" this returned promise
return resultPromise
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Concurrent Flush Causes Duplicate Requests

The flush() method can send the same batch multiple times when called concurrently with itself or enqueue(), resulting in duplicate HTTP requests and potential double-resolution of event promises. This occurs because clearBatch() is called after send(). Additionally, the flush() method's comment incorrectly states send() throws on failure; send() always resolves, marking individual events as failed.

Fix in Cursor Fix in Web

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the first bit as the logic is similar in flushAfterClose. But true the send call won't throw for some response errors. Not sure if we can surface the request send result to the caller, so if it fails we can try to recover somehow.

What I really want in my specific case is:

  1. Flush all queued events (e.g. some identity calls adding users)
  2. If success - Send a transactional message (not part of this library currently) to one or more of those users.

Without flushing we would try to send a transactional message to a user which does not exist, which I guess would fail (not 100% sure)

@alandotcom
Copy link

@alisd23 I was looking for the same thing. I ended up just using the segment analytics node sdk, and changing the host and adding a custom http client (since we need to add auth in the header)

  import {
    Analytics,
    FetchHTTPClient,
    type HTTPClient,
    type HTTPClientRequest,
    type HTTPResponse,
  } from "@segment/analytics-node";

  class CustomerIoHttpClient implements HTTPClient {
    private readonly authHeader: string;
    private readonly baseClient: HTTPClient;

    constructor(writeKey: string) {
      this.authHeader = Buffer.from(`${writeKey}:`).toString("base64");
      this.baseClient = new FetchHTTPClient();
    }

    makeRequest(options: HTTPClientRequest): Promise<HTTPResponse> {
      const headers = {
        ...options.headers,
        Authorization: `Basic ${this.authHeader}`,
      };

      return this.baseClient.makeRequest({ ...options, headers });
    }
  }

  export function createCustomerIoAnalyticsClient(writeKey: string) {
    return new Analytics({
      writeKey,
      host: "https://cdp.customer.io",
      httpClient: new CustomerIoHttpClient(writeKey),
      flushAt: 1,
    });
  }

  // Example
  const analytics = createCustomerIoAnalyticsClient(
    process.env.CUSTOMERIO_CDP_WRITE_KEY ?? ""
  );

  analytics.track({
    userId: "user-123",
    event: "Test Event",
  });

  await analytics.flush();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants