Skip to content

Commit c0a4917

Browse files
committed
Move test
1 parent 1ce15dd commit c0a4917

File tree

6 files changed

+124
-163
lines changed

6 files changed

+124
-163
lines changed

src/rdkafka_cgrp.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2914,7 +2914,7 @@ rd_kafka_cgrp_consumer_assignment_with_metadata(
29142914
rd_kafka_cgrp_t *rkcg,
29152915
rd_kafka_topic_partition_list_t *assignment,
29162916
rd_list_t **missing_topic_ids) {
2917-
int i;
2917+
int i, j;
29182918
rd_kafka_t *rk = rkcg->rkcg_rk;
29192919
rd_kafka_topic_partition_list_t *assignment_with_metadata =
29202920
rd_kafka_topic_partition_list_new(assignment->cnt);
@@ -2935,8 +2935,8 @@ rd_kafka_cgrp_consumer_assignment_with_metadata(
29352935
if (rkmce) {
29362936
topic_name = rd_strdup(rkmce->rkmce_mtopic.topic);
29372937
// Check if partition exists in metadata
2938-
for (int j = 0; j < rkmce->rkmce_mtopic.partition_cnt;
2939-
j++) {
2938+
int pcnt = rkmce->rkmce_mtopic.partition_cnt;
2939+
for (j = 0; j < pcnt; j++) {
29402940
if (rkmce->rkmce_mtopic.partitions[j].id ==
29412941
partition) {
29422942
partition_found = rd_true;

tests/0147-consumer_group_consumer_mock.c

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
*/
2828

2929
#include "test.h"
30+
#include "rdkafka.h"
3031

3132
#include "../src/rdkafka_proto.h"
3233

3334
#include <stdarg.h>
35+
#include <stdio.h>
36+
#include <string.h>
3437

3538

3639
/**
@@ -928,6 +931,122 @@ static void do_test_quick_unsubscribe_tests(void) {
928931
}
929932
}
930933

934+
/**
935+
* Integration test for KIP-848 partition metadata refresh:
936+
* - Create topic with 2 partitions
937+
* - Start consumer group and verify initial assignment
938+
* - Increase partition count to 4
939+
* - Reset log tracking variables after partition creation
940+
* - Wait for HeartbeatRequest, HeartbeatResponse, and metadata refresh logs
941+
* - Assert that metadata refresh is triggered for new partitions
942+
*/
943+
944+
// Globals to track log sequence
945+
static volatile int seen_heartbeat_req = 0;
946+
static volatile int seen_heartbeat_resp = 0;
947+
static volatile int seen_metadata_log = 0;
948+
949+
static void reset_log_tracking(void) {
950+
seen_heartbeat_req = 0;
951+
seen_heartbeat_resp = 0;
952+
seen_metadata_log = 0;
953+
}
954+
955+
static void wait_for_metadata_refresh_log(int timeout_ms) {
956+
int elapsed = 0;
957+
while (elapsed < timeout_ms && !seen_metadata_log) {
958+
rd_usleep(500 * 1000, NULL); // 500 ms
959+
elapsed += 500;
960+
}
961+
TEST_ASSERT(
962+
seen_heartbeat_req,
963+
"Expected HeartbeatRequest log not seen after partition creation");
964+
TEST_ASSERT(
965+
seen_heartbeat_resp,
966+
"Expected HeartbeatResponse log not seen after partition creation");
967+
TEST_ASSERT(seen_metadata_log,
968+
"Expected metadata refresh log not seen after partition "
969+
"creation and heartbeat");
970+
}
971+
972+
// Custom log callback to capture and process librdkafka logs
973+
static void test_metadata_log_cb(const rd_kafka_t *rk,
974+
int level,
975+
const char *fac,
976+
const char *buf) {
977+
if (strstr(buf, "Sent ConsumerGroupHeartbeatRequest")) {
978+
seen_heartbeat_req = 1;
979+
}
980+
if (seen_heartbeat_req &&
981+
strstr(buf, "Received ConsumerGroupHeartbeatResponse")) {
982+
seen_heartbeat_resp = 1;
983+
}
984+
if (seen_heartbeat_resp &&
985+
strstr(buf,
986+
"Partition assigned to this consumer is not present in "
987+
"cached metadata")) {
988+
seen_metadata_log = 1;
989+
}
990+
}
991+
992+
static rd_kafka_t *create_consumers(const char *group) {
993+
rd_kafka_conf_t *conf;
994+
test_conf_init(&conf, NULL, 60);
995+
test_conf_set(conf, "group.id", group);
996+
test_conf_set(conf, "auto.offset.reset", "earliest");
997+
test_conf_set(conf, "debug", "cgrp, protocol");
998+
rd_kafka_conf_set_log_cb(conf, test_metadata_log_cb);
999+
rd_kafka_t *consumer = test_create_consumer(group, NULL, conf, NULL);
1000+
return consumer;
1001+
}
1002+
1003+
static void do_test_setup_and_run_metadata_refresh_test(void) {
1004+
const char *topic = test_mk_topic_name("cgrp_metadata", 1);
1005+
int initial_partitions = 2;
1006+
int new_partitions = 4;
1007+
rd_kafka_t *c1, *c2, *rk;
1008+
const char *group = "grp_metadata";
1009+
1010+
SUB_TEST_QUICK();
1011+
1012+
TEST_SAY("Creating topic %s with %d partitions\n", topic,
1013+
initial_partitions);
1014+
test_create_topic(NULL, topic, initial_partitions, 1);
1015+
1016+
TEST_SAY("Creating consumers\n");
1017+
c1 = create_consumers(group);
1018+
c2 = create_consumers(group);
1019+
1020+
rk = test_create_handle(RD_KAFKA_PRODUCER, NULL);
1021+
1022+
TEST_SAY("Subscribing to topic %s\n", topic);
1023+
test_consumer_subscribe(c1, topic);
1024+
test_consumer_subscribe(c2, topic);
1025+
1026+
// Wait for initial assignment
1027+
test_consumer_wait_assignment(c1, rd_false);
1028+
test_consumer_wait_assignment(c2, rd_false);
1029+
1030+
// Create new partitions
1031+
TEST_SAY("Increasing partition count to %d\n", new_partitions);
1032+
test_create_partitions(rk, topic, new_partitions);
1033+
1034+
// Reset log tracking variables to only consider logs after partition
1035+
// creation
1036+
reset_log_tracking();
1037+
1038+
// Wait for expected logs for up to 10 seconds
1039+
wait_for_metadata_refresh_log(10000);
1040+
1041+
TEST_SAY("Closing consumers\n");
1042+
test_consumer_close(c1);
1043+
test_consumer_close(c2);
1044+
rd_kafka_destroy(c1);
1045+
rd_kafka_destroy(c2);
1046+
1047+
SUB_TEST_PASS();
1048+
}
1049+
9311050
int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
9321051
TEST_SKIP_MOCK_CLUSTER(0);
9331052

@@ -948,5 +1067,7 @@ int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
9481067

9491068
do_test_quick_unsubscribe_tests();
9501069

1070+
do_test_setup_and_run_metadata_refresh_test();
1071+
9511072
return 0;
9521073
}

tests/0154-metadata_refresh.c

Lines changed: 0 additions & 156 deletions
This file was deleted.

tests/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ set(
143143
0151-purge-brokers.c
144144
0152-rebootstrap.c
145145
0153-memberid.c
146-
0154-metadata_refresh.c
147146
8000-idle.cpp
148147
8001-fetch_from_follower_mock_manual.c
149148
test.c

tests/test.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ _TEST_DECL(0150_telemetry_mock);
270270
_TEST_DECL(0151_purge_brokers_mock);
271271
_TEST_DECL(0152_rebootstrap_local);
272272
_TEST_DECL(0153_memberid);
273-
_TEST_DECL(0154_metadata_refresh);
274273

275274
/* Manual tests */
276275
_TEST_DECL(8000_idle);
@@ -537,7 +536,6 @@ struct test tests[] = {
537536
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
538537
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
539538
_TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)),
540-
_TEST(0154_metadata_refresh, 0, TEST_BRKVER(0, 11, 0, 0)),
541539

542540

543541
/* Manual tests */

win32/tests/tests.vcxproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@
233233
<ClCompile Include="..\..\tests\0151-purge-brokers.c" />
234234
<ClCompile Include="..\..\tests\0152-rebootstrap.c" />
235235
<ClCompile Include="..\..\tests\0153-memberid.c" />
236-
<ClCompile Include="..\..\tests\0154-metadata_refresh.c" />
237236
<ClCompile Include="..\..\tests\8000-idle.cpp" />
238237
<ClCompile Include="..\..\tests\8001-fetch_from_follower_mock_manual.c" />
239238
<ClCompile Include="..\..\tests\test.c" />

0 commit comments

Comments
 (0)