@@ -2900,12 +2900,17 @@ public void nack(long sleep) {
29002900 "nack() can only be called on the consumer thread" );
29012901 Assert .isTrue (sleep >= 0 , "sleep cannot be negative" );
29022902 ListenerConsumer .this .nackSleep = sleep ;
2903- ListenerConsumer .this .deferredOffsets .forEach ((part , recs ) -> recs .clear ());
2903+ synchronized (ListenerConsumer .this ) {
2904+ if (ListenerConsumer .this .offsetsInThisBatch != null ) {
2905+ ListenerConsumer .this .offsetsInThisBatch .forEach ((part , recs ) -> recs .clear ());
2906+ ListenerConsumer .this .deferredOffsets .forEach ((part , recs ) -> recs .clear ());
2907+ }
2908+ }
29042909 }
29052910
29062911 @ Override
29072912 public String toString () {
2908- return "Acknowledgment for " + this .record ;
2913+ return "Acknowledgment for " + ListenerUtils . recordToString ( this .record , true ) ;
29092914 }
29102915
29112916 }
@@ -2944,6 +2949,12 @@ public void nack(int index, long sleep) {
29442949 Assert .isTrue (index >= 0 && index < this .records .count (), "index out of bounds" );
29452950 ListenerConsumer .this .nackIndex = index ;
29462951 ListenerConsumer .this .nackSleep = sleep ;
2952+ synchronized (ListenerConsumer .this ) {
2953+ if (ListenerConsumer .this .offsetsInThisBatch != null ) {
2954+ ListenerConsumer .this .offsetsInThisBatch .forEach ((part , recs ) -> recs .clear ());
2955+ ListenerConsumer .this .deferredOffsets .forEach ((part , recs ) -> recs .clear ());
2956+ }
2957+ }
29472958 }
29482959
29492960 @ Override
0 commit comments