From df259b018049c348ab8c801f6d7f391caf389bd2 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 13 Oct 2025 19:19:08 +0530 Subject: [PATCH 1/7] Update CreateTopics Api --- src/rdkafka.h | 39 +++++++++++++++++++++++-- src/rdkafka_admin.c | 68 ++++++++++++++++++++++++++++++++++++++----- src/rdkafka_admin.h | 13 +++++++++ src/rdkafka_aux.c | 29 ++++++++++++++++++ src/rdkafka_aux.h | 9 +++++- src/rdkafka_request.c | 42 +++++++++++++++++++++----- src/rdkafka_request.h | 4 +++ tests/test.c | 12 ++++++++ 8 files changed, 196 insertions(+), 20 deletions(-) diff --git a/src/rdkafka.h b/src/rdkafka.h index 57cf08bad1..5ae9a58802 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -6989,6 +6989,42 @@ rd_kafka_topic_result_error_string(const rd_kafka_topic_result_t *topicres); RD_EXPORT const char * rd_kafka_topic_result_name(const rd_kafka_topic_result_t *topicres); + +/*! Apache Kafka configuration entry. */ +typedef struct rd_kafka_ConfigEntry_s rd_kafka_ConfigEntry_t; + +/** + * @returns the error code for the topic configuration operation + */ +RD_EXPORT const rd_kafka_resp_err_t rd_kafka_topic_result_topic_config_error( + const rd_kafka_topic_result_t *topicres); + +/** + * @returns the number of partitions for the given topic result, + * or -1 if not applicable. + */ +RD_EXPORT const int32_t +rd_kafka_topic_result_num_partitions(const rd_kafka_topic_result_t *topicres); + +/** + * @returns the replication factor for the given topic result, + * or -1 if not applicable. + */ +RD_EXPORT const int16_t rd_kafka_topic_result_replication_factor( + const rd_kafka_topic_result_t *topicres); + +/** + * @returns the configuration entries for the given topic result, + * or NULL if not applicable. + * @param cntp Pointer to variable that will be set to the number + * of entries in the returned array. + * @remark lifetime of the returned array and entries is the same as the + * \p topicres. + */ +RD_EXPORT const rd_kafka_ConfigEntry_t ** +rd_kafka_topic_result_configs(const rd_kafka_topic_result_t *topicres, + size_t *cntp); + /** * @brief Group result provides per-group operation result information. * @@ -7802,9 +7838,6 @@ RD_EXPORT const char * rd_kafka_ConfigSource_name(rd_kafka_ConfigSource_t confsource); -/*! Apache Kafka configuration entry. */ -typedef struct rd_kafka_ConfigEntry_s rd_kafka_ConfigEntry_t; - /** * @returns the configuration property name */ diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index b2671f3c82..87a05f3363 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -2069,6 +2069,23 @@ rd_kafka_resp_err_t rd_kafka_NewTopic_set_config(rd_kafka_NewTopic_t *new_topic, } +static int rd_kafkap_CreateTopics_result_tags_parse( + rd_kafka_buf_t *rkbuf, + uint64_t tagtype, + uint64_t taglen, + rd_kafka_resp_err_t *topic_config_err) { + switch (tagtype) { + case 0: /* TopicConfigErrorCode */ + if (rd_kafka_buf_read_TopicConfigErrorCode( + rkbuf, topic_config_err) == -1) + goto err_parse; + return 1; + default: + return 0; + } +err_parse: + return -1; +} /** * @brief Parse CreateTopicsResponse and create ADMIN_RESULT op. @@ -2084,16 +2101,17 @@ rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req, rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_op_t *rko_result = NULL; int32_t topic_cnt; + int16_t ApiVersion = rd_kafka_buf_ApiVersion(reply); int i; - if (rd_kafka_buf_ApiVersion(reply) >= 2) { + if (ApiVersion >= 2) { int32_t Throttle_Time; rd_kafka_buf_read_i32(reply, &Throttle_Time); rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time); } /* #topics */ - rd_kafka_buf_read_i32(reply, &topic_cnt); + rd_kafka_buf_read_arraycnt(reply, &topic_cnt, RD_KAFKAP_TOPICS_MAX); if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args)) rd_kafka_buf_parse_fail( @@ -2116,12 +2134,13 @@ rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req, char *this_errstr = NULL; rd_kafka_topic_result_t *terr; rd_kafka_NewTopic_t skel; - int orig_pos; + int orig_pos, j; + int32_t configs_cnt; rd_kafka_buf_read_str(reply, &ktopic); rd_kafka_buf_read_i16(reply, &error_code); - if (rd_kafka_buf_ApiVersion(reply) >= 1) + if (ApiVersion >= 1) rd_kafka_buf_read_str(reply, &error_msg); /* For non-blocking CreateTopicsRequests the broker @@ -2150,6 +2169,39 @@ rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req, RD_KAFKAP_STR_LEN(&ktopic), error_code, this_errstr); + if (ApiVersion >= 5) { + /*Number of partitions*/ + rd_kafka_buf_read_i32(reply, &terr->num_partitions); + /*Replication factor*/ + rd_kafka_buf_read_i16(reply, &terr->replication_factor); + rd_kafka_buf_read_arraycnt(reply, &configs_cnt, + RD_KAFKAP_CONFIGS_MAX); + for (j = 0; j < configs_cnt; j++) { + rd_kafkap_str_t name, value; + rd_kafka_ConfigEntry_t *entry; + + rd_kafka_buf_read_str(reply, &name); + rd_kafka_buf_read_str(reply, &value); + + entry = rd_kafka_ConfigEntry_new0( + name.str, RD_KAFKAP_STR_LEN(&name), + value.str, RD_KAFKAP_STR_LEN(&value)); + + rd_kafka_buf_read_bool(reply, + &entry->a.is_readonly); + rd_kafka_buf_read_i8(reply, &entry->a.source); + rd_kafka_buf_read_bool(reply, + &entry->a.is_sensitive); + rd_kafka_buf_skip_tags(reply); + + rd_list_add(&terr->configs, entry); + } + + rd_kafka_buf_read_tags( + reply, rd_kafkap_CreateTopics_result_tags_parse, + &terr->topic_config_err); + } + /* As a convenience to the application we insert topic result * in the same order as they were requested. The broker * does not maintain ordering unfortunately. */ @@ -2763,10 +2815,10 @@ static void rd_kafka_ConfigEntry_free(void *ptr) { * @param value Config entry value, or NULL * @param value_len Length of value, or -1 to use strlen() */ -static rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name, - size_t name_len, - const char *value, - size_t value_len) { +rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name, + size_t name_len, + const char *value, + size_t value_len) { rd_kafka_ConfigEntry_t *entry; if (!name) diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index c84849ea66..c705ac7224 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -236,6 +236,19 @@ struct rd_kafka_ConfigEntry_s { rd_list_t synonyms; /**< Type (rd_kafka_configEntry *) */ }; +/** + * @brief Create new ConfigEntry + * + * @param name Config entry name + * @param name_len Length of name, or -1 to use strlen() + * @param value Config entry value, or NULL + * @param value_len Length of value, or -1 to use strlen() + */ +rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name, + size_t name_len, + const char *value, + size_t value_len); + /** * @brief A cluster ConfigResource constisting of: * - resource type (BROKER, TOPIC) diff --git a/src/rdkafka_aux.c b/src/rdkafka_aux.c index 7d5ccb5b2f..890514ab44 100644 --- a/src/rdkafka_aux.c +++ b/src/rdkafka_aux.c @@ -47,6 +47,30 @@ rd_kafka_topic_result_name(const rd_kafka_topic_result_t *topicres) { return topicres->topic; } +const rd_kafka_resp_err_t rd_kafka_topic_result_topic_config_error( + const rd_kafka_topic_result_t *topicres) { + return topicres->topic_config_err; +} + +const int32_t +rd_kafka_topic_result_num_partitions(const rd_kafka_topic_result_t *topicres) { + return topicres->num_partitions; +} + +const int16_t rd_kafka_topic_result_replication_factor( + const rd_kafka_topic_result_t *topicres) { + return topicres->replication_factor; +} + +const rd_kafka_ConfigEntry_t ** +rd_kafka_topic_result_configs(const rd_kafka_topic_result_t *topicres, + size_t *cntp) { + if (!*cntp) + return NULL; + *cntp = rd_list_cnt(&topicres->configs); + return (const rd_kafka_ConfigEntry_t **)topicres->configs.rl_elems; +} + /** * @brief Create new topic_result (single allocation). * @@ -82,6 +106,10 @@ rd_kafka_topic_result_t *rd_kafka_topic_result_new(const char *topic, terr->errstr = NULL; } + terr->num_partitions = -1; + terr->replication_factor = -1; + rd_list_init(&terr->configs, 0, NULL); + return terr; } @@ -90,6 +118,7 @@ rd_kafka_topic_result_t *rd_kafka_topic_result_new(const char *topic, * @brief Destroy topic_result */ void rd_kafka_topic_result_destroy(rd_kafka_topic_result_t *terr) { + rd_list_destroy(&terr->configs); rd_free(terr); } diff --git a/src/rdkafka_aux.h b/src/rdkafka_aux.h index 340fcf708d..0b2d51aaf4 100644 --- a/src/rdkafka_aux.h +++ b/src/rdkafka_aux.h @@ -46,7 +46,14 @@ struct rd_kafka_topic_result_s { char *topic; /**< Points to data */ rd_kafka_resp_err_t err; /**< Error code */ char *errstr; /**< Points to data after topic, unless NULL */ - char data[1]; /**< topic followed by errstr */ + rd_kafka_resp_err_t topic_config_err; /**< Error code for topic config, + * or 0 if not applicable. */ + int32_t num_partitions; /**< Number of partitions, or -1 if + * not applicable. */ + int16_t replication_factor; /**< Replication factor, or -1 if + * not applicable. */ + rd_list_t configs; /**< Type (rd_kafka_ConfigEntry_t *) */ + char data[1]; /**< topic followed by errstr */ }; void rd_kafka_topic_result_destroy(rd_kafka_topic_result_t *terr); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 663a07eae3..47d32098d7 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -583,6 +583,24 @@ int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf, return -1; } +/** + * @brief Read TopicConfigErrorCode from \p rkbuf. + * + * @param rkbuf buffer to read from + * @param topic_config_err is the TopicConfigErrorCode to populate. + * + * @return 1 on success, else -1 on parse error. + */ +int rd_kafka_buf_read_TopicConfigErrorCode( + rd_kafka_buf_t *rkbuf, + rd_kafka_resp_err_t *topic_config_err) { + const int log_decode_errors = LOG_ERR; + rd_kafka_buf_read_i16(rkbuf, topic_config_err); + rd_kafka_buf_skip_tags(rkbuf); + return 1; +err_parse: + return -1; +} /** * @brief Send FindCoordinatorRequest. @@ -4957,7 +4975,7 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb, } ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_CreateTopics, 0, 4, &features); + rkb, RD_KAFKAP_CreateTopics, 0, 5, &features); if (ApiVersion == -1) { rd_snprintf(errstr, errstr_size, "Topic Admin API (KIP-4) not supported " @@ -4975,12 +4993,12 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1, - 4 + (rd_list_cnt(new_topics) * 200) + - 4 + 1); + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_CreateTopics, 1, + 4 + (rd_list_cnt(new_topics) * 200) + 4 + 1, ApiVersion >= 5); /* #topics */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics)); + rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(new_topics)); while ((newt = rd_list_elem(new_topics, i++))) { int partition; @@ -5030,7 +5048,8 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb, } /* #replica_assignment */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas)); + rd_kafka_buf_write_arraycnt(rkbuf, + rd_list_cnt(&newt->replicas)); /* Replicas per partition, see rdkafka_admin.[ch] * for how these are constructed. */ @@ -5046,24 +5065,31 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb, /* partition */ rd_kafka_buf_write_i32(rkbuf, partition); /* #replicas */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas)); + rd_kafka_buf_write_arraycnt(rkbuf, + rd_list_cnt(replicas)); for (ri = 0; ri < rd_list_cnt(replicas); ri++) { /* replica */ rd_kafka_buf_write_i32( rkbuf, rd_list_get_int32(replicas, ri)); } + + rd_kafka_buf_write_tags_empty(rkbuf); } /* #config_entries */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config)); + rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(&newt->config)); RD_LIST_FOREACH(entry, &newt->config, ei) { /* config_name */ rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); /* config_value (nullable) */ rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); + /* Skip Tags */ + rd_kafka_buf_write_tags_empty(rkbuf); } + + rd_kafka_buf_write_tags_empty(rkbuf); } /* timeout */ diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index c508ffdaaf..17ba6922cf 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -190,6 +190,10 @@ int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf, int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf, rd_kafkap_NodeEndpoints_t *NodeEndpoints); +int rd_kafka_buf_read_TopicConfigErrorCode( + rd_kafka_buf_t *rkbuf, + rd_kafka_resp_err_t *topic_config_err); + rd_kafka_resp_err_t rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, diff --git a/tests/test.c b/tests/test.c index 42e525a9cc..84798f9272 100644 --- a/tests/test.c +++ b/tests/test.c @@ -5186,6 +5186,18 @@ void test_admin_create_topic(rd_kafka_t *use_rk, rd_kafka_topic_result_name(terr[0]), rd_kafka_topic_result_error_string(terr[0])); + TEST_ASSERT(rd_kafka_topic_result_num_partitions(terr[0]) == + partition_cnt, + "Topic %s expected %d partitions, got %d", + rd_kafka_topic_result_name(terr[0]), partition_cnt, + rd_kafka_topic_result_num_partitions(terr[0])); + + TEST_ASSERT(rd_kafka_topic_result_replication_factor(terr[0]) == + replication_factor, + "Topic %s expected replication factor %d, got %d", + rd_kafka_topic_result_name(terr[0]), replication_factor, + rd_kafka_topic_result_replication_factor(terr[0])); + rd_kafka_event_destroy(rkev); rd_kafka_queue_destroy(rkqu); From 2ad91f4cf2a7ba2bb1e03478dbb61b16a7c1a49b Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 14 Oct 2025 01:26:18 +0530 Subject: [PATCH 2/7] resolve failing build --- src/rdkafka.h | 1 + src/rdkafka_request.c | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rdkafka.h b/src/rdkafka.h index 5ae9a58802..edaa9d5836 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -7016,6 +7016,7 @@ RD_EXPORT const int16_t rd_kafka_topic_result_replication_factor( /** * @returns the configuration entries for the given topic result, * or NULL if not applicable. + * @param topicres Topic result object. * @param cntp Pointer to variable that will be set to the number * of entries in the returned array. * @remark lifetime of the returned array and entries is the same as the diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 47d32098d7..276f10eb8c 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -595,7 +595,9 @@ int rd_kafka_buf_read_TopicConfigErrorCode( rd_kafka_buf_t *rkbuf, rd_kafka_resp_err_t *topic_config_err) { const int log_decode_errors = LOG_ERR; - rd_kafka_buf_read_i16(rkbuf, topic_config_err); + int16_t err_code; + rd_kafka_buf_read_i16(rkbuf, &err_code); + *topic_config_err = err_code; rd_kafka_buf_skip_tags(rkbuf); return 1; err_parse: From 2f3f8c19f630d7d76ab0a9d0f0f39b8562c4b663 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 14 Oct 2025 01:34:01 +0530 Subject: [PATCH 3/7] remove cntp check --- src/rdkafka_aux.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/rdkafka_aux.c b/src/rdkafka_aux.c index 890514ab44..d2334f061e 100644 --- a/src/rdkafka_aux.c +++ b/src/rdkafka_aux.c @@ -65,8 +65,6 @@ const int16_t rd_kafka_topic_result_replication_factor( const rd_kafka_ConfigEntry_t ** rd_kafka_topic_result_configs(const rd_kafka_topic_result_t *topicres, size_t *cntp) { - if (!*cntp) - return NULL; *cntp = rd_list_cnt(&topicres->configs); return (const rd_kafka_ConfigEntry_t **)topicres->configs.rl_elems; } From 3a9b524b2d5fd7820f81715f08a49929ce46c763 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 14 Oct 2025 12:15:47 +0530 Subject: [PATCH 4/7] Tests correction --- tests/0091-max_poll_interval_timeout.c | 4 --- tests/test.c | 48 +++++++++++++++++++------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/tests/0091-max_poll_interval_timeout.c b/tests/0091-max_poll_interval_timeout.c index e915bb8624..81bf8055de 100644 --- a/tests/0091-max_poll_interval_timeout.c +++ b/tests/0091-max_poll_interval_timeout.c @@ -204,8 +204,6 @@ static void do_test_with_assign(const char *topic) { test_conf_init(&conf, NULL, 60); - test_create_topic_wait_exists(NULL, topic, 2, 1, 5000); - test_conf_set(conf, "session.timeout.ms", "6000"); test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/); @@ -249,8 +247,6 @@ static void do_test_no_poll(const char *topic) { test_conf_init(&conf, NULL, 60); - test_create_topic_wait_exists(NULL, topic, 2, 1, 5000); - test_conf_set(conf, "session.timeout.ms", "6000"); test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/); diff --git a/tests/test.c b/tests/test.c index 84798f9272..37bc464842 100644 --- a/tests/test.c +++ b/tests/test.c @@ -100,7 +100,7 @@ static const char *test_states[] = { #define _TEST_DECL(NAME) extern int main_##NAME(int, char **) #define _TEST(NAME, FLAGS, ...) \ - { .name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__ } + {.name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__} /** @@ -5186,17 +5186,41 @@ void test_admin_create_topic(rd_kafka_t *use_rk, rd_kafka_topic_result_name(terr[0]), rd_kafka_topic_result_error_string(terr[0])); - TEST_ASSERT(rd_kafka_topic_result_num_partitions(terr[0]) == - partition_cnt, - "Topic %s expected %d partitions, got %d", - rd_kafka_topic_result_name(terr[0]), partition_cnt, - rd_kafka_topic_result_num_partitions(terr[0])); - - TEST_ASSERT(rd_kafka_topic_result_replication_factor(terr[0]) == - replication_factor, - "Topic %s expected replication factor %d, got %d", - rd_kafka_topic_result_name(terr[0]), replication_factor, - rd_kafka_topic_result_replication_factor(terr[0])); + if (rd_kafka_topic_result_topic_config_error(terr[0]) == + RD_KAFKA_RESP_ERR_NO_ERROR) { + printf( + "Topic %s created with %d partitions and " + "replication factor %d and topic config err: %d\n", + rd_kafka_topic_result_name(terr[0]), + rd_kafka_topic_result_num_partitions(terr[0]), + rd_kafka_topic_result_replication_factor(terr[0]), + rd_kafka_topic_result_topic_config_error(terr[0])); + TEST_ASSERT(rd_kafka_topic_result_num_partitions(terr[0]) == + partition_cnt, + "Topic %s expected %d partitions, got %d", + rd_kafka_topic_result_name(terr[0]), partition_cnt, + rd_kafka_topic_result_num_partitions(terr[0])); + + if (replication_factor == -1) { + // When replication_factor is -1, it uses the broker + // default + TEST_ASSERT( + rd_kafka_topic_result_replication_factor(terr[0]) > + 0, + "Topic %s expected positive replication factor, " + "got %d", + rd_kafka_topic_result_name(terr[0]), + rd_kafka_topic_result_replication_factor(terr[0])); + } else { + TEST_ASSERT( + rd_kafka_topic_result_replication_factor(terr[0]) == + replication_factor, + "Topic %s expected replication factor %d, got %d", + rd_kafka_topic_result_name(terr[0]), + replication_factor, + rd_kafka_topic_result_replication_factor(terr[0])); + } + } rd_kafka_event_destroy(rkev); From 34791f4ac563e5207efe69f97ee25df6271f4b55 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 14 Oct 2025 12:42:31 +0530 Subject: [PATCH 5/7] reversing unintended change --- tests/test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test.c b/tests/test.c index 37bc464842..3c40f75383 100644 --- a/tests/test.c +++ b/tests/test.c @@ -100,7 +100,7 @@ static const char *test_states[] = { #define _TEST_DECL(NAME) extern int main_##NAME(int, char **) #define _TEST(NAME, FLAGS, ...) \ - {.name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__} + { .name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__ } /** From e156cd239b379d11eefff129a94e0e85f9bc205b Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 14 Oct 2025 14:49:53 +0530 Subject: [PATCH 6/7] add extra condition to partition check --- tests/test.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/test.c b/tests/test.c index 3c40f75383..b3962434c3 100644 --- a/tests/test.c +++ b/tests/test.c @@ -100,7 +100,7 @@ static const char *test_states[] = { #define _TEST_DECL(NAME) extern int main_##NAME(int, char **) #define _TEST(NAME, FLAGS, ...) \ - { .name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__ } + {.name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__} /** @@ -5187,14 +5187,9 @@ void test_admin_create_topic(rd_kafka_t *use_rk, rd_kafka_topic_result_error_string(terr[0])); if (rd_kafka_topic_result_topic_config_error(terr[0]) == - RD_KAFKA_RESP_ERR_NO_ERROR) { - printf( - "Topic %s created with %d partitions and " - "replication factor %d and topic config err: %d\n", - rd_kafka_topic_result_name(terr[0]), - rd_kafka_topic_result_num_partitions(terr[0]), - rd_kafka_topic_result_replication_factor(terr[0]), - rd_kafka_topic_result_topic_config_error(terr[0])); + RD_KAFKA_RESP_ERR_NO_ERROR && + rd_kafka_topic_result_error(terr[0]) != + RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS) { TEST_ASSERT(rd_kafka_topic_result_num_partitions(terr[0]) == partition_cnt, "Topic %s expected %d partitions, got %d", From d50544d35756be4ada28d93118bd202cfa8fc988 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Tue, 14 Oct 2025 14:51:27 +0530 Subject: [PATCH 7/7] remove unintended style change --- tests/test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test.c b/tests/test.c index b3962434c3..bfd2d72d83 100644 --- a/tests/test.c +++ b/tests/test.c @@ -100,7 +100,7 @@ static const char *test_states[] = { #define _TEST_DECL(NAME) extern int main_##NAME(int, char **) #define _TEST(NAME, FLAGS, ...) \ - {.name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__} + { .name = #NAME, .mainfunc = main_##NAME, .flags = FLAGS, __VA_ARGS__ } /**