Skip to content

Commit 1510d2b

Browse files
authored
[KIP-714] Fix idle ratio calculation for non forwarded queues (confluentinc#5017)
* Fix for devel assert "rkq->rkq_ts_last_poll_end >= rkq->rkq_ts_last_poll_start" in test 0056. Partition queues can be not forwarded to the main poll queue so these timestamps must be per-queue instead of per-instance. We avoid checking if app polled should be called in case of internal poll calls where we're sure it's not a consume call, we also avoid checking it if "app polled" was already called by a dedicated consume function.
1 parent 2af6749 commit 1510d2b

File tree

5 files changed

+191
-77
lines changed

5 files changed

+191
-77
lines changed

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
1+
# librdkafka v2.11.0
2+
3+
librdkafka v2.11.0 is a feature release:
4+
5+
* Fix for poll ratio calculation in case the queues are forwarded (#5017).
6+
7+
8+
## Fixes
9+
10+
### Telemetry fixes
11+
12+
* Issues: #5109
13+
Fix for poll ratio calculation in case the queues are forwarded.
14+
Poll ratio is now calculated per-queue instead of per-instance and
15+
it allows to avoid calculation problems linked to using the same
16+
field.
17+
Happens since 2.6.0 (#5017).
18+
19+
20+
121
# librdkafka v2.10.1
222

323
librdkafka v2.10.1 is a maintenance release:

src/rdkafka.c

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3218,12 +3218,12 @@ static rd_kafka_op_res_t rd_kafka_consume_callback0(
32183218
struct consume_ctx ctx = {.consume_cb = consume_cb, .opaque = opaque};
32193219
rd_kafka_op_res_t res;
32203220

3221-
rd_kafka_app_poll_start(rkq->rkq_rk, 0, timeout_ms);
3221+
rd_kafka_app_poll_start(rkq->rkq_rk, rkq, 0, timeout_ms);
32223222

32233223
res = rd_kafka_q_serve(rkq, timeout_ms, max_cnt, RD_KAFKA_Q_CB_RETURN,
32243224
rd_kafka_consume_cb, &ctx);
32253225

3226-
rd_kafka_app_polled(rkq->rkq_rk);
3226+
rd_kafka_app_polled(rkq->rkq_rk, rkq);
32273227

32283228
return res;
32293229
}
@@ -3289,7 +3289,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
32893289
rd_ts_t now = rd_clock();
32903290
rd_ts_t abs_timeout = rd_timeout_init0(now, timeout_ms);
32913291

3292-
rd_kafka_app_poll_start(rk, now, timeout_ms);
3292+
rd_kafka_app_poll_start(rk, rkq, now, timeout_ms);
32933293

32943294
rd_kafka_yield_thread = 0;
32953295
while ((
@@ -3306,7 +3306,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
33063306
/* Callback called rd_kafka_yield(), we must
33073307
* stop dispatching the queue and return. */
33083308
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR);
3309-
rd_kafka_app_polled(rk);
3309+
rd_kafka_app_polled(rk, rkq);
33103310
return NULL;
33113311
}
33123312

@@ -3318,7 +3318,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
33183318
/* Timeout reached with no op returned. */
33193319
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__TIMED_OUT,
33203320
ETIMEDOUT);
3321-
rd_kafka_app_polled(rk);
3321+
rd_kafka_app_polled(rk, rkq);
33223322
return NULL;
33233323
}
33243324

@@ -3333,7 +3333,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
33333333

33343334
rd_kafka_set_last_error(0, 0);
33353335

3336-
rd_kafka_app_polled(rk);
3336+
rd_kafka_app_polled(rk, rkq);
33373337

33383338
return rkmessage;
33393339
}
@@ -3980,10 +3980,10 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
39803980
cb_type == RD_KAFKA_Q_CB_FORCE_RETURN)
39813981
return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
39823982
else {
3983-
rk->rk_ts_last_poll_end = rd_clock();
3984-
struct consume_ctx ctx = {.consume_cb =
3985-
rk->rk_conf.consume_cb,
3986-
.opaque = rk->rk_conf.opaque};
3983+
rkq->rkq_ts_last_poll_end = rd_clock();
3984+
struct consume_ctx ctx = {.consume_cb =
3985+
rk->rk_conf.consume_cb,
3986+
.opaque = rk->rk_conf.opaque};
39873987

39883988
return rd_kafka_consume_cb(rk, rkq, rko, cb_type, &ctx);
39893989
}
@@ -4213,17 +4213,19 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
42134213
int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms) {
42144214
int r;
42154215

4216-
r = rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK,
4217-
rd_kafka_poll_cb, NULL);
4216+
r = rd_kafka_q_serve_maybe_consume(rk->rk_rep, timeout_ms, 0,
4217+
RD_KAFKA_Q_CB_CALLBACK,
4218+
rd_kafka_poll_cb, NULL);
42184219
return r;
42194220
}
42204221

42214222

42224223
rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
42234224
rd_kafka_op_t *rko;
42244225

4225-
rko = rd_kafka_q_pop_serve(rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0,
4226-
RD_KAFKA_Q_CB_EVENT, rd_kafka_poll_cb, NULL);
4226+
rko = rd_kafka_q_pop_serve_maybe_consume(
4227+
rkqu->rkqu_q, rd_timeout_us(timeout_ms), 0, RD_KAFKA_Q_CB_EVENT,
4228+
rd_kafka_poll_cb, NULL);
42274229

42284230

42294231
if (!rko)
@@ -4235,8 +4237,9 @@ rd_kafka_event_t *rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms) {
42354237
int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms) {
42364238
int r;
42374239

4238-
r = rd_kafka_q_serve(rkqu->rkqu_q, timeout_ms, 0,
4239-
RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);
4240+
r = rd_kafka_q_serve_maybe_consume(rkqu->rkqu_q, timeout_ms, 0,
4241+
RD_KAFKA_Q_CB_CALLBACK,
4242+
rd_kafka_poll_cb, NULL);
42404243
return r;
42414244
}
42424245

src/rdkafka_int.h

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -392,16 +392,7 @@ struct rd_kafka_s {
392392
* to avoid reaching
393393
* max.poll.interval.ms. during that time
394394
* frame. Only relevant for consumer. */
395-
rd_ts_t rk_ts_last_poll_start; /**< Timestamp of last application
396-
* consumer_poll() call start
397-
* Only relevant for consumer.
398-
* Not an atomic as Kafka consumer
399-
* isn't thread safe. */
400-
rd_ts_t rk_ts_last_poll_end; /**< Timestamp of last application
401-
* consumer_poll() call end
402-
* Only relevant for consumer.
403-
* Not an atomic as Kafka consumer
404-
* isn't thread safe. */
395+
405396
/* First fatal error. */
406397
struct {
407398
rd_atomic32_t err; /**< rd_kafka_resp_err_t */
@@ -1179,29 +1170,31 @@ static RD_INLINE RD_UNUSED int rd_kafka_max_poll_exceeded(rd_kafka_t *rk) {
11791170
* @locality any
11801171
* @locks none
11811172
*/
1182-
static RD_INLINE RD_UNUSED void
1183-
rd_kafka_app_poll_start(rd_kafka_t *rk, rd_ts_t now, rd_bool_t is_blocking) {
1173+
static RD_INLINE RD_UNUSED void rd_kafka_app_poll_start(rd_kafka_t *rk,
1174+
rd_kafka_q_t *rkq,
1175+
rd_ts_t now,
1176+
rd_bool_t is_blocking) {
11841177
if (rk->rk_type != RD_KAFKA_CONSUMER)
11851178
return;
11861179

11871180
if (!now)
11881181
now = rd_clock();
11891182
if (is_blocking)
11901183
rd_atomic64_set(&rk->rk_ts_last_poll, INT64_MAX);
1191-
if (rk->rk_ts_last_poll_end) {
1184+
if (rkq->rkq_ts_last_poll_end) {
11921185
int64_t poll_idle_ratio = 0;
1193-
rd_ts_t poll_interval = now - rk->rk_ts_last_poll_start;
1186+
rd_ts_t poll_interval = now - rkq->rkq_ts_last_poll_start;
11941187
if (poll_interval) {
1195-
rd_ts_t idle_interval =
1196-
rk->rk_ts_last_poll_end - rk->rk_ts_last_poll_start;
1188+
rd_ts_t idle_interval = rkq->rkq_ts_last_poll_end -
1189+
rkq->rkq_ts_last_poll_start;
11971190
poll_idle_ratio =
11981191
idle_interval * 1000000 / poll_interval;
11991192
}
12001193
rd_avg_add(
12011194
&rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio,
12021195
poll_idle_ratio);
1203-
rk->rk_ts_last_poll_start = now;
1204-
rk->rk_ts_last_poll_end = 0;
1196+
rkq->rkq_ts_last_poll_start = now;
1197+
rkq->rkq_ts_last_poll_end = 0;
12051198
}
12061199
}
12071200

@@ -1213,7 +1206,8 @@ rd_kafka_app_poll_start(rd_kafka_t *rk, rd_ts_t now, rd_bool_t is_blocking) {
12131206
* @locality any
12141207
* @locks none
12151208
*/
1216-
static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) {
1209+
static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk,
1210+
rd_kafka_q_t *rkq) {
12171211
if (rk->rk_type == RD_KAFKA_CONSUMER) {
12181212
rd_ts_t now = rd_clock();
12191213
rd_atomic64_set(&rk->rk_ts_last_poll, now);
@@ -1226,11 +1220,10 @@ static RD_INLINE RD_UNUSED void rd_kafka_app_polled(rd_kafka_t *rk) {
12261220
rk->rk_cgrp,
12271221
"app polled after poll interval exceeded");
12281222
}
1229-
if (!rk->rk_ts_last_poll_end)
1230-
rk->rk_ts_last_poll_end = now;
1231-
/* Disabled until #5017 is merged
1232-
* rd_dassert(rk->rk_ts_last_poll_end >=
1233-
* rk->rk_ts_last_poll_start); */
1223+
if (!rkq->rkq_ts_last_poll_end)
1224+
rkq->rkq_ts_last_poll_end = now;
1225+
rd_dassert(rkq->rkq_ts_last_poll_end >=
1226+
rkq->rkq_ts_last_poll_start);
12341227
}
12351228
}
12361229

0 commit comments

Comments
 (0)