Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 113 additions & 16 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,8 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) {


static void sig_usr1(int sig) {
rd_kafka_dump(stdout, global_rk);
if (global_rk)
rd_kafka_dump(stdout, global_rk);
}


Expand Down Expand Up @@ -878,24 +879,13 @@ int main(int argc, char **argv) {
rd_kafka_conf_set(conf, "message.send.max.retries", "3", NULL, 0);
rd_kafka_conf_set(conf, "retry.backoff.ms", "500", NULL, 0);

/* Consumer config */
/* Tell rdkafka to (try to) maintain 1M messages
* in its internal receive buffers. This is to avoid
* application -> rdkafka -> broker per-message ping-pong
* latency.
* The larger the local queue, the higher the performance.
* Try other values with: ... -X queued.min.messages=1000
*/
rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0);
rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);

topics = rd_kafka_topic_partition_list_new(1);

while ((opt = getopt(argc, argv,
"PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
"PCG:S:t:p:b:s:k:c:fi:MDd:m:F:x:"
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) {
switch (opt) {
case 'S':
case 'G':
if (rd_kafka_conf_set(conf, "group.id", optarg, errstr,
sizeof(errstr)) !=
Expand Down Expand Up @@ -940,7 +930,7 @@ int main(int argc, char **argv) {
case 'm':
msgpattern = optarg;
break;
case 'S':
case 'F':
seq = strtoull(optarg, NULL, 10);
do_seq = 1;
break;
Expand Down Expand Up @@ -1138,6 +1128,7 @@ int main(int argc, char **argv) {
" Options:\n"
" -C | -P | Consumer or Producer mode\n"
" -G <groupid> High-level Kafka Consumer mode\n"
" -S <groupid> Share Consumer mode\n"
" -t <topic> Topic to consume / produce\n"
" -p <num> Partition (defaults to random). "
"Multiple partitions are allowed in -C consumer mode.\n"
Expand All @@ -1153,7 +1144,7 @@ int main(int argc, char **argv) {
" -D Copy/Duplicate data buffer (producer)\n"
" -i <ms> Display interval\n"
" -m <msg> Message payload pattern\n"
" -S <start> Send a sequence number starting at "
" -F <start> Send a sequence number starting at "
"<start> as payload\n"
" -R <seed> Random seed value (defaults to time)\n"
" -a <acks> Required acks (producer): "
Expand Down Expand Up @@ -1195,12 +1186,30 @@ int main(int argc, char **argv) {
"\n"
" In Producer mode:\n"
" writes messages of size -s <..> and prints thruput\n"
"\n"
" In Share Consumer mode (-S):\n"
" consumes messages using share groups and prints "
"thruput\n"
"\n",
argv[0], rd_kafka_version_str(), rd_kafka_version(),
RD_KAFKA_DEBUG_CONTEXTS);
exit(1);
}

if (mode == 'C' || mode == 'G') {
/* Tell rdkafka to (try to) maintain 1M messages
* in its internal receive buffers. This is to avoid
* application -> rdkafka -> broker per-message ping-pong
* latency.
* The larger the local queue, the higher the performance.
* Try other values with: ... -X queued.min.messages=1000
*/
rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL,
0);
rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL,
0);
}

dispintvl *= 1000; /* us */

Expand Down Expand Up @@ -1314,6 +1323,27 @@ int main(int argc, char **argv) {
rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL,
0);

if (mode == 'S') {
if (partition_cnt > 0) {
fprintf(stderr,
"%% -p is not supported with "
"share consumer mode\n");
exit(1);
}
if (start_offset != 0) {
fprintf(stderr,
"%% -o is not supported with "
"share consumer mode\n");
exit(1);
}
if (exit_eof) {
fprintf(stderr,
"%% -e is not supported with "
"share consumer mode\n");
exit(1);
}
}

if (read_hdrs && mode == 'P') {
fprintf(stderr, "%% producer can not read headers\n");
exit(1);
Expand Down Expand Up @@ -1748,6 +1778,73 @@ int main(int argc, char **argv) {
free(rkmessages);
rd_kafka_queue_destroy(rkqu);
rd_kafka_destroy(rk);

} else if (mode == 'S') {
/* Share Consumer */
rd_kafka_share_t *rkshare;
rd_kafka_message_t **rkmessages = NULL;
int share_batch_size;

share_batch_size = batch_size ? batch_size : 10001;
rkmessages = malloc(sizeof(*rkmessages) * share_batch_size);

/* Create share consumer instance. */
rkshare =
rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
if (!rkshare) {
fprintf(stderr,
"%% Failed to create share consumer: %s\n",
errstr);
exit(1);
}

err = rd_kafka_share_subscribe(rkshare, topics);
if (err) {
fprintf(stderr, "%% Share subscribe failed: %s\n",
rd_kafka_err2str(err));
exit(1);
}
fprintf(stderr,
"%% Share consumer subscribed, "
"waiting for messages...\n");

while (run && (msgcnt == -1 || msgcnt > (int)cnt.msgs)) {
uint64_t fetch_latency;
size_t rcvd_msgs = 0;
size_t i;
rd_kafka_error_t *error;

fetch_latency = rd_clock();

error = rd_kafka_share_consume_batch(
rkshare, 1000, rkmessages, &rcvd_msgs);

cnt.t_fetch_latency += rd_clock() - fetch_latency;

if (error) {
fprintf(stderr, "%% Share consume error: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
continue;
}

for (i = 0; i < rcvd_msgs; i++) {
msg_consume(rkmessages[i], NULL);
rd_kafka_message_destroy(rkmessages[i]);
}

if (rcvd_msgs > 0 && rate_sleep)
do_sleep(rate_sleep);

print_stats(NULL, mode, otype, compression);
}
cnt.t_end = rd_clock();

fprintf(stderr, "%% Closing share consumer\n");
rd_kafka_share_consumer_close(rkshare);
rd_kafka_share_destroy(rkshare);

free(rkmessages);
}

if (hdrs)
Expand Down