@@ -67,15 +67,16 @@ static int exit_after = 0;
6767static int exit_eof = 0 ;
6868static FILE * stats_fp ;
6969static int dr_disp_div ;
70- static int verbosity = 1 ;
71- static int latency_mode = 0 ;
72- static FILE * latency_fp = NULL ;
73- static int msgcnt = -1 ;
74- static int incremental_mode = 0 ;
75- static int partition_cnt = 0 ;
76- static int eof_cnt = 0 ;
77- static int with_dr = 1 ;
78- static int read_hdrs = 0 ;
70+ static int verbosity = 1 ;
71+ static int latency_mode = 0 ;
72+ static FILE * latency_fp = NULL ;
73+ static int msgcnt = -1 ;
74+ static int incremental_mode = 0 ;
75+ static int partition_cnt = 0 ;
76+ static int eof_cnt = 0 ;
77+ static int with_dr = 1 ;
78+ static int read_hdrs = 0 ;
79+ static int is_group_protocol_classic = 1 ;
7980
8081
8182static void stop (int sig ) {
@@ -887,14 +888,13 @@ int main(int argc, char **argv) {
887888 * Try other values with: ... -X queued.min.messages=1000
888889 */
889890 rd_kafka_conf_set (conf , "queued.min.messages" , "1000000" , NULL , 0 );
890- rd_kafka_conf_set (conf , "session.timeout.ms" , "6000" , NULL , 0 );
891891 rd_kafka_conf_set (conf , "auto.offset.reset" , "earliest" , NULL , 0 );
892892
893893 topics = rd_kafka_topic_partition_list_new (1 );
894894
895895 while ((opt = getopt (argc , argv ,
896896 "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
897- "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:" )) != -1 ) {
897+ "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:g: " )) != -1 ) {
898898 switch (opt ) {
899899 case 'G' :
900900 if (rd_kafka_conf_set (conf , "group.id" , optarg , errstr ,
@@ -922,6 +922,16 @@ int main(int argc, char **argv) {
922922 case 'b' :
923923 brokers = optarg ;
924924 break ;
925+ case 'g' :
926+ if (rd_kafka_conf_set (conf , "group.protocol" , optarg ,
927+ errstr , sizeof (errstr )) !=
928+ RD_KAFKA_CONF_OK ) {
929+ fprintf (stderr , "%% %s\n" , errstr );
930+ exit (1 );
931+ }
932+ is_group_protocol_classic =
933+ strcmp (optarg , "classic" ) == 0 ;
934+ break ;
925935 case 's' :
926936 msgsize = atoi (optarg );
927937 break ;
@@ -1142,6 +1152,8 @@ int main(int argc, char **argv) {
11421152 " -p <num> Partition (defaults to random). "
11431153 "Multiple partitions are allowed in -C consumer mode.\n"
11441154 " -M Print consumer interval stats\n"
1155+ " -g <protocol> Group rebalance protocol to use. classic "
1156+ "or consumer (defaults to classic)\n"
11451157 " -b <brokers> Broker address list (host[:port],..)\n"
11461158 " -s <size> Message size (producer)\n"
11471159 " -k <key> Message key (producer)\n"
@@ -1201,6 +1213,11 @@ int main(int argc, char **argv) {
12011213 exit (1 );
12021214 }
12031215
1216+ if (is_group_protocol_classic ) {
1217+ /** Session timeout is moved to broker side on the new consumer
1218+ * group rebalance protocols */
1219+ rd_kafka_conf_set (conf , "session.timeout.ms" , "6000" , NULL , 0 );
1220+ }
12041221
12051222 dispintvl *= 1000 ; /* us */
12061223
0 commit comments