2020import com .rabbitmq .client .DefaultConsumer ;
2121import com .rabbitmq .client .Envelope ;
2222import com .rabbitmq .client .ShutdownSignalException ;
23+ import org .slf4j .Logger ;
24+ import org .slf4j .LoggerFactory ;
2325
2426import java .io .ByteArrayInputStream ;
2527import java .io .DataInputStream ;
3436
3537public class Consumer extends AgentBase implements Runnable {
3638
37- private ConsumerImpl q ;
39+ private static final Logger LOGGER = LoggerFactory .getLogger (Consumer .class );
40+
41+ private volatile ConsumerImpl q ;
3842 private final Channel channel ;
3943 private final String id ;
4044 private final List <String > queueNames ;
@@ -52,12 +56,15 @@ public class Consumer extends AgentBase implements Runnable {
5256
5357 private final ConsumerState state ;
5458
59+ private final Recovery .RecoveryProcess recoveryProcess ;
60+
5561 public Consumer (Channel channel , String id ,
5662 List <String > queueNames , int txSize , boolean autoAck ,
5763 int multiAckEvery , Stats stats , float rateLimit , int msgLimit ,
5864 int consumerLatencyInMicroSeconds ,
5965 TimestampProvider timestampProvider ,
60- MulticastSet .CompletionHandler completionHandler ) {
66+ MulticastSet .CompletionHandler completionHandler ,
67+ Recovery .RecoveryProcess recoveryProcess ) {
6168
6269 this .channel = channel ;
6370 this .id = id ;
@@ -97,6 +104,8 @@ public Consumer(Channel channel, String id,
97104 }
98105
99106 this .state = new ConsumerState (rateLimit );
107+ this .recoveryProcess = recoveryProcess ;
108+ this .recoveryProcess .init (this );
100109 }
101110
102111 public void run () {
@@ -124,21 +133,22 @@ private ConsumerImpl(Channel channel) {
124133 @ Override
125134 public void handleDelivery (String consumerTag , Envelope envelope , BasicProperties properties , byte [] body ) throws IOException {
126135 int currentMessageCount = state .incrementMessageCount ();
127-
128136 if (msgLimit == 0 || currentMessageCount <= msgLimit ) {
129137 long messageTimestamp = timestampExtractor .apply (properties , body );
130138 long nowTimestamp = timestampProvider .getCurrentTime ();
131139
132140 if (!autoAck ) {
133- if (multiAckEvery == 0 ) {
134- channel .basicAck (envelope .getDeliveryTag (), false );
135- } else if (currentMessageCount % multiAckEvery == 0 ) {
136- channel .basicAck (envelope .getDeliveryTag (), true );
137- }
141+ dealWithWriteOperation (() -> {
142+ if (multiAckEvery == 0 ) {
143+ channel .basicAck (envelope .getDeliveryTag (), false );
144+ } else if (currentMessageCount % multiAckEvery == 0 ) {
145+ channel .basicAck (envelope .getDeliveryTag (), true );
146+ }
147+ }, recoveryProcess );
138148 }
139149
140150 if (txSize != 0 && currentMessageCount % txSize == 0 ) {
141- channel .txCommit ();
151+ dealWithWriteOperation (() -> channel .txCommit (), recoveryProcess );
142152 }
143153
144154 long diff_time = timestampProvider .getDifference (nowTimestamp , messageTimestamp );
@@ -157,7 +167,14 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
157167
158168 @ Override
159169 public void handleShutdownSignal (String consumerTag , ShutdownSignalException sig ) {
160- countDown ();
170+ LOGGER .debug (
171+ "Consumer received shutdown signal, recovery process enabled? {}, condition to trigger connection recovery? {}" ,
172+ recoveryProcess .isEnabled (), isConnectionRecoveryTriggered (sig )
173+ );
174+ if (!recoveryProcess .isEnabled ()) {
175+ LOGGER .debug ("Counting down for consumer" );
176+ countDown ();
177+ }
161178 }
162179
163180 @ Override
@@ -179,6 +196,21 @@ private void countDown() {
179196 }
180197 }
181198
199+ @ Override
200+ public void recover (TopologyRecording topologyRecording ) {
201+ for (Map .Entry <String , String > entry : consumerTagBranchMap .entrySet ()) {
202+ TopologyRecording .RecordedQueue queue = topologyRecording .queue (entry .getValue ());
203+ try {
204+ channel .basicConsume (queue .name (), autoAck , entry .getKey (), q );
205+ } catch (IOException e ) {
206+ LOGGER .warn (
207+ "Error while recovering consumer {} on queue {} on connection {}" ,
208+ entry .getKey (), queue .name (), channel .getConnection ().getClientProvidedName (), e
209+ );
210+ }
211+ }
212+ }
213+
182214 private static class ConsumerState implements AgentState {
183215
184216 private final float rateLimit ;
0 commit comments