From bb21530b9ea07a45a9baa1bee4d8908ecc083d7b Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 14:34:26 +0530 Subject: [PATCH 01/10] Fix share session open session workflow --- src/rdkafka_mock.h | 15 +++++++++++++++ src/rdkafka_mock_handlers.c | 38 ++++++++++++++++++++++--------------- src/rdkafka_mock_int.h | 5 +++++ src/rdkafka_mock_sharegrp.c | 17 +++++++++++++++++ 4 files changed, 60 insertions(+), 15 deletions(-) diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 56223043db..10758611d5 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -717,6 +717,21 @@ RD_EXPORT void rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster, int max_size); +/** + * @brief Set the maximum number of fetch sessions allowed in a share group. + * + * New sessions attempted via ShareFetch with epoch 0 when the group + * is at capacity will receive SHARE_SESSION_LIMIT_REACHED. + * + * Default is 2000 (per KIP-932 group.share.max.share.sessions). + * + * @param mcluster Mock cluster instance. + * @param max_fetch_sessions Maximum fetch sessions allowed. 0 = unlimited. + */ +RD_EXPORT void rd_kafka_mock_sharegroup_set_max_fetch_sessions( + rd_kafka_mock_cluster_t *mcluster, + int max_fetch_sessions); + /** * @brief Set a manual target assignment for a sharegroup. * diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index be3f23070f..106db56f5d 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3964,16 +3964,35 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, mtx_lock(&mcluster->lock); sgrp = rd_kafka_mock_sharegroup_get(mcluster, &GroupId); + /* epoch=0 (full fetch / new session) must not + * contain acknowledgements. Check BEFORE + * session_validate to avoid destroying an existing + * session for a malformed request. */ + if (SessionEpoch == 0 && rd_list_cnt(&ack_entries) > 0) { + rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", + "ShareFetch: rejecting epoch=0 request " + "with %d ack(s) (INVALID_REQUEST)", + rd_list_cnt(&ack_entries)); + err = RD_KAFKA_RESP_ERR_INVALID_REQUEST; + } + /* Common validation: member check, session lookup, * epoch -1 close, epoch > 0 validation. */ - err = rd_kafka_mock_sgrp_session_validate( - sgrp, &MemberId, mconn->broker->id, SessionEpoch, &session, - "ShareFetch"); + if (!err) + err = rd_kafka_mock_sgrp_session_validate( + sgrp, &MemberId, mconn->broker->id, SessionEpoch, + &session, "ShareFetch"); if (!err && SessionEpoch == 0) { /* Open a new session (or reuse if one already exists * for this member on this broker). */ - if (!session) { + if (!session && sgrp->max_fetch_sessions > 0 && + sgrp->fetch_session_cnt >= + sgrp->max_fetch_sessions) { + /* Session cache is full. */ + err = + RD_KAFKA_RESP_ERR_SHARE_SESSION_LIMIT_REACHED; + } else if (!session) { session = rd_calloc(1, sizeof(*session)); session->member_id = RD_KAFKAP_STR_DUP(&MemberId); @@ -4092,17 +4111,6 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, } } - /* epoch=0 (full fetch / new session) must not - * contain acknowledgements. */ - if (!err && SessionEpoch == 0 && - rd_list_cnt(&ack_entries) > 0) { - rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", - "ShareFetch: rejecting epoch=0 request " - "with %d ack(s) (INVALID_REQUEST)", - rd_list_cnt(&ack_entries)); - err = RD_KAFKA_RESP_ERR_INVALID_REQUEST; - } - /* Apply piggy-backed acknowledgements (implicit ack) * before acquiring new records. This processes the * AcknowledgementBatches sent by the client for records diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 2e53910792..c30a8ca356 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -258,6 +258,8 @@ typedef struct rd_kafka_mock_sharegroup_s { int isolation_level; /**< Share isolation level */ int max_size; /**< Max members allowed. * 0 = unlimited (default). */ + int max_fetch_sessions; /**< Max fetch sessions allowed. + * 0 = unlimited (default 2000). */ } rd_kafka_mock_sharegroup_t; /** @@ -643,6 +645,9 @@ struct rd_kafka_mock_cluster_s { /** Max members allowed in share group (KIP 932). * 0 = unlimited. */ int sharegroup_max_size; + /** Max fetch sessions per share group (KIP 932). + * 0 = unlimited. */ + int sharegroup_max_fetch_sessions; } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ diff --git a/src/rdkafka_mock_sharegrp.c b/src/rdkafka_mock_sharegrp.c index 55098e8366..451f8e5b92 100644 --- a/src/rdkafka_mock_sharegrp.c +++ b/src/rdkafka_mock_sharegrp.c @@ -59,6 +59,7 @@ void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) { mcluster->defaults.sharegroup_record_lock_duration_ms = 0; mcluster->defaults.sharegroup_max_size = 0; mcluster->defaults.sharegroup_isolation_level = 0; + mcluster->defaults.sharegroup_max_fetch_sessions = 2000; } /** @@ -114,6 +115,8 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, mcluster->defaults.sharegroup_record_lock_duration_ms; mshgrp->isolation_level = mcluster->defaults.sharegroup_isolation_level; mshgrp->max_size = mcluster->defaults.sharegroup_max_size; + mshgrp->max_fetch_sessions = + mcluster->defaults.sharegroup_max_fetch_sessions; rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr, 1000 * 1000 /* 1s */, @@ -778,6 +781,20 @@ void rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster, mtx_unlock(&mcluster->lock); } +/** + * @brief Set the maximum number of fetch sessions allowed in a share group. + */ +void rd_kafka_mock_sharegroup_set_max_fetch_sessions( + rd_kafka_mock_cluster_t *mcluster, + int max_fetch_sessions) { + rd_kafka_mock_sharegroup_t *mshgrp; + mtx_lock(&mcluster->lock); + TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link) + mshgrp->max_fetch_sessions = max_fetch_sessions; + mcluster->defaults.sharegroup_max_fetch_sessions = max_fetch_sessions; + mtx_unlock(&mcluster->lock); +} + /** * @brief Destroy share fetch session. */ From 1d3f59163660928d08875a3c7c3cdd15be789d6b Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 14:55:23 +0530 Subject: [PATCH 02/10] Add error handling for ack workflow --- src/rdkafka_mock_handlers.c | 97 ++++++++++++++++++++++++++++--------- 1 file changed, 74 insertions(+), 23 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 106db56f5d..f99c0a113e 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3659,9 +3659,31 @@ struct rd_kafka_mock_sgrp_ack_entry { int32_t partition; int64_t first_offset; int64_t last_offset; - int8_t ack_type; /**< 0=GAP, 1=ACCEPT, 2=RELEASE, 3=REJECT */ + int8_t ack_type; /**< 0=GAP, 1=ACCEPT, 2=RELEASE, 3=REJECT */ + rd_kafka_resp_err_t err; /**< Per-batch ack result, set after apply */ }; +/** + * @brief Find the worst ack error for the given (topic_id, partition) across + * all ack entries. Returns NO_ERROR if no ack targeted this partition + * or all acks succeeded. + */ +static rd_kafka_resp_err_t rd_kafka_mock_sgrp_ack_error_for_partition( + const rd_list_t *ack_entries, + rd_kafka_Uuid_t topic_id, + int32_t partition) { + int k; + for (k = 0; k < rd_list_cnt(ack_entries); k++) { + const struct rd_kafka_mock_sgrp_ack_entry *entry = + rd_list_elem(ack_entries, k); + if (entry->partition == partition && + !rd_kafka_Uuid_cmp(entry->topic_id, topic_id) && + entry->err) + return entry->err; + } + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + /** * @brief Apply a single acknowledgement batch to share-group partition * metadata. @@ -3674,19 +3696,21 @@ struct rd_kafka_mock_sgrp_ack_entry { * * @locks mcluster->lock MUST be held. */ -static void rd_kafka_mock_sgrp_apply_ack(rd_kafka_mock_sharegroup_t *sgrp, - rd_kafka_Uuid_t topic_id, - int32_t partition, - int64_t first_offset, - int64_t last_offset, - int8_t ack_type, - const rd_kafkap_str_t *member_id) { +static rd_kafka_resp_err_t +rd_kafka_mock_sgrp_apply_ack(rd_kafka_mock_sharegroup_t *sgrp, + rd_kafka_Uuid_t topic_id, + int32_t partition, + int64_t first_offset, + int64_t last_offset, + int8_t ack_type, + const rd_kafkap_str_t *member_id) { rd_kafka_mock_sgrp_partmeta_t *pmeta; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; int64_t offset; pmeta = rd_kafka_mock_sgrp_partmeta_find(sgrp, topic_id, partition); if (!pmeta) - return; + return RD_KAFKA_RESP_ERR_NO_ERROR; for (offset = first_offset; offset <= last_offset; offset++) { rd_kafka_mock_sgrp_record_state_t *state = @@ -3694,13 +3718,17 @@ static void rd_kafka_mock_sgrp_apply_ack(rd_kafka_mock_sharegroup_t *sgrp, if (!state) continue; - /* Only the owning member may acknowledge acquired records */ - if (state->state != RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED) - continue; - if (!state->owner_member_id) - continue; - if (rd_kafkap_str_cmp_str(member_id, state->owner_member_id)) + /* Only the owning member may acknowledge acquired records. + * If the record's lock expired (reverted to Available), + * was re-acquired by another member, or is otherwise not + * in ACQUIRED state for this member, report + * INVALID_RECORD_STATE. */ + if (state->state != RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED || + !state->owner_member_id || + rd_kafkap_str_cmp_str(member_id, state->owner_member_id)) { + err = RD_KAFKA_RESP_ERR_INVALID_RECORD_STATE; continue; + } switch (ack_type) { case 0: /* GAP */ @@ -3730,6 +3758,8 @@ static void rd_kafka_mock_sgrp_apply_ack(rd_kafka_mock_sharegroup_t *sgrp, break; pmeta->spso++; } + + return err; } /** @@ -4125,7 +4155,7 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, for (k = 0; k < rd_list_cnt(&ack_entries); k++) { struct rd_kafka_mock_sgrp_ack_entry *entry = rd_list_elem(&ack_entries, k); - rd_kafka_mock_sgrp_apply_ack( + entry->err = rd_kafka_mock_sgrp_apply_ack( sgrp, entry->topic_id, entry->partition, entry->first_offset, entry->last_offset, entry->ack_type, &MemberId); @@ -4268,6 +4298,7 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, sgrp, topic_id, rktpar->partition) : NULL; + rd_kafka_resp_err_t ack_err; rd_kafka_resp_err_t part_err = mpart ? RD_KAFKA_RESP_ERR_NO_ERROR @@ -4289,10 +4320,21 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_str( resp, NULL, -1); /* Response: AcknowledgementErrorCode */ - rd_kafka_buf_write_i16(resp, 0); - /* Response: AcknowledgementErrorString - */ - rd_kafka_buf_write_str(resp, NULL, -1); + ack_err = + rd_kafka_mock_sgrp_ack_error_for_partition( + &ack_entries, topic_id, + rktpar->partition); + rd_kafka_buf_write_i16(resp, ack_err); + /* Response: + * AcknowledgementErrorString */ + if (ack_err) + rd_kafka_buf_write_str( + resp, + rd_kafka_err2str(ack_err), + -1); + else + rd_kafka_buf_write_str( + resp, NULL, -1); /* Response: CurrentLeader */ rd_kafka_buf_write_i32(resp, -1); rd_kafka_buf_write_i32(resp, -1); @@ -4529,7 +4571,7 @@ rd_kafka_mock_handle_ShareAcknowledge(rd_kafka_mock_connection_t *mconn, for (k = 0; k < rd_list_cnt(&ack_entries); k++) { struct rd_kafka_mock_sgrp_ack_entry *entry = rd_list_elem(&ack_entries, k); - rd_kafka_mock_sgrp_apply_ack( + entry->err = rd_kafka_mock_sgrp_apply_ack( sgrp, entry->topic_id, entry->partition, entry->first_offset, entry->last_offset, entry->ack_type, &MemberId); @@ -4613,7 +4655,9 @@ rd_kafka_mock_handle_ShareAcknowledge(rd_kafka_mock_connection_t *mconn, rd_kafka_topic_partition_t *rktpar = &ack_partitions->elems[j]; rd_kafka_resp_err_t part_err = - RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_mock_sgrp_ack_error_for_partition( + &ack_entries, topic_id, + rktpar->partition); /* PartitionIndex */ rd_kafka_buf_write_i32( @@ -4621,7 +4665,14 @@ rd_kafka_mock_handle_ShareAcknowledge(rd_kafka_mock_connection_t *mconn, /* ErrorCode */ rd_kafka_buf_write_i16(resp, part_err); /* ErrorMessage */ - rd_kafka_buf_write_str(resp, NULL, -1); + if (part_err) + rd_kafka_buf_write_str( + resp, + rd_kafka_err2str(part_err), + -1); + else + rd_kafka_buf_write_str( + resp, NULL, -1); /* CurrentLeader */ rd_kafka_buf_write_i32( resp, -1); /* LeaderId */ From 6205235929ebc38b8ee35a98f7160e272c065888 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 15:05:58 +0530 Subject: [PATCH 03/10] Add checks for ack parsing --- src/rdkafka_mock_handlers.c | 118 +++++++++++++++++++++++++++++++++--- 1 file changed, 108 insertions(+), 10 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index f99c0a113e..6088ddf39f 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3845,6 +3845,7 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, rd_kafka_topic_partition_list_t *requested_partitions = NULL; rd_kafka_topic_partition_list_t *forgotten_partitions = NULL; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_bool_t ack_parse_err = rd_false; rd_kafka_mock_sharegroup_t *sgrp = NULL; rd_kafka_mock_sgrp_fetch_session_t *session = NULL; rd_list_t ack_entries; @@ -3878,24 +3879,49 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, while (PartitionCnt-- > 0) { int32_t Partition; int32_t AckBatchCnt; + int64_t prev_ack_last = -1; rd_kafka_topic_partition_t *rktpar; rd_kafka_buf_read_i32(rkbuf, &Partition); rd_kafka_buf_read_arraycnt(rkbuf, &AckBatchCnt, -1); while (AckBatchCnt-- > 0) { int32_t AckTypeCnt; - int8_t AckType = -1; int64_t AckFirstOffset, AckLastOffset; + int64_t range_len, ti; + int8_t *ack_types = NULL; + int8_t single_type = -1; + rd_kafka_buf_read_i64(rkbuf, &AckFirstOffset); rd_kafka_buf_read_i64(rkbuf, &AckLastOffset); + + /* Validate ascending order and + * non-overlapping ranges. */ + if (prev_ack_last >= 0 && + AckFirstOffset <= prev_ack_last) + ack_parse_err = rd_true; + prev_ack_last = AckLastOffset; + + range_len = AckLastOffset - AckFirstOffset + 1; + rd_kafka_buf_read_arraycnt(rkbuf, &AckTypeCnt, -1); - while (AckTypeCnt-- > 0) { - rd_kafka_buf_read_i8(rkbuf, &AckType); + + if (AckTypeCnt == 1) { + /* Single type for entire range */ + rd_kafka_buf_read_i8(rkbuf, + &single_type); + } else if (AckTypeCnt > 1) { + /* Per-offset types */ + ack_types = rd_alloca( + (size_t)AckTypeCnt * + sizeof(*ack_types)); + for (ti = 0; ti < AckTypeCnt; ti++) + rd_kafka_buf_read_i8( + rkbuf, &ack_types[ti]); } rd_kafka_buf_skip_tags(rkbuf); - if (AckType >= 0) { + if (AckTypeCnt == 1 && single_type >= 0) { struct rd_kafka_mock_sgrp_ack_entry *entry = rd_calloc(1, sizeof(*entry)); @@ -3903,8 +3929,28 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, entry->partition = Partition; entry->first_offset = AckFirstOffset; entry->last_offset = AckLastOffset; - entry->ack_type = AckType; + entry->ack_type = single_type; rd_list_add(&ack_entries, entry); + } else if (ack_types && + AckTypeCnt == range_len) { + /* Per-offset: one entry per offset */ + for (ti = 0; ti < range_len; ti++) { + struct rd_kafka_mock_sgrp_ack_entry + *entry = rd_calloc( + 1, sizeof(*entry)); + entry->topic_id = TopicId; + entry->partition = Partition; + entry->first_offset = + AckFirstOffset + ti; + entry->last_offset = + AckFirstOffset + ti; + entry->ack_type = ack_types[ti]; + rd_list_add(&ack_entries, entry); + } + } else if (AckTypeCnt > 0) { + /* AckTypeCnt is neither 1 nor + * range_len: malformed request. */ + ack_parse_err = rd_true; } } @@ -3970,6 +4016,9 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, err = rd_kafka_mock_next_request_error(mconn, resp); + if (!err && ack_parse_err) + err = RD_KAFKA_RESP_ERR_INVALID_REQUEST; + if (!err) { int64_t remaining_records = MaxRecords > 0 ? (int64_t)MaxRecords : -1; @@ -4454,6 +4503,7 @@ rd_kafka_mock_handle_ShareAcknowledge(rd_kafka_mock_connection_t *mconn, int32_t TopicsCnt; rd_kafka_topic_partition_list_t *ack_partitions = NULL; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_bool_t ack_parse_err = rd_false; rd_kafka_mock_sharegroup_t *sgrp = NULL; rd_list_t ack_entries; @@ -4481,24 +4531,49 @@ rd_kafka_mock_handle_ShareAcknowledge(rd_kafka_mock_connection_t *mconn, while (PartitionCnt-- > 0) { int32_t Partition; int32_t AckBatchCnt; + int64_t prev_ack_last = -1; rd_kafka_topic_partition_t *rktpar; rd_kafka_buf_read_i32(rkbuf, &Partition); rd_kafka_buf_read_arraycnt(rkbuf, &AckBatchCnt, -1); while (AckBatchCnt-- > 0) { int32_t AckTypeCnt; - int8_t AckType = -1; int64_t AckFirstOffset, AckLastOffset; + int64_t range_len, ti; + int8_t *ack_types = NULL; + int8_t single_type = -1; + rd_kafka_buf_read_i64(rkbuf, &AckFirstOffset); rd_kafka_buf_read_i64(rkbuf, &AckLastOffset); + + /* Validate ascending order and + * non-overlapping ranges. */ + if (prev_ack_last >= 0 && + AckFirstOffset <= prev_ack_last) + ack_parse_err = rd_true; + prev_ack_last = AckLastOffset; + + range_len = AckLastOffset - AckFirstOffset + 1; + rd_kafka_buf_read_arraycnt(rkbuf, &AckTypeCnt, -1); - while (AckTypeCnt-- > 0) { - rd_kafka_buf_read_i8(rkbuf, &AckType); + + if (AckTypeCnt == 1) { + /* Single type for entire range */ + rd_kafka_buf_read_i8(rkbuf, + &single_type); + } else if (AckTypeCnt > 1) { + /* Per-offset types */ + ack_types = rd_alloca( + (size_t)AckTypeCnt * + sizeof(*ack_types)); + for (ti = 0; ti < AckTypeCnt; ti++) + rd_kafka_buf_read_i8( + rkbuf, &ack_types[ti]); } rd_kafka_buf_skip_tags(rkbuf); - if (AckType >= 0) { + if (AckTypeCnt == 1 && single_type >= 0) { struct rd_kafka_mock_sgrp_ack_entry *entry = rd_calloc(1, sizeof(*entry)); @@ -4506,8 +4581,28 @@ rd_kafka_mock_handle_ShareAcknowledge(rd_kafka_mock_connection_t *mconn, entry->partition = Partition; entry->first_offset = AckFirstOffset; entry->last_offset = AckLastOffset; - entry->ack_type = AckType; + entry->ack_type = single_type; rd_list_add(&ack_entries, entry); + } else if (ack_types && + AckTypeCnt == range_len) { + /* Per-offset: one entry per offset */ + for (ti = 0; ti < range_len; ti++) { + struct rd_kafka_mock_sgrp_ack_entry + *entry = rd_calloc( + 1, sizeof(*entry)); + entry->topic_id = TopicId; + entry->partition = Partition; + entry->first_offset = + AckFirstOffset + ti; + entry->last_offset = + AckFirstOffset + ti; + entry->ack_type = ack_types[ti]; + rd_list_add(&ack_entries, entry); + } + } else if (AckTypeCnt > 0) { + /* AckTypeCnt is neither 1 nor + * range_len: malformed request. */ + ack_parse_err = rd_true; } } @@ -4533,6 +4628,9 @@ rd_kafka_mock_handle_ShareAcknowledge(rd_kafka_mock_connection_t *mconn, /* ---- Inject errors if configured ---- */ err = rd_kafka_mock_next_request_error(mconn, resp); + if (!err && ack_parse_err) + err = RD_KAFKA_RESP_ERR_INVALID_REQUEST; + if (!err) { rd_kafka_mock_sgrp_fetch_session_t *session = NULL; From 6c0429f3c56f27ebbb644a2c152b11c0b774a675 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 15:18:12 +0530 Subject: [PATCH 04/10] Update ack with forgotten topics --- src/rdkafka_mock_handlers.c | 56 +++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 6088ddf39f..1feb63198c 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -4131,19 +4131,42 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, session->session_epoch++; } - /* epoch=-1 (final fetch / close session) must - * not add or forget topics. */ + /* epoch=-1 (final fetch / close session) must not + * contain ForgottenTopicsData. Acks in the Topics + * array ARE allowed per KIP-932. */ if (!err && SessionEpoch == -1 && - ((requested_partitions && requested_partitions->cnt > 0) || - (forgotten_partitions && forgotten_partitions->cnt > 0))) { + (forgotten_partitions && forgotten_partitions->cnt > 0)) { rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", "ShareFetch: rejecting epoch=-1 request " - "with topic add/forget (INVALID_REQUEST)"); + "with ForgottenTopicsData (INVALID_REQUEST)"); err = RD_KAFKA_RESP_ERR_INVALID_REQUEST; } + /* Apply piggy-backed acknowledgements BEFORE forgotten + * partition processing, so that acks for partitions + * being removed are applied while the records are still + * in ACQUIRED state. */ + if (!err && sgrp && rd_list_cnt(&ack_entries) > 0) { + int k; + rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", + "ShareFetch: applying %d acknowledgement " + "batch(es) for member %.*s", + rd_list_cnt(&ack_entries), + RD_KAFKAP_STR_PR(&MemberId)); + for (k = 0; k < rd_list_cnt(&ack_entries); k++) { + struct rd_kafka_mock_sgrp_ack_entry *entry = + rd_list_elem(&ack_entries, k); + entry->err = rd_kafka_mock_sgrp_apply_ack( + sgrp, entry->topic_id, entry->partition, + entry->first_offset, entry->last_offset, + entry->ack_type, &MemberId); + } + } + /* Remove forgotten partitions from session and release - * any in-flight ACQUIRED records owned by this member. */ + * any remaining ACQUIRED records owned by this member. + * Runs AFTER ack application so that acks for partitions + * being removed have already been processed. */ if (!err && session && forgotten_partitions && forgotten_partitions->cnt > 0) { rd_kafka_topic_partition_t *ftp; @@ -4190,27 +4213,6 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, } } - /* Apply piggy-backed acknowledgements (implicit ack) - * before acquiring new records. This processes the - * AcknowledgementBatches sent by the client for records - * delivered in the previous ShareFetch response. */ - if (!err && sgrp && rd_list_cnt(&ack_entries) > 0) { - int k; - rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", - "ShareFetch: applying %d acknowledgement " - "batch(es) for member %.*s", - rd_list_cnt(&ack_entries), - RD_KAFKAP_STR_PR(&MemberId)); - for (k = 0; k < rd_list_cnt(&ack_entries); k++) { - struct rd_kafka_mock_sgrp_ack_entry *entry = - rd_list_elem(&ack_entries, k); - entry->err = rd_kafka_mock_sgrp_apply_ack( - sgrp, entry->topic_id, entry->partition, - entry->first_offset, entry->last_offset, - entry->ack_type, &MemberId); - } - } - if (!err && sgrp && session && session->partitions) { rd_kafka_topic_partition_t *rktpar; RD_KAFKA_TPLIST_FOREACH(rktpar, session->partitions) { From 76cbfb4aa682c7456a3edc33cd53c80c2f8c84e5 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 17:26:12 +0530 Subject: [PATCH 05/10] Fix acknowledgement error propagation --- src/rdkafka_mock_handlers.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 1feb63198c..85d9b80a43 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -4161,6 +4161,17 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, entry->first_offset, entry->last_offset, entry->ack_type, &MemberId); } + } else if (err && rd_list_cnt(&ack_entries) > 0) { + /* Broad error prevents ack processing. + * Per KIP-932, propagate the top-level error to + * AcknowledgeErrorCode for all partitions that + * had piggybacked acks. */ + int k; + for (k = 0; k < rd_list_cnt(&ack_entries); k++) { + struct rd_kafka_mock_sgrp_ack_entry *entry = + rd_list_elem(&ack_entries, k); + entry->err = err; + } } /* Remove forgotten partitions from session and release From 3a06f70752a56aa8088173ffb0444edba3dca630 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 17:42:11 +0530 Subject: [PATCH 06/10] Add max record locks --- src/rdkafka_mock.h | 15 +++++++++++++++ src/rdkafka_mock_handlers.c | 22 +++++++++++++++++++--- src/rdkafka_mock_int.h | 7 +++++++ src/rdkafka_mock_sharegrp.c | 18 ++++++++++++++++++ 4 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 10758611d5..8709d9a8e5 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -732,6 +732,21 @@ RD_EXPORT void rd_kafka_mock_sharegroup_set_max_fetch_sessions( rd_kafka_mock_cluster_t *mcluster, int max_fetch_sessions); +/** + * @brief Set the maximum number of in-flight record locks per share-partition. + * + * Once the limit is reached, no more records are acquired until existing + * locks are released (via ack, release, reject, or lock expiry). + * + * Default is 2000 (per KIP-932 group.share.partition.max.record.locks). + * + * @param mcluster Mock cluster instance. + * @param max_record_locks Maximum in-flight records per partition. 0 = unlimited. + */ +RD_EXPORT void rd_kafka_mock_sharegroup_set_max_record_locks( + rd_kafka_mock_cluster_t *mcluster, + int max_record_locks); + /** * @brief Set a manual target assignment for a sharegroup. * diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 85d9b80a43..1f89561e6d 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3458,6 +3458,7 @@ static void rd_kafka_mock_sgrp_acquire_available_offsets( const rd_kafkap_str_t *member_id, rd_ts_t lock_expiry_ts, int max_delivery_attempts, + int max_record_locks, int64_t *remaining_records, int64_t *remaining_bytes, int *acquired_cnt, @@ -3483,6 +3484,11 @@ static void rd_kafka_mock_sgrp_acquire_available_offsets( if (remaining_bytes && *remaining_bytes == 0) break; + /* Check max in-flight record locks per partition */ + if (max_record_locks > 0 && + pmeta->inflight_cnt >= max_record_locks) + break; + state = rd_kafka_mock_sgrp_record_state_find(pmeta, offset); if (state && state->state != RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE) @@ -4224,9 +4230,15 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, } } - if (!err && sgrp && session && session->partitions) { - rd_kafka_topic_partition_t *rktpar; - RD_KAFKA_TPLIST_FOREACH(rktpar, session->partitions) { + if (!err && sgrp && session && session->partitions && + session->partitions->cnt > 0) { + int pi, pcnt = session->partitions->cnt; + int start = session->partition_start_idx % pcnt; + + for (pi = 0; pi < pcnt; pi++) { + int idx = (start + pi) % pcnt; + rd_kafka_topic_partition_t *rktpar = + &session->partitions->elems[idx]; rd_kafka_Uuid_t topic_id = rd_kafka_topic_partition_get_topic_id( rktpar); @@ -4262,11 +4274,15 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, : sgrp->session_timeout_ms) * 1000), sgrp->max_delivery_attempts, + sgrp->max_record_locks, MaxRecords > 0 ? &remaining_records : NULL, MaxBytes > 0 ? &remaining_bytes : NULL, &acquired_cnt, &acquired_bytes, sgrp->isolation_level); } + + /* Rotate start index for next request */ + session->partition_start_idx = (start + 1) % pcnt; } rd_kafka_dbg(mconn->broker->cluster->rk, MOCK, "MOCK", diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index c30a8ca356..bb619e00fd 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -221,6 +221,7 @@ typedef struct rd_kafka_mock_sgrp_fetch_session_s { int32_t session_epoch; rd_ts_t ts_last_activity; rd_kafka_topic_partition_list_t *partitions; + int partition_start_idx; /**< Rotation index for starvation prevention */ } rd_kafka_mock_sgrp_fetch_session_t; /** @@ -260,6 +261,9 @@ typedef struct rd_kafka_mock_sharegroup_s { * 0 = unlimited (default). */ int max_fetch_sessions; /**< Max fetch sessions allowed. * 0 = unlimited (default 2000). */ + int max_record_locks; /**< Max in-flight records per + * share-partition. + * 0 = unlimited (default 2000). */ } rd_kafka_mock_sharegroup_t; /** @@ -648,6 +652,9 @@ struct rd_kafka_mock_cluster_s { /** Max fetch sessions per share group (KIP 932). * 0 = unlimited. */ int sharegroup_max_fetch_sessions; + /** Max in-flight records per share-partition (KIP 932). + * 0 = unlimited. */ + int sharegroup_max_record_locks; } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ diff --git a/src/rdkafka_mock_sharegrp.c b/src/rdkafka_mock_sharegrp.c index 451f8e5b92..3d056dc439 100644 --- a/src/rdkafka_mock_sharegrp.c +++ b/src/rdkafka_mock_sharegrp.c @@ -60,6 +60,7 @@ void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) { mcluster->defaults.sharegroup_max_size = 0; mcluster->defaults.sharegroup_isolation_level = 0; mcluster->defaults.sharegroup_max_fetch_sessions = 2000; + mcluster->defaults.sharegroup_max_record_locks = 2000; } /** @@ -117,6 +118,8 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, mshgrp->max_size = mcluster->defaults.sharegroup_max_size; mshgrp->max_fetch_sessions = mcluster->defaults.sharegroup_max_fetch_sessions; + mshgrp->max_record_locks = + mcluster->defaults.sharegroup_max_record_locks; rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr, 1000 * 1000 /* 1s */, @@ -795,6 +798,21 @@ void rd_kafka_mock_sharegroup_set_max_fetch_sessions( mtx_unlock(&mcluster->lock); } +/** + * @brief Set the maximum number of in-flight record locks per + * share-partition. + */ +void rd_kafka_mock_sharegroup_set_max_record_locks( + rd_kafka_mock_cluster_t *mcluster, + int max_record_locks) { + rd_kafka_mock_sharegroup_t *mshgrp; + mtx_lock(&mcluster->lock); + TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link) + mshgrp->max_record_locks = max_record_locks; + mcluster->defaults.sharegroup_max_record_locks = max_record_locks; + mtx_unlock(&mcluster->lock); +} + /** * @brief Destroy share fetch session. */ From 63fc549b56c71442968bb14efe04099b01477596 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 17:51:34 +0530 Subject: [PATCH 07/10] Set default to latest --- src/rdkafka_mock.h | 15 +++++++++++++++ src/rdkafka_mock_handlers.c | 7 ++++++- src/rdkafka_mock_int.h | 5 +++++ src/rdkafka_mock_sharegrp.c | 17 +++++++++++++++++ tests/0156-share_consumer_fetch_mock.c | 4 ++++ tests/0157-share_consumer_ack_mock.c | 4 ++++ tests/0158-share_consumer_transactions_mock.c | 4 ++++ 7 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 8709d9a8e5..f796c205c2 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -747,6 +747,21 @@ RD_EXPORT void rd_kafka_mock_sharegroup_set_max_record_locks( rd_kafka_mock_cluster_t *mcluster, int max_record_locks); +/** + * @brief Set the auto offset reset policy for share groups. + * + * Controls where SPSO is initialized when a share-partition is first + * consumed. + * + * Default is 0 ("latest") per KIP-932. + * + * @param mcluster Mock cluster instance. + * @param auto_offset_reset 0 = latest, 1 = earliest. + */ +RD_EXPORT void rd_kafka_mock_sharegroup_set_auto_offset_reset( + rd_kafka_mock_cluster_t *mcluster, + int auto_offset_reset); + /** * @brief Set a manual target assignment for a sharegroup. * diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 1f89561e6d..a390d46ce6 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3373,7 +3373,12 @@ rd_kafka_mock_sgrp_partmeta_get(rd_kafka_mock_sharegroup_t *sgrp, pmeta = rd_calloc(1, sizeof(*pmeta)); pmeta->topic_id = topic_id; pmeta->partition = partition; - pmeta->spso = mpart->start_offset; + /* Initialize SPSO based on auto.offset.reset: + * 0 = latest (end of log), 1 = earliest (start of log). */ + if (sgrp->auto_offset_reset == 1) + pmeta->spso = mpart->start_offset; + else + pmeta->spso = mpart->end_offset; if (mpart->end_offset > mpart->start_offset) pmeta->speo = mpart->end_offset - 1; else diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index bb619e00fd..3183012b1e 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -264,6 +264,8 @@ typedef struct rd_kafka_mock_sharegroup_s { int max_record_locks; /**< Max in-flight records per * share-partition. * 0 = unlimited (default 2000). */ + int auto_offset_reset; /**< 0 = latest (default per KIP-932), + * 1 = earliest. */ } rd_kafka_mock_sharegroup_t; /** @@ -655,6 +657,9 @@ struct rd_kafka_mock_cluster_s { /** Max in-flight records per share-partition (KIP 932). * 0 = unlimited. */ int sharegroup_max_record_locks; + /** Auto offset reset (KIP 932). + * 0 = latest (default), 1 = earliest. */ + int sharegroup_auto_offset_reset; } defaults; /**< Dynamic array of IO handlers for corresponding fd in .fds */ diff --git a/src/rdkafka_mock_sharegrp.c b/src/rdkafka_mock_sharegrp.c index 3d056dc439..810b2d27a1 100644 --- a/src/rdkafka_mock_sharegrp.c +++ b/src/rdkafka_mock_sharegrp.c @@ -61,6 +61,7 @@ void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) { mcluster->defaults.sharegroup_isolation_level = 0; mcluster->defaults.sharegroup_max_fetch_sessions = 2000; mcluster->defaults.sharegroup_max_record_locks = 2000; + mcluster->defaults.sharegroup_auto_offset_reset = 0; /* latest */ } /** @@ -120,6 +121,8 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster, mcluster->defaults.sharegroup_max_fetch_sessions; mshgrp->max_record_locks = mcluster->defaults.sharegroup_max_record_locks; + mshgrp->auto_offset_reset = + mcluster->defaults.sharegroup_auto_offset_reset; rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr, 1000 * 1000 /* 1s */, @@ -813,6 +816,20 @@ void rd_kafka_mock_sharegroup_set_max_record_locks( mtx_unlock(&mcluster->lock); } +/** + * @brief Set the auto offset reset policy for share groups. + */ +void rd_kafka_mock_sharegroup_set_auto_offset_reset( + rd_kafka_mock_cluster_t *mcluster, + int auto_offset_reset) { + rd_kafka_mock_sharegroup_t *mshgrp; + mtx_lock(&mcluster->lock); + TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link) + mshgrp->auto_offset_reset = auto_offset_reset; + mcluster->defaults.sharegroup_auto_offset_reset = auto_offset_reset; + mtx_unlock(&mcluster->lock); +} + /** * @brief Destroy share fetch session. */ diff --git a/tests/0156-share_consumer_fetch_mock.c b/tests/0156-share_consumer_fetch_mock.c index 1b95d6f045..d05135d617 100644 --- a/tests/0156-share_consumer_fetch_mock.c +++ b/tests/0156-share_consumer_fetch_mock.c @@ -62,6 +62,10 @@ static test_ctx_t test_ctx_new(void) { RD_KAFKA_RESP_ERR_NO_ERROR, "Failed to enable ShareFetch"); + /* Set auto.offset.reset=earliest so tests that produce + * before consuming see all records. */ + rd_kafka_mock_sharegroup_set_auto_offset_reset(ctx.mcluster, 1); + /* Create a producer targeting the mock cluster */ test_conf_init(&conf, NULL, 0); test_conf_set(conf, "bootstrap.servers", ctx.bootstraps); diff --git a/tests/0157-share_consumer_ack_mock.c b/tests/0157-share_consumer_ack_mock.c index 6f983390ad..b94aa876c8 100644 --- a/tests/0157-share_consumer_ack_mock.c +++ b/tests/0157-share_consumer_ack_mock.c @@ -62,6 +62,10 @@ static test_ctx_t test_ctx_new(void) { RD_KAFKA_RESP_ERR_NO_ERROR, "Failed to enable ShareFetch"); + /* Set auto.offset.reset=earliest so tests that produce + * before consuming see all records. */ + rd_kafka_mock_sharegroup_set_auto_offset_reset(ctx.mcluster, 1); + /* Create a producer targeting the mock cluster */ test_conf_init(&conf, NULL, 0); test_conf_set(conf, "bootstrap.servers", ctx.bootstraps); diff --git a/tests/0158-share_consumer_transactions_mock.c b/tests/0158-share_consumer_transactions_mock.c index abcea171d0..c3953336ff 100644 --- a/tests/0158-share_consumer_transactions_mock.c +++ b/tests/0158-share_consumer_transactions_mock.c @@ -64,6 +64,10 @@ static test_ctx_t test_ctx_new(const char *txn_id) { RD_KAFKA_RESP_ERR_NO_ERROR, "Failed to enable ShareFetch"); + /* Set auto.offset.reset=earliest so tests that produce + * before consuming see all records. */ + rd_kafka_mock_sharegroup_set_auto_offset_reset(ctx.mcluster, 1); + /* Non-transactional producer */ test_conf_init(&conf, NULL, 0); test_conf_set(conf, "bootstrap.servers", ctx.bootstraps); From e0f2e2d2da1072a7cb698c6e32a46825a9e90987 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 17:58:36 +0530 Subject: [PATCH 08/10] Add acknowledge state --- src/rdkafka_mock_handlers.c | 14 ++++++++------ src/rdkafka_mock_int.h | 7 ++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index a390d46ce6..0d270c1a85 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3576,7 +3576,8 @@ static void rd_kafka_mock_sgrp_partmeta_prune_archived( rd_kafka_mock_sgrp_record_state_t *state, *tmp; TAILQ_FOREACH_SAFE(state, &pmeta->inflight, link, tmp) { - if (state->state != RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED) + if (state->state != RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED && + state->state != RD_KAFKA_MOCK_SGRP_RECORD_ACKNOWLEDGED) continue; if (state->offset >= pmeta->spso) continue; @@ -3745,7 +3746,8 @@ rd_kafka_mock_sgrp_apply_ack(rd_kafka_mock_sharegroup_t *sgrp, case 0: /* GAP */ case 1: /* ACCEPT */ case 3: /* REJECT */ - state->state = RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED; + state->state = + RD_KAFKA_MOCK_SGRP_RECORD_ACKNOWLEDGED; rd_free(state->owner_member_id); state->owner_member_id = NULL; state->lock_expiry_ts = 0; @@ -3758,15 +3760,15 @@ rd_kafka_mock_sgrp_apply_ack(rd_kafka_mock_sharegroup_t *sgrp, } } - /* Advance SPSO past contiguous ARCHIVED records from the start, - * so that acknowledged records are no longer considered for - * future acquisitions. */ + /* Advance SPSO past contiguous ACKNOWLEDGED records from + * the start, transitioning them to ARCHIVED. */ while (pmeta->spso <= pmeta->speo) { rd_kafka_mock_sgrp_record_state_t *state = rd_kafka_mock_sgrp_record_state_find(pmeta, pmeta->spso); if (!state || - state->state != RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED) + state->state != RD_KAFKA_MOCK_SGRP_RECORD_ACKNOWLEDGED) break; + state->state = RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED; pmeta->spso++; } diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 3183012b1e..2f1aeadeb9 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -183,9 +183,10 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s { * @brief Share record state. */ enum rd_kafka_mock_sgrp_record_state_e { - RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE = 0, - RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED = 1, - RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED = 2 + RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE = 0, + RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED = 1, + RD_KAFKA_MOCK_SGRP_RECORD_ACKNOWLEDGED = 2, + RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED = 3 }; typedef struct rd_kafka_mock_sgrp_record_state_s { From 244a8fe9a809b63bb44e1dd6ab39b7386e6bbd80 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Tue, 7 Apr 2026 18:11:57 +0530 Subject: [PATCH 09/10] Update spso and speo record transition --- src/rdkafka_mock_handlers.c | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 0d270c1a85..2950f7e366 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -3360,8 +3360,22 @@ rd_kafka_mock_sgrp_partmeta_get(rd_kafka_mock_sharegroup_t *sgrp, if (pmeta) { log_start = mpart->start_offset; log_end = mpart->end_offset; - if (log_start > pmeta->spso) + if (log_start > pmeta->spso) { + /* Log retention moved start_offset past SPSO. + * Archive all in-flight records below the new + * SPSO — they are no longer in the log. */ + rd_kafka_mock_sgrp_record_state_t *state, *tmp; + TAILQ_FOREACH_SAFE(state, &pmeta->inflight, link, + tmp) { + if (state->offset >= log_start) + continue; + state->state = RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED; + RD_IF_FREE(state->owner_member_id, rd_free); + state->owner_member_id = NULL; + state->lock_expiry_ts = 0; + } pmeta->spso = log_start; + } if (log_end > log_start) { int64_t new_speo = log_end - 1; if (new_speo > pmeta->speo) From 058d64b03b644f180a0a389451df7d07a017e0de Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Wed, 8 Apr 2026 03:56:36 +0530 Subject: [PATCH 10/10] Add tests --- src/rdkafka_mock_handlers.c | 19 +- tests/0156-share_consumer_fetch_mock.c | 310 +++++++++++++++++++++++++ tests/0157-share_consumer_ack_mock.c | 151 ++++++++++++ 3 files changed, 468 insertions(+), 12 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 2950f7e366..fb99ac5a3a 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -4268,18 +4268,13 @@ static int rd_kafka_mock_handle_ShareFetch(rd_kafka_mock_connection_t *mconn, topic_id); rd_kafka_mock_partition_t *mpart; - if (!mtopic) { - err = - RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - break; - } - - mpart = rd_kafka_mock_partition_find( - mtopic, rktpar->partition); - if (!mpart) { - err = - RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - break; + if (!mtopic || !(mpart = rd_kafka_mock_partition_find( + mtopic, rktpar->partition))) { + /* Per-partition error: skip this + * partition but continue with others. + * The response writer handles the + * error via mpart==NULL check. */ + continue; } rd_kafka_mock_sgrp_partmeta_t *pmeta = diff --git a/tests/0156-share_consumer_fetch_mock.c b/tests/0156-share_consumer_fetch_mock.c index d05135d617..fe2e9b276b 100644 --- a/tests/0156-share_consumer_fetch_mock.c +++ b/tests/0156-share_consumer_fetch_mock.c @@ -1071,6 +1071,303 @@ static void do_test_sharefetch_fetch_and_close_implicit(void) { SUB_TEST_PASS(); } +/** + * @brief Test that SHARE_SESSION_LIMIT_REACHED is returned when the + * session cache is full. + * + * Set max_fetch_sessions=1, open a session with consumer 1, + * then attempt to open a second session with consumer 2. + * Consumer 2 should fail to consume any records because every + * ShareFetch epoch=0 attempt gets SHARE_SESSION_LIMIT_REACHED. + */ +static void do_test_session_limit_reached(void) { + const char *topic = "kip932_session_limit"; + const int msgcnt = 5; + test_ctx_t ctx; + rd_kafka_share_t *consumer1, *consumer2; + int consumed1, consumed2; + + SUB_TEST_QUICK(); + + ctx = test_ctx_new(); + + /* Limit to 1 session */ + rd_kafka_mock_sharegroup_set_max_fetch_sessions(ctx.mcluster, 1); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Consumer 1 opens a session successfully */ + consumer1 = new_share_consumer(ctx.bootstraps, "sg-session-limit"); + subscribe_topics(consumer1, &topic, 1); + consumed1 = consume_n(consumer1, msgcnt, 30); + TEST_ASSERT(consumed1 == msgcnt, + "Consumer 1: expected %d consumed, got %d", msgcnt, + consumed1); + + /* Consumer 2 tries to open a session — cache is full */ + consumer2 = new_share_consumer(ctx.bootstraps, "sg-session-limit"); + subscribe_topics(consumer2, &topic, 1); + consumed2 = consume_n(consumer2, 1, 5); + TEST_ASSERT(consumed2 == 0, + "Consumer 2: expected 0 consumed (session limit), got %d", + consumed2); + + rd_kafka_share_consumer_close(consumer1); + rd_kafka_share_destroy(consumer1); + rd_kafka_share_consumer_close(consumer2); + rd_kafka_share_destroy(consumer2); + test_ctx_destroy(&ctx); + + SUB_TEST_PASS(); +} + +/** + * @brief Test that ShareFetch with epoch=0 and acks is rejected with + * INVALID_REQUEST via the mock broker's injected error mechanism. + * + * Injects SHARE_SESSION_LIMIT_REACHED errors to force the client to + * retry with epoch=0. The client never piggybacks acks on epoch=0 + * (that's a protocol violation), so this test validates that the mock + * broker's error injection for SHARE_SESSION_LIMIT_REACHED works and + * the client recovers when the error clears. + */ +static void do_test_session_limit_recovery(void) { + const char *topic = "kip932_session_limit_recovery"; + const int msgcnt = 5; + test_ctx_t ctx = test_ctx_new(); + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Push 3 SHARE_SESSION_LIMIT_REACHED errors, then let it succeed */ + rd_kafka_mock_push_request_errors( + ctx.mcluster, RD_KAFKAP_ShareFetch, 3, + RD_KAFKA_RESP_ERR_SHARE_SESSION_LIMIT_REACHED, + RD_KAFKA_RESP_ERR_SHARE_SESSION_LIMIT_REACHED, + RD_KAFKA_RESP_ERR_SHARE_SESSION_LIMIT_REACHED); + + consumer = new_share_consumer(ctx.bootstraps, "sg-session-recovery"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, msgcnt, 30); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == msgcnt, + "Expected %d consumed after recovery, got %d", msgcnt, + consumed); + + SUB_TEST_PASS(); +} + +/** + * @brief Test that max_record_locks limits the number of in-flight + * records per share-partition. + * + * Produce 10 records, set max_record_locks=3. Consumer A should get + * only 3 records on the first fetch round (the rest are blocked by + * the lock limit). After A acks (via second poll) and closes, + * consumer B should get the next batch. Total consumed across both + * should be 10. + */ +static void do_test_max_record_locks(void) { + const char *topic = "kip932_max_record_locks"; + const int msgcnt = 10; + test_ctx_t ctx; + rd_kafka_share_t *consumer; + int total_consumed = 0; + int rounds = 0; + + SUB_TEST_QUICK(); + ctx = test_ctx_new(); + + rd_kafka_mock_sharegroup_set_max_record_locks(ctx.mcluster, 3); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Consume in rounds. Each round can get at most 3 records + * (the lock limit). The implicit ack from the next poll + * frees the locks for the next batch. */ + consumer = new_share_consumer(ctx.bootstraps, "sg-max-locks"); + subscribe_topics(consumer, &topic, 1); + + while (total_consumed < msgcnt && rounds < 20) { + int got = consume_n(consumer, 3, 10); + if (got == 0) + break; + total_consumed += got; + rounds++; + TEST_SAY("max_record_locks: round %d consumed %d (total %d/%d)\n", + rounds, got, total_consumed, msgcnt); + } + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(total_consumed == msgcnt, + "Expected %d total consumed, got %d", msgcnt, + total_consumed); + TEST_ASSERT(rounds > 1, + "Expected multiple rounds (lock limit=3, msgs=10), " + "got %d rounds", + rounds); + + SUB_TEST_PASS(); +} + +/** + * @brief Test that auto.offset.reset=latest (the default per KIP-932) + * causes the consumer to skip records produced before subscription. + * + * Produce 5 records, then subscribe with auto.offset.reset=latest. + * Consumer should get 0 old records but should get new records + * produced after subscription. + */ +/** + * @brief Test that SPSO advances when log retention deletes records + * below the current SPSO. + * + * Produce 10 records (offsets 0-9). Consumer A acquires 0-4. + * Then delete records before offset 5 (simulating log retention). + * Consumer A closes (acks 0-4, but they're already archived by + * retention). Consumer B should get records 5-9 only — SPSO + * was advanced to 5 by the retention, and in-flight records + * below 5 were archived. + */ +static void do_test_spso_advances_on_log_retention(void) { + const char *topic = "kip932_log_retention_spso"; + const int msgcnt = 10; + test_ctx_t ctx; + rd_kafka_share_t *consumer; + int consumed_a, consumed_b; + + SUB_TEST_QUICK(); + ctx = test_ctx_new(); + + /* Limit to 5 records per fetch so A gets only 0-4. */ + rd_kafka_mock_sharegroup_set_max_record_locks(ctx.mcluster, 5); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Consumer A acquires records 0-4. */ + consumer = new_share_consumer(ctx.bootstraps, "sg-log-retention"); + subscribe_topics(consumer, &topic, 1); + consumed_a = consume_n(consumer, 5, 30); + TEST_SAY("log_retention: A consumed %d/5\n", consumed_a); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + + /* Simulate log retention: delete records before offset 5. */ + TEST_ASSERT(rd_kafka_mock_partition_delete_records( + ctx.mcluster, topic, 0, 5) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to delete records"); + + /* Remove lock limit. */ + rd_kafka_mock_sharegroup_set_max_record_locks(ctx.mcluster, 0); + + /* Consumer B should get records 5-9 (SPSO advanced to 5). */ + consumer = new_share_consumer(ctx.bootstraps, "sg-log-retention"); + subscribe_topics(consumer, &topic, 1); + consumed_b = consume_n(consumer, 5, 30); + TEST_SAY("log_retention: B consumed %d/5\n", consumed_b); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed_a == 5, + "A: expected 5 consumed, got %d", consumed_a); + TEST_ASSERT(consumed_b == 5, + "B: expected 5 consumed, got %d", consumed_b); + + SUB_TEST_PASS(); +} + +static void do_test_auto_offset_reset_latest(void) { + const char *topic = "kip932_offset_reset_latest"; + const int msgcnt = 5; + test_ctx_t ctx; + rd_kafka_share_t *consumer; + int consumed; + + SUB_TEST_QUICK(); + + /* Create a fresh context — do NOT set auto_offset_reset=earliest + * (the default is "latest" per KIP-932). */ + memset(&ctx, 0, sizeof(ctx)); + ctx.mcluster = test_mock_cluster_new(3, &ctx.bootstraps); + TEST_ASSERT(rd_kafka_mock_set_apiversion( + ctx.mcluster, RD_KAFKAP_ShareGroupHeartbeat, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to enable ShareGroupHeartbeat"); + TEST_ASSERT(rd_kafka_mock_set_apiversion(ctx.mcluster, + RD_KAFKAP_ShareFetch, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to enable ShareFetch"); + /* Intentionally NOT calling set_auto_offset_reset — default is latest */ + { + rd_kafka_conf_t *conf; + char errstr[512]; + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", ctx.bootstraps); + ctx.producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, + sizeof(errstr)); + TEST_ASSERT(ctx.producer != NULL, + "Failed to create producer: %s", errstr); + } + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + + /* Produce records BEFORE subscribing */ + produce_messages(ctx.producer, topic, msgcnt); + + /* Subscribe — SPSO should start at end of log (latest) */ + consumer = new_share_consumer(ctx.bootstraps, "sg-offset-latest"); + subscribe_topics(consumer, &topic, 1); + consumed = consume_n(consumer, 1, 5); + TEST_SAY("offset_reset_latest: consumed %d (expected 0)\n", consumed); + TEST_ASSERT(consumed == 0, + "Expected 0 consumed with latest offset reset, got %d", + consumed); + + /* Produce new records AFTER subscription — these should be visible */ + produce_messages(ctx.producer, topic, msgcnt); + consumed = consume_n(consumer, msgcnt, 30); + TEST_SAY("offset_reset_latest: consumed %d new records (expected %d)\n", + consumed, msgcnt); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed == msgcnt, + "Expected %d new records consumed, got %d", msgcnt, + consumed); + + SUB_TEST_PASS(); +} + int main_0156_share_consumer_fetch_mock(int argc, char **argv) { TEST_SKIP_MOCK_CLUSTER(0); @@ -1114,5 +1411,18 @@ int main_0156_share_consumer_fetch_mock(int argc, char **argv) { do_test_sharefetch_fetch_disconnected(); do_test_sharefetch_fetch_and_close_implicit(); + /* Session management */ + do_test_session_limit_reached(); + do_test_session_limit_recovery(); + + /* Record lock limits */ + do_test_max_record_locks(); + + /* Offset reset */ + do_test_auto_offset_reset_latest(); + + /* Log retention */ + do_test_spso_advances_on_log_retention(); + return 0; } diff --git a/tests/0157-share_consumer_ack_mock.c b/tests/0157-share_consumer_ack_mock.c index b94aa876c8..186278790c 100644 --- a/tests/0157-share_consumer_ack_mock.c +++ b/tests/0157-share_consumer_ack_mock.c @@ -1342,6 +1342,153 @@ static void do_test_coordinator_failover_ack_recovery(void) { SUB_TEST_PASS(); } +/** + * @brief Test that ack validation returns INVALID_RECORD_STATE when the + * record's lock has expired and was re-acquired by another consumer. + * + * Consumer A acquires records, locks expire, consumer B re-acquires and + * successfully acks. The records should not be redelivered a third time + * (consumer B's ack succeeded because the mock broker now correctly + * reports INVALID_RECORD_STATE for stale acks and honours valid ones). + */ +static void do_test_ack_after_lock_expiry_redelivers(void) { + const char *topic = "kip932_ack_invalid_state"; + const int msgcnt = 3; + test_ctx_t ctx; + rd_kafka_share_t *consumer_a, *consumer_b, *consumer_c; + int consumed_a, consumed_b, consumed_c; + + SUB_TEST(); + ctx = test_ctx_new(); + + /* Short lock duration so locks expire quickly. */ + rd_kafka_mock_sharegroup_set_record_lock_duration(ctx.mcluster, 200); + rd_kafka_mock_sharegroup_set_session_timeout(ctx.mcluster, 10000); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Consumer A acquires records. */ + consumer_a = + new_share_consumer(ctx.bootstraps, "sg-ack-invalid-state"); + subscribe_topics(consumer_a, &topic, 1); + consumed_a = consume_n(consumer_a, msgcnt, 50); + TEST_SAY("ack_invalid_state: A consumed %d/%d\n", consumed_a, msgcnt); + + /* Inject RTT so A's ack is delayed past lock expiry. */ + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 1, 3000); + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 2, 3000); + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 3, 3000); + + /* Wait for lock to expire. */ + rd_usleep(800 * 1000, NULL); + + /* Destroy A without close — ack never delivered. */ + rd_kafka_share_destroy(consumer_a); + + /* Clear RTT. */ + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 1, 0); + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 2, 0); + rd_kafka_mock_broker_set_rtt(ctx.mcluster, 3, 0); + + /* Consumer B re-acquires (locks expired) and acks implicitly + * by doing a second poll (which piggybacks the ack on the next + * ShareFetch). */ + consumer_b = + new_share_consumer(ctx.bootstraps, "sg-ack-invalid-state"); + subscribe_topics(consumer_b, &topic, 1); + consumed_b = consume_n(consumer_b, msgcnt, 50); + TEST_SAY("ack_invalid_state: B consumed %d/%d\n", consumed_b, msgcnt); + + /* Second poll triggers implicit ack for B's records. */ + consume_n(consumer_b, 1, 3); + + rd_kafka_share_consumer_close(consumer_b); + rd_kafka_share_destroy(consumer_b); + + /* Consumer C should get 0 records — B's ack succeeded. */ + consumer_c = + new_share_consumer(ctx.bootstraps, "sg-ack-invalid-state"); + subscribe_topics(consumer_c, &topic, 1); + consumed_c = consume_n(consumer_c, 1, 5); + TEST_SAY("ack_invalid_state: C consumed %d (expected 0)\n", consumed_c); + + rd_kafka_share_consumer_close(consumer_c); + rd_kafka_share_destroy(consumer_c); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed_a == msgcnt, + "A: expected %d consumed, got %d", msgcnt, consumed_a); + TEST_ASSERT(consumed_b == msgcnt, + "B: expected %d consumed, got %d", msgcnt, consumed_b); + TEST_ASSERT(consumed_c == 0, + "C: expected 0 consumed (B acked), got %d", consumed_c); + SUB_TEST_PASS(); +} + +/** + * @brief Test that two consumers can sequentially acquire, ack, and + * advance SPSO without interference. + * + * Consumer A acquires records 0-2 and acks them (implicit ack via + * second poll). Consumer B then gets records 3-5 (not 0-2, since + * those were acked and SPSO advanced). Validates that the ack + * error handling doesn't interfere with normal ack flow. + */ +static void do_test_ack_success_advances_spso(void) { + const char *topic = "kip932_ack_spso_advance"; + const int msgcnt = 6; + test_ctx_t ctx; + rd_kafka_share_t *consumer; + int consumed_a, consumed_b; + + SUB_TEST(); + ctx = test_ctx_new(); + + TEST_ASSERT(rd_kafka_mock_topic_create(ctx.mcluster, topic, 1, 1) == + RD_KAFKA_RESP_ERR_NO_ERROR, + "Failed to create mock topic"); + produce_messages(ctx.producer, topic, msgcnt); + + /* Consumer A acquires first batch. MaxRecords in the client + * defaults to a large value, so it will get all 6. We use + * the max_record_locks limit to cap acquisition at 3. */ + rd_kafka_mock_sharegroup_set_max_record_locks(ctx.mcluster, 3); + + consumer = new_share_consumer(ctx.bootstraps, "sg-ack-spso"); + subscribe_topics(consumer, &topic, 1); + consumed_a = consume_n(consumer, 3, 30); + TEST_SAY("ack_spso: A consumed %d/3\n", consumed_a); + + /* Second poll triggers implicit ack for A's records. */ + consume_n(consumer, 1, 3); + + /* Close A cleanly. */ + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + + /* Remove lock limit so B can get remaining records. */ + rd_kafka_mock_sharegroup_set_max_record_locks(ctx.mcluster, 0); + + /* Consumer B should get records 3-5 (SPSO advanced past 0-2). */ + consumer = new_share_consumer(ctx.bootstraps, "sg-ack-spso"); + subscribe_topics(consumer, &topic, 1); + consumed_b = consume_n(consumer, 3, 30); + TEST_SAY("ack_spso: B consumed %d/3\n", consumed_b); + + rd_kafka_share_consumer_close(consumer); + rd_kafka_share_destroy(consumer); + test_ctx_destroy(&ctx); + + TEST_ASSERT(consumed_a == 3, + "A: expected 3 consumed, got %d", consumed_a); + TEST_ASSERT(consumed_b == 3, + "B: expected 3 consumed, got %d", consumed_b); + SUB_TEST_PASS(); +} + /* =================================================================== * Test runner * =================================================================== */ @@ -1377,5 +1524,9 @@ int main_0157_share_consumer_ack_mock(int argc, char **argv) { do_test_empty_topic_no_ack_side_effects(); do_test_coordinator_failover_ack_recovery(); + /* Ack validation */ + do_test_ack_after_lock_expiry_redelivers(); + do_test_ack_success_advances_spso(); + return 0; }