Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/pink-states-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@thirdweb-dev/service-utils": patch
---

[service-utils] Use @confluentinc/kafka-javascript
4 changes: 1 addition & 3 deletions packages/service-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@
],
"sideEffects": false,
"dependencies": {
"@confluentinc/kafka-javascript": "^1.2.0",
"aws4fetch": "1.0.20",
"kafkajs": "2.2.4",
"lz4js": "0.2.0",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer need to import and configure lz4 for compression

"zod": "3.24.2"
},
"devDependencies": {
"@cloudflare/workers-types": "4.20250224.0",
"@types/lz4js": "0.2.1",
"@types/node": "22.13.5",
"typescript": "5.7.3",
"vitest": "3.0.7"
Expand Down
13 changes: 12 additions & 1 deletion packages/service-utils/src/core/usageV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ export const USAGE_V2_SOURCES = [
] as const;
export type UsageV2Source = (typeof USAGE_V2_SOURCES)[number];
export function getTopicName(source: UsageV2Source) {
return `usage_v2.raw_${source}`;
switch (source) {
// Some sources are sent from clients and are written to an "untrusted" table.
case "sdk":
case "engine":
return `usage_v2.untrusted_raw_${source}`;
default:
return `usage_v2.raw_${source}`;
}
}

export interface ClientUsageV2Event {
Expand Down Expand Up @@ -55,6 +62,10 @@ export interface ClientUsageV2Event {
* The product version, if available.
*/
product_version?: string;
/**
* The event version. Defaults to 1.
*/
version?: number;
/**
* An object of arbitrary key-value pairs.
* Values can be boolean, number, string, Date, or null.
Expand Down
121 changes: 45 additions & 76 deletions packages/service-utils/src/node/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { checkServerIdentity } from "node:tls";
import { CompressionTypes, Kafka, type Producer } from "kafkajs";
import { compress, decompress } from "lz4js";

// CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
import KafkaJS from "kafkajs";
const { CompressionCodecs } = KafkaJS;
import {
KafkaJS,
type ProducerGlobalConfig,
} from "@confluentinc/kafka-javascript";

/**
* Reference: https://kafka.js.org/docs/producing#producing-messages
Expand Down Expand Up @@ -33,104 +30,73 @@ export interface KafkaProducerSendOptions {
* ```
*/
export class KafkaProducer {
private kafka: Kafka;
private producer: Producer | null = null;
private compression: CompressionTypes;
private producer: KafkaJS.Producer;
private isConnected = false;

constructor(config: {
constructor(options: {
/**
* A descriptive name for your service. Example: "storage-server"
*/
producerName: string;
/**
* The environment the service is running in.
*/
environment: "development" | "production";
/**
* Whether to compress the events.
* A comma-separated list of `host[:port]` Kafka servers.
*/
shouldCompress?: boolean;

kafkaServers: string;
username: string;
password: string;

/**
* Configuration for the Kafka producer.
*/
config?: ProducerGlobalConfig;
}) {
const {
producerName,
environment,
shouldCompress = true,
username,
password,
} = config;
const { producerName, kafkaServers, username, password, config } = options;

this.kafka = new Kafka({
clientId: `${producerName}-${environment}`,
brokers:
environment === "production"
? ["warpstream.thirdweb.xyz:9092"]
: ["warpstream-dev.thirdweb.xyz:9092"],
ssl: {
checkServerIdentity(hostname, cert) {
return checkServerIdentity(hostname.toLowerCase(), cert);
},
},
sasl: {
mechanism: "plain",
username,
password,
},
this.producer = new KafkaJS.Kafka({}).producer({
"client.id": producerName,
"bootstrap.servers": kafkaServers,
"security.protocol": "sasl_ssl",
"sasl.mechanisms": "PLAIN",
"sasl.username": username,
"sasl.password": password,
"compression.codec": "lz4",
"allow.auto.create.topics": true,
// All configuration can be overridden.
...config,
});
}

if (shouldCompress) {
this.compression = CompressionTypes.LZ4;

CompressionCodecs[CompressionTypes.LZ4] = () => ({
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
compress: (encoder: { buffer: Buffer }) => {
const compressed = compress(encoder.buffer);
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
return Buffer.from(compressed);
},
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
decompress: (buffer: Buffer) => {
const decompressed = decompress(buffer);
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
return Buffer.from(decompressed);
},
});
} else {
this.compression = CompressionTypes.None;
}
/**
* Connects the producer. Can be called explicitly at the start of your service, or will be called automatically when sending messages.
*/
async connect() {
await this.producer.connect();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be concurrency-safe. Might need to wrap in a try/catch?

this.isConnected = true;
}

/**
* Send messages to a Kafka topic.
* This method may throw. To call this non-blocking:
* ```ts
* void kafka.send(topic, events).catch((e) => console.error(e))
* ```
*
* @param topic
* @param messages
* @param configOverrides
*/
async send(
topic: string,
messages: Record<string, unknown>[],
options?: KafkaProducerSendOptions,
): Promise<void> {
if (!this.producer) {
this.producer = this.kafka.producer({
allowAutoTopicCreation: options?.allowAutoTopicCreation ?? false,
maxInFlightRequests: options?.maxInFlightRequests ?? 2000,
retry: { retries: options?.retries ?? 5 },
});
await this.producer.connect();
if (!this.isConnected) {
await this.connect();
}

await this.producer.send({
topic,
messages: messages.map((m) => ({
value: JSON.stringify(m),
})),
compression: this.compression,
acks: options?.acks ?? -1, // Default: All brokers must acknowledge
timeout: options?.timeout ?? 10_000, // Default: 10 seconds
});
}

Expand All @@ -139,9 +105,12 @@ export class KafkaProducer {
* Useful when shutting down the service to flush in-flight events.
*/
async disconnect() {
if (this.producer) {
await this.producer.disconnect();
this.producer = null;
if (this.isConnected) {
try {
await this.producer.flush();
await this.producer.disconnect();
} catch {}
this.isConnected = false;
}
}
}
23 changes: 9 additions & 14 deletions packages/service-utils/src/node/usageV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type UsageV2Source,
getTopicName,
} from "../core/usageV2.js";
import { KafkaProducer, type KafkaProducerSendOptions } from "./kafka.js";
import { KafkaProducer } from "./kafka.js";

/**
* Creates a UsageV2Producer which opens a persistent TCP connection.
Expand Down Expand Up @@ -35,18 +35,13 @@ export class UsageV2Producer {
* The product where usage is coming from.
*/
source: UsageV2Source;
/**
* Whether to compress the events.
*/
shouldCompress?: boolean;

username: string;
password: string;
}) {
this.kafkaProducer = new KafkaProducer({
producerName: config.producerName,
environment: config.environment,
shouldCompress: config.shouldCompress,
username: config.username,
password: config.password,
});
Expand All @@ -56,25 +51,25 @@ export class UsageV2Producer {
/**
* Send usageV2 events.
* This method may throw. To call this non-blocking:
* ```ts
* void usageV2.sendEvents(events).catch((e) => console.error(e))
* ```
*
* @param events
*/
async sendEvents(
events: UsageV2Event[],
/**
* Reference: https://kafka.js.org/docs/producing#producing-messages
*/
options?: KafkaProducerSendOptions,
): Promise<void> {
async sendEvents(events: UsageV2Event[]): Promise<void> {
const parsedEvents = events.map((event) => ({
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not setting a fallback for version here.
version is intended to be defined by the caller optionally.

...event,
// Default to a generated UUID.
id: event.id ?? randomUUID(),
// Default to now.
created_at: event.created_at ?? new Date(),
// Remove the "team_" prefix, if any.
team_id: event.team_id.startsWith("team_")
? event.team_id.slice(5)
: event.team_id,
}));
await this.kafkaProducer.send(this.topic, parsedEvents, options);
await this.kafkaProducer.send(this.topic, parsedEvents);
}

/**
Expand Down
Loading
Loading