Skip to content

Commit f4d2475

Browse files
committed
Trigger a metadata cache refresh when current assignment not same as next_target_assignment
1 parent acec9a5 commit f4d2475

File tree

5 files changed

+166
-2
lines changed

5 files changed

+166
-2
lines changed

src/rdkafka_cgrp.c

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2925,13 +2925,23 @@ rd_kafka_cgrp_consumer_assignment_with_metadata(
29252925
rd_kafka_Uuid_t request_topic_id =
29262926
rd_kafka_topic_partition_get_topic_id(
29272927
&assignment->elems[i]);
2928+
int partition = assignment->elems[i].partition;
2929+
rd_bool_t partition_found = rd_false;
29282930

29292931
rd_kafka_rdlock(rk);
29302932
rkmce =
29312933
rd_kafka_metadata_cache_find_by_id(rk, request_topic_id, 1);
29322934

2933-
if (rkmce)
2935+
if (rkmce) {
29342936
topic_name = rd_strdup(rkmce->rkmce_mtopic.topic);
2937+
//Check if partition exists in metadata
2938+
for (int j = 0; j < rkmce->rkmce_mtopic.partition_cnt; j++) {
2939+
if (rkmce->rkmce_mtopic.partitions[j].id == partition) {
2940+
partition_found = rd_true;
2941+
break;
2942+
}
2943+
}
2944+
}
29352945
rd_kafka_rdunlock(rk);
29362946

29372947
if (unlikely(!topic_name)) {
@@ -2941,14 +2951,25 @@ rd_kafka_cgrp_consumer_assignment_with_metadata(
29412951
topic_name = rd_strdup(rktpar->topic);
29422952
}
29432953

2944-
if (likely(topic_name != NULL)) {
2954+
// If topic name is found and partition exists in metadata
2955+
if (likely(topic_name != NULL) && partition_found) {
29452956
rd_kafka_topic_partition_list_add_with_topic_name_and_id(
29462957
assignment_with_metadata, request_topic_id,
29472958
topic_name, assignment->elems[i].partition);
29482959
rd_free(topic_name);
29492960
continue;
29502961
}
29512962

2963+
if (!partition_found)
2964+
rd_kafka_dbg(
2965+
rkcg->rkcg_rk, CGRP, "HEARTBEAT",
2966+
"Partition assigned to this consumer is not "
2967+
"present in cached metadata for topic id: %s. "
2968+
"This may indicate that the topic's partition "
2969+
"count has increased and metadata needs to be "
2970+
"refreshed. ",
2971+
rd_kafka_Uuid_base64str(&request_topic_id));
2972+
29522973
if (missing_topic_ids) {
29532974
if (unlikely(!*missing_topic_ids))
29542975
*missing_topic_ids =

tests/0154-metadata_refresh.c

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* librdkafka - Apache Kafka C library
3+
*
4+
* Copyright (c) 2025, Confluent Inc.
5+
* All rights reserved.
6+
*
7+
* Redistribution and use in source and binary forms, with or without
8+
* modification, are permitted provided that the following conditions are met:
9+
*
10+
* 1. Redistributions of source code must retain the above copyright notice,
11+
* this list of conditions and the following disclaimer.
12+
* 2. Redistributions in binary form must reproduce the above copyright notice,
13+
* this list of conditions and the following disclaimer in the documentation
14+
* and/or other materials provided with the distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26+
* POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
#include <stdio.h>
30+
#include <string.h>
31+
#include "test.h"
32+
#include "rdkafka.h"
33+
34+
/**
35+
* Integration test for KIP-848 partition metadata refresh:
36+
* - Create topic with 2 partitions
37+
* - Start consumer group and verify initial assignment
38+
* - Increase partition count to 4
39+
* - Reset log tracking variables after partition creation
40+
* - Wait for HeartbeatRequest, HeartbeatResponse, and metadata refresh logs
41+
* - Assert that metadata refresh is triggered for new partitions
42+
*/
43+
44+
// Globals to track log sequence
45+
static volatile int seen_heartbeat_req = 0;
46+
static volatile int seen_heartbeat_resp = 0;
47+
static volatile int seen_metadata_log = 0;
48+
49+
static void reset_log_tracking(void) {
50+
seen_heartbeat_req = 0;
51+
seen_heartbeat_resp = 0;
52+
seen_metadata_log = 0;
53+
}
54+
55+
static void wait_for_metadata_refresh_log(int timeout_ms) {
56+
int elapsed = 0;
57+
while (elapsed < timeout_ms && !seen_metadata_log) {
58+
rd_usleep(500 * 1000, NULL); // 500 ms
59+
elapsed += 500;
60+
}
61+
TEST_ASSERT(seen_heartbeat_req, "Expected HeartbeatRequest log not seen after partition creation");
62+
TEST_ASSERT(seen_heartbeat_resp, "Expected HeartbeatResponse log not seen after partition creation");
63+
TEST_ASSERT(seen_metadata_log, "Expected metadata refresh log not seen after partition creation and heartbeat");
64+
}
65+
66+
// Custom log callback to capture and process librdkafka logs
67+
static void test_metadata_log_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
68+
if (strstr(buf, "Sent ConsumerGroupHeartbeatRequest")) {
69+
seen_heartbeat_req = 1;
70+
}
71+
if (seen_heartbeat_req && strstr(buf, "Received ConsumerGroupHeartbeatResponse")) {
72+
seen_heartbeat_resp = 1;
73+
}
74+
if (seen_heartbeat_resp && strstr(buf, "Partition assigned to this consumer is not present in cached metadata")) {
75+
seen_metadata_log = 1;
76+
}
77+
}
78+
79+
static rd_kafka_t *create_consumer(const char *topic, const char *group, void (*log_cb)(const rd_kafka_t *, int, const char *, const char *)) {
80+
rd_kafka_conf_t *conf;
81+
test_conf_init(&conf, NULL, 60);
82+
test_conf_set(conf, "group.id", group);
83+
test_conf_set(conf, "auto.offset.reset", "earliest");
84+
test_conf_set(conf, "debug", "cgrp, protocol");
85+
rd_kafka_conf_set_log_cb(conf, test_metadata_log_cb);
86+
rd_kafka_t *consumer = test_create_consumer(topic, NULL, conf, NULL);
87+
return consumer;
88+
}
89+
90+
static void setup_and_run_metadata_refresh_test(void) {
91+
const char *topic = test_mk_topic_name("cgrp_metadata", 1);
92+
int initial_partitions = 2;
93+
int new_partitions = 4;
94+
rd_kafka_t *c1, *c2, *rk;
95+
const char *group = "grp_metadata";
96+
97+
SUB_TEST_QUICK();
98+
99+
TEST_SAY("Creating topic %s with %d partitions\n", topic, initial_partitions);
100+
test_create_topic(NULL, topic, initial_partitions, 1);
101+
102+
TEST_SAY("Creating consumers\n");
103+
c1 = create_consumer(topic, group, test_metadata_log_cb);
104+
c2 = create_consumer(topic, group, test_metadata_log_cb);
105+
106+
rk = test_create_handle(RD_KAFKA_PRODUCER, NULL);
107+
108+
TEST_SAY("Subscribing to topic %s\n", topic);
109+
test_consumer_subscribe(c1, topic);
110+
test_consumer_subscribe(c2, topic);
111+
112+
// Wait for initial assignment
113+
test_consumer_wait_assignment(c1, rd_false);
114+
test_consumer_wait_assignment(c2, rd_false);
115+
116+
// Create new partitions
117+
TEST_SAY("Increasing partition count to %d\n", new_partitions);
118+
test_create_partitions(rk, topic, new_partitions);
119+
120+
// Reset log tracking variables to only consider logs after partition creation
121+
reset_log_tracking();
122+
123+
// Wait for expected logs for up to 10 seconds
124+
wait_for_metadata_refresh_log(10000);
125+
126+
TEST_SAY("Closing consumers\n");
127+
test_consumer_close(c1);
128+
test_consumer_close(c2);
129+
rd_kafka_destroy(c1);
130+
rd_kafka_destroy(c2);
131+
132+
SUB_TEST_PASS();
133+
}
134+
135+
int main_0154_metadata_refresh(int argc, char **argv) {
136+
if (!test_consumer_group_protocol_classic())
137+
setup_and_run_metadata_refresh_test();
138+
return 0;
139+
}

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ set(
143143
0151-purge-brokers.c
144144
0152-rebootstrap.c
145145
0153-memberid.c
146+
0154-metadata_refresh.c
146147
8000-idle.cpp
147148
8001-fetch_from_follower_mock_manual.c
148149
test.c

tests/test.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ _TEST_DECL(0150_telemetry_mock);
270270
_TEST_DECL(0151_purge_brokers_mock);
271271
_TEST_DECL(0152_rebootstrap_local);
272272
_TEST_DECL(0153_memberid);
273+
_TEST_DECL(0154_metadata_refresh);
273274

274275
/* Manual tests */
275276
_TEST_DECL(8000_idle);
@@ -536,6 +537,7 @@ struct test tests[] = {
536537
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
537538
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
538539
_TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)),
540+
_TEST(0154_metadata_refresh, 0, TEST_BRKVER(0, 11, 0, 0)),
539541

540542

541543
/* Manual tests */

win32/tests/tests.vcxproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@
233233
<ClCompile Include="..\..\tests\0151-purge-brokers.c" />
234234
<ClCompile Include="..\..\tests\0152-rebootstrap.c" />
235235
<ClCompile Include="..\..\tests\0153-memberid.c" />
236+
<ClCompile Include="..\..\tests\0154-metadata_refresh.c" />
236237
<ClCompile Include="..\..\tests\8000-idle.cpp" />
237238
<ClCompile Include="..\..\tests\8001-fetch_from_follower_mock_manual.c" />
238239
<ClCompile Include="..\..\tests\test.c" />

0 commit comments

Comments
 (0)