66import java .time .Instant ;
77import java .time .temporal .ChronoUnit ;
88import java .util .Properties ;
9+ import java .util .UUID ;
910import java .util .concurrent .CountDownLatch ;
10- import java .util .concurrent .TimeUnit ;
1111import java .util .logging .Level ;
1212import java .util .regex .Matcher ;
1313import java .util .regex .Pattern ;
1919import org .apache .kafka .streams .KeyValue ;
2020import org .apache .kafka .streams .StreamsBuilder ;
2121import org .apache .kafka .streams .StreamsConfig ;
22+ import org .apache .kafka .streams .kstream .Branched ;
2223import org .apache .kafka .streams .kstream .Consumed ;
2324import org .apache .kafka .streams .kstream .KStream ;
25+ import org .apache .kafka .streams .kstream .Named ;
2426import org .apache .kafka .streams .kstream .Transformer ;
2527import org .apache .kafka .streams .kstream .TransformerSupplier ;
2628import org .apache .kafka .streams .processor .ProcessorContext ;
2729import org .apache .kafka .streams .processor .TimestampExtractor ;
30+ import org .phoebus .applications .alarm .AlarmSystemConstants ;
2831import org .phoebus .applications .alarm .client .KafkaHelper ;
2932import org .phoebus .applications .alarm .messages .AlarmConfigMessage ;
3033import org .phoebus .applications .alarm .messages .AlarmMessage ;
@@ -71,14 +74,19 @@ public void run() {
7174 Properties kafkaProps = KafkaHelper .loadPropsFromFile (props .getProperty ("kafka_properties" ,"" ));
7275 kafkaProps .put (StreamsConfig .APPLICATION_ID_CONFIG , "streams-" +topic +"-alarm-messages" );
7376
74- if (props .containsKey (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG )){
75- kafkaProps .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG ,
76- props .get (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG ));
77- } else {
78- kafkaProps .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , "localhost:9092" );
79- }
80-
81-
77+ kafkaProps .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG ,
78+ props .getOrDefault (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , "localhost:9092" ));
79+
80+ // API requires for Consumer to be in a group.
81+ // Each alarm client must receive all updates,
82+ // cannot balance updates across a group
83+ // --> Use unique group for each client
84+ final String group_id = "Alarm-" + UUID .randomUUID ();
85+ kafkaProps .put ("group.id" , group_id );
86+
87+ AlarmSystemConstants .logger .fine (kafkaProps .getProperty ("group.id" ) + " subscribes to "
88+ + kafkaProps .get (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG ) + " for " + topic );
89+
8290 final String indexDateSpanUnits = props .getProperty ("date_span_units" );
8391 final boolean useDatedIndexNames = Boolean .parseBoolean (props .getProperty ("use_dated_index_names" ));
8492
@@ -112,14 +120,15 @@ public long extract(ConsumerRecord<Object, Object> record, long previousTimestam
112120 return new KeyValue <String , AlarmMessage >(key , value );
113121 });
114122
115- @ SuppressWarnings ("unchecked" )
116- KStream <String , AlarmMessage >[] alarmBranches = alarms .branch ((k ,v ) -> k .startsWith ("state" ),
117- (k ,v ) -> k .startsWith ("config" ),
118- (k ,v ) -> false
119- );
120-
121- processAlarmStateStream (alarmBranches [0 ], props );
122- processAlarmConfigurationStream (alarmBranches [1 ], props );
123+ alarms .split (Named .as ("alarm-" ))
124+ .branch ((k , v ) -> k .startsWith ("state" ),
125+ Branched .withConsumer (alarmStateStream -> processAlarmStateStream (alarmStateStream )))
126+ .branch ((k , v ) -> k .startsWith ("config" ),
127+ Branched .withConsumer (alarmConfigStream -> processAlarmConfigurationStream (alarmConfigStream )))
128+ .defaultBranch (Branched .withConsumer (stream -> {
129+ // No-op for unknown alarm message types
130+ logger .warning ("Unknown alarm message type for key: " + stream .toString ());
131+ }));
123132
124133 final KafkaStreams streams = new KafkaStreams (builder .build (), kafkaProps );
125134 final CountDownLatch latch = new CountDownLatch (1 );
@@ -143,7 +152,7 @@ public void run() {
143152 System .exit (0 );
144153 }
145154
146- private void processAlarmStateStream (KStream <String , AlarmMessage > alarmStateBranch , Properties props ) {
155+ private void processAlarmStateStream (KStream <String , AlarmMessage > alarmStateBranch ) {
147156
148157 KStream <String , AlarmStateMessage > transformedAlarms = alarmStateBranch
149158 .transform (new TransformerSupplier <String , AlarmMessage , KeyValue <String , AlarmStateMessage >>() {
@@ -193,7 +202,7 @@ public void close() {
193202
194203 }
195204
196- private void processAlarmConfigurationStream (KStream <String , AlarmMessage > alarmConfigBranch , Properties props ) {
205+ private void processAlarmConfigurationStream (KStream <String , AlarmMessage > alarmConfigBranch ) {
197206 KStream <String , AlarmConfigMessage > alarmConfigMessages = alarmConfigBranch .transform (new TransformerSupplier <String , AlarmMessage , KeyValue <String ,AlarmConfigMessage >>() {
198207
199208 @ Override
0 commit comments