6
6
topicPartitionOffsetToRdKafka,
7
7
topicPartitionOffsetMetadataToRdKafka,
8
8
topicPartitionOffsetMetadataToKafkaJS,
9
+ createBindingMessageMetadata,
9
10
createKafkaJsErrorFromLibRdKafkaError,
10
11
notImplemented,
11
12
loggerTrampoline,
@@ -171,6 +172,20 @@ class Consumer {
171
172
*/
172
173
#topicPartitionToBatchPayload = new Map ( ) ;
173
174
175
+ /**
176
+ * The client name used by the consumer for logging - determined by librdkafka
177
+ * using a combination of clientId and an integer.
178
+ * @type {string|undefined }
179
+ */
180
+ #clientName = undefined ;
181
+
182
+ /**
183
+ * Convenience function to create the metadata object needed for logging.
184
+ */
185
+ #createConsumerBindingMessageMetadata( ) {
186
+ return createBindingMessageMetadata ( this . #clientName) ;
187
+ }
188
+
174
189
/**
175
190
* @constructor
176
191
* @param {import("../../types/kafkajs").ConsumerConfig } kJSConfig
@@ -222,8 +237,8 @@ class Consumer {
222
237
}
223
238
224
239
/* TODO: we should cry more about this and render the consumer unusable. */
225
- await Promise . all ( seekPromises ) . catch ( err => this . #logger . error ( "Seek error. This is effectively a fatal error:" + err ) ) ;
226
-
240
+ await Promise . all ( seekPromises )
241
+ . catch ( err => this . #logger . error ( `Seek error. This is effectively a fatal error: ${ err } ` ) , this . #createConsumerBindingMessageMetadata ( ) ) ;
227
242
228
243
/* Clear the cache and stored offsets.
229
244
* We need to do this only if topicPartitions = null (global cache expiry).
@@ -242,6 +257,9 @@ class Consumer {
242
257
#rebalanceCallback( err , assignment ) {
243
258
err = LibrdKafkaError . create ( err ) ;
244
259
const userSpecifiedRebalanceCb = this . #userConfig[ 'rebalance_cb' ] ;
260
+ this . #logger. info (
261
+ `Received rebalance event with message: '${ err . message } ' and ${ assignment . length } partition(s)` ,
262
+ this . #createConsumerBindingMessageMetadata( ) ) ;
245
263
246
264
let assignmentFnCalled = false ;
247
265
function assignmentFn ( userAssignment ) {
@@ -559,6 +577,9 @@ class Consumer {
559
577
/* Slight optimization for cases where the size of messages in our subscription is less than the cache size. */
560
578
this . #internalClient. setDefaultIsTimeoutOnlyForFirstMessage ( true ) ;
561
579
580
+ this . #clientName = this . #internalClient. name ;
581
+ this . #logger. info ( 'Consumer connected' , this . #createConsumerBindingMessageMetadata( ) ) ;
582
+
562
583
// Resolve the promise.
563
584
this . #connectPromiseFunc[ 'resolve' ] ( ) ;
564
585
}
@@ -571,7 +592,7 @@ class Consumer {
571
592
if ( this . #state === ConsumerState . CONNECTING ) {
572
593
this . #connectPromiseFunc[ 'reject' ] ( err ) ;
573
594
} else {
574
- this . #logger. error ( err ) ;
595
+ this . #logger. error ( err , this . #createConsumerBindingMessageMetadata ( ) ) ;
575
596
}
576
597
}
577
598
@@ -660,8 +681,7 @@ class Consumer {
660
681
this . #lastConsumedOffsets. set ( key , offset + 1 ) ;
661
682
} catch ( e ) {
662
683
/* Not much we can do, except log the error. */
663
- if ( this . #logger)
664
- this . #logger. error ( `Consumer encountered error while storing offset. Error details: ${ e } :${ e . stack } ` ) ;
684
+ this . #logger. error ( `Consumer encountered error while storing offset. Error details: ${ e } :${ e . stack } ` , this . #createConsumerBindingMessageMetadata( ) ) ;
665
685
}
666
686
}
667
687
@@ -782,6 +802,7 @@ class Consumer {
782
802
}
783
803
784
804
this . #fetchInProgress = true ;
805
+ this . #logger. debug ( `Attempting to fetch ${ this . #messageCache. maxSize } messages to the message cache` , this . #createConsumerBindingMessageMetadata( ) ) ;
785
806
return new Promise ( ( resolve , reject ) => {
786
807
this . #internalClient. consume ( this . #messageCache. maxSize , ( err , messages ) => {
787
808
this . #fetchInProgress = false ;
@@ -828,6 +849,7 @@ class Consumer {
828
849
}
829
850
830
851
this . #fetchInProgress = true ;
852
+ this . #logger. debug ( `Attempting to fetch ${ this . #messageCache. maxSize } messages to the message cache` , this . #createConsumerBindingMessageMetadata( ) ) ;
831
853
return new Promise ( ( resolve , reject ) => {
832
854
this . #internalClient. consume ( this . #messageCache. maxSize , ( err , messages ) => {
833
855
this . #fetchInProgress = false ;
@@ -968,6 +990,7 @@ class Consumer {
968
990
} ) ;
969
991
970
992
this . #storedSubscriptions = subscription . replace ? topics : this . #storedSubscriptions. concat ( topics ) ;
993
+ this . #logger. debug ( `${ subscription . replace ? 'Replacing' : 'Adding' } topics [${ topics . join ( ', ' ) } ] to subscription` , this . #createConsumerBindingMessageMetadata( ) ) ;
971
994
this . #internalClient. subscribe ( this . #storedSubscriptions) ;
972
995
}
973
996
@@ -1043,12 +1066,12 @@ class Consumer {
1043
1066
* However, we don't do this inside the catch, but just outside it. This is because throwing an
1044
1067
* error is not the only case where we might want to seek back.
1045
1068
*
1046
- * So - do nothing but a debug log, but at this point eachMessageProcessed is false.
1069
+ * So - do nothing but a log, but at this point eachMessageProcessed is false.
1070
+ * TODO: log error only if error type is not KafkaJSError and if no pause() has been called, else log debug.
1047
1071
*/
1048
- this . #logger. debug ( `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ) ;
1049
-
1050
- /* TODO: log error if error type is not KafkaJSError and if no pause() has been called */
1051
- this . #logger. error ( `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ) ;
1072
+ this . #logger. error (
1073
+ `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ,
1074
+ this . #createConsumerBindingMessageMetadata( ) ) ;
1052
1075
}
1053
1076
1054
1077
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
@@ -1067,8 +1090,7 @@ class Consumer {
1067
1090
this . #lastConsumedOffsets. set ( partitionKey ( m ) , Number ( m . offset ) + 1 ) ;
1068
1091
} catch ( e ) {
1069
1092
/* Not much we can do, except log the error. */
1070
- if ( this . #logger)
1071
- this . #logger. error ( `Consumer encountered error while storing offset. Error details: ${ JSON . stringify ( e ) } ` ) ;
1093
+ this . #logger. error ( `Consumer encountered error while storing offset. Error details: ${ JSON . stringify ( e ) } ` , this . #createConsumerBindingMessageMetadata( ) ) ;
1072
1094
}
1073
1095
}
1074
1096
@@ -1123,13 +1145,13 @@ class Consumer {
1123
1145
* if the user has not called `resolveOffset` manually in case of using eachBatch without
1124
1146
* eachBatchAutoResolve being set.
1125
1147
*
1126
- * So - do nothing but a debug log, but at this point eachMessageProcessed needs to be false unless
1148
+ * So - do nothing but a log, but at this point eachMessageProcessed needs to be false unless
1127
1149
* the user has explicitly marked it as true.
1150
+ * TODO: log error only if error type is not KafkaJSError and if no pause() has been called, else log debug.
1128
1151
*/
1129
- this . #logger. debug ( `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ) ;
1130
-
1131
- /* TODO: log error if error type is not KafkaJSError and if no pause() has been called */
1132
- this . #logger. error ( `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ) ;
1152
+ this . #logger. error (
1153
+ `Consumer encountered error while processing message. Error details: ${ e } : ${ e . stack } . The same message may be reprocessed.` ,
1154
+ this . #createConsumerBindingMessageMetadata( ) ) ;
1133
1155
1134
1156
/* The value of eachBatchAutoResolve is not important. The only place where a message is marked processed
1135
1157
* despite an error is if the user says so, and the user can use resolveOffset for both the possible
@@ -1177,6 +1199,7 @@ class Consumer {
1177
1199
/* Invalidate the message cache if needed */
1178
1200
const locallyStale = this . #messageCache. popLocallyStale ( ) ;
1179
1201
if ( this . #messageCache. isStale ( ) ) {
1202
+ this . #logger. debug ( "Scheduling worker termination" , this . #createConsumerBindingMessageMetadata( ) ) ;
1180
1203
this . #workerTerminationScheduled = true ;
1181
1204
break ;
1182
1205
} else if ( locallyStale . length !== 0 ) {
@@ -1191,7 +1214,7 @@ class Consumer {
1191
1214
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
1192
1215
* This is due to restartOnFailure being set to always true. */
1193
1216
if ( this . #logger)
1194
- this . #logger. error ( `Consumer encountered error while consuming. Retrying. Error details: ${ e } : ${ e . stack } ` ) ;
1217
+ this . #logger. error ( `Consumer encountered error while consuming. Retrying. Error details: ${ e } : ${ e . stack } ` , this . #createConsumerBindingMessageMetadata ( ) ) ;
1195
1218
} ) ;
1196
1219
1197
1220
nextIdx = - 1 ;
@@ -1229,14 +1252,15 @@ class Consumer {
1229
1252
while ( ! this . #disconnectStarted) {
1230
1253
this . #workerTerminationScheduled = false ;
1231
1254
const workersToSpawn = Math . max ( 1 , Math . min ( this . #concurrency, this . #partitionCount) ) ;
1255
+ this . #logger. debug ( `Spawning ${ workersToSpawn } workers` , this . #createConsumerBindingMessageMetadata( ) ) ;
1232
1256
this . #workers =
1233
1257
Array ( workersToSpawn )
1234
1258
. fill ( )
1235
1259
. map ( ( _ , i ) =>
1236
1260
this . #worker( config , perMessageProcessor . bind ( this ) , fetcher . bind ( this ) )
1237
1261
. catch ( e => {
1238
1262
if ( this . #logger)
1239
- this . #logger. error ( `Worker ${ i } encountered an error: ${ e } :${ e . stack } ` ) ;
1263
+ this . #logger. error ( `Worker ${ i } encountered an error: ${ e } :${ e . stack } ` , this . #createConsumerBindingMessageMetadata ( ) ) ;
1240
1264
} ) ) ;
1241
1265
1242
1266
/* Best we can do is log errors on worker issues - handled by the catch block above. */
@@ -1427,7 +1451,10 @@ class Consumer {
1427
1451
/* It's assumed that topicPartition is already assigned, and thus can be seeked to and committed to.
1428
1452
* Errors are logged to detect bugs in the internal code. */
1429
1453
/* TODO: is it worth awaiting seeks to finish? */
1430
- this . #internalClient. seek ( topicPartitionOffset , 0 , err => err ? this . #logger. error ( err ) : null ) ;
1454
+ this . #internalClient. seek ( topicPartitionOffset , 0 , err => {
1455
+ if ( err )
1456
+ this . #logger. error ( `Error while calling seek from within seekInternal: ${ err } ` , this . #createConsumerBindingMessageMetadata( ) ) ;
1457
+ } ) ;
1431
1458
offsetsToCommit . push ( {
1432
1459
topic : topicPartition . topic ,
1433
1460
partition : topicPartition . partition ,
@@ -1539,6 +1566,8 @@ class Consumer {
1539
1566
throw new error . KafkaJSError ( 'Pause can only be called while connected.' , { code : error . ErrorCodes . ERR__STATE } ) ;
1540
1567
}
1541
1568
1569
+ this . #logger. debug ( `Pausing ${ topics . length } topics` , this . #createConsumerBindingMessageMetadata( ) ) ;
1570
+
1542
1571
const toppars = [ ] ;
1543
1572
for ( let topic of topics ) {
1544
1573
if ( typeof topic . topic !== 'string' ) {
@@ -1612,6 +1641,8 @@ class Consumer {
1612
1641
throw new error . KafkaJSError ( 'Resume can only be called while connected.' , { code : error . ErrorCodes . ERR__STATE } ) ;
1613
1642
}
1614
1643
1644
+ this . #logger. debug ( `Resuming ${ topics . length } topics` , this . #createConsumerBindingMessageMetadata( ) ) ;
1645
+
1615
1646
const toppars = [ ] ;
1616
1647
for ( let topic of topics ) {
1617
1648
if ( typeof topic . topic !== 'string' ) {
@@ -1677,6 +1708,8 @@ class Consumer {
1677
1708
1678
1709
this . #disconnectStarted = true ;
1679
1710
this . #workerTerminationScheduled = true ;
1711
+
1712
+ this . #logger. debug ( "Signalling disconnection attempt to workers" , this . #createConsumerBindingMessageMetadata( ) ) ;
1680
1713
while ( ! ( await acquireOrLog ( this . #lock, this . #logger) ) ) ; /* Just retry... */
1681
1714
1682
1715
this . #state = ConsumerState . DISCONNECTING ;
@@ -1691,6 +1724,7 @@ class Consumer {
1691
1724
return ;
1692
1725
}
1693
1726
this . #state = ConsumerState . DISCONNECTED ;
1727
+ this . #logger. info ( "Consumer disconnected" , this . #createConsumerBindingMessageMetadata( ) ) ;
1694
1728
resolve ( ) ;
1695
1729
} ;
1696
1730
this . #internalClient. disconnect ( cb ) ;
0 commit comments