diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 4ee4745bcc8a3..d3761115a73e4 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -593,6 +593,8 @@ impl ConsumerStateInner { let decoder = self.decoder.clone(); let log_namespace = self.log_namespace; let mut out = self.out.clone(); + let fetch_wait_max_ms = self.config.fetch_wait_max_ms; + let socket_timeout = self.config.socket_timeout_ms; let (end_tx, mut end_signal) = oneshot::channel::<()>(); @@ -607,6 +609,14 @@ impl ConsumerStateInner { let mut status = PartitionConsumerStatus::NormalExit; + // Track the last successfully committed offset for this partition. + // Initialize to None - will be set to the first message's offset - 1 when we consume the first message. + // This ensures we can always seek back, even for the first message. + let mut last_committed_offset: Option = None; + + // Track if we need to seek back due to a rejected message + let mut need_seek_back = false; + loop { tokio::select!( // Make sure to handle the acknowledgement stream before new messages to prevent @@ -620,11 +630,32 @@ impl ConsumerStateInner { }, ack = ack_stream.next() => match ack { - Some((status, entry)) => { - if status == BatchStatus::Delivered - && let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) { - emit!(KafkaOffsetUpdateError { error }); + Some((ack_status, entry)) => { + match ack_status { + BatchStatus::Delivered => { + // Message was successfully delivered, commit the offset + match consumer.store_offset(&entry.topic, entry.partition, entry.offset) { + Err(error) => { + emit!(KafkaOffsetUpdateError { error }); + } + Ok(_) => { + // Track the last successfully committed offset + last_committed_offset = Some(entry.offset); + // Clear the seek-back flag since we successfully committed + need_seek_back = false; + } + } } + BatchStatus::Errored | BatchStatus::Rejected => { + // Message failed to deliver - do NOT commit offset + // Mark that we need to seek back to retry this message + need_seek_back = true; + debug!( + "Message delivery failed for {}:{}:{}, will retry", + &entry.topic, entry.partition, entry.offset + ); + } + } } None if finalizer.is_none() => { debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1); @@ -635,7 +666,7 @@ impl ConsumerStateInner { } }, - message = messages.next(), if finalizer.is_some() => match message { + message = messages.next(), if finalizer.is_some() && !need_seek_back => match message { None => unreachable!("MessageStream never calls Ready(None)"), Some(Err(error)) => match error { rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => { @@ -646,6 +677,12 @@ impl ConsumerStateInner { _ => emit!(KafkaReadError { error }), }, Some(Ok(msg)) => { + // Initialize last_committed_offset on first message + // Set it to offset - 1 so we can seek back to this message if needed + if last_committed_offset.is_none() { + last_committed_offset = Some(msg.offset().saturating_sub(1)); + } + emit!(KafkaBytesReceived { byte_size: msg.payload_len(), protocol: "tcp", @@ -655,6 +692,17 @@ impl ConsumerStateInner { parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await; } }, + + // Handle seeking back when we had a rejection + // Use fetch_wait_max_ms as the retry delay - this aligns with Kafka's polling interval + _ = tokio::time::sleep(fetch_wait_max_ms), if need_seek_back && acknowledgements => { + need_seek_back = !Self::seek_to_retry_offset( + &consumer, + &tp, + last_committed_offset, + socket_timeout, + ); + }, ) } (tp, status) @@ -662,6 +710,57 @@ impl ConsumerStateInner { (end_tx, handle) } + /// Attempt to seek back to retry a rejected message. + /// + /// When acknowledgements are enabled and a message delivery fails (e.g., due to auth errors), + /// this method seeks the consumer back to the failed message's offset to retry delivery. + /// + /// # Arguments + /// * `consumer` - The Kafka consumer + /// * `tp` - The topic-partition tuple + /// * `last_committed_offset` - The last successfully committed offset (if any) + /// * `timeout` - Network timeout for the seek operation + /// + /// # Returns + /// `true` if seek was successful or not needed, `false` if seek failed and should be retried + fn seek_to_retry_offset( + consumer: &StreamConsumer, + tp: &TopicPartition, + last_committed_offset: Option, + timeout: Duration, + ) -> bool { + use rdkafka::topic_partition_list::Offset; + + match last_committed_offset { + Some(offset) => { + // Seek to the next message after the last committed offset + let seek_offset = offset + 1; + + match consumer.seek(&tp.0, tp.1, Offset::Offset(seek_offset), timeout) { + Ok(_) => { + debug!( + "Seeked back to offset {} for {}:{} to retry rejected message", + seek_offset, &tp.0, tp.1 + ); + true + } + Err(error) => { + warn!( + "Failed to seek back to offset {} for {}:{}: {:?}. Will retry.", + seek_offset, &tp.0, tp.1, error + ); + false + } + } + } + None => { + // This should not happen since we initialize offset on first message, + // but handle it gracefully + true + } + } + } + /// Consume self, and return a "Draining" ConsumerState, along with a Future /// representing a drain deadline, based on max_drain_ms fn begin_drain( @@ -970,6 +1069,7 @@ async fn parse_message( event } }); + match out.send_event_stream(&mut stream).await { Err(_) => { emit!(StreamClosedError { count }); @@ -1670,6 +1770,73 @@ mod integration_test { send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await; } + /// Test that the Kafka source properly seeks back and retries messages when they are rejected. + /// This test verifies the fix for the issue where rejected messages would cause offset commits + /// to be skipped, but the consumer wouldn't seek back to retry them. + /// + /// The test: + /// 1. Sends 5 messages to Kafka + /// 2. Rejects the 3rd message on first attempt + /// 3. Verifies that Vector seeks back and retries the message + /// 4. Confirms all 5 messages are eventually received and committed + #[tokio::test] + async fn seeks_back_on_rejected_message() { + const SEND_COUNT: usize = 5; + + let topic = format!("test-topic-{}", random_string(10)); + let group_id = format!("test-group-{}", random_string(10)); + let config = make_config(&topic, &group_id, LogNamespace::Legacy, None); + + // Send 5 messages to Kafka + send_events(topic.clone(), 1, SEND_COUNT).await; + + // Reject the 3rd message (index 2) on first attempt, then accept it on retry + let attempt_count = std::sync::Arc::new(std::sync::Mutex::new(0)); + let attempt_count_clone = attempt_count.clone(); + + let error_fn = move |n: usize| { + if n == 2 { + let mut count = attempt_count_clone.lock().unwrap(); + *count += 1; + // Reject on first attempt, accept on retry + *count == 1 + } else { + false + } + }; + + let events = assert_source_compliance(&["protocol", "topic", "partition"], async move { + let (tx, rx) = SourceSender::new_test_errors(error_fn); + let (trigger_shutdown, shutdown_done) = + spawn_kafka(tx, config, true, false, LogNamespace::Legacy); + + // Collect all messages - should get all 5 even though one was rejected initially + let events = collect_n(rx, SEND_COUNT).await; + + tokio::task::yield_now().await; + drop(trigger_shutdown); + shutdown_done.await; + + events + }) + .await; + + // Verify we received all 5 messages + assert_eq!( + events.len(), + SEND_COUNT, + "Should receive all messages after retry" + ); + + // Verify the offset was committed for all messages (including the retried one) + let offset = fetch_tpl_offset(&group_id, &topic, 0); + assert_eq!( + offset, + Offset::from_raw(SEND_COUNT as i64), + "Offset should be committed for all messages including retried ones" + ); + } + async fn send_receive( acknowledgements: bool, error_at: impl Fn(usize) -> bool,