Skip to content

Commit b9a1072

Browse files
authored
[KIP-848] Tests for: dev_kip848_fix_fast_subscribe_or_unsubscribe (confluentinc#5026)
1 parent 8c83a55 commit b9a1072

File tree

5 files changed

+241
-1
lines changed

5 files changed

+241
-1
lines changed

src/rd.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ static RD_INLINE RD_UNUSED char *rd_strndup(const char *s, size_t len) {
227227
#define RD_ARRAYSIZE(A) RD_ARRAY_SIZE(A)
228228
#define RD_SIZEOF(TYPE, MEMBER) sizeof(((TYPE *)NULL)->MEMBER)
229229
#define RD_OFFSETOF(TYPE, MEMBER) ((size_t) & (((TYPE *)NULL)->MEMBER))
230+
/** Array foreach */
231+
#define RD_ARRAY_FOREACH_INDEX(ELEM, ARRAY, INDEX) \
232+
for ((INDEX = 0, (ELEM) = (ARRAY)[INDEX]); \
233+
INDEX < RD_ARRAY_SIZE(ARRAY); \
234+
(ELEM) = \
235+
(++INDEX < RD_ARRAY_SIZE(ARRAY) ? (ARRAY)[INDEX] : (ELEM)))
230236

231237
/**
232238
* Returns the 'I'th array element from static sized array 'A'

tests/0022-consume_batch.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,9 @@ static void do_test_consume_batch_non_existent_topic(void) {
259259

260260
int main_0022_consume_batch(int argc, char **argv) {
261261
do_test_consume_batch();
262-
/* FIXME: this must be implemented in KIP-848 for compatibility. */
262+
/* Subscribing to a non-existent topic doesn't generate a
263+
* UNKNOWN_TOPIC_OR_PART error with `consumer` group rebalance protocol.
264+
*/
263265
if (test_consumer_group_protocol_classic()) {
264266
do_test_consume_batch_non_existent_topic();
265267
}

tests/0045-subscribe_update.c

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,108 @@ static void do_test_resubscribe_with_regex() {
821821
SUB_TEST_PASS();
822822
}
823823

824+
/**
825+
* @brief Create many topics and apply several subscription
826+
* updates, unsubscribing and re-subscribing too.
827+
* After changing some subscriptions verifies that the assignment
828+
* corresponds to last one.
829+
*
830+
* @param with_rebalance_cb Use a rebalance callback to perform the assignment.
831+
* It needs to poll the consumer when awaiting for the
832+
* assignment in this case.
833+
*/
834+
static void do_test_subscribe_many_updates(rd_bool_t with_rebalance_cb) {
835+
#define TOPIC_CNT 100
836+
char *topics[TOPIC_CNT] = {0};
837+
char *topic;
838+
char *group;
839+
size_t i;
840+
rd_kafka_t *rk;
841+
rd_kafka_conf_t *conf;
842+
const int partition_cnt = 4;
843+
844+
SUB_TEST("%s", with_rebalance_cb ? "with rebalance callback"
845+
: "without rebalance callback");
846+
847+
RD_ARRAY_FOREACH_INDEX(topic, topics, i) {
848+
char topic_i[17];
849+
rd_snprintf(topic_i, sizeof(topic_i), "topic%" PRIusz, i);
850+
topics[i] = rd_strdup(test_mk_topic_name(topic_i, 1));
851+
};
852+
group = topics[0];
853+
854+
test_conf_init(&conf, NULL, 60);
855+
if (with_rebalance_cb)
856+
rd_kafka_conf_set_rebalance_cb(conf, test_rebalance_cb);
857+
rk = test_create_consumer(group, NULL, conf, NULL);
858+
859+
TEST_SAY("Creating %d topics\n", TOPIC_CNT);
860+
TEST_CALL_ERR__(test_CreateTopics_simple(rk, NULL, topics, TOPIC_CNT,
861+
partition_cnt, NULL));
862+
test_wait_topic_exists(rk, topics[TOPIC_CNT - 1], 5000);
863+
/* Give the cluster some more time to propagate metadata
864+
* for TOPICS_CNT topics */
865+
rd_sleep(1);
866+
867+
RD_ARRAY_FOREACH_INDEX(topic, topics, i) {
868+
const int max_subscription_size = 5;
869+
size_t j;
870+
int k;
871+
int subscription_size =
872+
RD_MIN(max_subscription_size, TOPIC_CNT - i);
873+
int expected_assignment_cnt = subscription_size * partition_cnt;
874+
rd_kafka_topic_partition_list_t *expected_assignment = NULL;
875+
876+
rd_kafka_topic_partition_list_t *subscription =
877+
rd_kafka_topic_partition_list_new(subscription_size);
878+
rd_bool_t check_expected_assignment =
879+
(i % 5 == 0 || i == TOPIC_CNT - 1);
880+
rd_bool_t do_unsubscribe = i % 7 == 0;
881+
882+
if (check_expected_assignment)
883+
expected_assignment = rd_kafka_topic_partition_list_new(
884+
expected_assignment_cnt);
885+
886+
for (j = i; j < i + subscription_size; j++) {
887+
rd_kafka_topic_partition_list_add(
888+
subscription, topics[j], RD_KAFKA_PARTITION_UA);
889+
/* We unsubscribe every 7 iteration and
890+
* we check assignments every 5 iteration so
891+
* at 7 * 5 we unsubscribe and check that assignment
892+
* is empty. */
893+
if (check_expected_assignment && !do_unsubscribe)
894+
for (k = 0; k < partition_cnt; k++)
895+
rd_kafka_topic_partition_list_add(
896+
expected_assignment, topics[j], k);
897+
}
898+
TEST_CALL_ERR__(rd_kafka_subscribe(rk, subscription));
899+
rd_kafka_topic_partition_list_destroy(subscription);
900+
901+
if (do_unsubscribe)
902+
TEST_CALL_ERR__(rd_kafka_unsubscribe(rk));
903+
if (check_expected_assignment) {
904+
test_consumer_wait_assignment_topic_partition_list(
905+
rk,
906+
/* poll when we have a rebalance callback */
907+
with_rebalance_cb, expected_assignment, 30000);
908+
rd_kafka_topic_partition_list_destroy(
909+
expected_assignment);
910+
}
911+
};
912+
913+
TEST_CALL_ERR__(
914+
test_DeleteTopics_simple(rk, NULL, topics, TOPIC_CNT, NULL));
915+
RD_ARRAY_FOREACH_INDEX(topic, topics, i) {
916+
rd_free(topic);
917+
};
918+
919+
test_consumer_close(rk);
920+
rd_kafka_destroy(rk);
921+
922+
SUB_TEST_PASS();
923+
#undef TOPIC_CNT
924+
}
925+
824926
int main_0045_subscribe_update(int argc, char **argv) {
825927

826928
if (!test_can_create_topics(1))
@@ -864,6 +966,12 @@ int main_0045_resubscribe_with_regex(int argc, char **argv) {
864966
return 0;
865967
}
866968

969+
int main_0045_subscribe_many_updates(int argc, char **argv) {
970+
do_test_subscribe_many_updates(rd_false);
971+
do_test_subscribe_many_updates(rd_true);
972+
return 0;
973+
}
974+
867975
int main_0045_subscribe_update_racks_mock(int argc, char **argv) {
868976
int use_replica_rack = 0;
869977
int use_client_rack = 0;

tests/test.c

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ _TEST_DECL(0045_subscribe_update_non_exist_and_partchange);
158158
_TEST_DECL(0045_subscribe_update_mock);
159159
_TEST_DECL(0045_subscribe_update_racks_mock);
160160
_TEST_DECL(0045_resubscribe_with_regex);
161+
_TEST_DECL(0045_subscribe_many_updates);
161162
_TEST_DECL(0046_rkt_cache);
162163
_TEST_DECL(0047_partial_buf_tmout);
163164
_TEST_DECL(0048_partitioner);
@@ -388,6 +389,7 @@ struct test tests[] = {
388389
_TEST(0045_subscribe_update_mock, TEST_F_LOCAL),
389390
_TEST(0045_subscribe_update_racks_mock, TEST_F_LOCAL),
390391
_TEST(0045_resubscribe_with_regex, 0, TEST_BRKVER(0, 9, 0, 0)),
392+
_TEST(0045_subscribe_many_updates, 0, TEST_BRKVER(0, 10, 2, 0)),
391393
_TEST(0046_rkt_cache, TEST_F_LOCAL),
392394
_TEST(0047_partial_buf_tmout, TEST_F_KNOWN_ISSUE),
393395
_TEST(0048_partitioner,
@@ -3114,6 +3116,106 @@ void test_consumer_verify_assignment0(const char *func,
31143116
rd_kafka_topic_partition_list_destroy(assignment);
31153117
}
31163118

3119+
/**
3120+
* @brief Verify that the consumer's assignment matches the expected assignment.
3121+
* passed as a topic partition list in \p expected_assignment .
3122+
*/
3123+
rd_bool_t test_consumer_verify_assignment_topic_partition_list0(
3124+
const char *func,
3125+
int line,
3126+
rd_kafka_t *rk,
3127+
const rd_kafka_topic_partition_list_t *expected_assignment) {
3128+
rd_kafka_topic_partition_list_t *assignment,
3129+
*expected_assignment_copy = NULL;
3130+
rd_kafka_resp_err_t err;
3131+
int i;
3132+
rd_bool_t ret = rd_true;
3133+
3134+
if ((err = rd_kafka_assignment(rk, &assignment)))
3135+
TEST_FAIL("%s:%d: Failed to get assignment for %s: %s", func,
3136+
line, rd_kafka_name(rk), rd_kafka_err2str(err));
3137+
3138+
TEST_SAYL(4, "%s assignment (%d partition(s)):\n", rd_kafka_name(rk),
3139+
assignment->cnt);
3140+
for (i = 0; i < assignment->cnt; i++)
3141+
TEST_SAYL(4, " %s [%" PRId32 "]\n", assignment->elems[i].topic,
3142+
assignment->elems[i].partition);
3143+
3144+
if (assignment->cnt != expected_assignment->cnt) {
3145+
ret = rd_false;
3146+
goto done;
3147+
}
3148+
3149+
expected_assignment_copy =
3150+
rd_kafka_topic_partition_list_copy(expected_assignment);
3151+
rd_kafka_topic_partition_list_sort(assignment, NULL, NULL);
3152+
rd_kafka_topic_partition_list_sort(expected_assignment_copy, NULL,
3153+
NULL);
3154+
3155+
for (i = 0; i < assignment->cnt; i++) {
3156+
if (strcmp(assignment->elems[i].topic,
3157+
expected_assignment_copy->elems[i].topic) ||
3158+
assignment->elems[i].partition !=
3159+
expected_assignment_copy->elems[i].partition) {
3160+
ret = rd_false;
3161+
goto done;
3162+
}
3163+
}
3164+
3165+
done:
3166+
RD_IF_FREE(expected_assignment_copy,
3167+
rd_kafka_topic_partition_list_destroy);
3168+
rd_kafka_topic_partition_list_destroy(assignment);
3169+
return ret;
3170+
}
3171+
3172+
/**
3173+
* @brief Wait until the consumer's assignment matches the expected assignment.
3174+
* passed as a topic partition list in \p expected_assignment .
3175+
* Polling if \p do_poll is true, otherwise sleeps.
3176+
* Until \p timeout_ms milliseconds.
3177+
*/
3178+
void test_consumer_wait_assignment_topic_partition_list0(
3179+
const char *func,
3180+
int line,
3181+
rd_kafka_t *rk,
3182+
rd_bool_t do_poll,
3183+
const rd_kafka_topic_partition_list_t *expected_assignment,
3184+
int timeout_ms) {
3185+
int i;
3186+
rd_ts_t end = test_clock() + timeout_ms * 1000;
3187+
rd_bool_t verified = rd_false;
3188+
3189+
TEST_SAY("Verifying assignment\n");
3190+
TEST_SAYL(4, "%s expected assignment (%d partition(s)):\n",
3191+
rd_kafka_name(rk), expected_assignment->cnt);
3192+
for (i = 0; i < expected_assignment->cnt; i++)
3193+
TEST_SAYL(4, " %s [%" PRId32 "]\n",
3194+
expected_assignment->elems[i].topic,
3195+
expected_assignment->elems[i].partition);
3196+
3197+
do {
3198+
verified =
3199+
test_consumer_verify_assignment_topic_partition_list0(
3200+
func, line, rk, expected_assignment);
3201+
if (verified)
3202+
break;
3203+
3204+
if (do_poll)
3205+
test_consumer_poll_once(rk, NULL, 100);
3206+
else
3207+
rd_usleep(100 * 1000, NULL);
3208+
3209+
} while (test_clock() < end);
3210+
3211+
if (!verified) {
3212+
TEST_FAIL(
3213+
"%s:%d: Expected assignment not found in %s's "
3214+
"assignment within timeout %d",
3215+
func, line, rd_kafka_name(rk), timeout_ms);
3216+
}
3217+
TEST_SAY("Verified assignment\n");
3218+
}
31173219

31183220

31193221
/**

tests/test.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,28 @@ void test_consumer_verify_assignment0(const char *func,
676676
test_consumer_verify_assignment0(__FUNCTION__, __LINE__, rk, \
677677
fail_immediately, __VA_ARGS__)
678678

679+
rd_bool_t test_consumer_verify_assignment_topic_partition_list0(
680+
const char *func,
681+
int line,
682+
rd_kafka_t *rk,
683+
const rd_kafka_topic_partition_list_t *expected_assignment);
684+
#define test_consumer_verify_assignment_topic_partition_list( \
685+
rk, expected_assignment) \
686+
test_consumer_verify_assignment_topic_partition_list0( \
687+
__FUNCTION__, __LINE__, rk, expected_assignment)
688+
689+
void test_consumer_wait_assignment_topic_partition_list0(
690+
const char *func,
691+
int line,
692+
rd_kafka_t *rk,
693+
rd_bool_t do_poll,
694+
const rd_kafka_topic_partition_list_t *expected_assignment,
695+
int timeout_ms);
696+
#define test_consumer_wait_assignment_topic_partition_list( \
697+
rk, do_poll, expected_assignment, timeout_ms) \
698+
test_consumer_wait_assignment_topic_partition_list0( \
699+
__FUNCTION__, __LINE__, rk, do_poll, expected_assignment, \
700+
timeout_ms)
679701

680702
void test_consumer_assign(const char *what,
681703
rd_kafka_t *rk,

0 commit comments

Comments
 (0)