@@ -2,7 +2,7 @@ import { CompressionTypes, Kafka, type Producer } from 'kafkajs';
22import { Semaphore } from 'async-mutex' ;
33import { clickHouse , TABLE_NAMES } from '@databuddy/db' ;
44
5- const BROKER = process . env . KAFKA_BROKERS as string ;
5+ const BROKER = process . env . REDPANDA_BROKER as string ;
66const SEMAPHORE_LIMIT = 15 ;
77const BUFFER_INTERVAL = 5000 ;
88const BUFFER_MAX = 1000 ;
@@ -59,33 +59,43 @@ let producer: Producer | null = null;
5959let connected = false ;
6060let failed = false ;
6161let lastRetry = 0 ;
62-
63- if ( BROKER ) {
64- kafka = new Kafka ( {
65- clientId : 'basket' ,
66- brokers : [ BROKER ] ,
67- connectionTimeout : 5000 ,
68- requestTimeout : KAFKA_TIMEOUT ,
69- sasl : {
70- mechanism : 'scram-sha-256' ,
71- username : process . env . KAFKA_USER as string ,
72- password : process . env . KAFKA_PASSWORD as string ,
73- } ,
74- } ) ;
75- producer = kafka . producer ( {
76- allowAutoTopicCreation : true ,
77- retry : {
78- initialRetryTime : 300 ,
79- retries : 3 ,
80- maxRetryTime : 3000 ,
81- } ,
82- idempotent : true ,
83- maxInFlightRequests : 5 ,
84- } ) ;
62+ const kafkaEnabled = Boolean ( BROKER ) ;
63+
64+ if ( kafkaEnabled ) {
65+ const username = process . env . REDPANDA_USER ;
66+ const password = process . env . REDPANDA_PASSWORD ;
67+
68+ if ( ! username || ! password ) {
69+ console . warn ( 'REDPANDA_BROKER is set but REDPANDA_USER or REDPANDA_PASSWORD is missing. Kafka disabled, using ClickHouse fallback only.' ) ;
70+ } else {
71+ kafka = new Kafka ( {
72+ clientId : 'basket' ,
73+ brokers : [ BROKER ] ,
74+ connectionTimeout : 5000 ,
75+ requestTimeout : KAFKA_TIMEOUT ,
76+ sasl : {
77+ mechanism : 'scram-sha-256' ,
78+ username,
79+ password,
80+ } ,
81+ } ) ;
82+ producer = kafka . producer ( {
83+ allowAutoTopicCreation : true ,
84+ retry : {
85+ initialRetryTime : 300 ,
86+ retries : 3 ,
87+ maxRetryTime : 3000 ,
88+ } ,
89+ idempotent : true ,
90+ maxInFlightRequests : 5 ,
91+ } ) ;
92+ }
93+ } else {
94+ console . log ( 'REDPANDA_BROKER not set, using ClickHouse fallback only.' ) ;
8595}
8696
8797async function connect ( ) {
88- if ( ! BROKER || ! producer || connected ) return connected ;
98+ if ( ! kafkaEnabled || ! producer || connected ) return connected ;
8999 if ( failed && Date . now ( ) - lastRetry < RECONNECT_COOLDOWN ) return false ;
90100
91101 try {
@@ -229,7 +239,7 @@ async function send(topic: string, event: any, key?: string) {
229239 const [ , release ] = await semaphore . acquire ( ) ;
230240
231241 try {
232- if ( ( await connect ( ) ) && producer && connected ) {
242+ if ( kafkaEnabled && ( await connect ( ) ) && producer && connected ) {
233243 try {
234244 await producer . send ( {
235245 topic,
@@ -279,7 +289,7 @@ export const sendEventBatch = async (topic: string, events: any[]) => {
279289 const [ , release ] = await semaphore . acquire ( ) ;
280290
281291 try {
282- if ( ( await connect ( ) ) && producer && connected ) {
292+ if ( kafkaEnabled && ( await connect ( ) ) && producer && connected ) {
283293 try {
284294 await producer . send ( {
285295 topic,
@@ -357,7 +367,7 @@ export const disconnectProducer = async () => {
357367 } ) ;
358368} ;
359369
360- export const getProducerStats = ( ) => ( { ...stats , bufferSize : buffer . length , connected, failed } ) ;
370+ export const getProducerStats = ( ) => ( { ...stats , bufferSize : buffer . length , connected, failed, kafkaEnabled } ) ;
361371
362372if ( process . env . NODE_ENV === 'development' ) {
363373 setInterval ( ( ) => {
0 commit comments