@@ -30,7 +30,7 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
3030 const kafkaConfig = {
3131 clientId : process . env . KAFKA_CLIENTID || "os2iot-client" ,
3232 brokers : [
33- `${ process . env . KAFKA_HOSTNAME || "localhost" } :${
33+ `${ process . env . KAFKA_HOSTNAME || "localhost" } :${
3434 process . env . KAFKA_PORT || "9093"
3535 } `,
3636 ] ,
@@ -66,8 +66,8 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
6666 await this . consumer . on ( this . consumer . events . STOP , ( ) => {
6767 this . logger . debug ( "STOP ... " ) ;
6868 } ) ;
69- await this . consumer . on ( this . consumer . events . CRASH , ( { error } ) => {
70- this . logger . debug ( "CRASH ... " + error ) ;
69+ await this . consumer . on ( this . consumer . events . CRASH , ( { payload } ) => {
70+ this . logger . debug ( "CRASH ... " + payload ) ;
7171 } ) ;
7272 await this . consumer . on ( this . consumer . events . DISCONNECT , ( ) => {
7373 this . logger . debug ( "DISCONNECT ... " ) ;
@@ -96,16 +96,16 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
9696 autoCommit : true ,
9797 autoCommitThreshold : 1 ,
9898 eachMessage : async ( {
99- topic,
100- message,
101- } : {
99+ topic,
100+ message,
101+ } : {
102102 topic : string ;
103103 message : KafkaMessage ;
104104 } ) => {
105105 try {
106106 const arr = SUBSCRIBER_COMBINED_REF_MAP . get ( topic ) ;
107107 this . logger . debug (
108- `Got kafka message, have ${ arr . length } receivers ...`
108+ `Got kafka message, have ${ arr . length } receivers ...`
109109 ) ;
110110 arr . forEach ( async tuple => {
111111 const object = tuple [ 0 ] ;
@@ -118,8 +118,8 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
118118 await fn . apply ( object , [ msg ] ) ;
119119 } ) ;
120120 } catch ( err ) {
121- this . logger . error ( `Error occurred in eachMessage: ${ err } ` ) ;
122- this . logger . error ( `${ JSON . stringify ( err ) } ` ) ;
121+ this . logger . error ( `Error occurred in eachMessage: ${ err } ` ) ;
122+ this . logger . error ( `${ JSON . stringify ( err ) } ` ) ;
123123 }
124124 } ,
125125 } )
@@ -132,18 +132,18 @@ export class KafkaService implements OnModuleInit, OnModuleDestroy {
132132 kafkaTopic : string ,
133133 kafkaMessage : KafkaPayload
134134 ) : Promise < void | RecordMetadata [ ] > {
135- this . logger . debug ( `Connecting producer ...` ) ;
136- await this . producer . connect ( ) ;
137- this . logger . debug ( `Connected ...` ) ;
135+ const message = {
136+ topic : kafkaTopic ,
137+ messages : [ { value : JSON . stringify ( kafkaMessage ) } ] ,
138+ } ;
138139 const metadata = await this . producer
139- . send ( {
140- topic : kafkaTopic ,
141- messages : [ { value : JSON . stringify ( kafkaMessage ) } ] ,
142- } )
143- . catch ( e => this . logger . error ( e . message , e ) ) ;
144- this . logger . debug ( `Sent from producer` ) ;
145- await this . producer . disconnect ( ) ;
146- this . logger . debug ( `Disconnected ...` ) ;
140+ . send ( message )
141+ . catch ( async e => {
142+ await this . producer . connect ( )
143+ this . logger . warn ( "Kafka failed sending message, retrying" , e )
144+ await this . producer . send ( message ) . catch ( e => this . logger . error ( e . message , e )
145+ )
146+ } ) ;
147147 return metadata ;
148148 }
149149}
0 commit comments