Skip to content

Commit 8ad1dbe

Browse files
committed
kafkatest.VerifiableConsumer: add --verbose argument to match Java counterpart
1 parent f70b82f commit 8ad1dbe

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

confluent_kafka/kafkatest/verifiable_consumer.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,13 @@ def msg_consume(self, msg):
191191
self.err('Consume failed: %s' % msg.error(), term=False)
192192
return
193193

194-
if False:
195-
self.dbg('Read msg from %s [%d] @ %d' %
196-
(msg.topic(), msg.partition(), msg.offset()))
194+
if self.verbose:
195+
self.send({'name': 'record_data',
196+
'topic': msg.topic(),
197+
'partition': msg.partition(),
198+
'key': msg.key(),
199+
'value': msg.value(),
200+
'offset': msg.offset()})
197201

198202
if self.max_msgs >= 0 and self.consumed_msgs >= self.max_msgs:
199203
return # ignore extra messages
@@ -247,6 +251,7 @@ def to_dict(self):
247251
parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1)
248252
parser.add_argument('--assignment-strategy', dest='conf_partition.assignment.strategy')
249253
parser.add_argument('--reset-policy', dest='topicconf_auto.offset.reset', default='earliest')
254+
parser.add_argument('--verbose', action='store_true', dest='verbose', default=False, help='Per-message stats')
250255
parser.add_argument('--consumer.config', dest='consumer_config')
251256
parser.add_argument('-X', nargs=1, dest='extra_conf', action='append', help='Configuration property', default=[])
252257
args = vars(parser.parse_args())
@@ -267,6 +272,7 @@ def to_dict(self):
267272
vc = VerifiableConsumer(conf)
268273
vc.use_auto_commit = args['conf_enable.auto.commit']
269274
vc.max_msgs = args['max_messages']
275+
vc.verbose = args['verbose']
270276

271277
vc.dbg('Pid %d' % os.getpid())
272278
vc.dbg('Using config: %s' % conf)

0 commit comments

Comments
 (0)