Skip to content

Commit 8c7aabb

Browse files
committed
[KIP-848] Added chaos and performance testing scripts
1 parent 01ec279 commit 8c7aabb

18 files changed

+1069
-55
lines changed

examples/rdkafka_performance.c

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,16 @@ static int exit_after = 0;
6767
static int exit_eof = 0;
6868
static FILE *stats_fp;
6969
static int dr_disp_div;
70-
static int verbosity = 1;
71-
static int latency_mode = 0;
72-
static FILE *latency_fp = NULL;
73-
static int msgcnt = -1;
74-
static int incremental_mode = 0;
75-
static int partition_cnt = 0;
76-
static int eof_cnt = 0;
77-
static int with_dr = 1;
78-
static int read_hdrs = 0;
70+
static int verbosity = 1;
71+
static int latency_mode = 0;
72+
static FILE *latency_fp = NULL;
73+
static int msgcnt = -1;
74+
static int incremental_mode = 0;
75+
static int partition_cnt = 0;
76+
static int eof_cnt = 0;
77+
static int with_dr = 1;
78+
static int read_hdrs = 0;
79+
static int is_group_protocol_classic = 1;
7980

8081

8182
static void stop(int sig) {
@@ -887,14 +888,13 @@ int main(int argc, char **argv) {
887888
* Try other values with: ... -X queued.min.messages=1000
888889
*/
889890
rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0);
890-
rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
891891
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);
892892

893893
topics = rd_kafka_topic_partition_list_new(1);
894894

895895
while ((opt = getopt(argc, argv,
896896
"PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
897-
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) {
897+
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:g:")) != -1) {
898898
switch (opt) {
899899
case 'G':
900900
if (rd_kafka_conf_set(conf, "group.id", optarg, errstr,
@@ -922,6 +922,16 @@ int main(int argc, char **argv) {
922922
case 'b':
923923
brokers = optarg;
924924
break;
925+
case 'g':
926+
if (rd_kafka_conf_set(conf, "group.protocol", optarg,
927+
errstr, sizeof(errstr)) !=
928+
RD_KAFKA_CONF_OK) {
929+
fprintf(stderr, "%% %s\n", errstr);
930+
exit(1);
931+
}
932+
is_group_protocol_classic =
933+
strcmp(optarg, "classic") == 0;
934+
break;
925935
case 's':
926936
msgsize = atoi(optarg);
927937
break;
@@ -1142,6 +1152,8 @@ int main(int argc, char **argv) {
11421152
" -p <num> Partition (defaults to random). "
11431153
"Multiple partitions are allowed in -C consumer mode.\n"
11441154
" -M Print consumer interval stats\n"
1155+
" -g <protocol> Group rebalance protocol to use. classic "
1156+
"or consumer (defaults to classic)\n"
11451157
" -b <brokers> Broker address list (host[:port],..)\n"
11461158
" -s <size> Message size (producer)\n"
11471159
" -k <key> Message key (producer)\n"
@@ -1201,6 +1213,11 @@ int main(int argc, char **argv) {
12011213
exit(1);
12021214
}
12031215

1216+
if (is_group_protocol_classic) {
1217+
/** Session timeout is moved to broker side on the new consumer
1218+
* group rebalance protocols */
1219+
rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
1220+
}
12041221

12051222
dispintvl *= 1000; /* us */
12061223

tests/0045-subscribe_update.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,8 @@ static void do_test_regex_many_mock(const char *assignment_strategy,
474474
/* Wait for an assignment to let the consumer catch up on
475475
* all rebalancing. */
476476
if (i % await_assignment_every == await_assignment_every - 1)
477-
test_consumer_wait_assignment(rk, rd_true /*poll*/);
477+
test_consumer_wait_assignment(rk, rd_true /*poll*/,
478+
1000);
478479
else if (!lots_of_topics)
479480
rd_usleep(100 * 1000, NULL);
480481
}
@@ -783,7 +784,7 @@ static void do_test_resubscribe_with_regex() {
783784

784785
/* Subscribe to regex ^.*topic_regex.* and topic_a literal */
785786
TEST_SAY("Subscribing to regex ^.*topic_regex.* and topic_a\n");
786-
test_consumer_subscribe_multi(rk, 2, "^.*topic_regex.*", topic_a);
787+
test_consumer_subscribe_multi_va(rk, 2, "^.*topic_regex.*", topic_a);
787788
/* Wait for assignment */
788789
if (test_consumer_group_protocol_classic()) {
789790
await_assignment("Assignment for topic1, topic2 and topic_a",

tests/0081-admin.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4212,7 +4212,7 @@ static void do_test_DeleteConsumerGroupOffsets(const char *what,
42124212

42134213
if (sub_consumer) {
42144214
TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
4215-
test_consumer_wait_assignment(consumer, rd_true);
4215+
test_consumer_wait_assignment(consumer, rd_true, 1000);
42164216
}
42174217

42184218
/* Commit some offsets */
@@ -4488,7 +4488,7 @@ static void do_test_AlterConsumerGroupOffsets(const char *what,
44884488
if (sub_consumer) {
44894489
TEST_CALL_ERR__(
44904490
rd_kafka_subscribe(consumer, subscription));
4491-
test_consumer_wait_assignment(consumer, rd_true);
4491+
test_consumer_wait_assignment(consumer, rd_true, 1000);
44924492
}
44934493
}
44944494

@@ -4769,7 +4769,7 @@ static void do_test_ListConsumerGroupOffsets(const char *what,
47694769

47704770
if (sub_consumer) {
47714771
TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
4772-
test_consumer_wait_assignment(consumer, rd_true);
4772+
test_consumer_wait_assignment(consumer, rd_true, 1000);
47734773
}
47744774

47754775
/* Commit some offsets */

tests/0102-static_group_rebalance.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ static void do_test_static_membership_mock(
759759

760760
TEST_SAY("Subscribing consumer 1 again\n");
761761
test_consumer_subscribe(consumer1, topic);
762-
test_consumer_wait_assignment(consumer1, rd_false);
762+
test_consumer_wait_assignment(consumer1, rd_false, 1000);
763763

764764
next_generation_id1 = consumer_generation_id(consumer1);
765765
next_generation_id2 = consumer_generation_id(consumer2);

tests/0103-transactions.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ static void do_test_basic_producer_txn(rd_bool_t enable_compression) {
165165
/* Wait for assignment to make sure consumer is fetching messages
166166
* below, so we can use the poll_no_msgs() timeout to
167167
* determine that messages were indeed aborted. */
168-
test_consumer_wait_assignment(c, rd_true);
168+
test_consumer_wait_assignment(c, rd_true, 1000);
169169

170170
/* Init transactions */
171171
TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000));
@@ -1039,7 +1039,7 @@ static void do_test_empty_txn(rd_bool_t send_offsets, rd_bool_t do_commit) {
10391039
test_conf_set(c_conf, "enable.auto.commit", "false");
10401040
c = test_create_consumer(topic, NULL, c_conf, NULL);
10411041
test_consumer_subscribe(c, topic);
1042-
test_consumer_wait_assignment(c, rd_false);
1042+
test_consumer_wait_assignment(c, rd_false, 1000);
10431043

10441044
TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
10451045

tests/0107-topic_recreate.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) {
193193

194194
/* Start consumer */
195195
test_consumer_subscribe(consumer, topic);
196-
test_consumer_wait_assignment(consumer, rd_true);
196+
test_consumer_wait_assignment(consumer, rd_true, 1000);
197197

198198
mtx_lock(&value_mtx);
199199
value = "before";

tests/0113-cooperative_rebalance.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3346,7 +3346,7 @@ static void x_incremental_rebalances(void) {
33463346
/* First consumer joins group */
33473347
TEST_SAY("%s: joining\n", rd_kafka_name(c[0]));
33483348
test_consumer_subscribe(c[0], topic);
3349-
test_consumer_wait_assignment(c[0], rd_true /*poll*/);
3349+
test_consumer_wait_assignment(c[0], rd_true /*poll*/, 1000);
33503350
test_consumer_verify_assignment(c[0], rd_true /*fail immediately*/, topic, 0,
33513351
topic, 1, topic, 2, topic, 3, topic, 4, topic,
33523352
5, NULL);
@@ -3355,7 +3355,7 @@ static void x_incremental_rebalances(void) {
33553355
/* Second consumer joins group */
33563356
TEST_SAY("%s: joining\n", rd_kafka_name(c[1]));
33573357
test_consumer_subscribe(c[1], topic);
3358-
test_consumer_wait_assignment(c[1], rd_true /*poll*/);
3358+
test_consumer_wait_assignment(c[1], rd_true /*poll*/, 1000);
33593359
rd_sleep(3);
33603360
if (test_consumer_group_protocol_classic()) {
33613361
test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 3,
@@ -3372,7 +3372,7 @@ static void x_incremental_rebalances(void) {
33723372
/* Third consumer joins group */
33733373
TEST_SAY("%s: joining\n", rd_kafka_name(c[2]));
33743374
test_consumer_subscribe(c[2], topic);
3375-
test_consumer_wait_assignment(c[2], rd_true /*poll*/);
3375+
test_consumer_wait_assignment(c[2], rd_true /*poll*/, 1000);
33763376
rd_sleep(3);
33773377
if (test_consumer_group_protocol_classic()) {
33783378
test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 4,

tests/0120-asymmetric_subscription.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ static void do_test_asymmetric(const char *assignor, const char *bootstraps) {
100100

101101
/* Await assignments for all consumers */
102102
for (i = 0; i < _C_CNT; i++)
103-
test_consumer_wait_assignment(c[i], rd_true);
103+
test_consumer_wait_assignment(c[i], rd_true, 1000);
104104

105105
/* All have assignments, grab them. */
106106
for (i = 0; i < _C_CNT; i++) {

tests/0122-buffer_cleaning_after_rebalance.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ static void do_test_consume_batch(const char *strategy) {
164164
c2 = test_create_consumer(topic, NULL, conf, NULL);
165165

166166
test_consumer_subscribe(c1, topic);
167-
test_consumer_wait_assignment(c1, rd_false);
167+
test_consumer_wait_assignment(c1, rd_false, 1000);
168168

169169
/* Create generic consume queue */
170170
rkq1 = rd_kafka_queue_get_consumer(c1);
@@ -183,7 +183,7 @@ static void do_test_consume_batch(const char *strategy) {
183183
TEST_FAIL("Failed to create thread for %s", "C1.PRE");
184184

185185
test_consumer_subscribe(c2, topic);
186-
test_consumer_wait_assignment(c2, rd_false);
186+
test_consumer_wait_assignment(c2, rd_false, 1000);
187187

188188
thrd_join(thread_id, NULL);
189189

tests/0132-strategy_ordering.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ static void do_test_strategy_ordering(const char *assignor,
147147

148148
/* Await assignments for all consumers */
149149
for (i = 0; i < _C_CNT; i++) {
150-
test_consumer_wait_assignment(c[i], rd_true);
150+
test_consumer_wait_assignment(c[i], rd_true, 1000);
151151
}
152152

153153
if (!strcmp(expected_assignor, "range"))

0 commit comments

Comments
 (0)