3737import java .util .Map ;
3838import java .util .HashMap ;
3939import java .util .concurrent .TimeoutException ;
40+ import java .util .concurrent .ScheduledExecutorService ;
41+ import java .util .concurrent .Executors ;
42+ import java .util .concurrent .TimeUnit ;
4043
4144import com .rabbitmq .client .AMQP ;
4245import com .rabbitmq .client .Address ;
@@ -89,7 +92,7 @@ public static Map<String, Object> defaultClientProperties() {
8992 new Version (AMQP .PROTOCOL .MAJOR , AMQP .PROTOCOL .MINOR );
9093
9194 /** Initialization parameters */
92- private final ConnectionFactory factory ;
95+ private final ConnectionFactory _factory ;
9396
9497 /** The special channel 0 */
9598 private final AMQChannel _channel0 = new AMQChannel (this , 0 ) {
@@ -122,6 +125,9 @@ public static Map<String, Object> defaultClientProperties() {
122125 /** Flag indicating whether the client received Connection.Close message from the broker */
123126 private boolean _brokerInitiatedShutdown = false ;
124127
128+ /** Manages heartbeat sending for this connection */
129+ private final HeartbeatSender _heartbeatSender ;
130+
125131 /**
126132 * Protected API - respond, in the driver thread, to a ShutdownSignal.
127133 * @param channel the channel to disconnect
@@ -138,12 +144,6 @@ public void ensureIsOpen()
138144 }
139145 }
140146
141- /**
142- * Timestamp of last time we wrote a frame - used for deciding when to
143- * send a heartbeat
144- */
145- private volatile long _lastActivityTime = Long .MAX_VALUE ;
146-
147147 /**
148148 * Count of socket-timeouts that have happened without any incoming frames
149149 */
@@ -210,7 +210,8 @@ public AMQConnection(ConnectionFactory factory,
210210 _requestedHeartbeat = factory .getRequestedHeartbeat ();
211211 _clientProperties = new HashMap <String , Object >(factory .getClientProperties ());
212212
213- this .factory = factory ;
213+ _factory = factory ;
214+ _heartbeatSender = new HeartbeatSender (frameHandler );
214215 _frameHandler = frameHandler ;
215216 _running = true ;
216217 _frameMax = 0 ;
@@ -288,17 +289,17 @@ public void start()
288289 }
289290
290291 int channelMax =
291- negotiatedMaxValue (factory .getRequestedChannelMax (),
292+ negotiatedMaxValue (_factory .getRequestedChannelMax (),
292293 connTune .getChannelMax ());
293294 _channelManager = new ChannelManager (channelMax );
294295
295296 int frameMax =
296- negotiatedMaxValue (factory .getRequestedFrameMax (),
297+ negotiatedMaxValue (_factory .getRequestedFrameMax (),
297298 connTune .getFrameMax ());
298299 setFrameMax (frameMax );
299300
300301 int heartbeat =
301- negotiatedMaxValue (factory .getRequestedHeartbeat (),
302+ negotiatedMaxValue (_factory .getRequestedHeartbeat (),
302303 connTune .getHeartbeat ());
303304 setHeartbeat (heartbeat );
304305
@@ -349,10 +350,12 @@ public int getHeartbeat() {
349350 */
350351 public void setHeartbeat (int heartbeat ) {
351352 try {
353+ _heartbeatSender .setHeartbeat (heartbeat );
354+ _heartbeat = heartbeat ;
355+
352356 // Divide by four to make the maximum unwanted delay in
353357 // sending a timeout be less than a quarter of the
354358 // timeout setting.
355- _heartbeat = heartbeat ;
356359 _frameHandler .setTimeout (heartbeat * 1000 / 4 );
357360 } catch (SocketException se ) {
358361 // should do more here?
@@ -395,7 +398,7 @@ public Frame readFrame() throws IOException {
395398 */
396399 public void writeFrame (Frame f ) throws IOException {
397400 _frameHandler .writeFrame (f );
398- _lastActivityTime = System . nanoTime ();
401+ _heartbeatSender . signalActivity ();
399402 }
400403
401404 private static int negotiatedMaxValue (int clientValue , int serverValue ) {
@@ -416,7 +419,7 @@ private class MainLoop extends Thread {
416419 try {
417420 while (_running ) {
418421 Frame frame = readFrame ();
419- maybeSendHeartbeat ();
422+
420423 if (frame != null ) {
421424 _missedHeartbeats = 0 ;
422425 if (frame .type == AMQP .FRAME_HEARTBEAT ) {
@@ -459,25 +462,6 @@ private class MainLoop extends Thread {
459462 }
460463 }
461464
462- private static final long NANOS_IN_SECOND = 1000 * 1000 * 1000 ;
463-
464- /**
465- * Private API - Checks lastActivityTime and heartbeat, sending a
466- * heartbeat frame if conditions are right.
467- */
468- public void maybeSendHeartbeat () throws IOException {
469- if (_heartbeat == 0 ) {
470- // No heartbeating.
471- return ;
472- }
473-
474- long now = System .nanoTime ();
475- if (now > (_lastActivityTime + (_heartbeat * NANOS_IN_SECOND ))) {
476- _lastActivityTime = now ;
477- writeFrame (new Frame (AMQP .FRAME_HEARTBEAT , 0 ));
478- }
479- }
480-
481465 /**
482466 * Private API - Called when a frame-read operation times out. Checks to
483467 * see if too many heartbeats have been missed, and if so, throws
@@ -557,7 +541,7 @@ public void handleConnectionClose(Command closeCommand) {
557541 } catch (IOException ioe ) {
558542 Utility .emptyStatement ();
559543 }
560- _heartbeat = 0 ; // Do not try to send heartbeats after CloseOk
544+ _heartbeatSender . shutdown () ; // Do not try to send heartbeats after CloseOk
561545 _brokerInitiatedShutdown = true ;
562546 Thread scw = new SocketCloseWait (sse );
563547 scw .setName ("AMQP Connection Closing Monitor " +
@@ -607,6 +591,10 @@ public ShutdownSignalException shutdown(Object reason,
607591 if (isOpen ())
608592 _shutdownCause = sse ;
609593 }
594+
595+ // stop any heartbeating
596+ _heartbeatSender .shutdown ();
597+
610598 _channel0 .processShutdownSignal (sse , !initiatedByApplication , notifyRpc );
611599 _channelManager .handleSignal (sse );
612600 return sse ;
0 commit comments