Skip to content

Commit 04ee64c

Browse files
committed
Requested changes
1 parent e38cc1e commit 04ee64c

File tree

2 files changed

+10
-22
lines changed

2 files changed

+10
-22
lines changed

src/rdkafka_cgrp.c

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2935,42 +2935,32 @@ rd_kafka_cgrp_consumer_assignment_with_metadata(
29352935
if (rkmce) {
29362936
topic_name = rd_strdup(rkmce->rkmce_mtopic.topic);
29372937
// Check if partition exists in metadata
2938-
int pcnt = rkmce->rkmce_mtopic.partition_cnt;
2939-
for (j = 0; j < pcnt; j++) {
2940-
if (rkmce->rkmce_mtopic.partitions[j].id ==
2941-
partition) {
2942-
partition_found = rd_true;
2943-
break;
2944-
}
2945-
}
2938+
int pcnt = rkmce->rkmce_mtopic.partition_cnt;
2939+
partition_found = (partition < pcnt);
29462940
}
29472941
rd_kafka_rdunlock(rk);
29482942

2949-
if (unlikely(!topic_name)) {
2943+
if (!topic_name) {
29502944
rktpar = rd_kafka_topic_partition_list_find_topic_by_id(
29512945
rkcg->rkcg_current_assignment, request_topic_id);
29522946
if (rktpar)
29532947
topic_name = rd_strdup(rktpar->topic);
29542948
}
29552949

29562950
// If topic name is found and partition exists in metadata
2957-
if (likely(topic_name != NULL) && partition_found) {
2951+
if (topic_name != NULL && partition_found) {
29582952
rd_kafka_topic_partition_list_add_with_topic_name_and_id(
29592953
assignment_with_metadata, request_topic_id,
29602954
topic_name, assignment->elems[i].partition);
29612955
rd_free(topic_name);
29622956
continue;
29632957
}
29642958

2965-
if (!partition_found)
2966-
rd_kafka_dbg(
2967-
rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2968-
"Partition assigned to this consumer is not "
2969-
"present in cached metadata for topic id: %s. "
2970-
"This may indicate that the topic's partition "
2971-
"count has increased and metadata needs to be "
2972-
"refreshed. ",
2973-
rd_kafka_Uuid_base64str(&request_topic_id));
2959+
if (!partition_found && topic_name != NULL)
2960+
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2961+
"Found new partition for topic %s. "
2962+
"Updating metadata.",
2963+
topic_name);
29742964

29752965
if (missing_topic_ids) {
29762966
if (unlikely(!*missing_topic_ids))

tests/0147-consumer_group_consumer_mock.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -982,9 +982,7 @@ static void test_metadata_log_cb(const rd_kafka_t *rk,
982982
seen_heartbeat_resp = 1;
983983
}
984984
if (seen_heartbeat_resp &&
985-
strstr(buf,
986-
"Partition assigned to this consumer is not present in "
987-
"cached metadata")) {
985+
strstr(buf, "Found new partition for topic")) {
988986
seen_metadata_log = 1;
989987
}
990988
}

0 commit comments

Comments
 (0)