@@ -488,7 +488,6 @@ def print_wmark(consumer, parts):
488488 lo , hi = c .get_watermark_offsets (assignment [0 ], cached = True )
489489 print ('Cached offsets for %s: %d - %d' % (assignment [0 ], lo , hi ))
490490
491-
492491 # Query broker for offsets
493492 lo , hi = c .get_watermark_offsets (assignment [0 ], timeout = 1.0 )
494493 print ('Queried offsets for %s: %d - %d' % (assignment [0 ], lo , hi ))
@@ -620,7 +619,7 @@ def verify_batch_consumer():
620619
621620 # Consume messages (error()==0) or event (error()!=0)
622621 msglist = c .consume (batch_cnt , 10.0 )
623- assert len (msglist ) == batch_cnt
622+ assert len (msglist ) == batch_cnt , 'expected %d messages, not %d' % ( batch_cnt , len ( msglist ))
624623
625624 for msg in msglist :
626625 if msg .error ():
@@ -823,7 +822,33 @@ def stats_cb(stats_json_str):
823822 c .close ()
824823
825824
825+ def print_usage (exitcode , reason = None ):
826+ """ Print usage and exit with exitcode """
827+ if reason is not None :
828+ print ('Error: %s' % reason )
829+ print ('Usage: %s <broker> [opts] [<topic>] [<schema_registry>]' % sys .argv [0 ])
830+ print ('Options:' )
831+ print (' --consumer, --producer, --avro, --performance - limit to matching tests' )
832+
833+ sys .exit (exitcode )
834+
835+
826836if __name__ == '__main__' :
837+ modes = list ()
838+
839+ # Parse options
840+ while len (sys .argv ) > 1 and sys .argv [1 ].startswith ('--' ):
841+ opt = sys .argv .pop (1 )
842+ if opt == '--consumer' :
843+ modes .append ('consumer' )
844+ elif opt == '--producer' :
845+ modes .append ('producer' )
846+ elif opt == '--avro' :
847+ modes .append ('avro' )
848+ elif opt == '--performance' :
849+ modes .append ('performance' )
850+ else :
851+ print_usage (1 , 'unknown option ' + opt )
827852
828853 if len (sys .argv ) > 1 :
829854 bootstrap_servers = sys .argv [1 ]
@@ -832,37 +857,46 @@ def stats_cb(stats_json_str):
832857 if len (sys .argv ) > 3 :
833858 schema_registry_url = sys .argv [3 ]
834859 else :
835- print ('Usage: %s <broker> [<topic>] [<schema_registry>]' % sys .argv [0 ])
836- sys .exit (1 )
860+ print_usage (1 )
861+
862+ if len (modes ) == 0 :
863+ modes = ['consumer' , 'producer' , 'avro' , 'performance' ]
837864
838865 print ('Using confluent_kafka module version %s (0x%x)' % confluent_kafka .version ())
839866 print ('Using librdkafka version %s (0x%x)' % confluent_kafka .libversion ())
840867
841- print ('=' * 30 , 'Verifying Producer' , '=' * 30 )
842- verify_producer ()
868+ if 'producer' in modes :
869+ print ('=' * 30 , 'Verifying Producer' , '=' * 30 )
870+ verify_producer ()
843871
844- print ('=' * 30 , 'Verifying Consumer' , '=' * 30 )
845- verify_consumer ()
872+ if 'performance' in modes :
873+ print ('=' * 30 , 'Verifying Producer performance (with dr_cb)' , '=' * 30 )
874+ verify_producer_performance (with_dr_cb = True )
846875
847- print ('=' * 30 , 'Verifying batch Consumer' , '=' * 30 )
848- verify_batch_consumer ()
876+ if 'performance' in modes :
877+ print ('=' * 30 , 'Verifying Producer performance (without dr_cb)' , '=' * 30 )
878+ verify_producer_performance (with_dr_cb = False )
849879
850- print ('=' * 30 , 'Verifying Producer performance (with dr_cb)' , '=' * 30 )
851- verify_producer_performance (with_dr_cb = True )
880+ if 'consumer' in modes :
881+ print ('=' * 30 , 'Verifying Consumer' , '=' * 30 )
882+ verify_consumer ()
852883
853- print ('=' * 30 , 'Verifying Producer performance (without dr_cb) ' , '=' * 30 )
854- verify_producer_performance ( with_dr_cb = False )
884+ print ('=' * 30 , 'Verifying batch Consumer ' , '=' * 30 )
885+ verify_batch_consumer ( )
855886
856- print ('=' * 30 , 'Verifying Consumer performance' , '=' * 30 )
857- verify_consumer_performance ()
887+ if 'performance' in modes :
888+ print ('=' * 30 , 'Verifying Consumer performance' , '=' * 30 )
889+ verify_consumer_performance ()
858890
859- print ('=' * 30 , 'Verifying batch Consumer performance' , '=' * 30 )
860- verify_batch_consumer_performance ()
891+ print ('=' * 30 , 'Verifying batch Consumer performance' , '=' * 30 )
892+ verify_batch_consumer_performance ()
861893
862- print ('=' * 30 , 'Verifying stats_cb' , '=' * 30 )
863- verify_stats_cb ()
894+ # The stats test is utilizing the consumer.
895+ print ('=' * 30 , 'Verifying stats_cb' , '=' * 30 )
896+ verify_stats_cb ()
864897
865- print ('=' * 30 , 'Verifying AVRO' , '=' * 30 )
866- verify_avro ()
898+ if 'avro' in modes :
899+ print ('=' * 30 , 'Verifying AVRO' , '=' * 30 )
900+ verify_avro ()
867901
868902 print ('=' * 30 , 'Done' , '=' * 30 )
0 commit comments