diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index dab0b06b8f..16b9ad077f 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -705,7 +705,8 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) { static void sig_usr1(int sig) { - rd_kafka_dump(stdout, global_rk); + if (global_rk) + rd_kafka_dump(stdout, global_rk); } @@ -878,24 +879,13 @@ int main(int argc, char **argv) { rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0); rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0); - /* Consumer config */ - /* Tell rdkafka to (try to) maintain 1M messages - * in its internal receive buffers. This is to avoid - * application -> rdkafka -> broker per-message ping-pong - * latency. - * The larger the local queue, the higher the performance. - * Try other values with: ... -X queued.min.messages=1000 - */ - rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0); - rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0); - rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0); - topics = rd_kafka_topic_partition_list_new(1); while ((opt = getopt(argc, argv, - "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:" + "PCG:S:t:p:b:s:k:c:fi:MDd:m:F:x:" "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) { switch (opt) { + case 'S': case 'G': if (rd_kafka_conf_set(conf, "group.id", optarg, errstr, sizeof(errstr)) != @@ -940,7 +930,7 @@ int main(int argc, char **argv) { case 'm': msgpattern = optarg; break; - case 'S': + case 'F': seq = strtoull(optarg, NULL, 10); do_seq = 1; break; @@ -1138,6 +1128,7 @@ int main(int argc, char **argv) { " Options:\n" " -C | -P | Consumer or Producer mode\n" " -G High-level Kafka Consumer mode\n" + " -S Share Consumer mode\n" " -t Topic to consume / produce\n" " -p Partition (defaults to random). " "Multiple partitions are allowed in -C consumer mode.\n" @@ -1153,7 +1144,7 @@ int main(int argc, char **argv) { " -D Copy/Duplicate data buffer (producer)\n" " -i Display interval\n" " -m Message payload pattern\n" - " -S Send a sequence number starting at " + " -F Send a sequence number starting at " " as payload\n" " -R Random seed value (defaults to time)\n" " -a Required acks (producer): " @@ -1195,12 +1186,30 @@ int main(int argc, char **argv) { "\n" " In Producer mode:\n" " writes messages of size -s <..> and prints thruput\n" + "\n" + " In Share Consumer mode (-S):\n" + " consumes messages using share groups and prints " + "thruput\n" "\n", argv[0], rd_kafka_version_str(), rd_kafka_version(), RD_KAFKA_DEBUG_CONTEXTS); exit(1); } + if (mode == 'C' || mode == 'G') { + /* Tell rdkafka to (try to) maintain 1M messages + * in its internal receive buffers. This is to avoid + * application -> rdkafka -> broker per-message ping-pong + * latency. + * The larger the local queue, the higher the performance. + * Try other values with: ... -X queued.min.messages=1000 + */ + rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, + 0); + rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0); + rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, + 0); + } dispintvl *= 1000; /* us */ @@ -1314,6 +1323,27 @@ int main(int argc, char **argv) { rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL, 0); + if (mode == 'S') { + if (partition_cnt > 0) { + fprintf(stderr, + "%% -p is not supported with " + "share consumer mode\n"); + exit(1); + } + if (start_offset != 0) { + fprintf(stderr, + "%% -o is not supported with " + "share consumer mode\n"); + exit(1); + } + if (exit_eof) { + fprintf(stderr, + "%% -e is not supported with " + "share consumer mode\n"); + exit(1); + } + } + if (read_hdrs && mode == 'P') { fprintf(stderr, "%% producer can not read headers\n"); exit(1); @@ -1748,6 +1778,73 @@ int main(int argc, char **argv) { free(rkmessages); rd_kafka_queue_destroy(rkqu); rd_kafka_destroy(rk); + + } else if (mode == 'S') { + /* Share Consumer */ + rd_kafka_share_t *rkshare; + rd_kafka_message_t **rkmessages = NULL; + int share_batch_size; + + share_batch_size = batch_size ? batch_size : 10001; + rkmessages = malloc(sizeof(*rkmessages) * share_batch_size); + + /* Create share consumer instance. */ + rkshare = + rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); + if (!rkshare) { + fprintf(stderr, + "%% Failed to create share consumer: %s\n", + errstr); + exit(1); + } + + err = rd_kafka_share_subscribe(rkshare, topics); + if (err) { + fprintf(stderr, "%% Share subscribe failed: %s\n", + rd_kafka_err2str(err)); + exit(1); + } + fprintf(stderr, + "%% Share consumer subscribed, " + "waiting for messages...\n"); + + while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) { + uint64_t fetch_latency; + size_t rcvd_msgs = 0; + size_t i; + rd_kafka_error_t *error; + + fetch_latency = rd_clock(); + + error = rd_kafka_share_consume_batch( + rkshare, 1000, rkmessages, &rcvd_msgs); + + cnt.t_fetch_latency += rd_clock() - fetch_latency; + + if (error) { + fprintf(stderr, "%% Share consume error: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + continue; + } + + for (i = 0; i < rcvd_msgs; i++) { + msg_consume(rkmessages[i], NULL); + rd_kafka_message_destroy(rkmessages[i]); + } + + if (rcvd_msgs > 0 && rate_sleep) + do_sleep(rate_sleep); + + print_stats(NULL, mode, otype, compression); + } + cnt.t_end = rd_clock(); + + fprintf(stderr, "%% Closing share consumer\n"); + rd_kafka_share_consumer_close(rkshare); + rd_kafka_share_destroy(rkshare); + + free(rkmessages); } if (hdrs)