-
Notifications
You must be signed in to change notification settings - Fork 623
[service-utils] switch to @confluentinc/kafka-javascript #6345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@thirdweb-dev/service-utils": patch | ||
| --- | ||
|
|
||
| [service-utils] Use @confluentinc/kafka-javascript |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,12 @@ | ||
| import { checkServerIdentity } from "node:tls"; | ||
| import { CompressionTypes, Kafka, type Producer } from "kafkajs"; | ||
| import { compress, decompress } from "lz4js"; | ||
| import { | ||
| KafkaJS, | ||
| type ProducerGlobalConfig, | ||
| } from "@confluentinc/kafka-javascript"; | ||
|
|
||
| // CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391 | ||
| import KafkaJS from "kafkajs"; | ||
| const { CompressionCodecs } = KafkaJS; | ||
| const KAFKA_URL: Record<"development" | "production", string> = { | ||
| development: "warpstream-dev.thirdweb.xyz:9092", | ||
| production: "warpstream.thirdweb.xyz:9092", | ||
| } as const; | ||
|
|
||
| /** | ||
| * Reference: https://kafka.js.org/docs/producing#producing-messages | ||
|
|
@@ -33,11 +35,10 @@ export interface KafkaProducerSendOptions { | |
| * ``` | ||
| */ | ||
| export class KafkaProducer { | ||
| private kafka: Kafka; | ||
| private producer: Producer | null = null; | ||
| private compression: CompressionTypes; | ||
| private producer: KafkaJS.Producer; | ||
| private isConnected = false; | ||
|
|
||
| constructor(config: { | ||
| constructor(options: { | ||
| /** | ||
| * A descriptive name for your service. Example: "storage-server" | ||
| */ | ||
|
|
@@ -46,91 +47,61 @@ export class KafkaProducer { | |
| * The environment the service is running in. | ||
| */ | ||
| environment: "development" | "production"; | ||
| /** | ||
| * Whether to compress the events. | ||
| */ | ||
| shouldCompress?: boolean; | ||
|
|
||
| username: string; | ||
| password: string; | ||
|
|
||
| /** | ||
| * Configuration for the Kafka producer. | ||
| */ | ||
| config?: ProducerGlobalConfig; | ||
| }) { | ||
| const { | ||
| producerName, | ||
| environment, | ||
| shouldCompress = true, | ||
| username, | ||
| password, | ||
| } = config; | ||
| const { producerName, environment, username, password, config } = options; | ||
|
|
||
| this.kafka = new Kafka({ | ||
| clientId: `${producerName}-${environment}`, | ||
| brokers: | ||
| environment === "production" | ||
| ? ["warpstream.thirdweb.xyz:9092"] | ||
| : ["warpstream-dev.thirdweb.xyz:9092"], | ||
| ssl: { | ||
| checkServerIdentity(hostname, cert) { | ||
| return checkServerIdentity(hostname.toLowerCase(), cert); | ||
| }, | ||
| }, | ||
| sasl: { | ||
| mechanism: "plain", | ||
| username, | ||
| password, | ||
| }, | ||
| this.producer = new KafkaJS.Kafka({}).producer({ | ||
| "client.id": `${producerName}-${environment}`, | ||
| "bootstrap.servers": KAFKA_URL[environment], | ||
| "security.protocol": "sasl_ssl", | ||
| "sasl.mechanisms": "PLAIN", | ||
| "sasl.username": username, | ||
| "sasl.password": password, | ||
| "compression.codec": "lz4", | ||
| "allow.auto.create.topics": true, | ||
| // All configuration can be overridden. | ||
| ...config, | ||
| }); | ||
| } | ||
|
|
||
| if (shouldCompress) { | ||
| this.compression = CompressionTypes.LZ4; | ||
|
|
||
| CompressionCodecs[CompressionTypes.LZ4] = () => ({ | ||
| // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer | ||
| compress: (encoder: { buffer: Buffer }) => { | ||
| const compressed = compress(encoder.buffer); | ||
| // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer | ||
| return Buffer.from(compressed); | ||
| }, | ||
| // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer | ||
| decompress: (buffer: Buffer) => { | ||
| const decompressed = decompress(buffer); | ||
| // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer | ||
| return Buffer.from(decompressed); | ||
| }, | ||
| }); | ||
| } else { | ||
| this.compression = CompressionTypes.None; | ||
| } | ||
| /** | ||
| * Connects the producer. Can be called explicitly at the start of your service, or will be called automatically when sending messages. | ||
| */ | ||
| async connect() { | ||
| await this.producer.connect(); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may not be concurrency-safe. Might need to wrap in a try/catch? |
||
| this.isConnected = true; | ||
| } | ||
|
|
||
| /** | ||
| * Send messages to a Kafka topic. | ||
| * This method may throw. To call this non-blocking: | ||
| * ```ts | ||
| * void kafka.send(topic, events).catch((e) => console.error(e)) | ||
| * ``` | ||
| * | ||
| * @param topic | ||
| * @param messages | ||
| * @param configOverrides | ||
| */ | ||
| async send( | ||
| topic: string, | ||
| messages: Record<string, unknown>[], | ||
| options?: KafkaProducerSendOptions, | ||
| ): Promise<void> { | ||
| if (!this.producer) { | ||
| this.producer = this.kafka.producer({ | ||
| allowAutoTopicCreation: options?.allowAutoTopicCreation ?? false, | ||
| maxInFlightRequests: options?.maxInFlightRequests ?? 2000, | ||
| retry: { retries: options?.retries ?? 5 }, | ||
| }); | ||
| await this.producer.connect(); | ||
| if (!this.isConnected) { | ||
| await this.connect(); | ||
| } | ||
|
|
||
| await this.producer.send({ | ||
| topic, | ||
| messages: messages.map((m) => ({ | ||
| value: JSON.stringify(m), | ||
| })), | ||
| compression: this.compression, | ||
| acks: options?.acks ?? -1, // Default: All brokers must acknowledge | ||
| timeout: options?.timeout ?? 10_000, // Default: 10 seconds | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -139,9 +110,12 @@ export class KafkaProducer { | |
| * Useful when shutting down the service to flush in-flight events. | ||
| */ | ||
| async disconnect() { | ||
| if (this.producer) { | ||
| await this.producer.disconnect(); | ||
| this.producer = null; | ||
| if (this.isConnected) { | ||
| try { | ||
| await this.producer.flush(); | ||
| await this.producer.disconnect(); | ||
| } catch {} | ||
| this.isConnected = false; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ import { | |
| type UsageV2Source, | ||
| getTopicName, | ||
| } from "../core/usageV2.js"; | ||
| import { KafkaProducer, type KafkaProducerSendOptions } from "./kafka.js"; | ||
| import { KafkaProducer } from "./kafka.js"; | ||
|
|
||
| /** | ||
| * Creates a UsageV2Producer which opens a persistent TCP connection. | ||
|
|
@@ -35,18 +35,13 @@ export class UsageV2Producer { | |
| * The product where usage is coming from. | ||
| */ | ||
| source: UsageV2Source; | ||
| /** | ||
| * Whether to compress the events. | ||
| */ | ||
| shouldCompress?: boolean; | ||
|
|
||
| username: string; | ||
| password: string; | ||
| }) { | ||
| this.kafkaProducer = new KafkaProducer({ | ||
| producerName: config.producerName, | ||
| environment: config.environment, | ||
| shouldCompress: config.shouldCompress, | ||
| username: config.username, | ||
| password: config.password, | ||
| }); | ||
|
|
@@ -56,25 +51,25 @@ export class UsageV2Producer { | |
| /** | ||
| * Send usageV2 events. | ||
| * This method may throw. To call this non-blocking: | ||
| * ```ts | ||
| * void usageV2.sendEvents(events).catch((e) => console.error(e)) | ||
| * ``` | ||
| * | ||
| * @param events | ||
| */ | ||
| async sendEvents( | ||
| events: UsageV2Event[], | ||
| /** | ||
| * Reference: https://kafka.js.org/docs/producing#producing-messages | ||
| */ | ||
| options?: KafkaProducerSendOptions, | ||
| ): Promise<void> { | ||
| async sendEvents(events: UsageV2Event[]): Promise<void> { | ||
| const parsedEvents = events.map((event) => ({ | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not setting a fallback for |
||
| ...event, | ||
| // Default to a generated UUID. | ||
| id: event.id ?? randomUUID(), | ||
| // Default to now. | ||
| created_at: event.created_at ?? new Date(), | ||
| // Remove the "team_" prefix, if any. | ||
| team_id: event.team_id.startsWith("team_") | ||
| ? event.team_id.slice(5) | ||
| : event.team_id, | ||
| })); | ||
| await this.kafkaProducer.send(this.topic, parsedEvents, options); | ||
| await this.kafkaProducer.send(this.topic, parsedEvents); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer need to import and configure lz4 for compression