diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 40b7412ef..ddf795a9e 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -154,6 +154,7 @@ offset_commit_cb | C | | enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.
*Type: boolean* check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
*Type: boolean* client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`.
*Type: string* +max.poll.records | C | 1 .. 2147483647 | 500 | low | tba description,
*Type: integer* transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0.
*Type: string* transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods.
*Type: integer* enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible.
*Type: boolean* diff --git a/examples/consumer.c b/examples/consumer.c index 8f33edd69..3a4ad4f32 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -165,8 +165,8 @@ int main(int argc, char **argv) { } - if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { + if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; @@ -236,49 +236,59 @@ int main(int argc, char **argv) { * since a rebalance may happen at any time. * Start polling for messages. */ + rd_kafka_message_t *rkmessages[500]; while (run) { - rd_kafka_message_t *rkm; - - rkm = rd_kafka_consumer_poll(rk, 100); - if (!rkm) - continue; /* Timeout: no message within 100ms, - * try again. This short timeout allows - * checking for `run` at frequent intervals. - */ - - /* consumer_poll() will return either a proper message - * or a consumer error (rkm->err is set). */ - if (rkm->err) { - /* Consumer errors are generally to be considered - * informational as the consumer will automatically - * try to recover from all types of errors. */ - fprintf(stderr, "%% Consumer error: %s\n", - rd_kafka_message_errstr(rkm)); - rd_kafka_message_destroy(rkm); + rd_kafka_message_t *rkm = NULL; + size_t rcvd_msgs = 0; + int i; + + // rkm = rd_kafka_consumer_poll(rk, 100); + rd_kafka_error_t *error; + + // fprintf(stderr, "Calling consume_batch\n"); + error = rd_kafka_share_consume_batch(rk, 5000, rkmessages, + &rcvd_msgs); + if (error) { + fprintf(stderr, "%% Consume error: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); continue; } - /* Proper message. */ - printf("Message on %s [%" PRId32 "] at offset %" PRId64 - " (leader epoch %" PRId32 "):\n", - rd_kafka_topic_name(rkm->rkt), rkm->partition, - rkm->offset, rd_kafka_message_leader_epoch(rkm)); - - /* Print the message key. */ - if (rkm->key && is_printable(rkm->key, rkm->key_len)) - printf(" Key: %.*s\n", (int)rkm->key_len, - (const char *)rkm->key); - else if (rkm->key) - printf(" Key: (%d bytes)\n", (int)rkm->key_len); - - /* Print the message value/payload. */ - if (rkm->payload && is_printable(rkm->payload, rkm->len)) - printf(" Value: %.*s\n", (int)rkm->len, - (const char *)rkm->payload); - else if (rkm->payload) - printf(" Value: (%d bytes)\n", (int)rkm->len); - - rd_kafka_message_destroy(rkm); + fprintf(stderr, "%% Received %zu messages\n", rcvd_msgs); + for (i = 0; i < (int)rcvd_msgs; i++) { + rkm = rkmessages[i]; + + if (rkm->err) { + fprintf(stderr, "%% Consumer error: %d: %s\n", + rkm->err, rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + continue; + } + + /* Proper message. */ + printf("Message on %s [%" PRId32 "] at offset %" PRId64 + " (leader epoch %" PRId32 "):\n", + rd_kafka_topic_name(rkm->rkt), rkm->partition, + rkm->offset, rd_kafka_message_leader_epoch(rkm)); + + /* Print the message key. */ + if (rkm->key && is_printable(rkm->key, rkm->key_len)) + printf(" Key: %.*s\n", (int)rkm->key_len, + (const char *)rkm->key); + else if (rkm->key) + printf(" Key: (%d bytes)\n", (int)rkm->key_len); + + /* Print the message value/payload. */ + if (rkm->payload && + is_printable(rkm->payload, rkm->len)) + printf(" Value: %.*s\n", (int)rkm->len, + (const char *)rkm->payload); + else if (rkm->payload) + printf(" Value: (%d bytes)\n", (int)rkm->len); + + rd_kafka_message_destroy(rkm); + } } diff --git a/src/rdkafka.c b/src/rdkafka.c index 0904bb77a..6d0886a70 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2839,8 +2839,9 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, return NULL; } -rd_kafka_t *rd_kafka_share_consumer_new( - rd_kafka_conf_t *conf, char *errstr, size_t errstr_size) { +rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, + char *errstr, + size_t errstr_size) { rd_kafka_t *rk; char errstr_internal[512]; rd_kafka_conf_res_t res; @@ -2853,7 +2854,7 @@ rd_kafka_t *rd_kafka_share_consumer_new( } res = rd_kafka_conf_set(conf, "share.consumer", "true", errstr_internal, - sizeof(errstr_internal)); + sizeof(errstr_internal)); if (res != RD_KAFKA_CONF_OK) { rd_snprintf(errstr, errstr_size, "rd_kafka_share_consumer_new(): " @@ -2863,8 +2864,8 @@ rd_kafka_t *rd_kafka_share_consumer_new( } - res = rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr_internal, - sizeof(errstr_internal)); + res = rd_kafka_conf_set(conf, "group.protocol", "consumer", + errstr_internal, sizeof(errstr_internal)); if (res != RD_KAFKA_CONF_OK) { rd_snprintf(errstr, errstr_size, "rd_kafka_share_consumer_new(): " @@ -2882,6 +2883,355 @@ rd_kafka_t *rd_kafka_share_consumer_new( return rk; } + +/** + * @locality main thread + * @locks none + */ +static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, + rd_kafka_cgrp_t *rkcg) { + rd_kafka_broker_t *selected_rkb = NULL; + rd_kafka_topic_partition_list_t *partitions = + rkcg->rkcg_current_assignment; + // rkcg->rkcg_toppars; /* TODO: use rkcg->rkcg_toppars instead. */ + size_t i; + rd_kafka_topic_partition_t *partition; + + if (!partitions || partitions->cnt == 0) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "No partitions assigned to consumer, " + "cannot select broker for share fetch"); + return NULL; + } + + /* Look through all partitions in order, find the first one which + * has a leader. */ + rd_kafka_dbg(rk, CGRP, "SHARE", + "Selecting broker for share fetch from %d assigned " + "partitions, last picked index = %" PRIusz, + partitions->cnt, rkcg->rkcg_share.last_partition_picked); + + for (i = 0; i < (size_t)partitions->cnt; i++) { + rd_kafka_toppar_t *rktp; + rkcg->rkcg_share.last_partition_picked += 1; + if (rkcg->rkcg_share.last_partition_picked >= + (size_t)partitions->cnt) + rkcg->rkcg_share.last_partition_picked = 0; + partition = + &partitions->elems[rkcg->rkcg_share.last_partition_picked]; + + rktp = rd_kafka_toppar_get2(rk, partition->topic, + partition->partition, 0, 1); + + /* Criteria to choose a broker: + * 1. It should be the leader of a partition. + * 2. A share-fetch op must not already be enqueued on it. */ + if (rktp->rktp_leader) { + /* TODO: We're only going to access + * rkb_share_fetch_enqueued from the main thread, except + * when it's being calloc'd and destroyed. Is it safe to + * access it without a lock? */ + rd_kafka_broker_lock(rktp->rktp_leader); + if (!rktp->rktp_leader->rkb_share_fetch_enqueued) { + rd_kafka_broker_keep(rktp->rktp_leader); + selected_rkb = rktp->rktp_leader; + } + rd_kafka_broker_unlock(rktp->rktp_leader); + } + + rd_kafka_toppar_destroy(rktp); + + if (selected_rkb) + break; + } + return selected_rkb; +} + +/** + * @brief Timer callback for reenequeing SHARE_FETCH_FANOUT after a backoff. + * @locality main thread + * @locks none + */ +static void rd_kafka_share_fetch_fanout_renqueue(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_op_t *rko = arg; + rd_kafka_t *rk = rkts->rkts_rk; + + rd_kafka_dbg(rk, CGRP, "SHARE", "Re-enqueing SHARE_FETCH_FANOUT"); + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/** + * @brief Enqueue a SHARE_FETCH_FANOUT op on the main queue. + * @param backoff_ms If >0 the op will be enqueued after this many milliseconds. + * Else, it will be immediate. + * @locality any thread + */ +static void rd_kafka_share_fetch_fanout_with_backoff(rd_kafka_t *rk, + rd_ts_t abs_timeout, + int backoff_ms) { + rd_kafka_cgrp_t *rkcg = rd_kafka_cgrp_get(rk); + rd_kafka_op_t *rko = rd_kafka_op_new_cb( + rk, RD_KAFKA_OP_SHARE_FETCH_FANOUT, rd_kafka_share_fetch_fanout_op); + rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; + rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); + + if (backoff_ms > 0) + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rkcg->rkcg_share.share_fetch_fanout_tmr, + rd_true, backoff_ms * 1000, + rd_kafka_share_fetch_fanout_renqueue, rko); + else + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/** + * Handles RD_KAFKA_OP_SHARE_FETCH | RD_KAFKA_OP_REPLY. + * @locality main thread + */ +rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, + rd_kafka_op_t *rko_orig) { + rd_kafka_resp_err_t err; + + rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); + rd_kafka_dbg(rk, CGRP, "SHAREFETCH", + "Fetch share reply: %s, should_fetch=%d, broker=%s", + rd_kafka_err2str(rko_orig->rko_err), + rko_orig->rko_u.share_fetch.should_fetch, + rko_orig->rko_u.share_fetch.target_broker + ? rd_kafka_broker_name( + rko_orig->rko_u.share_fetch.target_broker) + : "none"); + + rko_orig->rko_u.share_fetch.target_broker->rkb_share_fetch_enqueued = + rd_false; + + if (!rko_orig->rko_err && !rko_orig->rko_error) + return RD_KAFKA_OP_RES_HANDLED; + + /* Retry fetching if there is any amount of time left. For other + * errors, let them drop and be retried on the next poll. */ + /* TODO: KIP-932: Are there errors for which we should not + * retry, and rather propagate those errors to user? In that + * case, we must write to the rkcg queue. */ + if (!rko_orig->rko_u.share_fetch.should_fetch || + !rd_timeout_remains(rko_orig->rko_u.share_fetch.abs_timeout)) + return RD_KAFKA_OP_RES_HANDLED; + + if (rko_orig->rko_error) + err = rd_kafka_error_code(rko_orig->rko_error); + + switch (err) { + /* For some errors, don't resend at all. + * This list of errors is incomplete. */ + case RD_KAFKA_RESP_ERR__TIMED_OUT: + case RD_KAFKA_RESP_ERR__DESTROY: + case RD_KAFKA_RESP_ERR__UNKNOWN_GROUP: + case RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS: /* This should not happen. */ + case RD_KAFKA_RESP_ERR__STATE: + case RD_KAFKA_RESP_ERR__AUTHENTICATION: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: /* Do we need more + handling for fatal + errors? */ + break; + + /* For other cases, resend to separate broker after a backoff. */ + default: + rd_kafka_share_fetch_fanout_with_backoff( + rk, rko_orig->rko_u.share_fetch.abs_timeout, + rk->rk_conf.retry_backoff_max_ms); + break; + } + + return RD_KAFKA_OP_RES_HANDLED; +} + +/** + * Handles RD_KAFKA_OP_SHARE_FETCH_FANOUT | RD_KAFKA_OP_REPLY. + * @locality main thread + */ +rd_kafka_op_res_t +rd_kafka_share_fetch_fanout_reply_op(rd_kafka_t *rk, rd_kafka_op_t *rko_orig) { + rd_kafka_resp_err_t err; + + if (!rko_orig->rko_err && !rko_orig->rko_error) + return RD_KAFKA_OP_RES_HANDLED; + + err = rko_orig->rko_err; + if (rko_orig->rko_error) + err = rd_kafka_error_code(rko_orig->rko_error); + + /* TODO: KIP-932: Add error handling - either retries, or user-level + * propagation, later. */ + rd_kafka_dbg( + rk, CGRP, "SHARE", + "Encountered error in SHARE_FETCH_FANOUT: %s, remains: %d", + rd_kafka_err2name(err), + rd_timeout_remains(rko_orig->rko_u.share_fetch_fanout.abs_timeout)); + + switch (err) { + /* Some errors need not be retried. */ + case RD_KAFKA_RESP_ERR__DESTROY: + case RD_KAFKA_RESP_ERR__TIMED_OUT: + break; + + /* Some errors may be retried - with a constant backoff. */ + default: + rd_kafka_share_fetch_fanout_with_backoff( + rk, rko_orig->rko_u.share_fetch_fanout.abs_timeout, + /* TODO: KIP-932: Consider setting this to retry_backoff_ms + or to a constant.*/ + rk->rk_conf.retry_backoff_max_ms); + break; + } + return RD_KAFKA_OP_RES_HANDLED; +} + +/** + * Op callback for RD_KAFKA_OP_SHARE_FETCH_FANOUT. + * @locality main thread + */ +rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_broker_t *rkb, *selected_rkb = NULL; + rd_kafka_error_t *error; + rd_kafka_cgrp_t *rkcg = rd_kafka_cgrp_get(rk); + rd_kafka_op_t *reply_rko; + rd_ts_t abs_timeout = rko->rko_u.share_fetch_fanout.abs_timeout; + + /* We should never have enqueued this op if we didn't have the rkcg */ + rd_assert(rkcg); + + if (rd_timeout_remains(abs_timeout) <= 0) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "Not issuing SHARE_FETCH_FANOUT: timeout expired"); + + error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__TIMED_OUT, + "rd_kafka_share_fetch_fanout_op(): " + "Share fetch fanout op timed out"); + + reply_rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH_FANOUT | + RD_KAFKA_OP_REPLY); + reply_rko->rko_error = error; + reply_rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; + rd_kafka_replyq_enq(&rko->rko_replyq, reply_rko, 0); + return RD_KAFKA_OP_RES_HANDLED; + } + + if (!(selected_rkb = rd_kafka_share_select_broker(rk, rkcg))) { + error = + rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, + "rd_kafka_share_consume_batch(): " + "No broker available for share fetch"); + + reply_rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH_FANOUT | + RD_KAFKA_OP_REPLY); + reply_rko->rko_error = error; + reply_rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; + rd_kafka_replyq_enq(&rko->rko_replyq, reply_rko, 0); + return RD_KAFKA_OP_RES_HANDLED; + } + + /* Issue fetch requests to all brokers */ + rd_kafka_dbg(rk, CGRP, "SHARE", + "Selected broker %s for fetching messages, issuing fetch " + "requests to necessary brokers", + rd_kafka_broker_name(selected_rkb)); + + rd_kafka_rdlock(rk); + TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { + rd_kafka_op_t *rko_sf; + + if (rd_kafka_broker_or_instance_terminating(rkb) || + RD_KAFKA_BROKER_IS_LOGICAL(rkb)) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "Skipping broker %s for share fetch: " + "terminating or logical", + rd_kafka_broker_name(rkb)); + continue; + } + + if (rkb->rkb_share_fetch_enqueued) { + rd_kafka_broker_unlock(rkb); + rd_kafka_dbg(rk, CGRP, "SHARE", + "Unable to enqueue op on broker %s " + "because another op is already pending.", + rd_kafka_broker_name(rkb)); + continue; + } + rkb->rkb_share_fetch_enqueued = rd_true; + + rko_sf = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); + rko_sf->rko_u.share_fetch.abs_timeout = abs_timeout; + rko_sf->rko_u.share_fetch.should_fetch = (rkb == selected_rkb); + rd_kafka_broker_keep(rkb); + rko_sf->rko_u.share_fetch.target_broker = rkb; + rko_sf->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); + + rd_kafka_dbg(rk, CGRP, "SHAREFETCH", + "Enqueuing share fetch op on broker %s " + "(%s fetch)", + rd_kafka_broker_name(rkb), + rko_sf->rko_u.share_fetch.should_fetch + ? "should" + : "should not"); + rd_kafka_q_enq(rkb->rkb_ops, rko_sf); + } + rd_kafka_rdunlock(rk); + + RD_IF_FREE(selected_rkb, rd_kafka_broker_destroy); + + return RD_KAFKA_OP_RES_HANDLED; +} + +rd_kafka_error_t *rd_kafka_share_consume_batch( + rd_kafka_t *rk, + int timeout_ms, + /* There is some benefit to making this ***rkmessages and allocating it + within this function, but on the flipside this means that it will always + be allocated on the heap. */ + rd_kafka_message_t **rkmessages /* out */, + size_t *rkmessages_size /* out */) { + rd_kafka_cgrp_t *rkcg; + rd_ts_t now = rd_clock(); + rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms); + size_t max_poll_records = (size_t)rk->rk_conf.share.max_poll_records; + + if (!RD_KAFKA_IS_SHARE_CONSUMER(rk)) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, + "rd_kafka_share_consume_batch(): " + "rk is not a shared consumer"); + + if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, + "rd_kafka_share_consume_batch(): " + "Consumer group not initialized"); + + /* If we have any pending items on the consumer queue, don't issue new + * requests, rather, deal with them first. */ + if (likely(rd_kafka_q_len(rkcg->rkcg_q) == 0)) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "Issuing share fetch fanout to main thread with " + "abs_timeout = %" PRId64, + abs_timeout); + rd_kafka_share_fetch_fanout_with_backoff(rk, abs_timeout, + 0 /* no backoff */); + } + + /* At this point, there's no reason to deviate from what we already do + * for returning multiple messages to the user, as the orchestration + * is handled by the main thread. Later on, we needed, we might need + * a custom loop if we need any changes. */ + *rkmessages_size = rd_kafka_q_serve_share_rkmessages( + rkcg->rkcg_q, timeout_ms, /* Use this timeout directly as prior + operations aren't blocking, so no need to + re-convert the abs_timeout into a relative one.*/ + rkmessages, max_poll_records); + + return NULL; +} + /** * Schedules a rebootstrap of the cluster immediately. * @@ -4331,6 +4681,15 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, res = rd_kafka_metadata_update_op(rk, rko->rko_u.metadata.mdi); break; + case RD_KAFKA_OP_SHARE_FETCH | RD_KAFKA_OP_REPLY: + res = rd_kafka_share_fetch_reply_op(rk, rko); + break; + + case RD_KAFKA_OP_SHARE_FETCH_FANOUT | RD_KAFKA_OP_REPLY: + rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); + res = rd_kafka_share_fetch_fanout_reply_op(rk, rko); + break; + default: /* If op has a callback set (e.g., OAUTHBEARER_REFRESH), * call it. */ diff --git a/src/rdkafka.h b/src/rdkafka.h index e137c0180..42f8bfdb9 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -3058,7 +3058,25 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, RD_EXPORT -rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, char *errstr, size_t errstr_size); +rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, + char *errstr, + size_t errstr_size); + +/** + * @brief Consume a batch of messages from the share consumer instance. + * + * @param rk Share consumer instance. + * @param timeout_ms Maximum time to block waiting for messages. + * @param rkmessages Output array of messages - this must be preallocated with + * at least enough capacity for size max.poll.records. + * @param rkmessages_size Output number of messages returned in rkmessages. + */ +RD_EXPORT +rd_kafka_error_t * +rd_kafka_share_consume_batch(rd_kafka_t *rk, + int timeout_ms, + rd_kafka_message_t **rkmessages /* out */, + size_t *rkmessages_size /* out */); /** * @brief Destroy Kafka handle. diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index eb8e84924..e130da7e9 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3553,6 +3553,28 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { wakeup = rd_true; break; + case RD_KAFKA_OP_SHARE_FETCH: + rd_rkb_dbg(rkb, CGRP, "SHAREFETCH", + "Received SHARE_FETCH op for broker %s with " + "should_fetch = %d", + rd_kafka_broker_name(rkb), + rko->rko_u.share_fetch.should_fetch); + /* This is only temporary handling for testing to avoid crashing + * on assert - the code below will automatically enqueue a + * reply which is not the final behaviour. */ + /* Insert errors randomly for testing, remove this code once + * actual errors can be tested via the mock broker. */ + // if (rd_jitter(0, 10) > 7) { + // rd_rkb_dbg(rkb, CGRP, "SHAREFETCH", + // "Injecting error! %s : %d", + // rd_kafka_broker_name(rkb), + // rko->rko_u.share_fetch.should_fetch); + + // rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__STATE); + // rko = NULL; + // } + break; + default: rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type"); break; diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index a649b7445..b8f7cb6e2 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -387,6 +387,12 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */ /** > 0 if this broker thread is terminating */ rd_atomic32_t termination_in_progress; + + /** + * Whether a share fetch should_fetch set is enqueued on + * this broker's op queue or not. + */ + rd_bool_t rkb_share_fetch_enqueued; }; #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 79a734f5f..8f6afedc7 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -377,6 +377,14 @@ typedef struct rd_kafka_cgrp_s { /* Timestamp of last rebalance start */ rd_ts_t rkcg_ts_rebalance_start; + struct { + size_t last_partition_picked; /* For round-robin + * partition picking */ + rd_kafka_timer_t share_fetch_fanout_tmr; /**< Timer for + * share fetch + * fanout */ + + } rkcg_share; } rd_kafka_cgrp_t; diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index f9b5de703..fe49950da 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1509,8 +1509,10 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "which indicates where this client is physically located. It " "corresponds with the broker config `broker.rack`.", .sdef = ""}, - {_RK_GLOBAL | _RK_HIDDEN, "share.consumer", _RK_C_BOOL, _RK(is_share_consumer), - "tba description", 0, 1, 0}, + {_RK_GLOBAL | _RK_HIDDEN, "share.consumer", _RK_C_BOOL, + _RK(share.is_share_consumer), "tba description", 0, 1, 0}, + {_RK_GLOBAL | _RK_CONSUMER, "max.poll.records", _RK_C_INT, + _RK(share.max_poll_records), "tba description,", 1, INT_MAX, 500}, /* Global producer properties */ diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index fedfc2ece..ea9e216ef 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -470,7 +470,10 @@ struct rd_kafka_conf_s { rd_kafkap_str_t *client_rack; - int is_share_consumer; /**< Is this a share consumer? */ + struct { + int is_share_consumer; /**< Is this a share consumer? */ + int max_poll_records; /**< Max records returned per poll */ + } share; /* * Producer configuration diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 9e97bf9dc..c9fe9da3e 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -95,7 +95,8 @@ typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t; #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) #define RD_KAFKA_IS_SHARE_CONSUMER(rk) \ - ((rk)->rk_type == RD_KAFKA_CONSUMER && (rk)->rk_conf.is_share_consumer) + ((rk)->rk_type == RD_KAFKA_CONSUMER && \ + (rk)->rk_conf.share.is_share_consumer) /** * @struct Represents a fetch position: @@ -1259,4 +1260,8 @@ int rd_kafka_rebootstrap_tmr_stop(rd_kafka_t *rk); void rd_kafka_reset_any_broker_down_reported(rd_kafka_t *rk); +rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko); + #endif /* _RDKAFKA_INT_H_ */ diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index db325b11c..e15db3f35 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -122,8 +122,9 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:RD_KAFKA_OP_SET_TELEMETRY_BROKER", [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", - [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", - [RD_KAFKA_OP_SHARE_FETCH] = "REPLY:SHARE_FETCH", + [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", + [RD_KAFKA_OP_SHARE_FETCH] = "REPLY:SHARE_FETCH", + [RD_KAFKA_OP_SHARE_FETCH_FANOUT] = "REPLY:SHARE_FETCH_FANOUT", }; if (type & RD_KAFKA_OP_REPLY) @@ -289,6 +290,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_SHARE_FETCH] = sizeof(rko->rko_u.share_fetch), + [RD_KAFKA_OP_SHARE_FETCH_FANOUT] = + sizeof(rko->rko_u.share_fetch_fanout), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -510,7 +513,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { break; case RD_KAFKA_OP_SHARE_FETCH: - /* TODO KIP-932: Add destruction code. */ + RD_IF_FREE(rko->rko_u.share_fetch.target_broker, + rd_kafka_broker_destroy); + break; + + case RD_KAFKA_OP_SHARE_FETCH_FANOUT: + /* No heap-allocated resources to clean up */ break; default: diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 58a248aaf..705f16b77 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -191,6 +191,8 @@ typedef enum { * u.admin_request */ RD_KAFKA_OP_SHARE_FETCH, /**< broker op: Issue share fetch request if applicable. */ + RD_KAFKA_OP_SHARE_FETCH_FANOUT, /**< fanout share fetch operation */ + RD_KAFKA_OP__END } rd_kafka_op_type_t; @@ -734,8 +736,17 @@ struct rd_kafka_op_s { /** Absolute timeout left to complete this share-fetch. */ rd_ts_t abs_timeout; + + /** Target broker to which op is sent. */ + rd_kafka_broker_t *target_broker; } share_fetch; + struct { + /** Absolute timeout for share fetch fanout operation. + */ + rd_ts_t abs_timeout; + } share_fetch_fanout; + } rko_u; }; diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 92eddccae..1d25d984d 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -869,6 +869,122 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, return cnt; } +int rd_kafka_q_serve_share_rkmessages(rd_kafka_q_t *rkq, + int timeout_ms, + rd_kafka_message_t **rkmessages, + size_t rkmessages_size) { + unsigned int cnt = 0; + TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); + struct rd_kafka_op_tailq ctrl_msg_q = + TAILQ_HEAD_INITIALIZER(ctrl_msg_q); + rd_kafka_op_t *rko, *next; + rd_kafka_t *rk = rkq->rkq_rk; + rd_kafka_q_t *fwdq; + rd_ts_t abs_timeout; + + mtx_lock(&rkq->rkq_lock); + if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + /* Since the q_pop may block we need to release the parent + * queue's lock. */ + mtx_unlock(&rkq->rkq_lock); + cnt = rd_kafka_q_serve_share_rkmessages(fwdq, timeout_ms, rkmessages, + rkmessages_size); + rd_kafka_q_destroy(fwdq); + return cnt; + } + + mtx_unlock(&rkq->rkq_lock); + + abs_timeout = rd_timeout_init(timeout_ms); + + rd_kafka_app_poll_start(rk, rkq, 0, timeout_ms); + + rd_kafka_yield_thread = 0; + while (cnt < rkmessages_size) { + rd_kafka_op_res_t res; + + mtx_lock(&rkq->rkq_lock); + + while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && + !rd_kafka_q_check_yield(rkq) && + /* Only do a timed wait if no messages are ready, if we + have gotten even one message, just return with it. */ + cnt == 0 && + cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, + abs_timeout) == thrd_success) + ; + + rd_kafka_q_mark_served(rkq); + + if (!rko) { + mtx_unlock(&rkq->rkq_lock); + break; /* Timed out */ + } + + rd_kafka_q_deq0(rkq, rko); + + mtx_unlock(&rkq->rkq_lock); + + if (rd_kafka_op_version_outdated(rko, 0)) { + /* Outdated op, put on discard queue */ + TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); + continue; + } + + /* Serve non-FETCH callbacks */ + res = + rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL); + if (res == RD_KAFKA_OP_RES_KEEP || + res == RD_KAFKA_OP_RES_HANDLED) { + /* Callback served, rko is destroyed (if HANDLED). */ + continue; + } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD || + rd_kafka_yield_thread)) { + /* Yield. */ + break; + } + rd_dassert(res == RD_KAFKA_OP_RES_PASS); + + /* If this is a control messages, don't return message to + * application. Add it to a tmp queue from where we can store + * the offset and destroy the op */ + if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) { + TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link); + continue; + } + + /* Get rkmessage from rko and append to array. */ + rkmessages[cnt++] = rd_kafka_message_get(rko); + } + + /* NOTE: KIP-932: + * For a share consumer, we are not using version barriers, and ideally, + * tmpq should be empty. However, the discard code is retained as + * non-share-consumer might still be around. This assert exists to spot + * any issues as they arise during testing.*/ + rd_dassert(TAILQ_EMPTY(&tmpq)); + + /* Discard non-desired and already handled ops */ + next = TAILQ_FIRST(&tmpq); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } + + /* Discard ctrl msgs */ + next = TAILQ_FIRST(&ctrl_msg_q); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } + + rd_kafka_app_polled(rk, rkq); + + return cnt; +} + void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu) { diff --git a/src/rdkafka_queue.h b/src/rdkafka_queue.h index ff1a46577..cfed32413 100644 --- a/src/rdkafka_queue.h +++ b/src/rdkafka_queue.h @@ -878,6 +878,10 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size); +int rd_kafka_q_serve_share_rkmessages(rd_kafka_q_t *rkq, + int timeout_ms, + rd_kafka_message_t **rkmessages, + size_t rkmessages_size); rd_kafka_resp_err_t rd_kafka_q_wait_result(rd_kafka_q_t *rkq, int timeout_ms); int rd_kafka_q_apply(rd_kafka_q_t *rkq,