77 *******************************************************************************/
88package org .phoebus .applications .alarm .talk ;
99
10+ import static java .lang .Thread .sleep ;
1011import static org .phoebus .applications .alarm .AlarmSystem .logger ;
12+ import static org .phoebus .applications .alarm .AlarmSystem .nag_period_ms ;
1113
14+ import java .time .Duration ;
15+ import java .time .Instant ;
1216import java .util .Collections ;
1317import java .util .List ;
1418import java .util .Objects ;
19+ import java .util .Optional ;
1520import java .util .concurrent .CopyOnWriteArrayList ;
1621import java .util .concurrent .atomic .AtomicBoolean ;
22+ import java .util .concurrent .atomic .AtomicReference ;
1723import java .util .logging .Level ;
1824
1925import org .apache .kafka .clients .consumer .Consumer ;
@@ -41,7 +47,10 @@ public class TalkClient
4147 private final CopyOnWriteArrayList <TalkClientListener > listeners = new CopyOnWriteArrayList <>();
4248 private final AtomicBoolean running = new AtomicBoolean (true );
4349 private final Consumer <String , String > consumer ;
50+ private final Consumer <String , String > heartbeatConsumer ;
4451 private final Thread thread ;
52+ private final Thread updateHeartbeatTimestampThread ;
53+ private final Thread annunciateDisconnectionThread ;
4554
4655 /** @param server - Kafka Server host:port
4756 * @param config_name - Name of kafka config topic that the talk topic accompanies.
@@ -56,6 +65,16 @@ public TalkClient(final String server, final String config_name)
5665
5766 thread = new Thread (this ::run , "TalkClient" );
5867 thread .setDaemon (true );
68+
69+ {
70+ heartbeatConsumer = KafkaHelper .connectConsumer (server , List .of (config_name ), Collections .emptyList (), AlarmSystem .kafka_properties );
71+ updateHeartbeatTimestampThread = new Thread (() -> updateHeartbeatTimestampLoop (), "UpdateHeartbeatTimestampThread" );
72+ updateHeartbeatTimestampThread .setDaemon (false );
73+ updateHeartbeatTimestampThread .start ();
74+ annunciateDisconnectionThread = new Thread (() -> annunciateDisconnectionLoop (), "AnnunciateDisconnectionThread" );
75+ annunciateDisconnectionThread .setDaemon (false );
76+ annunciateDisconnectionThread .start ();
77+ }
5978 }
6079
6180 /** @param listener - Listener to add */
@@ -144,11 +163,62 @@ private void checkUpdates()
144163 }
145164 }
146165
166+ private Optional <Instant > nextDisconnectedAnnunciation = Optional .empty (); // When alarm server is disconnected: point in time for next annunciation of disconnection.
167+ private final Duration disconnectionAnnunciationPeriod = Duration .ofMillis (AlarmSystem .nag_period_ms );
168+ private final Duration idleTimeoutDuration = Duration .ofMillis (AlarmSystem .idle_timeout_ms ).multipliedBy (3 );
169+ private AtomicReference <Instant > lastReceivedUpdateFromAlarmServer = new AtomicReference <>(Instant .now ());
170+
171+ private void updateHeartbeatTimestampLoop () {
172+ while (running .get ()) {
173+ final ConsumerRecords <String , String > records = heartbeatConsumer .poll (100 );
174+ if (!records .isEmpty ()) {
175+ lastReceivedUpdateFromAlarmServer .set (Instant .now ());
176+ }
177+ try {
178+ sleep (1000 );
179+ } catch (InterruptedException e ) {
180+ logger .log (Level .WARNING , "updateHeartbeatTimestampLoop() was interrupted when sleeping." );
181+ }
182+ }
183+ }
184+
185+ /** Background thread loop that detects and annunciates disconnections. */
186+ private void annunciateDisconnectionLoop () {
187+ while (running .get ()) {
188+ Instant now = Instant .now ();
189+ if (Duration .between (lastReceivedUpdateFromAlarmServer .get (), now ).compareTo (idleTimeoutDuration ) > 0 ) {
190+ if (nextDisconnectedAnnunciation .isEmpty () || nextDisconnectedAnnunciation .get ().isBefore (now )) {
191+ try {
192+ for (final TalkClientListener listener : listeners ) {
193+ listener .messageReceived (SeverityLevel .UNDEFINED , true , "Alarm Server Disconnected" );
194+ }
195+ } catch (final Exception ex ) {
196+ logger .log (Level .WARNING , "Talk error for " + SeverityLevel .UNDEFINED + ", " + "Alarm Server Disconnected" , ex );
197+ }
198+ if (nag_period_ms > 0 ) {
199+ nextDisconnectedAnnunciation = Optional .of (now .plus (disconnectionAnnunciationPeriod )); // Annunciate disconnect again after nag period_ms
200+ }
201+ else {
202+ nextDisconnectedAnnunciation = Optional .of (Instant .MAX ); // When nag_period_ms == 0, don't annunciate the disconnection again.
203+ }
204+ }
205+ } else {
206+ nextDisconnectedAnnunciation = Optional .empty (); // Connection to the Alarm Server exists.
207+ }
208+ try {
209+ sleep (1000 );
210+ } catch (InterruptedException e ) {
211+ logger .log (Level .WARNING , "annunciateDisconnectionLoop() was interrupted when sleeping." );
212+ }
213+ }
214+ }
215+
147216 /** Stop client */
148217 public void shutdown ()
149218 {
150219 running .set (false );
151220 consumer .wakeup ();
221+ heartbeatConsumer .wakeup ();
152222 try
153223 {
154224 thread .join (2000 );
@@ -157,6 +227,22 @@ public void shutdown()
157227 {
158228 logger .log (Level .WARNING , "Talk client thread doesn't shut down" , ex );
159229 }
230+ try
231+ {
232+ annunciateDisconnectionThread .join (2000 );
233+ }
234+ catch (final InterruptedException ex )
235+ {
236+ logger .log (Level .WARNING , "Annunciate Disconnection from Alarm Server thread doesn't shut down" , ex );
237+ }
238+ try
239+ {
240+ updateHeartbeatTimestampThread .join (2000 );
241+ }
242+ catch (final InterruptedException ex )
243+ {
244+ logger .log (Level .WARNING , "Update Alarm Server Heartbeat thread doesn't shut down" , ex );
245+ }
160246 logger .info (thread .getName () + " shut down" );
161247 }
162248}
0 commit comments