Skip to content

Commit a7ca500

Browse files
committed
flush() could return >0 prior to the specified timeout
1 parent 743ba51 commit a7ca500

File tree

1 file changed

+7
-12
lines changed

1 file changed

+7
-12
lines changed

src/confluent_kafka/src/Producer.c

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -374,27 +374,22 @@ static PyObject *Producer_poll (Handle *self, PyObject *args,
374374
static PyObject *Producer_flush (Handle *self, PyObject *args,
375375
PyObject *kwargs) {
376376
double tmout = -1;
377-
int qlen;
377+
int qlen = 0;
378378
static char *kws[] = { "timeout", NULL };
379-
#if RD_KAFKA_VERSION >= 0x00090300
379+
rd_kafka_resp_err_t err;
380380
CallState cs;
381-
#endif
382381

383382
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
384383
return NULL;
385384

386-
#if RD_KAFKA_VERSION >= 0x00090300
387385
CallState_begin(self, &cs);
388-
rd_kafka_flush(self->rk, cfl_timeout_ms(tmout));
386+
err = rd_kafka_flush(self->rk, cfl_timeout_ms(tmout));
389387
if (!CallState_end(self, &cs))
390388
return NULL;
391-
qlen = rd_kafka_outq_len(self->rk);
392-
#else
393-
while ((qlen = rd_kafka_outq_len(self->rk)) > 0) {
394-
if (Producer_poll0(self, 500) == -1)
395-
return NULL;
396-
}
397-
#endif
389+
390+
if (err) /* Get the queue length on error (timeout) */
391+
qlen = rd_kafka_outq_len(self->rk);
392+
398393
return cfl_PyInt_FromInt(qlen);
399394
}
400395

0 commit comments

Comments
 (0)