2929import java .util .Set ;
3030import java .util .TreeSet ;
3131import java .util .concurrent .ConcurrentHashMap ;
32+ import java .util .concurrent .atomic .AtomicBoolean ;
3233import java .util .concurrent .atomic .AtomicInteger ;
3334import java .util .concurrent .locks .Lock ;
3435import java .util .concurrent .locks .ReentrantLock ;
@@ -114,13 +115,13 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
114115 */
115116 public static final String REMAINING_RECORDS = KafkaHeaders .PREFIX + "remainingRecords" ;
116117
117- private final Lock lock = new ReentrantLock ();
118-
119118 private final ConsumerFactory <K , V > consumerFactory ;
120119
121120 private final KafkaAckCallbackFactory <K , V > ackCallbackFactory ;
122121
123- private final Lock consumerMonitor = new ReentrantLock ();
122+ private final Lock receiveLock = new ReentrantLock ();
123+
124+ private final Lock consumerLock = new ReentrantLock ();
124125
125126 private final Map <TopicPartition , Set <KafkaAckInfo <K , V >>> inflightRecords = new ConcurrentHashMap <>();
126127
@@ -136,26 +137,26 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
136137
137138 private final Duration pollTimeout ;
138139
140+ private final AtomicBoolean running = new AtomicBoolean ();
141+
142+ private final AtomicBoolean pausing = new AtomicBoolean ();
143+
144+ private final AtomicBoolean paused = new AtomicBoolean ();
145+
146+ private final AtomicBoolean stopped = new AtomicBoolean ();
147+
139148 private RecordMessageConverter messageConverter = new MessagingMessageConverter ();
140149
141150 private Class <?> payloadType ;
142151
143152 private boolean rawMessageHeader ;
144153
145- private boolean running ;
146-
147154 private Duration closeTimeout = Duration .ofSeconds (DEFAULT_CLOSE_TIMEOUT );
148155
149156 private volatile Consumer <K , V > consumer ;
150157
151- private volatile boolean pausing ;
152-
153- private volatile boolean paused ;
154-
155158 private volatile Iterator <ConsumerRecord <K , V >> recordsIterator ;
156159
157- private volatile boolean stopped ;
158-
159160 public volatile boolean newAssignment ; // NOSONAR - direct access from inner
160161
161162 /**
@@ -391,104 +392,74 @@ private boolean maxPollStringGtr1(Object maxPoll) {
391392
392393 @ Override
393394 public boolean isRunning () {
394- this .lock .lock ();
395- try {
396- return this .running ;
397- }
398- finally {
399- this .lock .unlock ();
400- }
395+ return this .running .get ();
401396 }
402397
403398 @ Override
404399 public void start () {
405- this .lock .lock ();
406- try {
407- this .running = true ;
408- this .stopped = false ;
409- }
410- finally {
411- this .lock .unlock ();
400+ if (this .running .compareAndSet (false , true )) {
401+ this .stopped .set (false );
412402 }
413403 }
414404
415405 @ Override
416406 public void stop () {
417- this .lock .lock ();
418- try {
407+ if (this .running .compareAndSet (true , false )) {
419408 stopConsumer ();
420- this .running = false ;
421- this .stopped = true ;
422- }
423- finally {
424- this .lock .unlock ();
409+ this .stopped .set (true );
425410 }
426411 }
427412
428413 @ Override
429414 public void pause () {
430- this .lock .lock ();
431- try {
432- this .pausing = true ;
433- }
434- finally {
435- this .lock .unlock ();
436- }
415+ this .pausing .set (true );
437416 }
438417
439418 @ Override
440419 public void resume () {
441- this .lock .lock ();
442- try {
443- this .pausing = false ;
444- }
445- finally {
446- this .lock .unlock ();
447- }
420+ this .pausing .set (false );
448421 }
449422
450423 @ Override
451424 public boolean isPaused () {
452- return this .paused ;
425+ return this .paused . get () ;
453426 }
454427
455428 @ Override // NOSONAR - not so complex
456429 protected Object doReceive () {
457- this .lock .lock ();
430+ this .receiveLock .lock ();
458431 try {
459432
460- if (this .stopped ) {
433+ if (this .stopped . get () ) {
461434 this .logger .debug ("Message source is stopped; no records will be returned" );
462435 return null ;
463436 }
464437 if (this .consumer == null ) {
465438 createConsumer ();
466- this .running = true ;
467439 }
468- if (this .pausing && !this .paused && !this .assignedPartitions .isEmpty ()) {
440+ if (this .pausing . get () && !this .paused . get () && !this .assignedPartitions .isEmpty ()) {
469441 this .consumer .pause (this .assignedPartitions );
470- this .paused = true ;
442+ this .paused . set ( true ) ;
471443 }
472- else if (this .paused && !this .pausing ) {
444+ else if (this .paused . get () && !this .pausing . get () ) {
473445 this .consumer .resume (this .assignedPartitions );
474- this .paused = false ;
446+ this .paused . set ( false ) ;
475447 }
476- if (this .paused && this .recordsIterator == null ) {
448+ if (this .paused . get () && this .recordsIterator == null ) {
477449 this .logger .debug ("Consumer is paused; no records will be returned" );
478450 }
479- ConsumerRecord <K , V > record = pollRecord ();
480-
481- return record != null
482- ? recordToMessage (record )
483- : null ;
484451 }
485452 finally {
486- this .lock .unlock ();
453+ this .receiveLock .unlock ();
487454 }
455+
456+ ConsumerRecord <K , V > record = pollRecord ();
457+
458+ return record != null ? recordToMessage (record ) : null ;
488459 }
489460
490461 protected void createConsumer () {
491- this .consumerMonitor .lock ();
462+ this .consumerLock .lock ();
492463 try {
493464 this .consumer = this .consumerFactory .createConsumer (this .consumerProperties .getGroupId (),
494465 this .consumerProperties .getClientId (), null , this .consumerProperties .getKafkaConsumerProperties ());
@@ -510,7 +481,7 @@ else if (partitions != null) {
510481 }
511482 }
512483 finally {
513- this .consumerMonitor .unlock ();
484+ this .consumerLock .unlock ();
514485 }
515486 }
516487
@@ -568,7 +539,7 @@ private ConsumerRecord<K, V> pollRecord() {
568539 return nextRecord ();
569540 }
570541 else {
571- this .consumerMonitor .lock ();
542+ this .consumerLock .lock ();
572543 try {
573544 try {
574545 ConsumerRecords <K , V > records = this .consumer
@@ -593,7 +564,7 @@ private ConsumerRecord<K, V> pollRecord() {
593564 }
594565 }
595566 finally {
596- this .consumerMonitor .unlock ();
567+ this .consumerLock .unlock ();
597568 }
598569 }
599570 }
@@ -641,26 +612,27 @@ private Object recordToMessage(ConsumerRecord<K, V> record) {
641612
642613 @ Override
643614 public void destroy () {
644- this .lock .lock ();
615+ this .receiveLock .lock ();
645616 try {
646617 stopConsumer ();
647618 }
648619 finally {
649- this .lock .unlock ();
620+ this .receiveLock .unlock ();
650621 }
651622 }
652623
653624 private void stopConsumer () {
654- this .consumerMonitor .lock ();
625+ this .consumerLock .lock ();
655626 try {
656627 if (this .consumer != null ) {
628+ this .consumer .wakeup ();
657629 this .consumer .close (this .closeTimeout );
658630 this .consumer = null ;
659631 this .assignedPartitions .clear ();
660632 }
661633 }
662634 finally {
663- this .consumerMonitor .unlock ();
635+ this .consumerLock .unlock ();
664636 }
665637 }
666638
@@ -702,7 +674,7 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
702674 @ Override
703675 public void onPartitionsAssigned (Collection <TopicPartition > partitions ) {
704676 KafkaMessageSource .this .assignedPartitions .addAll (partitions );
705- if (KafkaMessageSource .this .paused ) {
677+ if (KafkaMessageSource .this .paused . get () ) {
706678 KafkaMessageSource .this .consumer .pause (KafkaMessageSource .this .assignedPartitions );
707679 KafkaMessageSource .this .logger .warn ("Paused consumer resumed by Kafka due to rebalance; "
708680 + "consumer paused again, so the initial poll() will never return any records" );
@@ -940,7 +912,7 @@ public class KafkaAckInfoImpl implements KafkaAckInfo<K, V> { // NOSONAR - no eq
940912
941913 @ Override
942914 public Object getConsumerMonitor () {
943- return KafkaMessageSource .this .consumerMonitor ;
915+ return KafkaMessageSource .this .consumerLock ;
944916 }
945917
946918 @ Override
0 commit comments