Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 172 additions & 5 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,8 @@ impl ConsumerStateInner<Consuming> {
let decoder = self.decoder.clone();
let log_namespace = self.log_namespace;
let mut out = self.out.clone();
let fetch_wait_max_ms = self.config.fetch_wait_max_ms;
let socket_timeout = self.config.socket_timeout_ms;

let (end_tx, mut end_signal) = oneshot::channel::<()>();

Expand All @@ -607,6 +609,14 @@ impl ConsumerStateInner<Consuming> {

let mut status = PartitionConsumerStatus::NormalExit;

// Track the last successfully committed offset for this partition.
// Initialize to None - will be set to the first message's offset - 1 when we consume the first message.
// This ensures we can always seek back, even for the first message.
let mut last_committed_offset: Option<i64> = None;

// Track if we need to seek back due to a rejected message
let mut need_seek_back = false;

loop {
tokio::select!(
// Make sure to handle the acknowledgement stream before new messages to prevent
Expand All @@ -620,11 +630,32 @@ impl ConsumerStateInner<Consuming> {
},

ack = ack_stream.next() => match ack {
Some((status, entry)) => {
if status == BatchStatus::Delivered
&& let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
emit!(KafkaOffsetUpdateError { error });
Some((ack_status, entry)) => {
match ack_status {
BatchStatus::Delivered => {
// Message was successfully delivered, commit the offset
match consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
Err(error) => {
emit!(KafkaOffsetUpdateError { error });
}
Ok(_) => {
// Track the last successfully committed offset
last_committed_offset = Some(entry.offset);
// Clear the seek-back flag since we successfully committed
need_seek_back = false;
}
}
}
BatchStatus::Errored | BatchStatus::Rejected => {
// Message failed to deliver - do NOT commit offset
// Mark that we need to seek back to retry this message
need_seek_back = true;
debug!(
"Message delivery failed for {}:{}:{}, will retry",
&entry.topic, entry.partition, entry.offset
);
}
}
}
None if finalizer.is_none() => {
debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
Expand All @@ -635,7 +666,7 @@ impl ConsumerStateInner<Consuming> {
}
},

message = messages.next(), if finalizer.is_some() => match message {
message = messages.next(), if finalizer.is_some() && !need_seek_back => match message {
None => unreachable!("MessageStream never calls Ready(None)"),
Some(Err(error)) => match error {
rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
Expand All @@ -646,6 +677,12 @@ impl ConsumerStateInner<Consuming> {
_ => emit!(KafkaReadError { error }),
},
Some(Ok(msg)) => {
// Initialize last_committed_offset on first message
// Set it to offset - 1 so we can seek back to this message if needed
if last_committed_offset.is_none() {
last_committed_offset = Some(msg.offset().saturating_sub(1));
}

emit!(KafkaBytesReceived {
byte_size: msg.payload_len(),
protocol: "tcp",
Expand All @@ -655,13 +692,75 @@ impl ConsumerStateInner<Consuming> {
parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
}
},

// Handle seeking back when we had a rejection
// Use fetch_wait_max_ms as the retry delay - this aligns with Kafka's polling interval
_ = tokio::time::sleep(fetch_wait_max_ms), if need_seek_back && acknowledgements => {
need_seek_back = !Self::seek_to_retry_offset(
&consumer,
&tp,
last_committed_offset,
socket_timeout,
);
},
)
}
(tp, status)
}.instrument(self.consumer_state.span.clone()));
(end_tx, handle)
}

/// Attempt to seek back to retry a rejected message.
///
/// When acknowledgements are enabled and a message delivery fails (e.g., due to auth errors),
/// this method seeks the consumer back to the failed message's offset to retry delivery.
///
/// # Arguments
/// * `consumer` - The Kafka consumer
/// * `tp` - The topic-partition tuple
/// * `last_committed_offset` - The last successfully committed offset (if any)
/// * `timeout` - Network timeout for the seek operation
///
/// # Returns
/// `true` if seek was successful or not needed, `false` if seek failed and should be retried
fn seek_to_retry_offset(
consumer: &StreamConsumer<KafkaSourceContext>,
tp: &TopicPartition,
last_committed_offset: Option<i64>,
timeout: Duration,
) -> bool {
use rdkafka::topic_partition_list::Offset;

match last_committed_offset {
Some(offset) => {
// Seek to the next message after the last committed offset
let seek_offset = offset + 1;

match consumer.seek(&tp.0, tp.1, Offset::Offset(seek_offset), timeout) {
Ok(_) => {
debug!(
"Seeked back to offset {} for {}:{} to retry rejected message",
seek_offset, &tp.0, tp.1
);
true
}
Err(error) => {
warn!(
"Failed to seek back to offset {} for {}:{}: {:?}. Will retry.",
seek_offset, &tp.0, tp.1, error
);
false
}
}
}
None => {
// This should not happen since we initialize offset on first message,
// but handle it gracefully
true
}
}
}

/// Consume self, and return a "Draining" ConsumerState, along with a Future
/// representing a drain deadline, based on max_drain_ms
fn begin_drain(
Expand Down Expand Up @@ -970,6 +1069,7 @@ async fn parse_message(
event
}
});

match out.send_event_stream(&mut stream).await {
Err(_) => {
emit!(StreamClosedError { count });
Expand Down Expand Up @@ -1670,6 +1770,73 @@ mod integration_test {
send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
}

/// Test that the Kafka source properly seeks back and retries messages when they are rejected.
/// This test verifies the fix for the issue where rejected messages would cause offset commits
/// to be skipped, but the consumer wouldn't seek back to retry them.
///
/// The test:
/// 1. Sends 5 messages to Kafka
/// 2. Rejects the 3rd message on first attempt
/// 3. Verifies that Vector seeks back and retries the message
/// 4. Confirms all 5 messages are eventually received and committed
#[tokio::test]
async fn seeks_back_on_rejected_message() {
const SEND_COUNT: usize = 5;

let topic = format!("test-topic-{}", random_string(10));
let group_id = format!("test-group-{}", random_string(10));
let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);

// Send 5 messages to Kafka
send_events(topic.clone(), 1, SEND_COUNT).await;

// Reject the 3rd message (index 2) on first attempt, then accept it on retry
let attempt_count = std::sync::Arc::new(std::sync::Mutex::new(0));
let attempt_count_clone = attempt_count.clone();

let error_fn = move |n: usize| {
if n == 2 {
let mut count = attempt_count_clone.lock().unwrap();
*count += 1;
// Reject on first attempt, accept on retry
*count == 1
} else {
false
}
};

let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
let (tx, rx) = SourceSender::new_test_errors(error_fn);
let (trigger_shutdown, shutdown_done) =
spawn_kafka(tx, config, true, false, LogNamespace::Legacy);

// Collect all messages - should get all 5 even though one was rejected initially
let events = collect_n(rx, SEND_COUNT).await;

tokio::task::yield_now().await;
drop(trigger_shutdown);
shutdown_done.await;

events
})
.await;

// Verify we received all 5 messages
assert_eq!(
events.len(),
SEND_COUNT,
"Should receive all messages after retry"
);

// Verify the offset was committed for all messages (including the retried one)
let offset = fetch_tpl_offset(&group_id, &topic, 0);
assert_eq!(
offset,
Offset::from_raw(SEND_COUNT as i64),
"Offset should be committed for all messages including retried ones"
);
}

async fn send_receive(
acknowledgements: bool,
error_at: impl Fn(usize) -> bool,
Expand Down
Loading