Skip to content

Commit 519d44b

Browse files
authored
Fix data race when a buffer queue is being initialized instead of being reset (#4718)
A data race happened when emptying buffers of a failing broker, in its thread, with the statistics callback in main thread gathering the buffer counts. Solved by resetting the atomic counters instead of initializing them. Happening since 1.x Closes #4522
1 parent 3137946 commit 519d44b

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,20 @@ librdkafka v2.11.0 is a feature release:
44

55
* [KIP-1102](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code) Enable clients to rebootstrap based on timeout or error code (#4981).
66
* Fix for poll ratio calculation in case the queues are forwarded (#5017).
7+
* Fix data race when buffer queues are being reset instead of being
8+
initialized (#4718).
79

810

911
## Fixes
1012

13+
### General fixes
14+
15+
* Issues: #4522.
16+
A data race happened when emptying buffers of a failing broker, in its thread,
17+
with the statistics callback in main thread gathering the buffer counts.
18+
Solved by resetting the atomic counters instead of initializing them.
19+
Happening since 1.x (#4718).
20+
1121
### Telemetry fixes
1222

1323
* Issues: #5109

src/rdkafka_buf.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ void rd_kafka_bufq_init(rd_kafka_bufq_t *rkbufq) {
241241
rd_atomic32_init(&rkbufq->rkbq_msg_cnt, 0);
242242
}
243243

244+
static void rd_kafka_bufq_reset(rd_kafka_bufq_t *rkbufq) {
245+
TAILQ_INIT(&rkbufq->rkbq_bufs);
246+
rd_atomic32_set(&rkbufq->rkbq_cnt, 0);
247+
rd_atomic32_set(&rkbufq->rkbq_msg_cnt, 0);
248+
}
249+
244250
/**
245251
* Concat all buffers from 'src' to tail of 'dst'
246252
*/
@@ -249,7 +255,7 @@ void rd_kafka_bufq_concat(rd_kafka_bufq_t *dst, rd_kafka_bufq_t *src) {
249255
(void)rd_atomic32_add(&dst->rkbq_cnt, rd_atomic32_get(&src->rkbq_cnt));
250256
(void)rd_atomic32_add(&dst->rkbq_msg_cnt,
251257
rd_atomic32_get(&src->rkbq_msg_cnt));
252-
rd_kafka_bufq_init(src);
258+
rd_kafka_bufq_reset(src);
253259
}
254260

255261
/**

0 commit comments

Comments
 (0)