Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -6989,6 +6989,43 @@ rd_kafka_topic_result_error_string(const rd_kafka_topic_result_t *topicres);
RD_EXPORT const char *
rd_kafka_topic_result_name(const rd_kafka_topic_result_t *topicres);


/*! Apache Kafka configuration entry. */
typedef struct rd_kafka_ConfigEntry_s rd_kafka_ConfigEntry_t;

/**
* @returns the error code for the topic configuration operation
*/
RD_EXPORT const rd_kafka_resp_err_t rd_kafka_topic_result_topic_config_error(
const rd_kafka_topic_result_t *topicres);

/**
* @returns the number of partitions for the given topic result,
* or -1 if not applicable.
*/
RD_EXPORT const int32_t
rd_kafka_topic_result_num_partitions(const rd_kafka_topic_result_t *topicres);

/**
* @returns the replication factor for the given topic result,
* or -1 if not applicable.
*/
RD_EXPORT const int16_t rd_kafka_topic_result_replication_factor(
const rd_kafka_topic_result_t *topicres);

/**
* @returns the configuration entries for the given topic result,
* or NULL if not applicable.
* @param topicres Topic result object.
* @param cntp Pointer to variable that will be set to the number
* of entries in the returned array.
* @remark lifetime of the returned array and entries is the same as the
* \p topicres.
*/
RD_EXPORT const rd_kafka_ConfigEntry_t **
rd_kafka_topic_result_configs(const rd_kafka_topic_result_t *topicres,
size_t *cntp);

/**
* @brief Group result provides per-group operation result information.
*
Expand Down Expand Up @@ -7802,9 +7839,6 @@ RD_EXPORT const char *
rd_kafka_ConfigSource_name(rd_kafka_ConfigSource_t confsource);


/*! Apache Kafka configuration entry. */
typedef struct rd_kafka_ConfigEntry_s rd_kafka_ConfigEntry_t;

/**
* @returns the configuration property name
*/
Expand Down
68 changes: 60 additions & 8 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2069,6 +2069,23 @@ rd_kafka_resp_err_t rd_kafka_NewTopic_set_config(rd_kafka_NewTopic_t *new_topic,
}


static int rd_kafkap_CreateTopics_result_tags_parse(
rd_kafka_buf_t *rkbuf,
uint64_t tagtype,
uint64_t taglen,
rd_kafka_resp_err_t *topic_config_err) {
switch (tagtype) {
case 0: /* TopicConfigErrorCode */
if (rd_kafka_buf_read_TopicConfigErrorCode(
rkbuf, topic_config_err) == -1)
goto err_parse;
return 1;
default:
return 0;
}
err_parse:
return -1;
}

/**
* @brief Parse CreateTopicsResponse and create ADMIN_RESULT op.
Expand All @@ -2084,16 +2101,17 @@ rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_t *rk = rkb->rkb_rk;
rd_kafka_op_t *rko_result = NULL;
int32_t topic_cnt;
int16_t ApiVersion = rd_kafka_buf_ApiVersion(reply);
int i;

if (rd_kafka_buf_ApiVersion(reply) >= 2) {
if (ApiVersion >= 2) {
int32_t Throttle_Time;
rd_kafka_buf_read_i32(reply, &Throttle_Time);
rd_kafka_op_throttle_time(rkb, rk->rk_rep, Throttle_Time);
}

/* #topics */
rd_kafka_buf_read_i32(reply, &topic_cnt);
rd_kafka_buf_read_arraycnt(reply, &topic_cnt, RD_KAFKAP_TOPICS_MAX);

if (topic_cnt > rd_list_cnt(&rko_req->rko_u.admin_request.args))
rd_kafka_buf_parse_fail(
Expand All @@ -2116,12 +2134,13 @@ rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req,
char *this_errstr = NULL;
rd_kafka_topic_result_t *terr;
rd_kafka_NewTopic_t skel;
int orig_pos;
int orig_pos, j;
int32_t configs_cnt;

rd_kafka_buf_read_str(reply, &ktopic);
rd_kafka_buf_read_i16(reply, &error_code);

if (rd_kafka_buf_ApiVersion(reply) >= 1)
if (ApiVersion >= 1)
rd_kafka_buf_read_str(reply, &error_msg);

/* For non-blocking CreateTopicsRequests the broker
Expand Down Expand Up @@ -2150,6 +2169,39 @@ rd_kafka_CreateTopicsResponse_parse(rd_kafka_op_t *rko_req,
RD_KAFKAP_STR_LEN(&ktopic),
error_code, this_errstr);

if (ApiVersion >= 5) {
/*Number of partitions*/
rd_kafka_buf_read_i32(reply, &terr->num_partitions);
/*Replication factor*/
rd_kafka_buf_read_i16(reply, &terr->replication_factor);
rd_kafka_buf_read_arraycnt(reply, &configs_cnt,
RD_KAFKAP_CONFIGS_MAX);
for (j = 0; j < configs_cnt; j++) {
rd_kafkap_str_t name, value;
rd_kafka_ConfigEntry_t *entry;

rd_kafka_buf_read_str(reply, &name);
rd_kafka_buf_read_str(reply, &value);

entry = rd_kafka_ConfigEntry_new0(
name.str, RD_KAFKAP_STR_LEN(&name),
value.str, RD_KAFKAP_STR_LEN(&value));

rd_kafka_buf_read_bool(reply,
&entry->a.is_readonly);
rd_kafka_buf_read_i8(reply, &entry->a.source);
rd_kafka_buf_read_bool(reply,
&entry->a.is_sensitive);
rd_kafka_buf_skip_tags(reply);

rd_list_add(&terr->configs, entry);
}

rd_kafka_buf_read_tags(
reply, rd_kafkap_CreateTopics_result_tags_parse,
&terr->topic_config_err);
}

/* As a convenience to the application we insert topic result
* in the same order as they were requested. The broker
* does not maintain ordering unfortunately. */
Expand Down Expand Up @@ -2763,10 +2815,10 @@ static void rd_kafka_ConfigEntry_free(void *ptr) {
* @param value Config entry value, or NULL
* @param value_len Length of value, or -1 to use strlen()
*/
static rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name,
size_t name_len,
const char *value,
size_t value_len) {
rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name,
size_t name_len,
const char *value,
size_t value_len) {
rd_kafka_ConfigEntry_t *entry;

if (!name)
Expand Down
13 changes: 13 additions & 0 deletions src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,19 @@ struct rd_kafka_ConfigEntry_s {
rd_list_t synonyms; /**< Type (rd_kafka_configEntry *) */
};

/**
* @brief Create new ConfigEntry
*
* @param name Config entry name
* @param name_len Length of name, or -1 to use strlen()
* @param value Config entry value, or NULL
* @param value_len Length of value, or -1 to use strlen()
*/
rd_kafka_ConfigEntry_t *rd_kafka_ConfigEntry_new0(const char *name,
size_t name_len,
const char *value,
size_t value_len);

/**
* @brief A cluster ConfigResource constisting of:
* - resource type (BROKER, TOPIC)
Expand Down
27 changes: 27 additions & 0 deletions src/rdkafka_aux.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,28 @@ rd_kafka_topic_result_name(const rd_kafka_topic_result_t *topicres) {
return topicres->topic;
}

const rd_kafka_resp_err_t rd_kafka_topic_result_topic_config_error(
const rd_kafka_topic_result_t *topicres) {
return topicres->topic_config_err;
}

const int32_t
rd_kafka_topic_result_num_partitions(const rd_kafka_topic_result_t *topicres) {
return topicres->num_partitions;
}

const int16_t rd_kafka_topic_result_replication_factor(
const rd_kafka_topic_result_t *topicres) {
return topicres->replication_factor;
}

const rd_kafka_ConfigEntry_t **
rd_kafka_topic_result_configs(const rd_kafka_topic_result_t *topicres,
size_t *cntp) {
*cntp = rd_list_cnt(&topicres->configs);
return (const rd_kafka_ConfigEntry_t **)topicres->configs.rl_elems;
}

/**
* @brief Create new topic_result (single allocation).
*
Expand Down Expand Up @@ -82,6 +104,10 @@ rd_kafka_topic_result_t *rd_kafka_topic_result_new(const char *topic,
terr->errstr = NULL;
}

terr->num_partitions = -1;
terr->replication_factor = -1;
rd_list_init(&terr->configs, 0, NULL);

return terr;
}

Expand All @@ -90,6 +116,7 @@ rd_kafka_topic_result_t *rd_kafka_topic_result_new(const char *topic,
* @brief Destroy topic_result
*/
void rd_kafka_topic_result_destroy(rd_kafka_topic_result_t *terr) {
rd_list_destroy(&terr->configs);
rd_free(terr);
}

Expand Down
9 changes: 8 additions & 1 deletion src/rdkafka_aux.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ struct rd_kafka_topic_result_s {
char *topic; /**< Points to data */
rd_kafka_resp_err_t err; /**< Error code */
char *errstr; /**< Points to data after topic, unless NULL */
char data[1]; /**< topic followed by errstr */
rd_kafka_resp_err_t topic_config_err; /**< Error code for topic config,
* or 0 if not applicable. */
int32_t num_partitions; /**< Number of partitions, or -1 if
* not applicable. */
int16_t replication_factor; /**< Replication factor, or -1 if
* not applicable. */
rd_list_t configs; /**< Type (rd_kafka_ConfigEntry_t *) */
char data[1]; /**< topic followed by errstr */
};

void rd_kafka_topic_result_destroy(rd_kafka_topic_result_t *terr);
Expand Down
44 changes: 36 additions & 8 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,26 @@ int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
return -1;
}

/**
* @brief Read TopicConfigErrorCode from \p rkbuf.
*
* @param rkbuf buffer to read from
* @param topic_config_err is the TopicConfigErrorCode to populate.
*
* @return 1 on success, else -1 on parse error.
*/
int rd_kafka_buf_read_TopicConfigErrorCode(
rd_kafka_buf_t *rkbuf,
rd_kafka_resp_err_t *topic_config_err) {
const int log_decode_errors = LOG_ERR;
Copy link

Copilot AI Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable log_decode_errors is declared but never used in this function. It should be removed or used in error handling.

Suggested change
const int log_decode_errors = LOG_ERR;

Copilot uses AI. Check for mistakes.
int16_t err_code;
rd_kafka_buf_read_i16(rkbuf, &err_code);
*topic_config_err = err_code;
rd_kafka_buf_skip_tags(rkbuf);
return 1;
err_parse:
return -1;
}

/**
* @brief Send FindCoordinatorRequest.
Expand Down Expand Up @@ -4957,7 +4977,7 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb,
}

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_CreateTopics, 0, 4, &features);
rkb, RD_KAFKAP_CreateTopics, 0, 5, &features);
if (ApiVersion == -1) {
rd_snprintf(errstr, errstr_size,
"Topic Admin API (KIP-4) not supported "
Expand All @@ -4975,12 +4995,12 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb,
return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
}

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1,
4 + (rd_list_cnt(new_topics) * 200) +
4 + 1);
rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_CreateTopics, 1,
4 + (rd_list_cnt(new_topics) * 200) + 4 + 1, ApiVersion >= 5);

/* #topics */
rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics));
rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(new_topics));

while ((newt = rd_list_elem(new_topics, i++))) {
int partition;
Expand Down Expand Up @@ -5030,7 +5050,8 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb,
}

/* #replica_assignment */
rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas));
rd_kafka_buf_write_arraycnt(rkbuf,
rd_list_cnt(&newt->replicas));

/* Replicas per partition, see rdkafka_admin.[ch]
* for how these are constructed. */
Expand All @@ -5046,24 +5067,31 @@ rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb,
/* partition */
rd_kafka_buf_write_i32(rkbuf, partition);
/* #replicas */
rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas));
rd_kafka_buf_write_arraycnt(rkbuf,
rd_list_cnt(replicas));

for (ri = 0; ri < rd_list_cnt(replicas); ri++) {
/* replica */
rd_kafka_buf_write_i32(
rkbuf, rd_list_get_int32(replicas, ri));
}

rd_kafka_buf_write_tags_empty(rkbuf);
}

/* #config_entries */
rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config));
rd_kafka_buf_write_arraycnt(rkbuf, rd_list_cnt(&newt->config));

RD_LIST_FOREACH(entry, &newt->config, ei) {
/* config_name */
rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
/* config_value (nullable) */
rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1);
/* Skip Tags */
rd_kafka_buf_write_tags_empty(rkbuf);
}

rd_kafka_buf_write_tags_empty(rkbuf);
}

/* timeout */
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ int rd_kafka_buf_read_CurrentLeader(rd_kafka_buf_t *rkbuf,
int rd_kafka_buf_read_NodeEndpoints(rd_kafka_buf_t *rkbuf,
rd_kafkap_NodeEndpoints_t *NodeEndpoints);

int rd_kafka_buf_read_TopicConfigErrorCode(
rd_kafka_buf_t *rkbuf,
rd_kafka_resp_err_t *topic_config_err);


rd_kafka_resp_err_t
rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
Expand Down
4 changes: 0 additions & 4 deletions tests/0091-max_poll_interval_timeout.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ static void do_test_with_assign(const char *topic) {

test_conf_init(&conf, NULL, 60);

test_create_topic_wait_exists(NULL, topic, 2, 1, 5000);

test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);

Expand Down Expand Up @@ -249,8 +247,6 @@ static void do_test_no_poll(const char *topic) {

test_conf_init(&conf, NULL, 60);

test_create_topic_wait_exists(NULL, topic, 2, 1, 5000);

test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "max.poll.interval.ms", "7000" /*7s*/);

Expand Down
Loading