Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ offset_commit_cb | C | |
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`. <br>*Type: string*
max.poll.records | C | 1 .. 2147483647 | 500 | low | tba description, <br>*Type: integer*
transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0. <br>*Type: string*
transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods. <br>*Type: integer*
enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible. <br>*Type: boolean*
Expand Down
92 changes: 51 additions & 41 deletions examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ int main(int argc, char **argv) {
}


if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
Expand Down Expand Up @@ -236,49 +236,59 @@ int main(int argc, char **argv) {
* since a rebalance may happen at any time.
* Start polling for messages. */

rd_kafka_message_t *rkmessages[500];
while (run) {
rd_kafka_message_t *rkm;

rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm)
continue; /* Timeout: no message within 100ms,
* try again. This short timeout allows
* checking for `run` at frequent intervals.
*/

/* consumer_poll() will return either a proper message
* or a consumer error (rkm->err is set). */
if (rkm->err) {
/* Consumer errors are generally to be considered
* informational as the consumer will automatically
* try to recover from all types of errors. */
fprintf(stderr, "%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
rd_kafka_message_t *rkm = NULL;
size_t rcvd_msgs = 0;
int i;

// rkm = rd_kafka_consumer_poll(rk, 100);
rd_kafka_error_t *error;

// fprintf(stderr, "Calling consume_batch\n");
error = rd_kafka_share_consume_batch(rk, 5000, rkmessages,
&rcvd_msgs);
if (error) {
fprintf(stderr, "%% Consume error: %s\n",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
continue;
}

/* Proper message. */
printf("Message on %s [%" PRId32 "] at offset %" PRId64
" (leader epoch %" PRId32 "):\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset, rd_kafka_message_leader_epoch(rkm));

/* Print the message key. */
if (rkm->key && is_printable(rkm->key, rkm->key_len))
printf(" Key: %.*s\n", (int)rkm->key_len,
(const char *)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);

/* Print the message value/payload. */
if (rkm->payload && is_printable(rkm->payload, rkm->len))
printf(" Value: %.*s\n", (int)rkm->len,
(const char *)rkm->payload);
else if (rkm->payload)
printf(" Value: (%d bytes)\n", (int)rkm->len);

rd_kafka_message_destroy(rkm);
fprintf(stderr, "%% Received %zu messages\n", rcvd_msgs);
for (i = 0; i < (int)rcvd_msgs; i++) {
rkm = rkmessages[i];

if (rkm->err) {
fprintf(stderr, "%% Consumer error: %d: %s\n",
rkm->err, rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}

/* Proper message. */
printf("Message on %s [%" PRId32 "] at offset %" PRId64
" (leader epoch %" PRId32 "):\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset, rd_kafka_message_leader_epoch(rkm));

/* Print the message key. */
if (rkm->key && is_printable(rkm->key, rkm->key_len))
printf(" Key: %.*s\n", (int)rkm->key_len,
(const char *)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);

/* Print the message value/payload. */
if (rkm->payload &&
is_printable(rkm->payload, rkm->len))
printf(" Value: %.*s\n", (int)rkm->len,
(const char *)rkm->payload);
else if (rkm->payload)
printf(" Value: (%d bytes)\n", (int)rkm->len);

rd_kafka_message_destroy(rkm);
}
}


Expand Down
Loading