Skip to content

Commit 519d44b

Browse files
authored
Fix data race when a buffer queue is being initialized instead of being reset (confluentinc#4718)
A data race happened when emptying buffers of a failing broker, in its thread, with the statistics callback in main thread gathering the buffer counts. Solved by resetting the atomic counters instead of initializing them. Happening since 1.x Closes confluentinc#4522
1 parent 3137946 commit 519d44b

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,20 @@ librdkafka v2.11.0 is a feature release:
44

55
* [KIP-1102](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code) Enable clients to rebootstrap based on timeout or error code (#4981).
66
* Fix for poll ratio calculation in case the queues are forwarded (#5017).
7+
* Fix data race when buffer queues are being reset instead of being
8+
initialized (#4718).
79

810

911
## Fixes
1012

13+
### General fixes
14+
15+
* Issues: #4522.
16+
A data race happened when emptying buffers of a failing broker, in its thread,
17+
with the statistics callback in main thread gathering the buffer counts.
18+
Solved by resetting the atomic counters instead of initializing them.
19+
Happening since 1.x (#4718).
20+
1121
### Telemetry fixes
1222

1323
* Issues: #5109

src/rdkafka_buf.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
241241
rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
242242
}
243243

244+
static void rd_kafka_bufq_reset(rd_kafka_bufq_t *rkbufq) {
245+
TAILQ_INIT(&rkbufq->rkbq_bufs);
246+
rd_atomic32_set(&rkbufq->rkbq_cnt, 0);
247+
rd_atomic32_set(&rkbufq->rkbq_msg_cnt, 0);
248+
}
249+
244250
/**
245251
* Concat all buffers from 'src' to tail of 'dst'
246252
*/
@@ -249,7 +255,7 @@ void rd_kafka_bufq_concat(rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
249255
(void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
250256
(void)rd_atomic32_add(&dst->rkbq_msg_cnt,
251257
rd_atomic32_get(&src->rkbq_msg_cnt));
252-
rd_kafka_bufq_init(src);
258+
rd_kafka_bufq_reset(src);
253259
}
254260

255261
/**

0 commit comments

Comments
 (0)