Skip to content

Commit 459d4a1

Browse files
committed
in_kafka: improve offset commit timing to avoid data loss in edge cases
This patch refactors the Kafka input plugin’s offset commit logic when enable.auto.commit is disabled. Previously, offsets were committed immediately after each individual message was processed, even before it was flushed to the Fluent Bit pipeline. This introduced a small risk window where, in the event of a crash or forced shutdown, messages could be acknowledged (via Kafka commit) without being fully ingested resulting in potential data loss under extreme conditions. This patch moves the offset commit logic to occur only after a successful batch flush (flb_input_log_append). This ensures that: - offsets are only committed if messages were actually encoded and flushed. - we avoid committing unprocessed data, thereby improving correctness. - we reduce the number of commits, improving efficiency in batch mode. This change aligns commit timing with actual ingestion and eliminates the window of inconsistency between processing and commit acknowledgment. Signed-off-by: Eduardo Silva <[email protected]>
1 parent 1eefbf3 commit 459d4a1

File tree

1 file changed

+18
-10
lines changed

1 file changed

+18
-10
lines changed

plugins/in_kafka/in_kafka.c

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ static int in_kafka_collect(struct flb_input_instance *ins,
157157
struct flb_config *config, void *in_context)
158158
{
159159
int ret;
160+
int append_ret;
160161
struct flb_in_kafka_config *ctx = in_context;
161162
rd_kafka_message_t *rkm;
162163

@@ -200,12 +201,6 @@ static int in_kafka_collect(struct flb_input_instance *ins,
200201

201202
rd_kafka_message_destroy(rkm);
202203

203-
if (!ctx->enable_auto_commit) {
204-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
205-
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
206-
}
207-
}
208-
209204
/* Break from the loop when reaching the limit of polling if available */
210205
if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED &&
211206
ctx->log_encoder->output_length > ctx->polling_threshold + 512) {
@@ -215,11 +210,24 @@ static int in_kafka_collect(struct flb_input_instance *ins,
215210

216211
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
217212
if (ctx->log_encoder->output_length > 0) {
218-
flb_input_log_append(ins, NULL, 0,
219-
ctx->log_encoder->output_buffer,
220-
ctx->log_encoder->output_length);
213+
append_ret = flb_input_log_append(ins, NULL, 0,
214+
ctx->log_encoder->output_buffer,
215+
ctx->log_encoder->output_length);
216+
217+
if (append_ret == 0) {
218+
if (!ctx->enable_auto_commit) {
219+
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
220+
}
221+
ret = 0;
222+
}
223+
else {
224+
flb_plg_error(ins, "failed to append records");
225+
ret = -1;
226+
}
227+
}
228+
else {
229+
ret = 0;
221230
}
222-
ret = 0;
223231
}
224232
else {
225233
flb_plg_error(ins, "Error encoding record : %d", ret);

0 commit comments

Comments
 (0)