From a52d29cc222416812983ab56b5795faba42b46f1 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 14 Jul 2025 17:08:34 +0530 Subject: [PATCH 1/3] Prototype code to obtain assignments --- examples/consumer.c | 23 ++ src/rdkafka.c | 43 ++++ src/rdkafka.h | 3 + src/rdkafka_cgrp.c | 316 ++++++++++++++++++++++++++++ src/rdkafka_conf.c | 3 + src/rdkafka_conf.h | 2 + src/rdkafka_int.h | 2 + src/rdkafka_proto.h | 1 + src/rdkafka_protocol.h | 1 + src/rdkafka_request.c | 94 +++++++++ src/rdkafka_request.h | 11 + tests/0154-share_consumer.c | 71 +++++++ tests/CMakeLists.txt | 1 + tests/broker_version_tests.py | 4 +- tests/interactive_broker_version.py | 2 +- tests/test.c | 2 + win32/tests/tests.vcxproj | 1 + 17 files changed, 577 insertions(+), 3 deletions(-) create mode 100644 tests/0154-share_consumer.c diff --git a/examples/consumer.c b/examples/consumer.c index f621a9db35..8f33edd69c 100644 --- a/examples/consumer.c +++ b/examples/consumer.c @@ -149,6 +149,29 @@ int main(int argc, char **argv) { return 1; } + if (rd_kafka_conf_set(conf, "share.consumer", "true", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + + if (rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + + if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + /* * Create consumer instance. * diff --git a/src/rdkafka.c b/src/rdkafka.c index c6f89ad469..0904bb77ac 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -2839,6 +2839,49 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, return NULL; } +rd_kafka_t *rd_kafka_share_consumer_new( + rd_kafka_conf_t *conf, char *errstr, size_t errstr_size) { + rd_kafka_t *rk; + char errstr_internal[512]; + rd_kafka_conf_res_t res; + + if (conf == NULL) { + rd_snprintf(errstr, errstr_size, + "rd_kafka_share_consumer_new(): " + "conf argument must not be NULL"); + return NULL; + } + + res = rd_kafka_conf_set(conf, "share.consumer", "true", errstr_internal, + sizeof(errstr_internal)); + if (res != RD_KAFKA_CONF_OK) { + rd_snprintf(errstr, errstr_size, + "rd_kafka_share_consumer_new(): " + "Failed to set share.consumer=true: %s", + errstr_internal); + return NULL; + } + + + res = rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr_internal, + sizeof(errstr_internal)); + if (res != RD_KAFKA_CONF_OK) { + rd_snprintf(errstr, errstr_size, + "rd_kafka_share_consumer_new(): " + "Failed to set group.protocol=consumer: %s", + errstr_internal); + return NULL; + } + + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, errstr_size); + if (!rk) { + /* If rd_kafka_new() failed it will have set the last error + * and filled out errstr, so we don't need to do that here. */ + return NULL; + } + return rk; +} + /** * Schedules a rebootstrap of the cluster immediately. * diff --git a/src/rdkafka.h b/src/rdkafka.h index 57cf08bad1..e137c01804 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -3057,6 +3057,9 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, size_t errstr_size); +RD_EXPORT +rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, char *errstr, size_t errstr_size); + /** * @brief Destroy Kafka handle. * diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index d6348ba52d..e462e6843a 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -3369,6 +3369,304 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk, } } +void rd_kafka_cgrp_handle_ShareGroupHeartbeat(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + const int log_decode_errors = LOG_ERR; + int16_t error_code = 0; + int actions = 0; + rd_kafkap_str_t error_str = RD_KAFKAP_STR_INITIALIZER_EMPTY; + rd_kafkap_str_t member_id; + int32_t member_epoch; + int32_t heartbeat_interval_ms; + int8_t are_assignments_present; + + if (err == RD_KAFKA_RESP_ERR__DESTROY) + return; + + rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT); + + if (rd_kafka_cgrp_will_leave(rkcg)) + err = RD_KAFKA_RESP_ERR__OUTDATED; + if (err) + goto err; + + rd_kafka_buf_read_throttle_time(rkbuf); + rd_kafka_buf_read_i16(rkbuf, &error_code); + rd_kafka_buf_read_str(rkbuf, &error_str); + + if (error_code) { + err = error_code; + goto err; + } + + rd_kafka_buf_read_str(rkbuf, &member_id); + if (!RD_KAFKAP_STR_IS_NULL(&member_id)) { + rd_kafka_cgrp_set_member_id(rkcg, member_id.str); + } + + rd_kafka_buf_read_i32(rkbuf, &member_epoch); + rkcg->rkcg_generation_id = member_epoch; + + rd_kafka_buf_read_i32(rkbuf, &heartbeat_interval_ms); + if (heartbeat_interval_ms > 0) { + rkcg->rkcg_heartbeat_intvl_ms = heartbeat_interval_ms; + } + + rd_kafka_buf_read_i8(rkbuf, &are_assignments_present); + + if (are_assignments_present == 1) { + rd_kafka_topic_partition_list_t *assigned_topic_partitions; + const rd_kafka_topic_partition_field_t assignments_fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + assigned_topic_partitions = rd_kafka_buf_read_topic_partitions( + rkbuf, rd_true, rd_false /* Don't use Topic Name */, 0, + assignments_fields); + + rd_kafka_dbg( + rk, CGRP, "HEARTBEAT", + "ShareGroupHeartbeat response received " + "assigned_topic_partitions size %d", + assigned_topic_partitions->cnt); + + if (rd_kafka_is_dbg(rk, CGRP)) { + char assigned_topic_partitions_str[512] = "NULL"; + + if (assigned_topic_partitions) { + rd_kafka_topic_partition_list_str( + assigned_topic_partitions, + assigned_topic_partitions_str, + sizeof(assigned_topic_partitions_str), 0); + } + + rd_kafka_dbg( + rk, CGRP, "HEARTBEAT", + "ShareGroupHeartbeat response received target " + "assignment \"%s\"", + assigned_topic_partitions_str); + } + + if (assigned_topic_partitions) { + RD_IF_FREE(rkcg->rkcg_next_target_assignment, + rd_kafka_topic_partition_list_destroy); + rkcg->rkcg_next_target_assignment = NULL; + if (rd_kafka_cgrp_consumer_is_new_assignment_different( + rkcg, assigned_topic_partitions)) { + rkcg->rkcg_next_target_assignment = + assigned_topic_partitions; + } else { + rd_kafka_topic_partition_list_destroy( + assigned_topic_partitions); + assigned_topic_partitions = NULL; + } + } + } + + if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY && + (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) && + rkcg->rkcg_target_assignment) { + if (rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK) { + if (rkcg->rkcg_current_assignment) + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_current_assignment); + rkcg->rkcg_current_assignment = + rd_kafka_topic_partition_list_copy( + rkcg->rkcg_target_assignment); + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_target_assignment); + rkcg->rkcg_target_assignment = NULL; + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK; + + if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) { + char rkcg_current_assignment_str[512] = "NULL"; + + rd_kafka_topic_partition_list_str( + rkcg->rkcg_current_assignment, + rkcg_current_assignment_str, + sizeof(rkcg_current_assignment_str), 0); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Target assignment acked, new " + "current assignment " + " \"%s\"", + rkcg_current_assignment_str); + } + } else if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) { + /* We've finished reconciliation but we weren't + * sending an ack, need to send a new HB with the ack. + */ + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "not subscribed anymore"); + } + } + + + if (rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING && + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) { + /* TODO: Check if this should be done only for the + * steady state? + */ + rd_kafka_assignment_serve(rk); + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING; + } + + if (rkcg->rkcg_next_target_assignment) { + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) { + rd_kafka_cgrp_consumer_next_target_assignment_request_metadata( + rk, rkb); + } else { + /* Consumer left the group sending an HB request + * while this one was in-flight. */ + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_target_assignment); + rkcg->rkcg_next_target_assignment = NULL; + } + } + + if (rd_kafka_cgrp_consumer_subscription_preconditions_met(rkcg)) + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "send new subscription"); + + rkcg->rkcg_consumer_flags &= + ~RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION & + ~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + rd_kafka_cgrp_maybe_clear_heartbeat_failed_err(rkcg); + rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rkcg->rkcg_expedite_heartbeat_retries = 0; + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + + return; + +err_parse: + err = rkbuf->rkbuf_err; +err: + rkcg->rkcg_last_heartbeat_err = err; + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + switch (err) { + case RD_KAFKA_RESP_ERR__DESTROY: + /* quick cleanup */ + return; + + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ShareGroupHeartbeat failed due to coordinator (%s) " + "loading in progress: %s: " + "retrying", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + actions = RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: + case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR__TRANSPORT: + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ShareGroupHeartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "ShareGroupHeartbeat failed due to: %s: " + "will rejoin the group", + rd_kafka_err2str(err)); + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN; + return; + + case RD_KAFKA_RESP_ERR_INVALID_REQUEST: + case RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION: + case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: + actions = RD_KAFKA_ERR_ACTION_FATAL; + break; + + default: + actions = rd_kafka_err_action( + rkb, err, request, + + RD_KAFKA_ERR_ACTION_SPECIAL, + RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, + + RD_KAFKA_ERR_ACTION_END); + break; + } + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_set_fatal_error( + rkcg->rkcg_rk, err, + "ShareGroupHeartbeat fatal error: %s", + rd_kafka_err2str(err)); + rd_kafka_cgrp_revoke_all_rejoin_maybe( + rkcg, rd_true, /*assignments lost*/ + rd_true, /*initiating*/ + "Fatal error in ShareGroupHeartbeat API response"); + return; + } + + if (!rkcg->rkcg_heartbeat_intvl_ms) { + /* When an error happens on first HB, it should be always + * retried, unless fatal, to avoid entering a tight loop + * and to use exponential backoff. */ + actions |= RD_KAFKA_ERR_ACTION_RETRY; + } + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST; + rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); + /* If coordinator changes, HB will be expedited. */ + } + + if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) { + rd_ts_t min_error_interval = + RD_MAX(rkcg->rkcg_heartbeat_intvl_ms * 1000, + /* default group.consumer.heartbeat.interval.ms */ + 5000000); + if (rkcg->rkcg_last_err != err || + (rd_clock() > + rkcg->rkcg_ts_last_err + min_error_interval)) { + rd_kafka_cgrp_set_last_err(rkcg, err); + rd_kafka_consumer_err( + rkcg->rkcg_q, rd_kafka_broker_id(rkb), err, 0, NULL, + NULL, err, + "ShareGroupHeartbeat failed: %s%s%.*s", + rd_kafka_err2str(err), + RD_KAFKAP_STR_LEN(&error_str) ? ": " : "", + RD_KAFKAP_STR_PR(&error_str)); + } + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && + !rd_kafka_cgrp_will_leave(rkcg) && + rd_kafka_buf_retry(rkb, request)) { + /* Retry */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + } +} /** * @brief Handle Heartbeat response. @@ -6150,6 +6448,14 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg, } rkcg->rkcg_expedite_heartbeat_retries++; + + if (RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk)) { + rd_kafka_ShareGroupHeartbeatRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id, + member_epoch, rkcg_client_rack, rkcg_subscription_topics, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_handle_ShareGroupHeartbeat, NULL); + return; + } + rd_kafka_ConsumerGroupHeartbeatRequest( rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id, member_epoch, rkcg_group_instance_id, rkcg_client_rack, @@ -6211,6 +6517,15 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { "member fenced - rejoining"); } + /* There should be no fencing, hence no rejoining - these asserts are to test only, we don't actually need them. */ + rd_dassert(!(RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk) && + (rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN))); + rd_dassert(!(RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk) && + (rkcg->rkcg_consumer_flags & + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE))); + + switch (rkcg->rkcg_join_state) { case RD_KAFKA_CGRP_JOIN_STATE_INIT: rkcg->rkcg_consumer_flags &= @@ -6300,6 +6615,7 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg, /* If member is leaving, new subscription * will be applied after the leave * ConsumerGroupHeartbeat */ + /* MILIND: how is new subscription applied after heartbeat, check it. */ if (!rd_kafka_cgrp_will_leave(rkcg)) rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg); } else { diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index 1f8bbf106b..f9b5de7037 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -1509,6 +1509,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "which indicates where this client is physically located. It " "corresponds with the broker config `broker.rack`.", .sdef = ""}, + {_RK_GLOBAL | _RK_HIDDEN, "share.consumer", _RK_C_BOOL, _RK(is_share_consumer), + "tba description", 0, 1, 0}, + /* Global producer properties */ {_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "transactional.id", _RK_C_STR, diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h index 92e5193eb7..fedfc2ecee 100644 --- a/src/rdkafka_conf.h +++ b/src/rdkafka_conf.h @@ -470,6 +470,8 @@ struct rd_kafka_conf_s { rd_kafkap_str_t *client_rack; + int is_share_consumer; /**< Is this a share consumer? */ + /* * Producer configuration */ diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index d8370ff599..9e97bf9dca 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -94,6 +94,8 @@ typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t; #define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0) +#define RD_KAFKA_IS_SHARE_CONSUMER(rk) \ + ((rk)->rk_type == RD_KAFKA_CONSUMER && (rk)->rk_conf.is_share_consumer) /** * @struct Represents a fetch position: diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 02565ecb3b..b5b52ac146 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -175,6 +175,7 @@ static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) { [RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribe", [RD_KAFKAP_GetTelemetrySubscriptions] = "GetTelemetrySubscriptions", [RD_KAFKAP_PushTelemetry] = "PushTelemetry", + [RD_KAFKAP_ShareGroupHeartbeat] = "ShareGroupHeartbeat", }; static RD_TLS char ret[64]; diff --git a/src/rdkafka_protocol.h b/src/rdkafka_protocol.h index 19190e1447..28c246dd9c 100644 --- a/src/rdkafka_protocol.h +++ b/src/rdkafka_protocol.h @@ -120,6 +120,7 @@ #define RD_KAFKAP_GetTelemetrySubscriptions 71 #define RD_KAFKAP_PushTelemetry 72 #define RD_KAFKAP_AssignReplicasToDirs 73 +#define RD_KAFKAP_ShareGroupHeartbeat 76 #define RD_KAFKAP__NUM 74 diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 663a07eae3..94c01e5964 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2442,7 +2442,101 @@ void rd_kafka_ConsumerGroupHeartbeatRequest( rd_kafkap_str_destroy(subscribed_topic_regex_to_send); } +void rd_kafka_ShareGroupHeartbeatRequest( + rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *group_id, + const rd_kafkap_str_t *member_id, + int32_t member_epoch, + const rd_kafkap_str_t *rack_id, + const rd_kafka_topic_partition_list_t *subscribed_topics, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + size_t rkbuf_size = 0; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_ShareGroupHeartbeat, 1, 1, &features); + + rd_rkb_dbg(rkb, CGRP, "SHAREHEARTBEAT", + "ShareGroupHeartbeat version %d for group \"%s\", member id " + "\"%s\", topic count = %d", + ApiVersion, group_id ? group_id->str : "NULL", + member_id ? member_id->str : "NULL", + subscribed_topics ? subscribed_topics->cnt : -1); + + if (ApiVersion == -1) { + rd_kafka_cgrp_coord_dead(rkb->rkb_rk->rk_cgrp, + RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, + "ShareGroupHeartbeatRequest not " + "supported by broker"); + return; + } + + // debug log all the fields + if (rd_rkb_is_dbg(rkb, CGRP)) { + char subscribed_topics_str[512] = "NULL"; + if (subscribed_topics) { + rd_kafka_topic_partition_list_str( + subscribed_topics, subscribed_topics_str, + sizeof(subscribed_topics_str), 0); + } + rd_rkb_dbg(rkb, CGRP, "SHAREHEARTBEAT", + "ShareGroupHeartbeat of group id \"%s\", " + "member id \"%s\", member epoch %d, rack id \"%s\"" + ", subscribed topics \"%s\"", + group_id ? group_id->str : "NULL", + member_id ? member_id->str : "NULL", member_epoch, + rack_id ? rack_id->str : "NULL", + subscribed_topics_str); + } + + if (group_id) + rkbuf_size += RD_KAFKAP_STR_SIZE(group_id); + if (member_id) + rkbuf_size += RD_KAFKAP_STR_SIZE(member_id); + rkbuf_size += 4; /* MemberEpoch */ + if (rack_id) + rkbuf_size += RD_KAFKAP_STR_SIZE(rack_id); + if (subscribed_topics) { + rkbuf_size += + ((subscribed_topics->cnt * (4 + 50)) + 4 /* array size */); + } + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_ShareGroupHeartbeat, 1, rkbuf_size, rd_true); + + rd_kafka_buf_write_kstr(rkbuf, group_id); + rd_kafka_buf_write_kstr(rkbuf, member_id); + rd_kafka_buf_write_i32(rkbuf, member_epoch); + rd_kafka_buf_write_kstr(rkbuf, rack_id); + if (subscribed_topics) { + int topics_cnt = subscribed_topics->cnt; + + /* write Topics */ + rd_kafka_buf_write_arraycnt(rkbuf, topics_cnt); + while (--topics_cnt >= 0) { + if (rd_rkb_is_dbg(rkb, CGRP)) + rd_rkb_dbg( + rkb, CGRP, "SHAREHEARTBEAT", + "ShareGroupHeartbeat subscribed " + "topic %s", + subscribed_topics->elems[topics_cnt].topic); + rd_kafka_buf_write_str( + rkbuf, subscribed_topics->elems[topics_cnt].topic, + -1); + } + } else { + rd_kafka_buf_write_arraycnt(rkbuf, -1); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, features); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} /** * @brief Construct and send ListGroupsRequest to \p rkb diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index c508ffdaaf..12b2bcbd5f 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -400,6 +400,17 @@ void rd_kafka_ConsumerGroupHeartbeatRequest( rd_kafka_resp_cb_t *resp_cb, void *opaque); +void rd_kafka_ShareGroupHeartbeatRequest( + rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *group_id, + const rd_kafkap_str_t *member_id, + int32_t member_epoch, + const rd_kafkap_str_t *rack_id, + const rd_kafka_topic_partition_list_t *subscribed_topics, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque); + rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, const rd_list_t *topics, rd_list_t *topic_ids, diff --git a/tests/0154-share_consumer.c b/tests/0154-share_consumer.c new file mode 100644 index 0000000000..d906eaf836 --- /dev/null +++ b/tests/0154-share_consumer.c @@ -0,0 +1,71 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2025, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +int main_0154_share_consumer(int argc, char **argv) { + char errstr[512]; + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + rd_kafka_topic_partition_list_t *topics; + char *topic = "test-topic";// test_mk_topic_name("0154-share-consumer", 0); + char *group = "test-group-0"; + + test_create_topic_wait_exists(NULL, topic, 3, -1, 60 * 1000); + rd_sleep(5); + + test_produce_msgs_easy(topic, 0, 0, 2); + + TEST_SAY("Creating share consumer\n"); + test_conf_init(&conf, NULL, 60); + rd_kafka_conf_set(conf, "share.consumer", "true", NULL, 0); + rd_kafka_conf_set(conf, "group.protocol", "consumer", NULL, 0); + rd_kafka_conf_set(conf, "group.id", group, NULL, 0); + rd_kafka_conf_set(conf, "debug", "cgrp,protocol,conf", NULL, 0); + + // rk = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr)); + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) { + TEST_FAIL("Failed to create share consumer: %s\n", errstr); + } + + topics = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA); + rd_kafka_subscribe(rk, topics); + rd_kafka_topic_partition_list_destroy(topics); + + TEST_SAY("Share consumer created successfully\n"); + + rd_kafka_consumer_poll(rk, 65000); + + TEST_SAY("Destroying consumer\n"); + + /* Clean up */ + rd_kafka_destroy(rk); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e509092873..b5e6f0bfaf 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -143,6 +143,7 @@ set( 0151-purge-brokers.c 0152-rebootstrap.c 0153-memberid.c + 0154-share-consumer.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/broker_version_tests.py b/tests/broker_version_tests.py index c451e02471..1a0de874b1 100755 --- a/tests/broker_version_tests.py +++ b/tests/broker_version_tests.py @@ -31,7 +31,7 @@ def test_it(version, deploy=True, conf={}, rdkconf={}, tests=None, """ cluster = LibrdkafkaTestCluster(version, conf, - num_brokers=int(conf.get('broker_cnt', 3)), + num_brokers=int(conf.get('broker_cnt', 1)), debug=debug, scenario=scenario, kraft=kraft) @@ -175,7 +175,7 @@ def handle_report(report, version, suite): '--brokers', dest='broker_cnt', type=int, - default=3, + default=1, help='Number of Kafka brokers') parser.add_argument('--ssl', dest='ssl', action='store_true', default=False, diff --git a/tests/interactive_broker_version.py b/tests/interactive_broker_version.py index acddc872fd..3f2ffe7aae 100755 --- a/tests/interactive_broker_version.py +++ b/tests/interactive_broker_version.py @@ -25,7 +25,7 @@ def version_as_number(version): def test_version(version, cmd=None, deploy=True, conf={}, debug=False, exec_cnt=1, - root_path='tmp', broker_cnt=3, scenario='default', + root_path='tmp', broker_cnt=1, scenario='default', kraft=False): """ @brief Create, deploy and start a Kafka cluster using Kafka \\p version diff --git a/tests/test.c b/tests/test.c index 86205dd5de..d0f9dafa68 100644 --- a/tests/test.c +++ b/tests/test.c @@ -270,6 +270,7 @@ _TEST_DECL(0150_telemetry_mock); _TEST_DECL(0151_purge_brokers_mock); _TEST_DECL(0152_rebootstrap_local); _TEST_DECL(0153_memberid); +_TEST_DECL(0154_share_consumer); /* Manual tests */ _TEST_DECL(8000_idle); @@ -536,6 +537,7 @@ struct test tests[] = { _TEST(0151_purge_brokers_mock, TEST_F_LOCAL), _TEST(0152_rebootstrap_local, TEST_F_LOCAL), _TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)), + _TEST(0154_share_consumer, 0, TEST_BRKVER(0, 4, 0, 0)), /* Manual tests */ diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index c00bc84418..03d5825219 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -233,6 +233,7 @@ + From aed60439d98f782b83ca405d1af1124ca8bb3bf5 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 6 Oct 2025 04:59:06 +0000 Subject: [PATCH 2/3] Add broker-op for share fetching --- src/rdkafka_op.c | 2 ++ src/rdkafka_op.h | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 5dbbf9c9d4..469128cf9e 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -123,6 +123,7 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", + [RD_KAFKA_OP_SHARE_FETCH] = "REPLY:SHARE_FETCH", }; if (type & RD_KAFKA_OP_REPLY) @@ -287,6 +288,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.telemetry_broker), [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_SHARE_FETCH] = _RD_KAFKA_OP_EMPTY, }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index e79309aa02..d75a0f23d1 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -189,6 +189,8 @@ typedef enum { RD_KAFKA_OP_ELECTLEADERS, /**< Admin: * ElectLeaders * u.admin_request */ + RD_KAFKA_OP_SHARE_FETCH, /**< broker op: Issue share fetch request if + applicable. */ RD_KAFKA_OP__END } rd_kafka_op_type_t; From 24acb72692087fc72069bea0b0d4dd54856b5758 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 13 Oct 2025 07:39:27 +0000 Subject: [PATCH 3/3] Add fields to op --- src/rdkafka_op.c | 6 +++++- src/rdkafka_op.h | 10 ++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 469128cf9e..db325b11c3 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -288,7 +288,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.telemetry_broker), [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request), - [RD_KAFKA_OP_SHARE_FETCH] = _RD_KAFKA_OP_EMPTY, + [RD_KAFKA_OP_SHARE_FETCH] = sizeof(rko->rko_u.share_fetch), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -509,6 +509,10 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { rd_kafka_broker_destroy); break; + case RD_KAFKA_OP_SHARE_FETCH: + /* TODO KIP-932: Add destruction code. */ + break; + default: break; } diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index d75a0f23d1..58a248aaf1 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -726,6 +726,16 @@ struct rd_kafka_op_s { void (*cb)(rd_kafka_t *rk, void *rkb); } terminated; + struct { + /** Whether this broker should share-fetch nonzero + * messages. */ + rd_bool_t should_fetch; + + /** Absolute timeout left to complete this share-fetch. + */ + rd_ts_t abs_timeout; + } share_fetch; + } rko_u; };