diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 1b1e19141..17016454f 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2155,10 +2155,10 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, /* Epoch */ rd_kafka_buf_read_i16(rkbuf, &pid.epoch); /* #Topics */ - rd_kafka_buf_read_i32(rkbuf, &TopicsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX); /* Response: #Results */ - rd_kafka_buf_write_i32(resp, TopicsCnt); + rd_kafka_buf_write_arraycnt(resp, TopicsCnt); /* Inject error */ all_err = rd_kafka_mock_next_request_error(mconn, resp); @@ -2183,9 +2183,10 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_kstr(resp, &Topic); /* #Partitions */ - rd_kafka_buf_read_i32(rkbuf, &PartsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartsCnt, + RD_KAFKAP_PARTITIONS_MAX); /* Response: #Partitions */ - rd_kafka_buf_write_i32(resp, PartsCnt); + rd_kafka_buf_write_arraycnt(resp, PartsCnt); mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); @@ -2206,7 +2207,13 @@ rd_kafka_mock_handle_AddPartitionsToTxn(rd_kafka_mock_connection_t *mconn, /* Response: ErrorCode */ rd_kafka_buf_write_i16(resp, err); + + rd_kafka_buf_write_tags_empty(resp); } + + rd_kafka_buf_skip_tags(rkbuf); + + rd_kafka_buf_write_tags_empty(resp); } rd_kafka_mock_connection_send_response(mconn, resp); @@ -3013,12 +3020,12 @@ const struct rd_kafka_mock_api_handler [RD_KAFKAP_LeaveGroup] = {0, 4, 4, rd_kafka_mock_handle_LeaveGroup}, [RD_KAFKAP_SyncGroup] = {0, 4, 4, rd_kafka_mock_handle_SyncGroup}, [RD_KAFKAP_AddPartitionsToTxn] = - {0, 1, -1, rd_kafka_mock_handle_AddPartitionsToTxn}, - [RD_KAFKAP_AddOffsetsToTxn] = {0, 1, -1, + {0, 3, 3, rd_kafka_mock_handle_AddPartitionsToTxn}, + [RD_KAFKAP_AddOffsetsToTxn] = {0, 3, 3, rd_kafka_mock_handle_AddOffsetsToTxn}, [RD_KAFKAP_TxnOffsetCommit] = {0, 3, 3, rd_kafka_mock_handle_TxnOffsetCommit}, - [RD_KAFKAP_EndTxn] = {0, 1, -1, rd_kafka_mock_handle_EndTxn}, + [RD_KAFKAP_EndTxn] = {0, 3, 3, rd_kafka_mock_handle_EndTxn}, [RD_KAFKAP_OffsetForLeaderEpoch] = {2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch}, [RD_KAFKAP_ConsumerGroupHeartbeat] = diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 26ffe4622..4ed850858 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -6285,7 +6285,7 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, int TopicCnt = 0, PartCnt = 0; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 0, NULL); + rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 3, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "AddPartitionsToTxnRequest (KIP-98) not supported " @@ -6294,8 +6294,8 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500); + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500, ApiVersion >= 3); /* transactional_id */ rd_kafka_buf_write_str(rkbuf, transactional_id, -1); @@ -6305,15 +6305,16 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_i16(rkbuf, pid.epoch); /* Topics/partitions array (count updated later) */ - of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); + of_TopicCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); TAILQ_FOREACH(rktp, rktps, rktp_txnlink) { if (last_rkt != rktp->rktp_rkt) { if (last_rkt) { /* Update last topic's partition count field */ - rd_kafka_buf_update_i32(rkbuf, of_PartCnt, - PartCnt); + rd_kafka_buf_finalize_arraycnt( + rkbuf, of_PartCnt, PartCnt); + rd_kafka_buf_write_tags_empty(rkbuf); of_PartCnt = -1; } @@ -6321,7 +6322,7 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_kstr(rkbuf, rktp->rktp_rkt->rkt_topic); /* Partition count, updated later */ - of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); + of_PartCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); PartCnt = 0; TopicCnt++; @@ -6334,9 +6335,12 @@ rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, } /* Update last partition and topic count fields */ - if (of_PartCnt != -1) - rd_kafka_buf_update_i32(rkbuf, (size_t)of_PartCnt, PartCnt); - rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt); + if (of_PartCnt != -1) { + rd_kafka_buf_finalize_arraycnt(rkbuf, (size_t)of_PartCnt, + PartCnt); + rd_kafka_buf_write_tags_empty(rkbuf); + } + rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicCnt, TopicCnt); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -6373,7 +6377,7 @@ rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion = 0; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 0, NULL); + rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 3, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "AddOffsetsToTxnRequest (KIP-98) not supported " @@ -6382,8 +6386,8 @@ rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddOffsetsToTxn, 1, 100); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_AddOffsetsToTxn, + 1, 100, ApiVersion >= 3); /* transactional_id */ rd_kafka_buf_write_str(rkbuf, transactional_id, -1); @@ -6428,7 +6432,7 @@ rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion = 0; ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_EndTxn, - 0, 1, NULL); + 0, 3, NULL); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "EndTxnRequest (KIP-98) not supported " @@ -6437,7 +6441,8 @@ rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_EndTxn, 1, 500); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_EndTxn, 1, 500, + ApiVersion >= 3); /* transactional_id */ rd_kafka_buf_write_str(rkbuf, transactional_id, -1); diff --git a/src/rdkafka_txnmgr.c b/src/rdkafka_txnmgr.c index 019a3b80c..1bac74489 100644 --- a/src/rdkafka_txnmgr.c +++ b/src/rdkafka_txnmgr.c @@ -842,7 +842,7 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, rd_kafka_buf_read_throttle_time(rkbuf); - rd_kafka_buf_read_i32(rkbuf, &TopicCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicCnt, RD_KAFKAP_TOPICS_MAX); while (TopicCnt-- > 0) { rd_kafkap_str_t Topic; @@ -851,7 +851,8 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, rd_bool_t request_error = rd_false; rd_kafka_buf_read_str(rkbuf, &Topic); - rd_kafka_buf_read_i32(rkbuf, &PartCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartCnt, + RD_KAFKAP_PARTITIONS_MAX); rkt = rd_kafka_topic_find0(rk, &Topic); if (rkt) @@ -865,6 +866,7 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, rd_kafka_buf_read_i32(rkbuf, &Partition); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + rd_kafka_buf_skip_tags(rkbuf); if (rkt) rktp = rd_kafka_toppar_get(rkt, Partition, @@ -979,6 +981,8 @@ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, break; /* Request-level error seen, bail out */ } + rd_kafka_buf_skip_tags(rkbuf); + if (rkt) { rd_kafka_topic_rdunlock(rkt); rd_kafka_topic_destroy0(rkt);