11/*
2- * Copyright 2018-2022 the original author or authors.
2+ * Copyright 2018-2023 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
3030import java .util .TreeSet ;
3131import java .util .concurrent .ConcurrentHashMap ;
3232import java .util .concurrent .atomic .AtomicInteger ;
33+ import java .util .concurrent .locks .Lock ;
34+ import java .util .concurrent .locks .ReentrantLock ;
3335import java .util .regex .Pattern ;
3436import java .util .stream .Collectors ;
3537
9597 * @author Mark Norkin
9698 * @author Artem Bilan
9799 * @author Anshul Mehra
100+ * @author Christian Tzolov
98101 *
99102 * @since 5.4
100103 *
@@ -111,11 +114,13 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
111114 */
112115 public static final String REMAINING_RECORDS = KafkaHeaders .PREFIX + "remainingRecords" ;
113116
117+ private final Lock lock = new ReentrantLock ();
118+
114119 private final ConsumerFactory <K , V > consumerFactory ;
115120
116121 private final KafkaAckCallbackFactory <K , V > ackCallbackFactory ;
117122
118- private final Object consumerMonitor = new Object ();
123+ private final Lock consumerMonitor = new ReentrantLock ();
119124
120125 private final Map <TopicPartition , Set <KafkaAckInfo <K , V >>> inflightRecords = new ConcurrentHashMap <>();
121126
@@ -385,31 +390,61 @@ private boolean maxPollStringGtr1(Object maxPoll) {
385390 }
386391
387392 @ Override
388- public synchronized boolean isRunning () {
389- return this .running ;
393+ public boolean isRunning () {
394+ this .lock .lock ();
395+ try {
396+ return this .running ;
397+ }
398+ finally {
399+ this .lock .unlock ();
400+ }
390401 }
391402
392403 @ Override
393- public synchronized void start () {
394- this .running = true ;
395- this .stopped = false ;
404+ public void start () {
405+ this .lock .lock ();
406+ try {
407+ this .running = true ;
408+ this .stopped = false ;
409+ }
410+ finally {
411+ this .lock .unlock ();
412+ }
396413 }
397414
398415 @ Override
399- public synchronized void stop () {
400- stopConsumer ();
401- this .running = false ;
402- this .stopped = true ;
416+ public void stop () {
417+ this .lock .lock ();
418+ try {
419+ stopConsumer ();
420+ this .running = false ;
421+ this .stopped = true ;
422+ }
423+ finally {
424+ this .lock .unlock ();
425+ }
403426 }
404427
405428 @ Override
406- public synchronized void pause () {
407- this .pausing = true ;
429+ public void pause () {
430+ this .lock .lock ();
431+ try {
432+ this .pausing = true ;
433+ }
434+ finally {
435+ this .lock .unlock ();
436+ }
408437 }
409438
410439 @ Override
411- public synchronized void resume () {
412- this .pausing = false ;
440+ public void resume () {
441+ this .lock .lock ();
442+ try {
443+ this .pausing = false ;
444+ }
445+ finally {
446+ this .lock .unlock ();
447+ }
413448 }
414449
415450 @ Override
@@ -418,35 +453,43 @@ public boolean isPaused() {
418453 }
419454
420455 @ Override // NOSONAR - not so complex
421- protected synchronized Object doReceive () {
422- if (this .stopped ) {
423- this .logger .debug ("Message source is stopped; no records will be returned" );
424- return null ;
425- }
426- if (this .consumer == null ) {
427- createConsumer ();
428- this .running = true ;
429- }
430- if (this .pausing && !this .paused && this .assignedPartitions .size () > 0 ) {
431- this .consumer .pause (this .assignedPartitions );
432- this .paused = true ;
433- }
434- else if (this .paused && !this .pausing ) {
435- this .consumer .resume (this .assignedPartitions );
436- this .paused = false ;
456+ protected Object doReceive () {
457+ this .lock .lock ();
458+ try {
459+
460+ if (this .stopped ) {
461+ this .logger .debug ("Message source is stopped; no records will be returned" );
462+ return null ;
463+ }
464+ if (this .consumer == null ) {
465+ createConsumer ();
466+ this .running = true ;
467+ }
468+ if (this .pausing && !this .paused && !this .assignedPartitions .isEmpty ()) {
469+ this .consumer .pause (this .assignedPartitions );
470+ this .paused = true ;
471+ }
472+ else if (this .paused && !this .pausing ) {
473+ this .consumer .resume (this .assignedPartitions );
474+ this .paused = false ;
475+ }
476+ if (this .paused && this .recordsIterator == null ) {
477+ this .logger .debug ("Consumer is paused; no records will be returned" );
478+ }
479+ ConsumerRecord <K , V > record = pollRecord ();
480+
481+ return record != null
482+ ? recordToMessage (record )
483+ : null ;
437484 }
438- if ( this . paused && this . recordsIterator == null ) {
439- this .logger . debug ( "Consumer is paused; no records will be returned" );
485+ finally {
486+ this .lock . unlock ( );
440487 }
441- ConsumerRecord <K , V > record = pollRecord ();
442-
443- return record != null
444- ? recordToMessage (record )
445- : null ;
446488 }
447489
448490 protected void createConsumer () {
449- synchronized (this .consumerMonitor ) {
491+ this .consumerMonitor .lock ();
492+ try {
450493 this .consumer = this .consumerFactory .createConsumer (this .consumerProperties .getGroupId (),
451494 this .consumerProperties .getClientId (), null , this .consumerProperties .getKafkaConsumerProperties ());
452495
@@ -466,6 +509,9 @@ else if (partitions != null) {
466509 rebalanceCallback );
467510 }
468511 }
512+ finally {
513+ this .consumerMonitor .unlock ();
514+ }
469515 }
470516
471517 private void assignAndSeekPartitions (TopicPartitionOffset [] partitions ) {
@@ -522,7 +568,8 @@ private ConsumerRecord<K, V> pollRecord() {
522568 return nextRecord ();
523569 }
524570 else {
525- synchronized (this .consumerMonitor ) {
571+ this .consumerMonitor .lock ();
572+ try {
526573 try {
527574 ConsumerRecords <K , V > records = this .consumer
528575 .poll (this .assignedPartitions .isEmpty () ? this .assignTimeout : this .pollTimeout );
@@ -545,6 +592,9 @@ private ConsumerRecord<K, V> pollRecord() {
545592 return null ;
546593 }
547594 }
595+ finally {
596+ this .consumerMonitor .unlock ();
597+ }
548598 }
549599 }
550600
@@ -590,18 +640,28 @@ private Object recordToMessage(ConsumerRecord<K, V> record) {
590640 }
591641
592642 @ Override
593- public synchronized void destroy () {
594- stopConsumer ();
643+ public void destroy () {
644+ this .lock .lock ();
645+ try {
646+ stopConsumer ();
647+ }
648+ finally {
649+ this .lock .unlock ();
650+ }
595651 }
596652
597653 private void stopConsumer () {
598- synchronized (this .consumerMonitor ) {
654+ this .consumerMonitor .lock ();
655+ try {
599656 if (this .consumer != null ) {
600657 this .consumer .close (this .closeTimeout );
601658 this .consumer = null ;
602659 this .assignedPartitions .clear ();
603660 }
604661 }
662+ finally {
663+ this .consumerMonitor .unlock ();
664+ }
605665 }
606666
607667 private class IntegrationConsumerRebalanceListener implements ConsumerRebalanceListener {
0 commit comments