From 847d04bb6f4b7b90f19547a4db5293824a35e9df Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 17 Oct 2025 05:04:25 +0000 Subject: [PATCH 1/7] Add preliminary polling --- examples/consumer.c | 19 ++- src/rdkafka.c | 287 ++++++++++++++++++++++++++++++++++++++++++- src/rdkafka.h | 11 +- src/rdkafka_broker.c | 11 ++ src/rdkafka_broker.h | 6 + src/rdkafka_cgrp.h | 5 + src/rdkafka_int.h | 4 + src/rdkafka_op.c | 13 +- src/rdkafka_op.h | 15 +++ 9 files changed, 359 insertions(+), 12 deletions(-) diff --git a/examples/consumer.c b/examples/consumer.c index 8f33edd69c..04ff0e43c7 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -165,8 +165,8 @@ int main(int argc, char **argv) { } - if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr, - sizeof(errstr)) != RD_KAFKA_CONF_OK) { + if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; @@ -237,9 +237,20 @@ int main(int argc, char **argv) { * Start polling for messages. */ while (run) { - rd_kafka_message_t *rkm; + rd_kafka_message_t *rkm = NULL; + + // rkm = rd_kafka_consumer_poll(rk, 100); + rd_kafka_error_t *error; + + // fprintf(stderr, "Calling consume_batch\n"); + error = rd_kafka_share_consume_batch(rk, 1000, NULL, NULL); + if (error) { + fprintf(stderr, "%% Consume error: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + continue; + } - rkm = rd_kafka_consumer_poll(rk, 100); if (!rkm) continue; /* Timeout: no message within 100ms, * try again. This short timeout allows diff --git a/src/rdkafka.c b/src/rdkafka.c index 0904bb77ac..a040da0411 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2839,8 +2839,9 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, return NULL; } -rd_kafka_t *rd_kafka_share_consumer_new( - rd_kafka_conf_t *conf, char *errstr, size_t errstr_size) { +rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, + char *errstr, + size_t errstr_size) { rd_kafka_t *rk; char errstr_internal[512]; rd_kafka_conf_res_t res; @@ -2853,7 +2854,7 @@ rd_kafka_t *rd_kafka_share_consumer_new( } res = rd_kafka_conf_set(conf, "share.consumer", "true", errstr_internal, - sizeof(errstr_internal)); + sizeof(errstr_internal)); if (res != RD_KAFKA_CONF_OK) { rd_snprintf(errstr, errstr_size, "rd_kafka_share_consumer_new(): " @@ -2863,8 +2864,8 @@ rd_kafka_t *rd_kafka_share_consumer_new( } - res = rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr_internal, - sizeof(errstr_internal)); + res = rd_kafka_conf_set(conf, "group.protocol", "consumer", + errstr_internal, sizeof(errstr_internal)); if (res != RD_KAFKA_CONF_OK) { rd_snprintf(errstr, errstr_size, "rd_kafka_share_consumer_new(): " @@ -2882,6 +2883,271 @@ rd_kafka_t *rd_kafka_share_consumer_new( return rk; } + +/** + * @locality main thread + * @locks none + */ +static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, + rd_kafka_cgrp_t *rkcg) { + rd_kafka_broker_t *selected_rkb = NULL; + rd_kafka_topic_partition_list_t *partitions = + rkcg->rkcg_current_assignment; + int32_t broker_id = -1; + size_t i; + rd_kafka_topic_partition_t *partition; + + + if (!partitions || partitions->cnt == 0) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "No partitions assigned to consumer, " + "cannot select broker for share fetch"); + return NULL; + } + + /* Look through all partitions in order, find the first one which + * has a leader. */ + rd_kafka_dbg(rk, CGRP, "SHARE", + "Selecting broker for share fetch from %d assigned " + "partitions, offset = %d", + partitions->cnt, rkcg->rkcg_share.last_partition_picked); + + for (i = 0; i < partitions->cnt && !selected_rkb; i++) { + rd_kafka_toppar_t *rktp; + rkcg->rkcg_share.last_partition_picked += 1; + if (rkcg->rkcg_share.last_partition_picked >= partitions->cnt) + rkcg->rkcg_share.last_partition_picked = 0; + partition = + &partitions->elems[rkcg->rkcg_share.last_partition_picked]; + + rktp = rd_kafka_toppar_get2(rk, partition->topic, + partition->partition, 0, 1); + + /* Criteria to choose a broker: + * 1. It should be the leader of a partition. + * 2. A share-fetch op must not already be enqueued on it. */ + if (rktp->rktp_leader) { + /* TODO: We're only going to access + * rkb_share_fetch_enqueued from the main thread, except + * when it's being calloc'd and destroyed. Is it safe to + * access it without a lock? */ + rd_kafka_broker_lock(rktp->rktp_leader); + if (!rktp->rktp_leader->rkb_share_fetch_enqueued) { + rd_kafka_broker_keep(rktp->rktp_leader); + selected_rkb = rktp->rktp_leader; + } + rd_kafka_broker_unlock(rktp->rktp_leader); + } + + rd_kafka_toppar_destroy(rktp); + } + + rd_kafka_dbg(rk, CGRP, "SHARE", "Selected broker %s for share fetch", + selected_rkb ? rd_kafka_broker_name(selected_rkb) + : "none"); + return selected_rkb; +} + +rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, + rd_kafka_op_t *rko) { + rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); + rd_kafka_dbg( + rk, TOPIC, "SHAREFETCH", + "Fetch share reply: %s, should_fetch=%d, broker=%s", + rd_kafka_err2str(rko->rko_err), rko->rko_u.share_fetch.should_fetch, + rko->rko_u.share_fetch.target_broker + ? rd_kafka_broker_name(rko->rko_u.share_fetch.target_broker) + : "none"); + + rd_kafka_broker_lock(rko->rko_u.share_fetch.target_broker); + rko->rko_u.share_fetch.target_broker->rkb_share_fetch_enqueued = + rd_false; + rd_kafka_broker_unlock(rko->rko_u.share_fetch.target_broker); + + if (rko->rko_error) { + rd_kafka_dbg(rk, CGRP, "SHARE", "Share fetch failed: %s", + rd_kafka_error_string(rko->rko_error)); + /* Retry fetching if there is any amount of time left. */ + // if (rko->rko_u.share_fetch.should_fetch && + // rd_timeout_remains(rko->rko_u.share_fetch.abs_timeout)) { + + // } + } + + return RD_KAFKA_OP_RES_HANDLED; +} +/** + * @locality main thread + */ +rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_broker_t *rkb, *selected_rkb = NULL; + rd_kafka_error_t *error; + rd_kafka_cgrp_t *rkcg = rd_kafka_cgrp_get(rk); + rd_kafka_op_t *reply_rko; + + /* We should never have enqueued this op if we didn't have the rkcg */ + rd_assert(rkcg); + + if (!(selected_rkb = rd_kafka_share_select_broker(rk, rkcg))) { + error = + rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, + "rd_kafka_share_consume_batch(): " + "No broker available for share fetch"); + + reply_rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH_FANOUT | + RD_KAFKA_OP_REPLY); + reply_rko->rko_error = error; + rd_kafka_replyq_enq(&rko->rko_replyq, reply_rko, 0); + return RD_KAFKA_OP_RES_HANDLED; + } + + /* Issue fetch requests to all brokers */ + rd_kafka_dbg(rk, CGRP, "SHARE", + "Selected broker %s for share fetch, issuing " + "fetch requests to all %d brokers", + rd_kafka_broker_name(selected_rkb), + rd_atomic32_get(&rk->rk_broker_cnt)); + + rd_kafka_rdlock(rk); + TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { + rd_kafka_op_t *rko; + + if (rd_kafka_broker_or_instance_terminating(rkb) || + RD_KAFKA_BROKER_IS_LOGICAL(rkb)) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "Skipping broker %s for share fetch: " + "terminating or logical", + rd_kafka_broker_name(rkb)); + continue; + } + + rd_kafka_broker_lock(rkb); + if (rkb->rkb_share_fetch_enqueued) { + rd_kafka_broker_unlock(rkb); + rd_kafka_dbg(rk, CGRP, "SHARE", + "Unable to enqueue op on broker %s " + "because another op is already pending.", + rd_kafka_broker_name(rkb)); + continue; + } + rkb->rkb_share_fetch_enqueued = rd_true; + rd_kafka_broker_unlock(rkb); + + rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); + rko->rko_u.share_fetch.abs_timeout = + rko->rko_u.share_fetch_fanout.abs_timeout; + rko->rko_u.share_fetch.should_fetch = (rkb == selected_rkb); + rd_kafka_broker_keep(rkb); + rko->rko_u.share_fetch.target_broker = rkb; + rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); + + rd_kafka_dbg(rk, CGRP, "SHAREFETCH", + "Enqueuing share fetch op on broker %s " + "(%sfetch)", + rd_kafka_broker_name(rkb), + rko->rko_u.share_fetch.should_fetch ? "" : "no-"); + rd_kafka_q_enq(rkb->rkb_ops, rko); + } + rd_kafka_rdunlock(rk); + + RD_IF_FREE(selected_rkb, rd_kafka_broker_destroy); + + return RD_KAFKA_OP_RES_HANDLED; +} + +rd_kafka_error_t * +rd_kafka_share_consume_batch(rd_kafka_t *rk, + int timeout_ms, + rd_kafka_message_t ***rkmessages /* out */, + size_t *rkmessages_size /* out */) { + rd_kafka_cgrp_t *rkcg; + rd_kafka_q_t *rkq; + rd_kafka_op_t *rko; + rd_ts_t now = rd_clock(); + rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms); + int cnt = 0; + + if (!RD_KAFKA_IS_SHARE_CONSUMER(rk)) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, + "rd_kafka_share_consume_batch(): " + "rk is not a shared consumer"); + + // Steps: + // 0. Check if we're a part of consumer group yet or not + // 1. Check if there's anything still in the fetch queue - and if so, + // don't issue fetch ops + // 2. If not, select broker for share fetch and issue fetch ops to all + // brokers + // 3. Wait on consumer group queue for either messages or errors + + if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, + "rd_kafka_share_consume_batch(): " + "Consumer group not initialized"); + + rd_kafka_app_poll_start(rk, rkcg->rkcg_q, now, timeout_ms); + + /* If we have any pending items on the consumer queue, don't issue new + * requests, rather, deal with them first. */ + if (likely(rd_kafka_q_len(rkcg->rkcg_q) == 0)) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "Issuing share fetch fanout to main thread"); + rd_kafka_op_t *fanout_rko = + rd_kafka_op_new_cb(rk, RD_KAFKA_OP_SHARE_FETCH_FANOUT, + rd_kafka_share_fetch_fanout_op); + fanout_rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; + fanout_rko->rko_u.share_fetch_fanout.is_retry = rd_false; + fanout_rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); + + rd_kafka_q_enq(rk->rk_ops, fanout_rko); + } + + while ((rko = rd_kafka_q_pop(rkcg->rkcg_q, + rd_timeout_remains_us(abs_timeout), 0))) { + rd_kafka_op_res_t res; + + cnt++; + res = rd_kafka_poll_cb(rk, rkcg->rkcg_q, rko, + RD_KAFKA_Q_CB_RETURN, NULL); + /* Ignore anything that's not PASS or YIELD, as it's handled + * already. */ + if (res == RD_KAFKA_OP_RES_YIELD) { + // TODO: Finish handling yields. + break; + } + + if (res != RD_KAFKA_OP_RES_PASS) + continue; + + switch (rko->rko_type) { + case RD_KAFKA_OP_FETCH: + /* Messages - for now, ignore. */ + rd_kafka_dbg( + rk, CGRP, "SHARE", + "Ignoring msg at offset %d from share fetch", + rd_kafka_message_get(rko)->offset); + rd_kafka_op_destroy(rko); + break; + + default: + rd_kafka_log(rk, LOG_WARNING, "SHARE", + "Ignoring unexpected op %s in " + "rd_kafka_share_consume_batch()", + rd_kafka_op2str(rko->rko_type)); + rd_kafka_op_destroy(rko); + break; + } + } + + rd_kafka_dbg(rk, CGRP, "SHARE", + "Returning from share consume batch after %d ops", cnt); + rd_kafka_app_polled(rk, rkcg->rkcg_q); + + return NULL; +} + /** * Schedules a rebootstrap of the cluster immediately. * @@ -4331,6 +4597,17 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, res = rd_kafka_metadata_update_op(rk, rko->rko_u.metadata.mdi); break; + case RD_KAFKA_OP_SHARE_FETCH | RD_KAFKA_OP_REPLY: + res = rd_kafka_share_fetch_reply_op(rk, rko); + break; + + case RD_KAFKA_OP_SHARE_FETCH_FANOUT | RD_KAFKA_OP_REPLY: + rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); + rd_kafka_dbg(rk, TOPIC, "SHAREFETCH", + "Fetch share fanout reply: %s", + rd_kafka_err2str(rko->rko_err)); + break; + default: /* If op has a callback set (e.g., OAUTHBEARER_REFRESH), * call it. */ diff --git a/src/rdkafka.h b/src/rdkafka.h index e137c01804..a214bf2d5c 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -3058,7 +3058,16 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, RD_EXPORT -rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, char *errstr, size_t errstr_size); +rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, + char *errstr, + size_t errstr_size); + +RD_EXPORT +rd_kafka_error_t * +rd_kafka_share_consume_batch(rd_kafka_t *rk, + int timeout_ms, + rd_kafka_message_t ***rkmessages /* out */, + size_t *rkmessages_size /* out */); /** * @brief Destroy Kafka handle. diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index eb8e849240..5f7f8d59f4 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3553,6 +3553,17 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { wakeup = rd_true; break; + case RD_KAFKA_OP_SHARE_FETCH: + rd_rkb_dbg(rkb, CGRP, "SHAREFETCH", + "Received SHARE_FETCH op for broker %s with " + "should_fetch = %d", + rd_kafka_broker_name(rkb), + rko->rko_u.share_fetch.should_fetch); + /* This is only temporary handling for testing to avoid crashing + * on assert - the code below will automatically enqueue a + * reply which is not the final behaviour. */ + break; + default: rd_kafka_assert(rkb->rkb_rk, !*"unhandled op type"); break; diff --git a/src/rdkafka_broker.h b/src/rdkafka_broker.h index a649b7445e..b8f7cb6e21 100644 --- a/src/rdkafka_broker.h +++ b/src/rdkafka_broker.h @@ -387,6 +387,12 @@ struct rd_kafka_broker_s { /* rd_kafka_broker_t */ /** > 0 if this broker thread is terminating */ rd_atomic32_t termination_in_progress; + + /** + * Whether a share fetch should_fetch set is enqueued on + * this broker's op queue or not. + */ + rd_bool_t rkb_share_fetch_enqueued; }; #define rd_kafka_broker_keep(rkb) rd_refcnt_add(&(rkb)->rkb_refcnt) diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 79a734f5fb..020fbb2df6 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -377,6 +377,11 @@ typedef struct rd_kafka_cgrp_s { /* Timestamp of last rebalance start */ rd_ts_t rkcg_ts_rebalance_start; + struct { + size_t last_partition_picked; /* For round-robin + * partition picking */ + + } rkcg_share; } rd_kafka_cgrp_t; diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 9e97bf9dca..ed2a0d6bce 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -1259,4 +1259,8 @@ int rd_kafka_rebootstrap_tmr_stop(rd_kafka_t *rk); void rd_kafka_reset_any_broker_down_reported(rd_kafka_t *rk); +rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko); + #endif /* _RDKAFKA_INT_H_ */ diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index db325b11c3..f5a053abe2 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -122,8 +122,9 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:RD_KAFKA_OP_SET_TELEMETRY_BROKER", [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", - [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", - [RD_KAFKA_OP_SHARE_FETCH] = "REPLY:SHARE_FETCH", + [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", + [RD_KAFKA_OP_SHARE_FETCH] = "REPLY:SHARE_FETCH", + [RD_KAFKA_OP_SHARE_FETCH_FANOUT] = "REPLY:SHARE_FETCH_FANOUT", }; if (type & RD_KAFKA_OP_REPLY) @@ -289,6 +290,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_SHARE_FETCH] = sizeof(rko->rko_u.share_fetch), + [RD_KAFKA_OP_SHARE_FETCH_FANOUT] = + sizeof(rko->rko_u.share_fetch_fanout), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -511,6 +514,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_SHARE_FETCH: /* TODO KIP-932: Add destruction code. */ + RD_IF_FREE(rko->rko_u.share_fetch.target_broker, + rd_kafka_broker_destroy); + break; + + case RD_KAFKA_OP_SHARE_FETCH_FANOUT: + /* No dynamic resources to clean up */ break; default: diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 58a248aaf1..1cf70e3197 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -191,6 +191,8 @@ typedef enum { * u.admin_request */ RD_KAFKA_OP_SHARE_FETCH, /**< broker op: Issue share fetch request if applicable. */ + RD_KAFKA_OP_SHARE_FETCH_FANOUT, /**< fanout share fetch operation */ + RD_KAFKA_OP__END } rd_kafka_op_type_t; @@ -734,8 +736,21 @@ struct rd_kafka_op_s { /** Absolute timeout left to complete this share-fetch. */ rd_ts_t abs_timeout; + + /** Target broker to which op is sent. */ + rd_kafka_broker_t *target_broker; } share_fetch; + struct { + /** Absolute timeout for share fetch fanout operation. + */ + rd_ts_t abs_timeout; + + /** Is this a retry, or the first attempt of this poll? + * Retries only have should_fetch=true request sent. */ + rd_bool_t is_retry; + } share_fetch_fanout; + } rko_u; }; From 8cfb42832fdeaa475906687cc24dad9cfca2ffe8 Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 17 Oct 2025 14:58:50 +0000 Subject: [PATCH 2/7] Start returning messages to user --- examples/consumer.c | 76 ++++++++++++++++++++++----------------------- src/rdkafka.c | 65 +++++++++----------------------------- src/rdkafka.h | 11 ++++++- 3 files changed, 63 insertions(+), 89 deletions(-) diff --git a/examples/consumer.c b/examples/consumer.c index 04ff0e43c7..c1abc50e61 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -236,14 +236,19 @@ int main(int argc, char **argv) { * since a rebalance may happen at any time. * Start polling for messages. */ + rd_kafka_message_t **rkmessages = + (rd_kafka_message_t **)malloc(sizeof(rd_kafka_message_t *) * 100); while (run) { rd_kafka_message_t *rkm = NULL; + size_t rcvd_msgs = 0; + int i; // rkm = rd_kafka_consumer_poll(rk, 100); rd_kafka_error_t *error; // fprintf(stderr, "Calling consume_batch\n"); - error = rd_kafka_share_consume_batch(rk, 1000, NULL, NULL); + error = rd_kafka_share_consume_batch(rk, 1000, rkmessages, + &rcvd_msgs); if (error) { fprintf(stderr, "%% Consume error: %s\n", rd_kafka_error_string(error)); @@ -251,45 +256,40 @@ int main(int argc, char **argv) { continue; } - if (!rkm) - continue; /* Timeout: no message within 100ms, - * try again. This short timeout allows - * checking for `run` at frequent intervals. - */ - - /* consumer_poll() will return either a proper message - * or a consumer error (rkm->err is set). */ - if (rkm->err) { - /* Consumer errors are generally to be considered - * informational as the consumer will automatically - * try to recover from all types of errors. */ - fprintf(stderr, "%% Consumer error: %s\n", - rd_kafka_message_errstr(rkm)); + fprintf(stderr, "%% Received %zu messages\n", rcvd_msgs); + for (i = 0; i < (int)rcvd_msgs; i++) { + rkm = rkmessages[i]; + + if (rkm->err) { + fprintf(stderr, "%% Consumer error: %d: %s\n", + rkm->err, rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + continue; + } + + /* Proper message. */ + printf("Message on %s [%" PRId32 "] at offset %" PRId64 + " (leader epoch %" PRId32 "):\n", + rd_kafka_topic_name(rkm->rkt), rkm->partition, + rkm->offset, rd_kafka_message_leader_epoch(rkm)); + + /* Print the message key. */ + if (rkm->key && is_printable(rkm->key, rkm->key_len)) + printf(" Key: %.*s\n", (int)rkm->key_len, + (const char *)rkm->key); + else if (rkm->key) + printf(" Key: (%d bytes)\n", (int)rkm->key_len); + + /* Print the message value/payload. */ + if (rkm->payload && + is_printable(rkm->payload, rkm->len)) + printf(" Value: %.*s\n", (int)rkm->len, + (const char *)rkm->payload); + else if (rkm->payload) + printf(" Value: (%d bytes)\n", (int)rkm->len); + rd_kafka_message_destroy(rkm); - continue; } - - /* Proper message. */ - printf("Message on %s [%" PRId32 "] at offset %" PRId64 - " (leader epoch %" PRId32 "):\n", - rd_kafka_topic_name(rkm->rkt), rkm->partition, - rkm->offset, rd_kafka_message_leader_epoch(rkm)); - - /* Print the message key. */ - if (rkm->key && is_printable(rkm->key, rkm->key_len)) - printf(" Key: %.*s\n", (int)rkm->key_len, - (const char *)rkm->key); - else if (rkm->key) - printf(" Key: (%d bytes)\n", (int)rkm->key_len); - - /* Print the message value/payload. */ - if (rkm->payload && is_printable(rkm->payload, rkm->len)) - printf(" Value: %.*s\n", (int)rkm->len, - (const char *)rkm->payload); - else if (rkm->payload) - printf(" Value: (%d bytes)\n", (int)rkm->len); - - rd_kafka_message_destroy(rkm); } diff --git a/src/rdkafka.c b/src/rdkafka.c index a040da0411..0c47379d70 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2893,6 +2893,7 @@ static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, rd_kafka_broker_t *selected_rkb = NULL; rd_kafka_topic_partition_list_t *partitions = rkcg->rkcg_current_assignment; + // rkcg->rkcg_toppars; /* TODO: use rkcg->rkcg_toppars instead. */ int32_t broker_id = -1; size_t i; rd_kafka_topic_partition_t *partition; @@ -2959,10 +2960,8 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, ? rd_kafka_broker_name(rko->rko_u.share_fetch.target_broker) : "none"); - rd_kafka_broker_lock(rko->rko_u.share_fetch.target_broker); rko->rko_u.share_fetch.target_broker->rkb_share_fetch_enqueued = rd_false; - rd_kafka_broker_unlock(rko->rko_u.share_fetch.target_broker); if (rko->rko_error) { rd_kafka_dbg(rk, CGRP, "SHARE", "Share fetch failed: %s", @@ -3023,7 +3022,6 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, continue; } - rd_kafka_broker_lock(rkb); if (rkb->rkb_share_fetch_enqueued) { rd_kafka_broker_unlock(rkb); rd_kafka_dbg(rk, CGRP, "SHARE", @@ -3033,7 +3031,6 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, continue; } rkb->rkb_share_fetch_enqueued = rd_true; - rd_kafka_broker_unlock(rkb); rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); rko->rko_u.share_fetch.abs_timeout = @@ -3060,14 +3057,15 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, rd_kafka_error_t * rd_kafka_share_consume_batch(rd_kafka_t *rk, int timeout_ms, - rd_kafka_message_t ***rkmessages /* out */, + rd_kafka_message_t **rkmessages /* out */, size_t *rkmessages_size /* out */) { rd_kafka_cgrp_t *rkcg; rd_kafka_q_t *rkq; rd_kafka_op_t *rko; - rd_ts_t now = rd_clock(); - rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms); - int cnt = 0; + rd_ts_t now = rd_clock(); + rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms); + int cnt = 0; + const size_t max_poll_records = 100; /* TODO: change. */ if (!RD_KAFKA_IS_SHARE_CONSUMER(rk)) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, @@ -3087,8 +3085,6 @@ rd_kafka_share_consume_batch(rd_kafka_t *rk, "rd_kafka_share_consume_batch(): " "Consumer group not initialized"); - rd_kafka_app_poll_start(rk, rkcg->rkcg_q, now, timeout_ms); - /* If we have any pending items on the consumer queue, don't issue new * requests, rather, deal with them first. */ if (likely(rd_kafka_q_len(rkcg->rkcg_q) == 0)) { @@ -3104,46 +3100,15 @@ rd_kafka_share_consume_batch(rd_kafka_t *rk, rd_kafka_q_enq(rk->rk_ops, fanout_rko); } - while ((rko = rd_kafka_q_pop(rkcg->rkcg_q, - rd_timeout_remains_us(abs_timeout), 0))) { - rd_kafka_op_res_t res; - - cnt++; - res = rd_kafka_poll_cb(rk, rkcg->rkcg_q, rko, - RD_KAFKA_Q_CB_RETURN, NULL); - /* Ignore anything that's not PASS or YIELD, as it's handled - * already. */ - if (res == RD_KAFKA_OP_RES_YIELD) { - // TODO: Finish handling yields. - break; - } - - if (res != RD_KAFKA_OP_RES_PASS) - continue; - - switch (rko->rko_type) { - case RD_KAFKA_OP_FETCH: - /* Messages - for now, ignore. */ - rd_kafka_dbg( - rk, CGRP, "SHARE", - "Ignoring msg at offset %d from share fetch", - rd_kafka_message_get(rko)->offset); - rd_kafka_op_destroy(rko); - break; - - default: - rd_kafka_log(rk, LOG_WARNING, "SHARE", - "Ignoring unexpected op %s in " - "rd_kafka_share_consume_batch()", - rd_kafka_op2str(rko->rko_type)); - rd_kafka_op_destroy(rko); - break; - } - } - - rd_kafka_dbg(rk, CGRP, "SHARE", - "Returning from share consume batch after %d ops", cnt); - rd_kafka_app_polled(rk, rkcg->rkcg_q); + /* At this point, there's no reason to deviate from what we already do + * for returning multiple messages to the user, as the orchestration + * is handled by the main thread. Later on, we needed, we might need + * a custom loop if we need any changes. */ + *rkmessages_size = rd_kafka_q_serve_rkmessages( + rkcg->rkcg_q, timeout_ms, /* Use this timeout directly as prior + operations aren't blocking, so no need to + re-convert the abs_timeout into a relative one*/ + rkmessages, max_poll_records); return NULL; } diff --git a/src/rdkafka.h b/src/rdkafka.h index a214bf2d5c..42f8bfdb9c 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -3062,11 +3062,20 @@ rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, char *errstr, size_t errstr_size); +/** + * @brief Consume a batch of messages from the share consumer instance. + * + * @param rk Share consumer instance. + * @param timeout_ms Maximum time to block waiting for messages. + * @param rkmessages Output array of messages - this must be preallocated with + * at least enough capacity for size max.poll.records. + * @param rkmessages_size Output number of messages returned in rkmessages. + */ RD_EXPORT rd_kafka_error_t * rd_kafka_share_consume_batch(rd_kafka_t *rk, int timeout_ms, - rd_kafka_message_t ***rkmessages /* out */, + rd_kafka_message_t **rkmessages /* out */, size_t *rkmessages_size /* out */); /** From e6b512a9e645c11e4f370db5132c400315e79873 Mon Sep 17 00:00:00 2001 From: Milind L Date: Wed, 22 Oct 2025 04:57:12 +0000 Subject: [PATCH 3/7] Add retry for share fetches --- src/rdkafka.c | 126 +++++++++++++++++++++++++++++-------------- src/rdkafka_broker.c | 11 ++++ src/rdkafka_op.c | 3 +- 3 files changed, 98 insertions(+), 42 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 0c47379d70..acb9351dd5 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2950,31 +2950,83 @@ static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, } rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, - rd_kafka_op_t *rko) { + rd_kafka_op_t *rko_orig) { rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); - rd_kafka_dbg( - rk, TOPIC, "SHAREFETCH", - "Fetch share reply: %s, should_fetch=%d, broker=%s", - rd_kafka_err2str(rko->rko_err), rko->rko_u.share_fetch.should_fetch, - rko->rko_u.share_fetch.target_broker - ? rd_kafka_broker_name(rko->rko_u.share_fetch.target_broker) - : "none"); - - rko->rko_u.share_fetch.target_broker->rkb_share_fetch_enqueued = + rd_kafka_dbg(rk, CGRP, "SHAREFETCH", + "Fetch share reply: %s, should_fetch=%d, broker=%s", + rd_kafka_err2str(rko_orig->rko_err), + rko_orig->rko_u.share_fetch.should_fetch, + rko_orig->rko_u.share_fetch.target_broker + ? rd_kafka_broker_name( + rko_orig->rko_u.share_fetch.target_broker) + : "none"); + + rko_orig->rko_u.share_fetch.target_broker->rkb_share_fetch_enqueued = rd_false; - if (rko->rko_error) { - rd_kafka_dbg(rk, CGRP, "SHARE", "Share fetch failed: %s", - rd_kafka_error_string(rko->rko_error)); - /* Retry fetching if there is any amount of time left. */ - // if (rko->rko_u.share_fetch.should_fetch && - // rd_timeout_remains(rko->rko_u.share_fetch.abs_timeout)) { - - // } + if (rko_orig->rko_error || rko_orig->rko_err) { + rd_kafka_dbg( + rk, CGRP, "SHARE", + "Share fetch failed: %s, timeout remains: %d, should-fetch " + "= %d", + rko_orig->rko_error + ? rd_kafka_error_name(rko_orig->rko_error) + : rd_kafka_err2name(rko_orig->rko_err), + rd_timeout_remains(rko_orig->rko_u.share_fetch.abs_timeout), + rko_orig->rko_u.share_fetch.should_fetch); + + /* Retry fetching if there is any amount of time left. For other + * errors, let them drop and be retried on the next poll. */ + /* TODO: KIP-932: Are there errors for which we should not + * retry, and rather propagate those errors to user? In that + * case, we must write to the rkcg queue. */ + if (rko_orig->rko_u.share_fetch.should_fetch && + rd_timeout_remains( + rko_orig->rko_u.share_fetch.abs_timeout)) { + + rd_kafka_op_t *rko = + rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); + rd_kafka_broker_t *rkb = + rko_orig->rko_u.share_fetch.target_broker; + rko->rko_u.share_fetch.abs_timeout = + rko_orig->rko_u.share_fetch.abs_timeout; + rko->rko_u.share_fetch.should_fetch = rd_true; + rd_kafka_broker_keep(rkb); + rko->rko_u.share_fetch.target_broker = rkb; + rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); + + rd_kafka_dbg( + rk, CGRP, "SHAREFETCH", + "Retrying share fetch op on broker %s after eror" + "(%s fetch)", + rd_kafka_broker_name(rkb), + rko->rko_u.share_fetch.should_fetch ? "should" : "should not"); + + /* TODO: KIP-932: Are there errors where we should back + * this off by running it on a timer?*/ + rd_kafka_q_enq(rkb->rkb_ops, rko); + } } return RD_KAFKA_OP_RES_HANDLED; } + +rd_kafka_op_res_t +rd_kafka_share_fetch_fanout_reply_op(rd_kafka_t *rk, rd_kafka_op_t *rko_orig) { + rd_kafka_message_t *rkm; + rd_kafka_op_t *rko; + if (!rko_orig->rko_err && !rko_orig->rko_error) + return RD_KAFKA_OP_RES_HANDLED; + + /* TODO: KIP-932: Add error handling - either retries, or user-level + * propagation, later. */ + rd_kafka_dbg( + rk, CGRP, "SHARE", "Encountered error in SHARE_FETCH_FANOUT: %s", + rko_orig->rko_err ? rd_kafka_err2name(rko_orig->rko_err) + : rd_kafka_error_name(rko_orig->rko_error)); + return RD_KAFKA_OP_RES_HANDLED; +} + /** * @locality main thread */ @@ -3011,7 +3063,7 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, rd_kafka_rdlock(rk); TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { - rd_kafka_op_t *rko; + rd_kafka_op_t *rko_sf; if (rd_kafka_broker_or_instance_terminating(rkb) || RD_KAFKA_BROKER_IS_LOGICAL(rkb)) { @@ -3032,20 +3084,22 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, } rkb->rkb_share_fetch_enqueued = rd_true; - rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); - rko->rko_u.share_fetch.abs_timeout = + rko_sf = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); + rko_sf->rko_u.share_fetch.abs_timeout = rko->rko_u.share_fetch_fanout.abs_timeout; - rko->rko_u.share_fetch.should_fetch = (rkb == selected_rkb); + rko_sf->rko_u.share_fetch.should_fetch = (rkb == selected_rkb); rd_kafka_broker_keep(rkb); - rko->rko_u.share_fetch.target_broker = rkb; - rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); + rko_sf->rko_u.share_fetch.target_broker = rkb; + rko_sf->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); rd_kafka_dbg(rk, CGRP, "SHAREFETCH", "Enqueuing share fetch op on broker %s " - "(%sfetch)", + "(%s fetch)", rd_kafka_broker_name(rkb), - rko->rko_u.share_fetch.should_fetch ? "" : "no-"); - rd_kafka_q_enq(rkb->rkb_ops, rko); + rko_sf->rko_u.share_fetch.should_fetch + ? "should" + : "should not"); + rd_kafka_q_enq(rkb->rkb_ops, rko_sf); } rd_kafka_rdunlock(rk); @@ -3072,14 +3126,6 @@ rd_kafka_share_consume_batch(rd_kafka_t *rk, "rd_kafka_share_consume_batch(): " "rk is not a shared consumer"); - // Steps: - // 0. Check if we're a part of consumer group yet or not - // 1. Check if there's anything still in the fetch queue - and if so, - // don't issue fetch ops - // 2. If not, select broker for share fetch and issue fetch ops to all - // brokers - // 3. Wait on consumer group queue for either messages or errors - if (unlikely(!(rkcg = rd_kafka_cgrp_get(rk)))) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, "rd_kafka_share_consume_batch(): " @@ -3089,7 +3135,9 @@ rd_kafka_share_consume_batch(rd_kafka_t *rk, * requests, rather, deal with them first. */ if (likely(rd_kafka_q_len(rkcg->rkcg_q) == 0)) { rd_kafka_dbg(rk, CGRP, "SHARE", - "Issuing share fetch fanout to main thread"); + "Issuing share fetch fanout to main thread with " + "abs_timeout = %lld and now=%lld", + abs_timeout, now); rd_kafka_op_t *fanout_rko = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_SHARE_FETCH_FANOUT, rd_kafka_share_fetch_fanout_op); @@ -3107,7 +3155,7 @@ rd_kafka_share_consume_batch(rd_kafka_t *rk, *rkmessages_size = rd_kafka_q_serve_rkmessages( rkcg->rkcg_q, timeout_ms, /* Use this timeout directly as prior operations aren't blocking, so no need to - re-convert the abs_timeout into a relative one*/ + re-convert the abs_timeout into a relative one.*/ rkmessages, max_poll_records); return NULL; @@ -4568,9 +4616,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, case RD_KAFKA_OP_SHARE_FETCH_FANOUT | RD_KAFKA_OP_REPLY: rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); - rd_kafka_dbg(rk, TOPIC, "SHAREFETCH", - "Fetch share fanout reply: %s", - rd_kafka_err2str(rko->rko_err)); + rd_kafka_share_fetch_fanout_reply_op(rk, rko); break; default: diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 5f7f8d59f4..e130da7e96 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3562,6 +3562,17 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { /* This is only temporary handling for testing to avoid crashing * on assert - the code below will automatically enqueue a * reply which is not the final behaviour. */ + /* Insert errors randomly for testing, remove this code once + * actual errors can be tested via the mock broker. */ + // if (rd_jitter(0, 10) > 7) { + // rd_rkb_dbg(rkb, CGRP, "SHAREFETCH", + // "Injecting error! %s : %d", + // rd_kafka_broker_name(rkb), + // rko->rko_u.share_fetch.should_fetch); + + // rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__STATE); + // rko = NULL; + // } break; default: diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index f5a053abe2..e15db3f35b 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -513,13 +513,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { break; case RD_KAFKA_OP_SHARE_FETCH: - /* TODO KIP-932: Add destruction code. */ RD_IF_FREE(rko->rko_u.share_fetch.target_broker, rd_kafka_broker_destroy); break; case RD_KAFKA_OP_SHARE_FETCH_FANOUT: - /* No dynamic resources to clean up */ + /* No heap-allocated resources to clean up */ break; default: From 98b5dfee1d71f1b76008626314823daafe6404c5 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 27 Oct 2025 05:53:43 +0000 Subject: [PATCH 4/7] Unify error handling and add records conf --- CONFIGURATION.md | 1 + examples/consumer.c | 2 +- src/rdkafka.c | 243 ++++++++++++++++++++++++++++---------------- src/rdkafka_cgrp.h | 7 +- src/rdkafka_conf.c | 6 +- src/rdkafka_conf.h | 5 +- src/rdkafka_int.h | 3 +- src/rdkafka_op.h | 4 - 8 files changed, 175 insertions(+), 96 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 40b7412efd..ddf795a9e2 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -154,6 +154,7 @@ offset_commit_cb | C | | enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.
*Type: boolean* check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage.
*Type: boolean* client.rack | * | | | low | A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config `broker.rack`.
*Type: string* +max.poll.records | C | 1 .. 2147483647 | 500 | low | tba description,
*Type: integer* transactional.id | P | | | high | Enables the transactional producer. The transactional.id is used to identify the same transactional producer instance across process restarts. It allows the producer to guarantee that transactions corresponding to earlier instances of the same producer have been finalized prior to starting any new transactions, and that any zombie instances are fenced off. If no transactional.id is provided, then the producer is limited to idempotent delivery (if enable.idempotence is set). Requires broker version >= 0.11.0.
*Type: string* transaction.timeout.ms | P | 1000 .. 2147483647 | 60000 | medium | The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the broker, the init_transactions() call will fail with ERR_INVALID_TRANSACTION_TIMEOUT. The transaction timeout automatically adjusts `message.timeout.ms` and `socket.timeout.ms`, unless explicitly configured in which case they must not exceed the transaction timeout (`socket.timeout.ms` must be at least 100ms lower than `transaction.timeout.ms`). This is also the default timeout value if no timeout (-1) is supplied to the transactional API methods.
*Type: integer* enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible.
*Type: boolean* diff --git a/examples/consumer.c b/examples/consumer.c index c1abc50e61..fea673b539 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -247,7 +247,7 @@ int main(int argc, char **argv) { rd_kafka_error_t *error; // fprintf(stderr, "Calling consume_batch\n"); - error = rd_kafka_share_consume_batch(rk, 1000, rkmessages, + error = rd_kafka_share_consume_batch(rk, 5000, rkmessages, &rcvd_msgs); if (error) { fprintf(stderr, "%% Consume error: %s\n", diff --git a/src/rdkafka.c b/src/rdkafka.c index acb9351dd5..0ac649da4a 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2894,11 +2894,9 @@ static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions = rkcg->rkcg_current_assignment; // rkcg->rkcg_toppars; /* TODO: use rkcg->rkcg_toppars instead. */ - int32_t broker_id = -1; size_t i; rd_kafka_topic_partition_t *partition; - if (!partitions || partitions->cnt == 0) { rd_kafka_dbg(rk, CGRP, "SHARE", "No partitions assigned to consumer, " @@ -2910,13 +2908,14 @@ static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, * has a leader. */ rd_kafka_dbg(rk, CGRP, "SHARE", "Selecting broker for share fetch from %d assigned " - "partitions, offset = %d", + "partitions, last picked index = %" PRIusz, partitions->cnt, rkcg->rkcg_share.last_partition_picked); - for (i = 0; i < partitions->cnt && !selected_rkb; i++) { + for (i = 0; i < (size_t)partitions->cnt; i++) { rd_kafka_toppar_t *rktp; rkcg->rkcg_share.last_partition_picked += 1; - if (rkcg->rkcg_share.last_partition_picked >= partitions->cnt) + if (rkcg->rkcg_share.last_partition_picked >= + (size_t)partitions->cnt) rkcg->rkcg_share.last_partition_picked = 0; partition = &partitions->elems[rkcg->rkcg_share.last_partition_picked]; @@ -2941,16 +2940,59 @@ static rd_kafka_broker_t *rd_kafka_share_select_broker(rd_kafka_t *rk, } rd_kafka_toppar_destroy(rktp); - } - rd_kafka_dbg(rk, CGRP, "SHARE", "Selected broker %s for share fetch", - selected_rkb ? rd_kafka_broker_name(selected_rkb) - : "none"); + if (selected_rkb) + break; + } return selected_rkb; } +/** + * @brief Timer callback for reenequeing SHARE_FETCH_FANOUT after a backoff. + * @locality main thread + * @locks none + */ +static void rd_kafka_share_fetch_fanout_renqueue(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_op_t *rko = arg; + rd_kafka_t *rk = rkts->rkts_rk; + + rd_kafka_dbg(rk, CGRP, "SHARE", "Re-enqueing SHARE_FETCH_FANOUT"); + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/** + * @brief Enqueue a SHARE_FETCH_FANOUT op on the main queue. + * @param backoff_ms If >0 the op will be enqueued after this many milliseconds. + * Else, it will be immediate. + * @locality any thread + */ +static void rd_kafka_share_fetch_fanout_with_backoff(rd_kafka_t *rk, + rd_ts_t abs_timeout, + int backoff_ms) { + rd_kafka_cgrp_t *rkcg = rd_kafka_cgrp_get(rk); + rd_kafka_op_t *rko = rd_kafka_op_new_cb( + rk, RD_KAFKA_OP_SHARE_FETCH_FANOUT, rd_kafka_share_fetch_fanout_op); + rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; + rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); + + if (backoff_ms > 0) + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rkcg->rkcg_share.share_fetch_fanout_tmr, + rd_true, backoff_ms * 1000, + rd_kafka_share_fetch_fanout_renqueue, rko); + else + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/** + * Handles RD_KAFKA_OP_SHARE_FETCH | RD_KAFKA_OP_REPLY. + * @locality main thread + */ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, rd_kafka_op_t *rko_orig) { + rd_kafka_resp_err_t err; + rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); rd_kafka_dbg(rk, CGRP, "SHAREFETCH", "Fetch share reply: %s, should_fetch=%d, broker=%s", @@ -2964,70 +3006,91 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, rko_orig->rko_u.share_fetch.target_broker->rkb_share_fetch_enqueued = rd_false; - if (rko_orig->rko_error || rko_orig->rko_err) { - rd_kafka_dbg( - rk, CGRP, "SHARE", - "Share fetch failed: %s, timeout remains: %d, should-fetch " - "= %d", - rko_orig->rko_error - ? rd_kafka_error_name(rko_orig->rko_error) - : rd_kafka_err2name(rko_orig->rko_err), - rd_timeout_remains(rko_orig->rko_u.share_fetch.abs_timeout), - rko_orig->rko_u.share_fetch.should_fetch); - - /* Retry fetching if there is any amount of time left. For other - * errors, let them drop and be retried on the next poll. */ - /* TODO: KIP-932: Are there errors for which we should not - * retry, and rather propagate those errors to user? In that - * case, we must write to the rkcg queue. */ - if (rko_orig->rko_u.share_fetch.should_fetch && - rd_timeout_remains( - rko_orig->rko_u.share_fetch.abs_timeout)) { - - rd_kafka_op_t *rko = - rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); - rd_kafka_broker_t *rkb = - rko_orig->rko_u.share_fetch.target_broker; - rko->rko_u.share_fetch.abs_timeout = - rko_orig->rko_u.share_fetch.abs_timeout; - rko->rko_u.share_fetch.should_fetch = rd_true; - rd_kafka_broker_keep(rkb); - rko->rko_u.share_fetch.target_broker = rkb; - rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); - - rd_kafka_dbg( - rk, CGRP, "SHAREFETCH", - "Retrying share fetch op on broker %s after eror" - "(%s fetch)", - rd_kafka_broker_name(rkb), - rko->rko_u.share_fetch.should_fetch ? "should" : "should not"); - - /* TODO: KIP-932: Are there errors where we should back - * this off by running it on a timer?*/ - rd_kafka_q_enq(rkb->rkb_ops, rko); - } + if (!rko_orig->rko_err && !rko_orig->rko_error) + return RD_KAFKA_OP_RES_HANDLED; + + /* Retry fetching if there is any amount of time left. For other + * errors, let them drop and be retried on the next poll. */ + /* TODO: KIP-932: Are there errors for which we should not + * retry, and rather propagate those errors to user? In that + * case, we must write to the rkcg queue. */ + if (!rko_orig->rko_u.share_fetch.should_fetch || + !rd_timeout_remains(rko_orig->rko_u.share_fetch.abs_timeout)) + return RD_KAFKA_OP_RES_HANDLED; + + if (rko_orig->rko_error) + err = rd_kafka_error_code(rko_orig->rko_error); + + switch (err) { + /* For some errors, don't resend at all. + * This list of errors is incomplete. */ + case RD_KAFKA_RESP_ERR__TIMED_OUT: + case RD_KAFKA_RESP_ERR__DESTROY: + case RD_KAFKA_RESP_ERR__UNKNOWN_GROUP: + case RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS: /* This should not happen. */ + case RD_KAFKA_RESP_ERR__STATE: + case RD_KAFKA_RESP_ERR__AUTHENTICATION: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: /* Do we need more + handling for fatal + errors? */ + break; + + /* For other cases, resend to separate broker after a backoff. The + * fanout op will back us off automatically if there is no broker + * available. */ + default: + rd_kafka_share_fetch_fanout_with_backoff( + rk, rko_orig->rko_u.share_fetch.abs_timeout, + 0 /* no backoff */); + break; } return RD_KAFKA_OP_RES_HANDLED; } +/** + * Handles RD_KAFKA_OP_SHARE_FETCH_FANOUT | RD_KAFKA_OP_REPLY. + * @locality main thread + */ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_reply_op(rd_kafka_t *rk, rd_kafka_op_t *rko_orig) { - rd_kafka_message_t *rkm; - rd_kafka_op_t *rko; + rd_kafka_resp_err_t err; + if (!rko_orig->rko_err && !rko_orig->rko_error) return RD_KAFKA_OP_RES_HANDLED; + err = rko_orig->rko_err; + if (rko_orig->rko_error) + err = rd_kafka_error_code(rko_orig->rko_error); + /* TODO: KIP-932: Add error handling - either retries, or user-level * propagation, later. */ rd_kafka_dbg( - rk, CGRP, "SHARE", "Encountered error in SHARE_FETCH_FANOUT: %s", - rko_orig->rko_err ? rd_kafka_err2name(rko_orig->rko_err) - : rd_kafka_error_name(rko_orig->rko_error)); + rk, CGRP, "SHARE", + "Encountered error in SHARE_FETCH_FANOUT: %s, remains: %d", + rd_kafka_err2name(err), + rd_timeout_remains(rko_orig->rko_u.share_fetch_fanout.abs_timeout)); + + switch (err) { + /* Some errors need not be retried. */ + case RD_KAFKA_RESP_ERR__DESTROY: + case RD_KAFKA_RESP_ERR__TIMED_OUT: + break; + + /* Some errors may be retried - with a constant backoff. */ + default: + rd_kafka_share_fetch_fanout_with_backoff( + rk, rko_orig->rko_u.share_fetch_fanout.abs_timeout, + /* TODO: KIP-932: Consider setting this to retry_backoff_ms + or to a constant.*/ + rk->rk_conf.retry_backoff_max_ms); + break; + } return RD_KAFKA_OP_RES_HANDLED; } /** + * Op callback for RD_KAFKA_OP_SHARE_FETCH_FANOUT. * @locality main thread */ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, @@ -3037,10 +3100,27 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, rd_kafka_error_t *error; rd_kafka_cgrp_t *rkcg = rd_kafka_cgrp_get(rk); rd_kafka_op_t *reply_rko; + rd_ts_t abs_timeout = rko->rko_u.share_fetch_fanout.abs_timeout; /* We should never have enqueued this op if we didn't have the rkcg */ rd_assert(rkcg); + if (rd_timeout_remains(abs_timeout) <= 0) { + rd_kafka_dbg(rk, CGRP, "SHARE", + "Not issuing SHARE_FETCH_FANOUT: timeout expired"); + + error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__TIMED_OUT, + "rd_kafka_share_fetch_fanout_op(): " + "Share fetch fanout op timed out"); + + reply_rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH_FANOUT | + RD_KAFKA_OP_REPLY); + reply_rko->rko_error = error; + reply_rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; + rd_kafka_replyq_enq(&rko->rko_replyq, reply_rko, 0); + return RD_KAFKA_OP_RES_HANDLED; + } + if (!(selected_rkb = rd_kafka_share_select_broker(rk, rkcg))) { error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, @@ -3049,17 +3129,17 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, reply_rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH_FANOUT | RD_KAFKA_OP_REPLY); - reply_rko->rko_error = error; + reply_rko->rko_error = error; + reply_rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; rd_kafka_replyq_enq(&rko->rko_replyq, reply_rko, 0); return RD_KAFKA_OP_RES_HANDLED; } /* Issue fetch requests to all brokers */ rd_kafka_dbg(rk, CGRP, "SHARE", - "Selected broker %s for share fetch, issuing " - "fetch requests to all %d brokers", - rd_kafka_broker_name(selected_rkb), - rd_atomic32_get(&rk->rk_broker_cnt)); + "Selected broker %s for fetching messages, issuing fetch " + "requests to necessary brokers", + rd_kafka_broker_name(selected_rkb)); rd_kafka_rdlock(rk); TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { @@ -3085,8 +3165,7 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, rkb->rkb_share_fetch_enqueued = rd_true; rko_sf = rd_kafka_op_new(RD_KAFKA_OP_SHARE_FETCH); - rko_sf->rko_u.share_fetch.abs_timeout = - rko->rko_u.share_fetch_fanout.abs_timeout; + rko_sf->rko_u.share_fetch.abs_timeout = abs_timeout; rko_sf->rko_u.share_fetch.should_fetch = (rkb == selected_rkb); rd_kafka_broker_keep(rkb); rko_sf->rko_u.share_fetch.target_broker = rkb; @@ -3108,18 +3187,18 @@ rd_kafka_op_res_t rd_kafka_share_fetch_fanout_op(rd_kafka_t *rk, return RD_KAFKA_OP_RES_HANDLED; } -rd_kafka_error_t * -rd_kafka_share_consume_batch(rd_kafka_t *rk, - int timeout_ms, - rd_kafka_message_t **rkmessages /* out */, - size_t *rkmessages_size /* out */) { +rd_kafka_error_t *rd_kafka_share_consume_batch( + rd_kafka_t *rk, + int timeout_ms, + /* There is some benefit to making this ***rkmessages and allocating it + within this function, but on the flipside this means that it will always + be allocated on the heap. */ + rd_kafka_message_t **rkmessages /* out */, + size_t *rkmessages_size /* out */) { rd_kafka_cgrp_t *rkcg; - rd_kafka_q_t *rkq; - rd_kafka_op_t *rko; - rd_ts_t now = rd_clock(); - rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms); - int cnt = 0; - const size_t max_poll_records = 100; /* TODO: change. */ + rd_ts_t now = rd_clock(); + rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms); + size_t max_poll_records = (size_t)rk->rk_conf.share.max_poll_records; if (!RD_KAFKA_IS_SHARE_CONSUMER(rk)) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG, @@ -3136,16 +3215,10 @@ rd_kafka_share_consume_batch(rd_kafka_t *rk, if (likely(rd_kafka_q_len(rkcg->rkcg_q) == 0)) { rd_kafka_dbg(rk, CGRP, "SHARE", "Issuing share fetch fanout to main thread with " - "abs_timeout = %lld and now=%lld", - abs_timeout, now); - rd_kafka_op_t *fanout_rko = - rd_kafka_op_new_cb(rk, RD_KAFKA_OP_SHARE_FETCH_FANOUT, - rd_kafka_share_fetch_fanout_op); - fanout_rko->rko_u.share_fetch_fanout.abs_timeout = abs_timeout; - fanout_rko->rko_u.share_fetch_fanout.is_retry = rd_false; - fanout_rko->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0); - - rd_kafka_q_enq(rk->rk_ops, fanout_rko); + "abs_timeout = %" PRId64, + abs_timeout); + rd_kafka_share_fetch_fanout_with_backoff(rk, abs_timeout, + 0 /* no backoff */); } /* At this point, there's no reason to deviate from what we already do @@ -4616,7 +4689,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, case RD_KAFKA_OP_SHARE_FETCH_FANOUT | RD_KAFKA_OP_REPLY: rd_kafka_assert(rk, thrd_is_current(rk->rk_thread)); - rd_kafka_share_fetch_fanout_reply_op(rk, rko); + res = rd_kafka_share_fetch_fanout_reply_op(rk, rko); break; default: diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 020fbb2df6..8f6afedc7d 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -378,8 +378,11 @@ typedef struct rd_kafka_cgrp_s { rd_ts_t rkcg_ts_rebalance_start; struct { - size_t last_partition_picked; /* For round-robin - * partition picking */ + size_t last_partition_picked; /* For round-robin + * partition picking */ + rd_kafka_timer_t share_fetch_fanout_tmr; /**< Timer for + * share fetch + * fanout */ } rkcg_share; } rd_kafka_cgrp_t; diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index f9b5de7037..fe49950da8 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1509,8 +1509,10 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "which indicates where this client is physically located. It " "corresponds with the broker config `broker.rack`.", .sdef = ""}, - {_RK_GLOBAL | _RK_HIDDEN, "share.consumer", _RK_C_BOOL, _RK(is_share_consumer), - "tba description", 0, 1, 0}, + {_RK_GLOBAL | _RK_HIDDEN, "share.consumer", _RK_C_BOOL, + _RK(share.is_share_consumer), "tba description", 0, 1, 0}, + {_RK_GLOBAL | _RK_CONSUMER, "max.poll.records", _RK_C_INT, + _RK(share.max_poll_records), "tba description,", 1, INT_MAX, 500}, /* Global producer properties */ diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index fedfc2ecee..ea9e216efa 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -470,7 +470,10 @@ struct rd_kafka_conf_s { rd_kafkap_str_t *client_rack; - int is_share_consumer; /**< Is this a share consumer? */ + struct { + int is_share_consumer; /**< Is this a share consumer? */ + int max_poll_records; /**< Max records returned per poll */ + } share; /* * Producer configuration diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index ed2a0d6bce..c9fe9da3e1 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -95,7 +95,8 @@ typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t; #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) #define RD_KAFKA_IS_SHARE_CONSUMER(rk) \ - ((rk)->rk_type == RD_KAFKA_CONSUMER && (rk)->rk_conf.is_share_consumer) + ((rk)->rk_type == RD_KAFKA_CONSUMER && \ + (rk)->rk_conf.share.is_share_consumer) /** * @struct Represents a fetch position: diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 1cf70e3197..705f16b770 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -745,10 +745,6 @@ struct rd_kafka_op_s { /** Absolute timeout for share fetch fanout operation. */ rd_ts_t abs_timeout; - - /** Is this a retry, or the first attempt of this poll? - * Retries only have should_fetch=true request sent. */ - rd_bool_t is_retry; } share_fetch_fanout; } rko_u; From 39ba8411e26615a90de4cda3314f977ebc616b06 Mon Sep 17 00:00:00 2001 From: Milind L Date: Tue, 28 Oct 2025 15:12:03 +0000 Subject: [PATCH 5/7] Allow poll to exit immediately on message --- src/rdkafka.c | 2 +- src/rdkafka_queue.c | 116 ++++++++++++++++++++++++++++++++++++++++++++ src/rdkafka_queue.h | 4 ++ 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index 0ac649da4a..b9659d31b4 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -3225,7 +3225,7 @@ rd_kafka_error_t *rd_kafka_share_consume_batch( * for returning multiple messages to the user, as the orchestration * is handled by the main thread. Later on, we needed, we might need * a custom loop if we need any changes. */ - *rkmessages_size = rd_kafka_q_serve_rkmessages( + *rkmessages_size = rd_kafka_q_serve_share_rkmessages( rkcg->rkcg_q, timeout_ms, /* Use this timeout directly as prior operations aren't blocking, so no need to re-convert the abs_timeout into a relative one.*/ diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 92eddccae9..1d25d984d3 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -869,6 +869,122 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, return cnt; } +int rd_kafka_q_serve_share_rkmessages(rd_kafka_q_t *rkq, + int timeout_ms, + rd_kafka_message_t **rkmessages, + size_t rkmessages_size) { + unsigned int cnt = 0; + TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq); + struct rd_kafka_op_tailq ctrl_msg_q = + TAILQ_HEAD_INITIALIZER(ctrl_msg_q); + rd_kafka_op_t *rko, *next; + rd_kafka_t *rk = rkq->rkq_rk; + rd_kafka_q_t *fwdq; + rd_ts_t abs_timeout; + + mtx_lock(&rkq->rkq_lock); + if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { + /* Since the q_pop may block we need to release the parent + * queue's lock. */ + mtx_unlock(&rkq->rkq_lock); + cnt = rd_kafka_q_serve_share_rkmessages(fwdq, timeout_ms, rkmessages, + rkmessages_size); + rd_kafka_q_destroy(fwdq); + return cnt; + } + + mtx_unlock(&rkq->rkq_lock); + + abs_timeout = rd_timeout_init(timeout_ms); + + rd_kafka_app_poll_start(rk, rkq, 0, timeout_ms); + + rd_kafka_yield_thread = 0; + while (cnt < rkmessages_size) { + rd_kafka_op_res_t res; + + mtx_lock(&rkq->rkq_lock); + + while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && + !rd_kafka_q_check_yield(rkq) && + /* Only do a timed wait if no messages are ready, if we + have gotten even one message, just return with it. */ + cnt == 0 && + cnd_timedwait_abs(&rkq->rkq_cond, &rkq->rkq_lock, + abs_timeout) == thrd_success) + ; + + rd_kafka_q_mark_served(rkq); + + if (!rko) { + mtx_unlock(&rkq->rkq_lock); + break; /* Timed out */ + } + + rd_kafka_q_deq0(rkq, rko); + + mtx_unlock(&rkq->rkq_lock); + + if (rd_kafka_op_version_outdated(rko, 0)) { + /* Outdated op, put on discard queue */ + TAILQ_INSERT_TAIL(&tmpq, rko, rko_link); + continue; + } + + /* Serve non-FETCH callbacks */ + res = + rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL); + if (res == RD_KAFKA_OP_RES_KEEP || + res == RD_KAFKA_OP_RES_HANDLED) { + /* Callback served, rko is destroyed (if HANDLED). */ + continue; + } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD || + rd_kafka_yield_thread)) { + /* Yield. */ + break; + } + rd_dassert(res == RD_KAFKA_OP_RES_PASS); + + /* If this is a control messages, don't return message to + * application. Add it to a tmp queue from where we can store + * the offset and destroy the op */ + if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) { + TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link); + continue; + } + + /* Get rkmessage from rko and append to array. */ + rkmessages[cnt++] = rd_kafka_message_get(rko); + } + + /* NOTE: KIP-932: + * For a share consumer, we are not using version barriers, and ideally, + * tmpq should be empty. However, the discard code is retained as + * non-share-consumer might still be around. This assert exists to spot + * any issues as they arise during testing.*/ + rd_dassert(TAILQ_EMPTY(&tmpq)); + + /* Discard non-desired and already handled ops */ + next = TAILQ_FIRST(&tmpq); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } + + /* Discard ctrl msgs */ + next = TAILQ_FIRST(&ctrl_msg_q); + while (next) { + rko = next; + next = TAILQ_NEXT(next, rko_link); + rd_kafka_op_destroy(rko); + } + + rd_kafka_app_polled(rk, rkq); + + return cnt; +} + void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu) { diff --git a/src/rdkafka_queue.h b/src/rdkafka_queue.h index ff1a465770..cfed32413a 100644 --- a/src/rdkafka_queue.h +++ b/src/rdkafka_queue.h @@ -878,6 +878,10 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size); +int rd_kafka_q_serve_share_rkmessages(rd_kafka_q_t *rkq, + int timeout_ms, + rd_kafka_message_t **rkmessages, + size_t rkmessages_size); rd_kafka_resp_err_t rd_kafka_q_wait_result(rd_kafka_q_t *rkq, int timeout_ms); int rd_kafka_q_apply(rd_kafka_q_t *rkq, From 24ada5299e336d34ce45c65747ba5a7431de2b7b Mon Sep 17 00:00:00 2001 From: Milind L Date: Tue, 28 Oct 2025 15:16:25 +0000 Subject: [PATCH 6/7] Fix invalid reads and compiler warnings --- examples/consumer.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/consumer.c b/examples/consumer.c index fea673b539..3a4ad4f324 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -236,8 +236,7 @@ int main(int argc, char **argv) { * since a rebalance may happen at any time. * Start polling for messages. */ - rd_kafka_message_t **rkmessages = - (rd_kafka_message_t **)malloc(sizeof(rd_kafka_message_t *) * 100); + rd_kafka_message_t *rkmessages[500]; while (run) { rd_kafka_message_t *rkm = NULL; size_t rcvd_msgs = 0; From 279edb9dff5f73e4495f37e04a722e318cc81df6 Mon Sep 17 00:00:00 2001 From: Milind L Date: Tue, 28 Oct 2025 15:22:27 +0000 Subject: [PATCH 7/7] Add missing backoff for share fetch retries --- src/rdkafka.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/rdkafka.c b/src/rdkafka.c index b9659d31b4..6d0886a70b 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -3035,13 +3035,11 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk, errors? */ break; - /* For other cases, resend to separate broker after a backoff. The - * fanout op will back us off automatically if there is no broker - * available. */ + /* For other cases, resend to separate broker after a backoff. */ default: rd_kafka_share_fetch_fanout_with_backoff( rk, rko_orig->rko_u.share_fetch.abs_timeout, - 0 /* no backoff */); + rk->rk_conf.retry_backoff_max_ms); break; }