Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
9d0bd65
K2 testing commit
k-raina Jul 23, 2025
9a46014
K2 testing commit
k-raina Jul 23, 2025
4db0be8
K2 testing commit
k-raina Jul 23, 2025
10eafca
K2 Fix ( - 81)
Ankith-Confluent Aug 11, 2025
9135051
2.11 cherrypick fix
Ankith-Confluent Aug 11, 2025
04103da
K2 Fix
Ankith-Confluent Aug 12, 2025
ba824c7
K2 fix for 0011 and 0081 for 2.11
Ankith-Confluent Aug 14, 2025
39ba005
K2 Fix
Ankith-Confluent Aug 12, 2025
8d536dc
Cherry-pick Fix K2
Ankith-Confluent Aug 18, 2025
e7520d7
k2 11 and 81 fix
Ankith-Confluent Aug 14, 2025
9655b58
k2 Fix 2.8 in latest tests
Ankith-Confluent Aug 14, 2025
f9ab564
small fix
Ankith-Confluent Aug 18, 2025
86551ff
K2 Fix for 2.7
Ankith-Confluent Aug 19, 2025
160725f
Cherrypicked fix from 2.8 latest
Ankith-Confluent Aug 14, 2025
8117fa3
small fix
Ankith-Confluent Aug 19, 2025
b6158e9
k2 Fix
Ankith-Confluent Aug 21, 2025
e0dcd29
k2 Fix
Ankith-Confluent Aug 28, 2025
b4654c8
K2 Fix
Ankith-Confluent Aug 28, 2025
d32bede
K2 Fix
Ankith-Confluent Aug 28, 2025
24b8316
K2 Fix
Ankith-Confluent Aug 29, 2025
e041181
K2 Fix
Ankith-Confluent Sep 2, 2025
4c42662
K2 Fix
Ankith-Confluent Sep 2, 2025
37bc02b
K2 Fix
Ankith-Confluent Sep 4, 2025
c4c7f76
81 Fix
Ankith-Confluent Sep 8, 2025
3854c71
minor fix 11 and 59
Ankith-Confluent Sep 12, 2025
425e075
59 clean up
Ankith-Confluent Sep 12, 2025
4ca7d6f
81 test clean up
Ankith-Confluent Sep 12, 2025
31b3c20
fix for 107 and 113
Ankith-Confluent Sep 15, 2025
a5689d2
Fix critical K2 logic bug in 0081-admin.c - inverted condition for de…
Ankith-Confluent Sep 15, 2025
eb58f2c
merged 2.8
Ankith-Confluent Sep 15, 2025
5b15cc2
Merge branch 'k2_testing_v2.8.0' into k2_testing_final
Ankith-Confluent Sep 16, 2025
4a12f0b
Merge k2_testing_v2.8.0_latest into k2_testing_final
Ankith-Confluent Sep 16, 2025
bed0e3d
Merged 2.7
Ankith-Confluent Sep 16, 2025
931c560
Merge branch 'k2_testing_v2.5.3-Latest' into k2_testing_final
Ankith-Confluent Sep 17, 2025
09f564e
Merged 2.4
Ankith-Confluent Sep 17, 2025
c972992
Merge branch 'k2_testing_v2.3.0-Latest' into k2_testing_final
Ankith-Confluent Sep 17, 2025
78a5d24
Merge branch 'k2_testing_v2.2.1-Latest' into k2_testing_final
Ankith-Confluent Sep 17, 2025
e48166c
Merge branch 'k2_testing_v2.1.1-Latest' into k2_testing_final
Ankith-Confluent Sep 18, 2025
bb55d4a
Merge branch 'k2_testing_v2.0.2-Latest' into k2_testing_final
Ankith-Confluent Sep 18, 2025
af55b2e
fix regex tests
Ankith-Confluent Sep 23, 2025
0f29e67
ACKS Fix
Ankith-Confluent Sep 23, 2025
4a6efcf
rd_sleep Fix
Ankith-Confluent Sep 23, 2025
d2dca7c
rd_sleep second fix
Ankith-Confluent Sep 23, 2025
bd7cff0
rd_sleep Fix 3
Ankith-Confluent Sep 23, 2025
be170ed
rd_sleep Fix 4
Ankith-Confluent Sep 23, 2025
300afc2
rd_sleep Fix 5
Ankith-Confluent Sep 23, 2025
741d2db
Removing test_k2_cluster for timeout sceneriaos
Ankith-Confluent Sep 24, 2025
fbb66ef
Clean up 113 112 1
Ankith-Confluent Sep 24, 2025
e1722dc
K2 related skip fix by adding a new config
Ankith-Confluent Sep 24, 2025
863de8d
Fix couple of test , 81 mainly
Ankith-Confluent Sep 24, 2025
347539d
Delete topics (0001 - 0050)
Ankith-Confluent Sep 25, 2025
7e644ce
removed delete
Ankith-Confluent Sep 26, 2025
e93a11f
SSL skips
Ankith-Confluent Sep 26, 2025
600b387
delete topics utility
Ankith-Confluent Sep 26, 2025
2522cde
minro commit
Ankith-Confluent Sep 26, 2025
24c066c
removed delete
Ankith-Confluent Sep 26, 2025
93f9d45
delete topics utility
Ankith-Confluent Sep 26, 2025
4301f9d
minor bug
Ankith-Confluent Sep 26, 2025
6a15e96
Github Copilot reviews
Ankith-Confluent Sep 29, 2025
8cea5dd
minor spelling mistake
Ankith-Confluent Sep 29, 2025
9df1716
Removed extra lines and delete topics code
Ankith-Confluent Oct 3, 2025
21538a8
removed extra lines and fixed formatting
Ankith-Confluent Oct 3, 2025
4edf3d8
small fix
Ankith-Confluent Oct 3, 2025
21661a6
Refactor partition list printing functions to improve version safety.…
Ankith-Confluent Oct 3, 2025
566cc33
Refactor sleep function calls in tests to use sleep_for for consisten…
Ankith-Confluent Oct 6, 2025
1af4193
Remove K2 cluster mode references from tests and simplify fetch confi…
Ankith-Confluent Oct 6, 2025
a7fc114
Update fetch configuration and topic creation in tests for consistenc…
Ankith-Confluent Oct 6, 2025
b4e7851
Enhance test stability by adjusting sleep durations and handling time…
Ankith-Confluent Oct 6, 2025
6f83605
Improve test robustness by enhancing partition comparison logic and a…
Ankith-Confluent Oct 7, 2025
3156906
clang-formatted the changes
Ankith-Confluent Oct 22, 2025
ff16f52
Refactor vector syntax in cooperative rebalance tests for consistency.
Ankith-Confluent Oct 22, 2025
cdcbdd9
clang-format 18 verison
Ankith-Confluent Oct 22, 2025
2a72bf6
small styling changes
Ankith-Confluent Oct 22, 2025
05946f2
Remove unnecessary topic creation call in rkt_cache test to streamlin…
Ankith-Confluent Oct 22, 2025
1054fd9
Adjust sleep duration in fast metadata refresh test to improve timing…
Ankith-Confluent Oct 22, 2025
e3a7e68
Updated timeout duration
Ankith-Confluent Oct 22, 2025
c15bb21
increased time
Ankith-Confluent Oct 22, 2025
06148f1
Refactor variable initialization
Ankith-Confluent Oct 22, 2025
d6c360e
Refactor sleep calls to test_wait_for_metadata_propagation for improv…
Ankith-Confluent Oct 29, 2025
b6e0bb1
Update test configuration comment to clarify usage of test_wait_for_m…
Ankith-Confluent Oct 29, 2025
45edb1f
Merge remote-tracking branch 'origin/master' into k2_testing_final
Ankith-Confluent Oct 29, 2025
e136b42
Refactor variable initialization and formatting for consistency acros…
Ankith-Confluent Oct 29, 2025
e869834
clang fixes
Ankith-Confluent Oct 29, 2025
1fcb564
Refactor tests to improve clarity and functionality, including adjust…
Ankith-Confluent Oct 29, 2025
9f42f6c
style fixes
Ankith-Confluent Oct 29, 2025
09dcda6
Increase wait times for ACL propagation and consumer group rebalancin…
Ankith-Confluent Oct 30, 2025
2543aa8
minor fix
Ankith-Confluent Oct 30, 2025
e411ed2
warnings fixes
Ankith-Confluent Oct 30, 2025
7162bd1
clang format
Ankith-Confluent Oct 30, 2025
64fdbc9
Build and clang issues
Ankith-Confluent Oct 30, 2025
6a9cd8f
Fixes to 146 (not required changes)
Ankith-Confluent Oct 30, 2025
a69ff7d
small fix
Ankith-Confluent Oct 30, 2025
524ab74
spacing related commit
Ankith-Confluent Oct 30, 2025
e6254e4
small spacing changes
Ankith-Confluent Oct 30, 2025
852a647
146 formatting changes
Ankith-Confluent Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/incremental_alter_configs.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ cmd_incremental_alter_configs(rd_kafka_conf_t *conf, int argc, char **argv) {
char *config_name = argv[i * 5 + 3];
char *config_value = argv[i * 5 + 4];
rd_kafka_ConfigResource_t *config;
rd_kafka_AlterConfigOpType_t op_type;
rd_kafka_AlterConfigOpType_t op_type =
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET;
rd_kafka_ResourceType_t restype =
!strcmp(restype_s, "TOPIC") ? RD_KAFKA_RESOURCE_TOPIC
: !strcmp(restype_s, "BROKER") ? RD_KAFKA_RESOURCE_BROKER
Expand Down
2 changes: 0 additions & 2 deletions examples/rdkafka_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,6 @@ int main(int argc, char **argv) {
* Producer
*/
char buf[2048];
int sendcnt = 0;

/* Set up a message delivery report callback.
* It will be called once for each message, either on successful
Expand Down Expand Up @@ -661,7 +660,6 @@ int main(int argc, char **argv) {
"%s partition %i\n",
len, rd_kafka_topic_name(rkt),
partition);
sendcnt++;
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
}
Expand Down
1 change: 1 addition & 0 deletions src/rdbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size) {
}

rd_dassert(remains == 0);
(void)remains; /* Only used in asserts */

/* Restore original size */
slice->end = orig_end;
Expand Down
9 changes: 7 additions & 2 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -5392,10 +5392,15 @@ rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) {
*/
rd_kafka_Uuid_t rd_kafka_Uuid_random() {
int i;
unsigned char rand_values_bytes[16] = {0};
uint64_t *rand_values_uint64 = (uint64_t *)rand_values_bytes;
union {
unsigned char bytes[16];
uint64_t uint64s[2];
} rand_values = {{0}};
unsigned char *rand_values_bytes = rand_values.bytes;
uint64_t *rand_values_uint64 = rand_values.uint64s;
unsigned char *rand_values_app;
rd_kafka_Uuid_t ret = RD_KAFKA_UUID_ZERO;

for (i = 0; i < 16; i += 2) {
uint16_t rand_uint16 = (uint16_t)rd_jitter(0, INT16_MAX - 1);
/* No need to convert endianess here because it's still only
Expand Down
3 changes: 0 additions & 3 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,6 @@ static int rd_kafka_broker_bufq_timeout_scan(rd_kafka_broker_t *rkb,
int log_first_n) {
rd_kafka_buf_t *rkbuf, *tmp;
int cnt = 0;
int idx = -1;
const rd_kafka_buf_t *holb;

restart:
Expand All @@ -906,8 +905,6 @@ static int rd_kafka_broker_bufq_timeout_scan(rd_kafka_broker_t *rkb,
TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
rd_kafka_broker_state_t pre_state, post_state;

idx++;

if (likely(now && rkbuf->rkbuf_ts_timeout > now))
continue;

Expand Down
8 changes: 4 additions & 4 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1498,12 +1498,14 @@ static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) {
char reason[512];
va_list ap;
char astr[128];

va_start(ap, fmt);

if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
rd_kafka_cgrp_consumer_rejoin(rkcg, fmt, ap);
va_end(ap);
return;
}

va_start(ap, fmt);
rd_vsnprintf(reason, sizeof(reason), fmt, ap);
va_end(ap);

Expand Down Expand Up @@ -1677,7 +1679,6 @@ static void rd_kafka_cooperative_protocol_adjust_assignment(
int i;
int expected_max_assignment_size;
int total_assigned = 0;
int not_revoking = 0;
size_t par_cnt = 0;
const rd_kafka_topic_partition_t *toppar;
const PartitionMemberInfo_t *pmi;
Expand Down Expand Up @@ -1743,7 +1744,6 @@ static void rd_kafka_cooperative_protocol_adjust_assignment(
toppar->partition);

total_assigned++;
not_revoking++;
}

/* For ready-to-migrate-partitions, it is safe to move them
Expand Down
8 changes: 4 additions & 4 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3404,10 +3404,10 @@ void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk,
rd_kafka_buf_read_i64(rkbuf, &session_lifetime_ms);

if (session_lifetime_ms)
rd_kafka_dbg(
rk, SECURITY, "REAUTH",
"Received session lifetime %ld ms from broker",
session_lifetime_ms);
rd_kafka_dbg(rk, SECURITY, "REAUTH",
"Received session lifetime %" PRId64
" ms from broker",
session_lifetime_ms);
rd_kafka_broker_start_reauth_timer(rkb, session_lifetime_ms);
}

Expand Down
22 changes: 11 additions & 11 deletions src/rdkafka_telemetry_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,17 +522,17 @@ unit_test_telemetry_decode_error(void *opaque, const char *error, ...) {
rd_assert(!*"Failure while decoding telemetry data");
}

int unit_test_telemetry(rd_kafka_type_t rk_type,
rd_kafka_telemetry_producer_metric_name_t metric_name,
const char *expected_name,
const char *expected_description,
rd_kafka_telemetry_metric_type_t expected_type,
rd_bool_t is_double,
rd_bool_t is_per_broker,
void (*set_metric_value)(rd_kafka_t *,
rd_kafka_broker_t *),
int64_t expected_value_int,
double expected_value_double) {
int unit_test_telemetry(
rd_kafka_type_t rk_type,
int metric_name, /* Accepts both producer and consumer metric enums */
const char *expected_name,
const char *expected_description,
rd_kafka_telemetry_metric_type_t expected_type,
rd_bool_t is_double,
rd_bool_t is_per_broker,
void (*set_metric_value)(rd_kafka_t *, rd_kafka_broker_t *),
int64_t expected_value_int,
double expected_value_double) {
rd_kafka_t *rk = rd_calloc(1, sizeof(*rk));
rwlock_init(&rk->rk_lock);
rd_kafka_conf_t *conf = rd_kafka_conf_new();
Expand Down
2 changes: 1 addition & 1 deletion src/rdmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,5 +504,5 @@ int unittest_map(void) {
fails += unittest_untyped_map();
fails += unittest_typed_map();
fails += unittest_typed_map2();
return 0;
return fails;
}
3 changes: 0 additions & 3 deletions src/snappy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1437,13 +1437,11 @@ static inline int sn_compress(struct snappy_env *env, struct source *reader,
struct sink *writer)
{
int err;
size_t written = 0;
int N = available(reader);
char ulength[kmax32];
char *p = varint_encode32(ulength, N);

append(writer, ulength, p - ulength);
written += (p - ulength);

while (N > 0) {
/* Get next block to compress (without copying if possible) */
Expand Down Expand Up @@ -1500,7 +1498,6 @@ static inline int sn_compress(struct snappy_env *env, struct source *reader,
char *end = compress_fragment(fragment, fragment_size,
dest, table, table_size);
append(writer, dest, end - dest);
written += (end - dest);

N -= num_to_read;
skip(reader, pending_advance);
Expand Down
7 changes: 5 additions & 2 deletions tests/0001-multiobj.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ int main_0001_multiobj(int argc, char **argv) {

test_conf_init(&conf, &topic_conf, 30);

if (!topic)
if (!topic) {
topic = test_mk_topic_name("0001", 0);
test_create_topic_if_auto_create_disabled(NULL, topic,
-1);
}

TIMING_START(&t_full, "full create-produce-destroy cycle");
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
Expand Down Expand Up @@ -92,7 +95,7 @@ int main_0001_multiobj(int argc, char **argv) {

/* Topic is created on the first iteration. */
if (i > 0)
TIMING_ASSERT(&t_full, 0, 999);
TIMING_ASSERT(&t_full, 0, tmout_multip(999));
else
/* Allow metadata propagation. */
rd_sleep(1);
Expand Down
10 changes: 8 additions & 2 deletions tests/0002-unkpart.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ static void do_test_unkpart(void) {
int i;
int fails = 0;
const struct rd_kafka_metadata *metadata;
const char *topic;

TEST_SAY(_C_BLU "%s\n" _C_CLR, __FUNCTION__);

Expand All @@ -94,7 +95,10 @@ static void do_test_unkpart(void) {
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0002", 0), topic_conf);
topic = test_mk_topic_name("0002", 0);
test_create_topic_if_auto_create_disabled(rk, topic, 3);

rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
Expand Down Expand Up @@ -199,7 +203,9 @@ static void do_test_unkpart_timeout_nobroker(void) {
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

test_create_topic_if_auto_create_disabled(NULL, topic, 3);
rkt = rd_kafka_topic_new(rk, topic, NULL);

err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
Expand Down
6 changes: 5 additions & 1 deletion tests/0003-msgmaxsize.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ int main_0003_msgmaxsize(int argc, char **argv) {
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
const char *topic;

static const struct {
ssize_t keylen;
Expand Down Expand Up @@ -108,7 +109,10 @@ int main_0003_msgmaxsize(int argc, char **argv) {
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0003", 0), topic_conf);
topic = test_mk_topic_name("0003", 0);
test_create_topic_if_auto_create_disabled(NULL, topic, -1);
rkt = rd_kafka_topic_new(rk, topic, topic_conf);

if (!rkt)
TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));

Expand Down
Loading