1- import { checkServerIdentity } from "node:tls" ;
2- import { CompressionTypes , Kafka , type Producer } from "kafkajs" ;
3- import { compress , decompress } from "lz4js" ;
1+ import {
2+ KafkaJS ,
3+ type ProducerGlobalConfig ,
4+ } from "@confluentinc/kafka-javascript" ;
45
5- // CompressionCodecs is not exported properly in kafkajs. Source: https://github.com/tulios/kafkajs/issues/1391
6- import KafkaJS from "kafkajs" ;
7- const { CompressionCodecs } = KafkaJS ;
6+ const KAFKA_URL : Record < "development" | "production" , string > = {
7+ development : "warpstream-dev.thirdweb.xyz:9092" ,
8+ production : "warpstream.thirdweb.xyz:9092" ,
9+ } as const ;
810
911/**
1012 * Reference: https://kafka.js.org/docs/producing#producing-messages
@@ -33,11 +35,10 @@ export interface KafkaProducerSendOptions {
3335 * ```
3436 */
3537export class KafkaProducer {
36- private kafka : Kafka ;
37- private producer : Producer | null = null ;
38- private compression : CompressionTypes ;
38+ private producer : KafkaJS . Producer ;
39+ private isConnected = false ;
3940
40- constructor ( config : {
41+ constructor ( options : {
4142 /**
4243 * A descriptive name for your service. Example: "storage-server"
4344 */
@@ -46,91 +47,61 @@ export class KafkaProducer {
4647 * The environment the service is running in.
4748 */
4849 environment : "development" | "production" ;
49- /**
50- * Whether to compress the events.
51- */
52- shouldCompress ?: boolean ;
53-
5450 username : string ;
5551 password : string ;
52+
53+ /**
54+ * Configuration for the Kafka producer.
55+ */
56+ config ?: ProducerGlobalConfig ;
5657 } ) {
57- const {
58- producerName,
59- environment,
60- shouldCompress = true ,
61- username,
62- password,
63- } = config ;
58+ const { producerName, environment, username, password, config } = options ;
6459
65- this . kafka = new Kafka ( {
66- clientId : `${ producerName } -${ environment } ` ,
67- brokers :
68- environment === "production"
69- ? [ "warpstream.thirdweb.xyz:9092" ]
70- : [ "warpstream-dev.thirdweb.xyz:9092" ] ,
71- ssl : {
72- checkServerIdentity ( hostname , cert ) {
73- return checkServerIdentity ( hostname . toLowerCase ( ) , cert ) ;
74- } ,
75- } ,
76- sasl : {
77- mechanism : "plain" ,
78- username,
79- password,
80- } ,
60+ this . producer = new KafkaJS . Kafka ( { } ) . producer ( {
61+ "client.id" : `${ producerName } -${ environment } ` ,
62+ "bootstrap.servers" : KAFKA_URL [ environment ] ,
63+ "security.protocol" : "sasl_ssl" ,
64+ "sasl.mechanisms" : "PLAIN" ,
65+ "sasl.username" : username ,
66+ "sasl.password" : password ,
67+ "compression.codec" : "lz4" ,
68+ "allow.auto.create.topics" : true ,
69+ // All configuration can be overridden.
70+ ...config ,
8171 } ) ;
72+ }
8273
83- if ( shouldCompress ) {
84- this . compression = CompressionTypes . LZ4 ;
85-
86- CompressionCodecs [ CompressionTypes . LZ4 ] = ( ) => ( {
87- // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
88- compress : ( encoder : { buffer : Buffer } ) => {
89- const compressed = compress ( encoder . buffer ) ;
90- // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
91- return Buffer . from ( compressed ) ;
92- } ,
93- // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
94- decompress : ( buffer : Buffer ) => {
95- const decompressed = decompress ( buffer ) ;
96- // biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
97- return Buffer . from ( decompressed ) ;
98- } ,
99- } ) ;
100- } else {
101- this . compression = CompressionTypes . None ;
102- }
74+ /**
75+ * Connects the producer. Can be called explicitly at the start of your service, or will be called automatically when sending messages.
76+ */
77+ async connect ( ) {
78+ await this . producer . connect ( ) ;
79+ this . isConnected = true ;
10380 }
10481
10582 /**
10683 * Send messages to a Kafka topic.
10784 * This method may throw. To call this non-blocking:
85+ * ```ts
86+ * void kafka.send(topic, events).catch((e) => console.error(e))
87+ * ```
88+ *
10889 * @param topic
10990 * @param messages
110- * @param configOverrides
11191 */
11292 async send (
11393 topic : string ,
11494 messages : Record < string , unknown > [ ] ,
115- options ?: KafkaProducerSendOptions ,
11695 ) : Promise < void > {
117- if ( ! this . producer ) {
118- this . producer = this . kafka . producer ( {
119- allowAutoTopicCreation : options ?. allowAutoTopicCreation ?? false ,
120- maxInFlightRequests : options ?. maxInFlightRequests ?? 2000 ,
121- retry : { retries : options ?. retries ?? 5 } ,
122- } ) ;
123- await this . producer . connect ( ) ;
96+ if ( ! this . isConnected ) {
97+ await this . connect ( ) ;
12498 }
12599
126100 await this . producer . send ( {
127101 topic,
128102 messages : messages . map ( ( m ) => ( {
129103 value : JSON . stringify ( m ) ,
130104 } ) ) ,
131- compression : this . compression ,
132- acks : options ?. acks ?? - 1 , // Default: All brokers must acknowledge
133- timeout : options ?. timeout ?? 10_000 , // Default: 10 seconds
134105 } ) ;
135106 }
136107
@@ -139,9 +110,12 @@ export class KafkaProducer {
139110 * Useful when shutting down the service to flush in-flight events.
140111 */
141112 async disconnect ( ) {
142- if ( this . producer ) {
143- await this . producer . disconnect ( ) ;
144- this . producer = null ;
113+ if ( this . isConnected ) {
114+ try {
115+ await this . producer . flush ( ) ;
116+ await this . producer . disconnect ( ) ;
117+ } catch { }
118+ this . isConnected = false ;
145119 }
146120 }
147121}
0 commit comments