1818import java .util .Set ;
1919import java .util .concurrent .ConcurrentHashMap ;
2020import java .util .concurrent .SynchronousQueue ;
21+ import java .util .concurrent .TimeUnit ;
2122import java .util .concurrent .atomic .AtomicInteger ;
2223import java .util .logging .Level ;
2324import java .util .logging .LogManager ;
2425import java .util .prefs .Preferences ;
2526
27+ import org .apache .kafka .clients .admin .AdminClient ;
28+ import org .apache .kafka .clients .admin .AdminClientConfig ;
29+ import org .apache .kafka .clients .admin .AlterConfigOp ;
30+ import org .apache .kafka .clients .admin .ConfigEntry ;
31+ import org .apache .kafka .clients .admin .ListTopicsResult ;
32+ import org .apache .kafka .clients .admin .NewTopic ;
2633import org .phoebus .applications .alarm .AlarmSystemConstants ;
2734import org .phoebus .applications .alarm .client .ClientState ;
2835import org .phoebus .applications .alarm .model .AlarmTreeItem ;
3542import org .phoebus .util .shell .CommandShell ;
3643
3744import com .fasterxml .jackson .databind .JsonNode ;
45+ import org .apache .kafka .common .config .ConfigResource ;
3846
3947/** Alarm Server
4048 * @author Kay Kasemir
@@ -72,6 +80,107 @@ public class AlarmServerMain implements ServerModelListener
7280 "\t restart - Re-load alarm configuration and restart.\n " +
7381 "\t shutdown - Shut alarm server down and exit.\n " ;
7482
83+ /**
84+ * Ensure that the required Kafka topics exist and are correctly configured.
85+ * <p>
86+ * Creates and configures the main alarm topic (compacted) and command/talk topics (deleted).
87+ * For more details on alarm topic configuration, see:
88+ * Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
89+ *
90+ * @param server Kafka server
91+ * @param topic Base topic name
92+ * @param kafka_props_file Extra Kafka properties file
93+ * @throws Exception
94+ */
95+ private static void ensureKafkaTopics (String server , String topic , String kafka_props_file ) throws Exception {
96+ try (AdminClient admin = AdminClient .create (Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , server ))) {
97+ Set <String > topics = admin .listTopics ().names ().get (60 , TimeUnit .SECONDS );
98+ // Compacted topic
99+ String compactedTopic = topic ;
100+ if (!topics .contains (compactedTopic )) {
101+ createTopic (admin , compactedTopic );
102+ }
103+ setCompactedConfig (admin , compactedTopic );
104+
105+ // Deleted topics
106+ for (String suffix : List .of ("Command" , "Talk" )) {
107+ String deletedTopic = topic + suffix ;
108+ if (!topics .contains (deletedTopic )) {
109+ createTopic (admin , deletedTopic );
110+ }
111+ setDeletedConfig (admin , deletedTopic );
112+ }
113+ }
114+ }
115+
116+ /**
117+ * Create topics
118+ *
119+ * @param admin Admin client
120+ * @param topic Topic name
121+ * @throws Exception
122+ */
123+ private static void createTopic (AdminClient admin , String topic ) throws Exception {
124+ NewTopic newTopic = new NewTopic (topic , 1 , (short ) 1 );
125+ try {
126+ admin .createTopics (List .of (newTopic )).all ().get ();
127+ logger .info ("Created topic: " + topic );
128+ } catch (Exception e ) {
129+ if (e .getCause () instanceof org .apache .kafka .common .errors .TopicExistsException ) {
130+ logger .info ("Topic already exists: " + topic );
131+ } else {
132+ throw e ;
133+ }
134+ }
135+ }
136+
137+ /**
138+ * Configure topic for alarm state storage with compaction to retain latest state.
139+ * For configuration information, see:
140+ * <p>
141+ * Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
142+ *
143+ * @param admin Admin client
144+ * @param topic Topic name
145+ * @throws Exception
146+ */
147+ private static void setCompactedConfig (AdminClient admin , String topic ) throws Exception {
148+ ConfigResource resource = new ConfigResource (ConfigResource .Type .TOPIC , topic );
149+ List <AlterConfigOp > configOps = List .of (
150+ new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "compact" ), AlterConfigOp .OpType .SET ),
151+ new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
152+ new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
153+ new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET )
154+ );
155+ admin .incrementalAlterConfigs (Map .of (resource , configOps )).all ().get ();
156+ logger .info ("Set compacted config for topic: " + topic );
157+ }
158+
159+ /**
160+ * Configure topic for command/talk messages with time-based deletion.
161+ * For configuration information, see:
162+ *
163+ * Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
164+ *
165+ * @param admin Admin client
166+ * @param topic Topic name
167+ * @throws Exception
168+ */
169+ private static void setDeletedConfig (AdminClient admin , String topic ) throws Exception {
170+ ConfigResource resource = new ConfigResource (ConfigResource .Type .TOPIC , topic );
171+ List <AlterConfigOp > configOps = List .of (
172+ new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "delete" ), AlterConfigOp .OpType .SET ),
173+ new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
174+ new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
175+ new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET ),
176+ new AlterConfigOp (new ConfigEntry ("retention.ms" , "20000" ), AlterConfigOp .OpType .SET ),
177+ new AlterConfigOp (new ConfigEntry ("delete.retention.ms" , "1000" ), AlterConfigOp .OpType .SET ),
178+ new AlterConfigOp (new ConfigEntry ("file.delete.delay.ms" , "1000" ), AlterConfigOp .OpType .SET )
179+ );
180+ admin .incrementalAlterConfigs (Map .of (resource , configOps )).all ().get ();
181+ logger .info ("Set deleted config for topic: " + topic );
182+ }
183+
75184 private AlarmServerMain (final String server , final String config , final boolean use_shell , final String kafka_props_file )
76185 {
77186 logger .info ("Server: " + server );
@@ -85,6 +194,10 @@ private AlarmServerMain(final String server, final String config, final boolean
85194 boolean run = true ;
86195 while (run )
87196 {
197+ logger .info ("Verify topics exists and are correctly configured..." );
198+ // Create/verify topics before using Kafka
199+ ensureKafkaTopics (server , config , kafka_props_file );
200+
88201 logger .info ("Fetching past alarm states..." );
89202 final AlarmStateInitializer init = new AlarmStateInitializer (server , config , kafka_props_file );
90203 if (init .awaitCompleteStates ())
0 commit comments