Skip to content

Commit a4a050b

Browse files
authored
[KIP-848] fix generated memberid uniqueness (confluentinc#5101)
1 parent 99b03e8 commit a4a050b

File tree

5 files changed

+143
-1
lines changed

5 files changed

+143
-1
lines changed

src/rdrand.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "rdrand.h"
3131
#include "rdtime.h"
3232
#include "tinycthread.h"
33+
#include "rdmurmur2.h"
3334

3435
int rd_jitter(int low, int high) {
3536
int rand_num;
@@ -40,8 +41,17 @@ int rd_jitter(int low, int high) {
4041
if (unlikely(seed == 0)) {
4142
struct timeval tv;
4243
rd_gettimeofday(&tv, NULL);
43-
seed = (unsigned int)(tv.tv_usec / 1000);
44+
seed = (unsigned int)(tv.tv_usec);
4445
seed ^= (unsigned int)(intptr_t)thrd_current();
46+
47+
/* When many threads are created at the same time and the
48+
* thread id is different only by a few bits it's possible that
49+
* `rand_r`, that is initially multiplying by `1103515245`,
50+
* truncates the variable bits and uses the same seed for
51+
* different threads. By applying `murmur2` we ensure that seed
52+
* variability is distributed across various bits at different
53+
* positions. */
54+
seed = (unsigned int)rd_murmur2(&seed, sizeof(seed));
4555
}
4656

4757
rand_num = rand_r(&seed);

tests/0153-memberid.c

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* librdkafka - Apache Kafka C library
3+
*
4+
* Copyright (c) 2025, Confluent Inc.
5+
* All rights reserved.
6+
*
7+
* Redistribution and use in source and binary forms, with or without
8+
* modification, are permitted provided that the following conditions are met:
9+
*
10+
* 1. Redistributions of source code must retain the above copyright notice,
11+
* this list of conditions and the following disclaimer.
12+
* 2. Redistributions in binary form must reproduce the above copyright notice,
13+
* this list of conditions and the following disclaimer in the documentation
14+
* and/or other materials provided with the distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26+
* POSSIBILITY OF SUCH DAMAGE.
27+
*/
28+
29+
#include "test.h"
30+
typedef struct consumer_s {
31+
const char *group_id;
32+
char *memberid;
33+
} consumer_t;
34+
35+
static int
36+
is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
37+
if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
38+
return 0;
39+
return 1;
40+
}
41+
42+
static int consumer_thread(void *arg) {
43+
rd_kafka_conf_t *conf;
44+
rd_kafka_t *consumer;
45+
consumer_t *consumer_args = arg;
46+
47+
test_curr->is_fatal_cb = is_fatal_cb;
48+
49+
test_conf_init(&conf, NULL, 60);
50+
51+
consumer =
52+
test_create_consumer(consumer_args->group_id, NULL, conf, NULL);
53+
54+
consumer_args->memberid = rd_kafka_memberid(consumer);
55+
56+
test_consumer_close(consumer);
57+
rd_kafka_destroy(consumer);
58+
test_curr->is_fatal_cb = NULL;
59+
return 0;
60+
}
61+
62+
void do_test_unique_memberid() {
63+
int i;
64+
int j;
65+
int have_only_unique_memberids = 1;
66+
const char *group_id = test_mk_topic_name("0153-memberid", 1);
67+
68+
#define CONSUMER_CNT 500
69+
thrd_t thread_id[CONSUMER_CNT];
70+
consumer_t consumer_args[CONSUMER_CNT];
71+
72+
SUB_TEST_QUICK();
73+
74+
for (i = 0; i < CONSUMER_CNT; i++) {
75+
consumer_args[i].group_id = group_id;
76+
consumer_args[i].memberid = NULL;
77+
thrd_create(&thread_id[i], consumer_thread, &consumer_args[i]);
78+
}
79+
80+
for (i = 0; i < CONSUMER_CNT; i++) {
81+
thrd_join(thread_id[i], NULL);
82+
}
83+
84+
for (i = 0; i < CONSUMER_CNT; i++) {
85+
if (have_only_unique_memberids) {
86+
for (j = i + 1; j < CONSUMER_CNT; j++) {
87+
if (strcmp(consumer_args[i].memberid,
88+
consumer_args[j].memberid) == 0) {
89+
TEST_SAY(
90+
"Consumer %d has the same member "
91+
"ID as consumer %d: %s\n",
92+
i, j, consumer_args[i].memberid);
93+
have_only_unique_memberids = 0;
94+
break;
95+
}
96+
}
97+
}
98+
rd_free(consumer_args[i].memberid);
99+
}
100+
101+
if (have_only_unique_memberids) {
102+
TEST_SAY("All %d consumers have unique member IDs\n",
103+
CONSUMER_CNT);
104+
} else {
105+
TEST_FAIL("Not all consumers have unique member IDs\n");
106+
}
107+
108+
SUB_TEST_PASS();
109+
}
110+
111+
int main_0153_memberid(int argc, char **argv) {
112+
113+
if (test_consumer_group_protocol_classic()) {
114+
TEST_SKIP(
115+
"Member ID is not generated on the client side in classic "
116+
"protocol, skipping test");
117+
return 0;
118+
}
119+
120+
if (!strcmp(test_mode, "valgrind")) {
121+
TEST_SKIP("Test is too heavy for valgrind, skipping it");
122+
return 0;
123+
}
124+
125+
do_test_unique_memberid();
126+
127+
return 0;
128+
}

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ set(
142142
0150-telemetry_mock.c
143143
0151-purge-brokers.c
144144
0152-rebootstrap.c
145+
0153-memberid.c
145146
8000-idle.cpp
146147
8001-fetch_from_follower_mock_manual.c
147148
test.c

tests/test.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ _TEST_DECL(0149_broker_same_host_port_mock);
269269
_TEST_DECL(0150_telemetry_mock);
270270
_TEST_DECL(0151_purge_brokers_mock);
271271
_TEST_DECL(0152_rebootstrap_local);
272+
_TEST_DECL(0153_memberid);
272273

273274
/* Manual tests */
274275
_TEST_DECL(8000_idle);
@@ -534,6 +535,7 @@ struct test tests[] = {
534535
_TEST(0150_telemetry_mock, 0),
535536
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
536537
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
538+
_TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)),
537539

538540

539541
/* Manual tests */

win32/tests/tests.vcxproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@
232232
<ClCompile Include="..\..\tests\0150-telemetry_mock.c" />
233233
<ClCompile Include="..\..\tests\0151-purge-brokers.c" />
234234
<ClCompile Include="..\..\tests\0152-rebootstrap.c" />
235+
<ClCompile Include="..\..\tests\0153-memberid.c" />
235236
<ClCompile Include="..\..\tests\8000-idle.cpp" />
236237
<ClCompile Include="..\..\tests\8001-fetch_from_follower_mock_manual.c" />
237238
<ClCompile Include="..\..\tests\test.c" />

0 commit comments

Comments
 (0)