@@ -593,6 +593,8 @@ impl ConsumerStateInner<Consuming> {
593593 let decoder = self . decoder . clone ( ) ;
594594 let log_namespace = self . log_namespace ;
595595 let mut out = self . out . clone ( ) ;
596+ let fetch_wait_max_ms = self . config . fetch_wait_max_ms ;
597+ let socket_timeout = self . config . socket_timeout_ms ;
596598
597599 let ( end_tx, mut end_signal) = oneshot:: channel :: < ( ) > ( ) ;
598600
@@ -607,6 +609,14 @@ impl ConsumerStateInner<Consuming> {
607609
608610 let mut status = PartitionConsumerStatus :: NormalExit ;
609611
612+ // Track the last successfully committed offset for this partition.
613+ // Initialize to None - will be set to the first message's offset - 1 when we consume the first message.
614+ // This ensures we can always seek back, even for the first message.
615+ let mut last_committed_offset: Option < i64 > = None ;
616+
617+ // Track if we need to seek back due to a rejected message
618+ let mut need_seek_back = false ;
619+
610620 loop {
611621 tokio:: select!(
612622 // Make sure to handle the acknowledgement stream before new messages to prevent
@@ -620,11 +630,32 @@ impl ConsumerStateInner<Consuming> {
620630 } ,
621631
622632 ack = ack_stream. next( ) => match ack {
623- Some ( ( status, entry) ) => {
624- if status == BatchStatus :: Delivered
625- && let Err ( error) = consumer. store_offset( & entry. topic, entry. partition, entry. offset) {
626- emit!( KafkaOffsetUpdateError { error } ) ;
633+ Some ( ( ack_status, entry) ) => {
634+ match ack_status {
635+ BatchStatus :: Delivered => {
636+ // Message was successfully delivered, commit the offset
637+ match consumer. store_offset( & entry. topic, entry. partition, entry. offset) {
638+ Err ( error) => {
639+ emit!( KafkaOffsetUpdateError { error } ) ;
640+ }
641+ Ok ( _) => {
642+ // Track the last successfully committed offset
643+ last_committed_offset = Some ( entry. offset) ;
644+ // Clear the seek-back flag since we successfully committed
645+ need_seek_back = false ;
646+ }
647+ }
627648 }
649+ BatchStatus :: Errored | BatchStatus :: Rejected => {
650+ // Message failed to deliver - do NOT commit offset
651+ // Mark that we need to seek back to retry this message
652+ need_seek_back = true ;
653+ debug!(
654+ "Message delivery failed for {}:{}:{}, will retry" ,
655+ & entry. topic, entry. partition, entry. offset
656+ ) ;
657+ }
658+ }
628659 }
629660 None if finalizer. is_none( ) => {
630661 debug!( "Acknowledgement stream complete for partition {}:{}." , & tp. 0 , tp. 1 ) ;
@@ -635,7 +666,7 @@ impl ConsumerStateInner<Consuming> {
635666 }
636667 } ,
637668
638- message = messages. next( ) , if finalizer. is_some( ) => match message {
669+ message = messages. next( ) , if finalizer. is_some( ) && !need_seek_back => match message {
639670 None => unreachable!( "MessageStream never calls Ready(None)" ) ,
640671 Some ( Err ( error) ) => match error {
641672 rdkafka:: error:: KafkaError :: PartitionEOF ( partition) if exit_eof => {
@@ -646,6 +677,12 @@ impl ConsumerStateInner<Consuming> {
646677 _ => emit!( KafkaReadError { error } ) ,
647678 } ,
648679 Some ( Ok ( msg) ) => {
680+ // Initialize last_committed_offset on first message
681+ // Set it to offset - 1 so we can seek back to this message if needed
682+ if last_committed_offset. is_none( ) {
683+ last_committed_offset = Some ( msg. offset( ) . saturating_sub( 1 ) ) ;
684+ }
685+
649686 emit!( KafkaBytesReceived {
650687 byte_size: msg. payload_len( ) ,
651688 protocol: "tcp" ,
@@ -655,13 +692,75 @@ impl ConsumerStateInner<Consuming> {
655692 parse_message( msg, decoder. clone( ) , & keys, & mut out, acknowledgements, & finalizer, log_namespace) . await ;
656693 }
657694 } ,
695+
696+ // Handle seeking back when we had a rejection
697+ // Use fetch_wait_max_ms as the retry delay - this aligns with Kafka's polling interval
698+ _ = tokio:: time:: sleep( fetch_wait_max_ms) , if need_seek_back && acknowledgements => {
699+ need_seek_back = !Self :: seek_to_retry_offset(
700+ & consumer,
701+ & tp,
702+ last_committed_offset,
703+ socket_timeout,
704+ ) ;
705+ } ,
658706 )
659707 }
660708 ( tp, status)
661709 } . instrument ( self . consumer_state . span . clone ( ) ) ) ;
662710 ( end_tx, handle)
663711 }
664712
713+ /// Attempt to seek back to retry a rejected message.
714+ ///
715+ /// When acknowledgements are enabled and a message delivery fails (e.g., due to auth errors),
716+ /// this method seeks the consumer back to the failed message's offset to retry delivery.
717+ ///
718+ /// # Arguments
719+ /// * `consumer` - The Kafka consumer
720+ /// * `tp` - The topic-partition tuple
721+ /// * `last_committed_offset` - The last successfully committed offset (if any)
722+ /// * `timeout` - Network timeout for the seek operation
723+ ///
724+ /// # Returns
725+ /// `true` if seek was successful or not needed, `false` if seek failed and should be retried
726+ fn seek_to_retry_offset (
727+ consumer : & StreamConsumer < KafkaSourceContext > ,
728+ tp : & TopicPartition ,
729+ last_committed_offset : Option < i64 > ,
730+ timeout : Duration ,
731+ ) -> bool {
732+ use rdkafka:: topic_partition_list:: Offset ;
733+
734+ match last_committed_offset {
735+ Some ( offset) => {
736+ // Seek to the next message after the last committed offset
737+ let seek_offset = offset + 1 ;
738+
739+ match consumer. seek ( & tp. 0 , tp. 1 , Offset :: Offset ( seek_offset) , timeout) {
740+ Ok ( _) => {
741+ debug ! (
742+ "Seeked back to offset {} for {}:{} to retry rejected message" ,
743+ seek_offset, & tp. 0 , tp. 1
744+ ) ;
745+ true
746+ }
747+ Err ( error) => {
748+ warn ! (
749+ "Failed to seek back to offset {} for {}:{}: {:?}. Will retry." ,
750+ seek_offset, & tp. 0 , tp. 1 , error
751+ ) ;
752+ false
753+ }
754+ }
755+ }
756+ None => {
757+ // This should not happen since we initialize offset on first message,
758+ // but handle it gracefully
759+ true
760+ }
761+ }
762+ }
763+
665764 /// Consume self, and return a "Draining" ConsumerState, along with a Future
666765 /// representing a drain deadline, based on max_drain_ms
667766 fn begin_drain (
@@ -970,6 +1069,7 @@ async fn parse_message(
9701069 event
9711070 }
9721071 } ) ;
1072+
9731073 match out. send_event_stream ( & mut stream) . await {
9741074 Err ( _) => {
9751075 emit ! ( StreamClosedError { count } ) ;
@@ -1670,6 +1770,73 @@ mod integration_test {
16701770 send_receive ( true , |n| n >= 2 , 2 , LogNamespace :: Vector ) . await ;
16711771 }
16721772
1773+ /// Test that the Kafka source properly seeks back and retries messages when they are rejected.
1774+ /// This test verifies the fix for the issue where rejected messages would cause offset commits
1775+ /// to be skipped, but the consumer wouldn't seek back to retry them.
1776+ ///
1777+ /// The test:
1778+ /// 1. Sends 5 messages to Kafka
1779+ /// 2. Rejects the 3rd message on first attempt
1780+ /// 3. Verifies that Vector seeks back and retries the message
1781+ /// 4. Confirms all 5 messages are eventually received and committed
1782+ #[ tokio:: test]
1783+ async fn seeks_back_on_rejected_message ( ) {
1784+ const SEND_COUNT : usize = 5 ;
1785+
1786+ let topic = format ! ( "test-topic-{}" , random_string( 10 ) ) ;
1787+ let group_id = format ! ( "test-group-{}" , random_string( 10 ) ) ;
1788+ let config = make_config ( & topic, & group_id, LogNamespace :: Legacy , None ) ;
1789+
1790+ // Send 5 messages to Kafka
1791+ send_events ( topic. clone ( ) , 1 , SEND_COUNT ) . await ;
1792+
1793+ // Reject the 3rd message (index 2) on first attempt, then accept it on retry
1794+ let attempt_count = std:: sync:: Arc :: new ( std:: sync:: Mutex :: new ( 0 ) ) ;
1795+ let attempt_count_clone = attempt_count. clone ( ) ;
1796+
1797+ let error_fn = move |n : usize | {
1798+ if n == 2 {
1799+ let mut count = attempt_count_clone. lock ( ) . unwrap ( ) ;
1800+ * count += 1 ;
1801+ // Reject on first attempt, accept on retry
1802+ * count == 1
1803+ } else {
1804+ false
1805+ }
1806+ } ;
1807+
1808+ let events = assert_source_compliance ( & [ "protocol" , "topic" , "partition" ] , async move {
1809+ let ( tx, rx) = SourceSender :: new_test_errors ( error_fn) ;
1810+ let ( trigger_shutdown, shutdown_done) =
1811+ spawn_kafka ( tx, config, true , false , LogNamespace :: Legacy ) ;
1812+
1813+ // Collect all messages - should get all 5 even though one was rejected initially
1814+ let events = collect_n ( rx, SEND_COUNT ) . await ;
1815+
1816+ tokio:: task:: yield_now ( ) . await ;
1817+ drop ( trigger_shutdown) ;
1818+ shutdown_done. await ;
1819+
1820+ events
1821+ } )
1822+ . await ;
1823+
1824+ // Verify we received all 5 messages
1825+ assert_eq ! (
1826+ events. len( ) ,
1827+ SEND_COUNT ,
1828+ "Should receive all messages after retry"
1829+ ) ;
1830+
1831+ // Verify the offset was committed for all messages (including the retried one)
1832+ let offset = fetch_tpl_offset ( & group_id, & topic, 0 ) ;
1833+ assert_eq ! (
1834+ offset,
1835+ Offset :: from_raw( SEND_COUNT as i64 ) ,
1836+ "Offset should be committed for all messages including retried ones"
1837+ ) ;
1838+ }
1839+
16731840 async fn send_receive (
16741841 acknowledgements : bool ,
16751842 error_at : impl Fn ( usize ) -> bool ,
0 commit comments