From 97bfdfd6c081f25a0299fb4cfca1fcd1bec5e928 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 19 Aug 2025 04:11:39 +0530 Subject: [PATCH 1/4] Update Apis to flex --- src/rdkafka_request.c | 35 ++++++++++++++++++++--------------- src/rdkafka_txnmgr.c | 8 ++++++-- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 26ffe46227..35282ada4c 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -6285,7 +6285,7 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, int TopicCnt = 0, PartCnt = 0; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 0, NULL); + rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 3, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "AddPartitionsToTxnRequest (KIP-98) not supported " @@ -6294,8 +6294,8 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500); + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500, ApiVersion >= 3); /* transactional_id */ rd_kafka_buf_write_str(rkbuf, transactional_id, -1); @@ -6305,15 +6305,16 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_i16(rkbuf, pid.epoch); /* Topics/partitions array (count updated later) */ - of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); + of_TopicCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); TAILQ_FOREACH(rktp, rktps, rktp_txnlink) { if (last_rkt != rktp->rktp_rkt) { if (last_rkt) { /* Update last topic's partition count field */ - rd_kafka_buf_update_i32(rkbuf, of_PartCnt, - PartCnt); + rd_kafka_buf_finalize_arraycnt( + rkbuf, of_PartCnt, PartCnt); + rd_kafka_buf_write_tags_empty(rkbuf); of_PartCnt = -1; } @@ -6321,7 +6322,7 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_kstr(rkbuf, rktp->rktp_rkt->rkt_topic); /* Partition count, updated later */ - of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); + of_PartCnt = rd_kafka_buf_write_arraycnt(rkbuf, 0); PartCnt = 0; TopicCnt++; @@ -6334,9 +6335,12 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, } /* Update last partition and topic count fields */ - if (of_PartCnt != -1) - rd_kafka_buf_update_i32(rkbuf, (size_t)of_PartCnt, PartCnt); - rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt); + if (of_PartCnt != -1) { + rd_kafka_buf_finalize_arraycnt(rkbuf, (size_t)of_PartCnt, + PartCnt); + rd_kafka_buf_write_tags_empty(rkbuf); + } + rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicCnt, TopicCnt); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -6373,7 +6377,7 @@ rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion = 0; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 0, NULL); + rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 3, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "AddOffsetsToTxnRequest (KIP-98) not supported " @@ -6382,8 +6386,8 @@ rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddOffsetsToTxn, 1, 100); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_AddOffsetsToTxn, + 1, 100, ApiVersion >= 3); /* transactional_id */ rd_kafka_buf_write_str(rkbuf, transactional_id, -1); @@ -6428,7 +6432,7 @@ rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion = 0; ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_EndTxn, - 0, 1, NULL); + 0, 3, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "EndTxnRequest (KIP-98) not supported " @@ -6437,7 +6441,8 @@ rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_EndTxn, 1, 500); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_EndTxn, 1, 500, + ApiVersion >= 3); /* transactional_id */ rd_kafka_buf_write_str(rkbuf, transactional_id, -1); diff --git a/src/rdkafka_txnmgr.c b/src/rdkafka_txnmgr.c index 019a3b80c6..1bac74489c 100644 --- a/src/rdkafka_txnmgr.c +++ b/src/rdkafka_txnmgr.c @@ -842,7 +842,7 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, rd_kafka_buf_read_throttle_time(rkbuf); - rd_kafka_buf_read_i32(rkbuf, &TopicCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicCnt, RD_KAFKAP_TOPICS_MAX); while (TopicCnt-- > 0) { rd_kafkap_str_t Topic; @@ -851,7 +851,8 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, rd_bool_t request_error = rd_false; rd_kafka_buf_read_str(rkbuf, &Topic); - rd_kafka_buf_read_i32(rkbuf, &PartCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartCnt, + RD_KAFKAP_PARTITIONS_MAX); rkt = rd_kafka_topic_find0(rk, &Topic); if (rkt) @@ -865,6 +866,7 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, rd_kafka_buf_read_i32(rkbuf, &Partition); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + rd_kafka_buf_skip_tags(rkbuf); if (rkt) rktp = rd_kafka_toppar_get(rkt, Partition, @@ -979,6 +981,8 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, break; /* Request-level error seen, bail out */ } + rd_kafka_buf_skip_tags(rkbuf); + if (rkt) { rd_kafka_topic_rdunlock(rkt); rd_kafka_topic_destroy0(rkt); From 67a035fddedacca44ad239cc8bb9409715faaa09 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Fri, 22 Aug 2025 20:23:59 +0530 Subject: [PATCH 2/4] Mock handlers api update --- src/rdkafka_mock_handlers.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 1b1e191411..56281b4434 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2207,6 +2207,10 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, /* Response: ErrorCode */ rd_kafka_buf_write_i16(resp, err); } + + rd_kafka_buf_skip_tags(rkbuf); + + rd_kafka_buf_write_tags_empty(resp); } rd_kafka_mock_connection_send_response(mconn, resp); @@ -3013,12 +3017,12 @@ const struct rd_kafka_mock_api_handler [RD_KAFKAP_LeaveGroup] = {0, 4, 4, rd_kafka_mock_handle_LeaveGroup}, [RD_KAFKAP_SyncGroup] = {0, 4, 4, rd_kafka_mock_handle_SyncGroup}, [RD_KAFKAP_AddPartitionsToTxn] = - {0, 1, -1, rd_kafka_mock_handle_AddPartitionsToTxn}, - [RD_KAFKAP_AddOffsetsToTxn] = {0, 1, -1, + {0, 3, 3, rd_kafka_mock_handle_AddPartitionsToTxn}, + [RD_KAFKAP_AddOffsetsToTxn] = {0, 3, 3, rd_kafka_mock_handle_AddOffsetsToTxn}, [RD_KAFKAP_TxnOffsetCommit] = {0, 3, 3, rd_kafka_mock_handle_TxnOffsetCommit}, - [RD_KAFKAP_EndTxn] = {0, 1, -1, rd_kafka_mock_handle_EndTxn}, + [RD_KAFKAP_EndTxn] = {0, 3, 3, rd_kafka_mock_handle_EndTxn}, [RD_KAFKAP_OffsetForLeaderEpoch] = {2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch}, [RD_KAFKAP_ConsumerGroupHeartbeat] = From 049e68d32f5d74cdd0764678cf75f7bf27922c59 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Fri, 22 Aug 2025 21:48:57 +0530 Subject: [PATCH 3/4] Mock handling --- src/rdkafka_mock_handlers.c | 10 ++++++---- src/rdkafka_request.c | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 56281b4434..af219ecc3a 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2155,10 +2155,10 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, /* Epoch */ rd_kafka_buf_read_i16(rkbuf, &pid.epoch); /* #Topics */ - rd_kafka_buf_read_i32(rkbuf, &TopicsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX); /* Response: #Results */ - rd_kafka_buf_write_i32(resp, TopicsCnt); + rd_kafka_buf_write_arraycnt(resp, TopicsCnt); /* Inject error */ all_err = rd_kafka_mock_next_request_error(mconn, resp); @@ -2183,9 +2183,9 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_kstr(resp, &Topic); /* #Partitions */ - rd_kafka_buf_read_i32(rkbuf, &PartsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartsCnt, RD_KAFKAP_PARTITIONS_MAX); /* Response: #Partitions */ - rd_kafka_buf_write_i32(resp, PartsCnt); + rd_kafka_buf_write_arraycnt(resp, PartsCnt); mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); @@ -2206,6 +2206,8 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, /* Response: ErrorCode */ rd_kafka_buf_write_i16(resp, err); + + rd_kafka_buf_write_tags_empty(resp); } rd_kafka_buf_skip_tags(rkbuf); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 35282ada4c..4ed8508580 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -6322,7 +6322,7 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_kstr(rkbuf, rktp->rktp_rkt->rkt_topic); /* Partition count, updated later */ - of_PartCnt = rd_kafka_buf_write_arraycnt(rkbuf, 0); + of_PartCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); PartCnt = 0; TopicCnt++; From a97d1b7fd2a979455d99881a97a6b873a7beb1b3 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Fri, 22 Aug 2025 22:02:42 +0530 Subject: [PATCH 4/4] Style fix --- src/rdkafka_mock_handlers.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index af219ecc3a..17016454f0 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2183,7 +2183,8 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_kstr(resp, &Topic); /* #Partitions */ - rd_kafka_buf_read_arraycnt(rkbuf, &PartsCnt, RD_KAFKAP_PARTITIONS_MAX); + rd_kafka_buf_read_arraycnt(rkbuf, &PartsCnt, + RD_KAFKAP_PARTITIONS_MAX); /* Response: #Partitions */ rd_kafka_buf_write_arraycnt(resp, PartsCnt);