@@ -80,6 +80,18 @@ public class AlarmServerMain implements ServerModelListener
8080 "\t restart - Re-load alarm configuration and restart.\n " +
8181 "\t shutdown - Shut alarm server down and exit.\n " ;
8282
83+ /**
84+ * Ensure that the required Kafka topics exist and are correctly configured.
85+ * <p>
86+ * Creates and configures the main alarm topic (compacted) and command/talk topics (deleted).
87+ * For more details on alarm topic configuration, see:
88+ * Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
89+ *
90+ * @param server Kafka server
91+ * @param topic Base topic name
92+ * @param kafka_props_file Extra Kafka properties file
93+ * @throws Exception
94+ */
8395 private static void ensureKafkaTopics (String server , String topic , String kafka_props_file ) throws Exception {
8496 try (AdminClient admin = AdminClient .create (Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , server ))) {
8597 Set <String > topics = admin .listTopics ().names ().get (60 , TimeUnit .SECONDS );
@@ -101,6 +113,13 @@ private static void ensureKafkaTopics(String server, String topic, String kafka_
101113 }
102114 }
103115
116+ /**
117+ * Create topics
118+ *
119+ * @param admin Admin client
120+ * @param topic Topic name
121+ * @throws Exception
122+ */
104123 private static void createTopic (AdminClient admin , String topic ) throws Exception {
105124 NewTopic newTopic = new NewTopic (topic , 1 , (short ) 1 );
106125 try {
@@ -115,28 +134,48 @@ private static void createTopic(AdminClient admin, String topic) throws Exceptio
115134 }
116135 }
117136
137+ /**
138+ * Configure topic for alarm state storage with compaction to retain latest state.
139+ * For configuration information, see:
140+ * <p>
141+ * Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
142+ *
143+ * @param admin Admin client
144+ * @param topic Topic name
145+ * @throws Exception
146+ */
118147 private static void setCompactedConfig (AdminClient admin , String topic ) throws Exception {
119148 ConfigResource resource = new ConfigResource (ConfigResource .Type .TOPIC , topic );
120149 List <AlterConfigOp > configOps = List .of (
121- new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "compact" ), AlterConfigOp .OpType .SET ),
122- new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
123- new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
124- new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET )
150+ new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "compact" ), AlterConfigOp .OpType .SET ),
151+ new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
152+ new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
153+ new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET )
125154 );
126155 admin .incrementalAlterConfigs (Map .of (resource , configOps )).all ().get ();
127156 logger .info ("Set compacted config for topic: " + topic );
128157 }
129158
159+ /**
160+ * Configure topic for command/talk messages with time-based deletion.
161+ * For configuration information, see:
162+ *
163+ * Refer to <a href="https://github.com/ControlSystemStudio/phoebus/tree/master/app/alarm#configure-alarm-topics">Configure Alarm Topics</a>
164+ *
165+ * @param admin Admin client
166+ * @param topic Topic name
167+ * @throws Exception
168+ */
130169 private static void setDeletedConfig (AdminClient admin , String topic ) throws Exception {
131170 ConfigResource resource = new ConfigResource (ConfigResource .Type .TOPIC , topic );
132171 List <AlterConfigOp > configOps = List .of (
133- new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "delete" ), AlterConfigOp .OpType .SET ),
134- new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
135- new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
136- new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET ),
137- new AlterConfigOp (new ConfigEntry ("retention.ms" , "20000" ), AlterConfigOp .OpType .SET ),
138- new AlterConfigOp (new ConfigEntry ("delete.retention.ms" , "1000" ), AlterConfigOp .OpType .SET ),
139- new AlterConfigOp (new ConfigEntry ("file.delete.delay.ms" , "1000" ), AlterConfigOp .OpType .SET )
172+ new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "delete" ), AlterConfigOp .OpType .SET ),
173+ new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
174+ new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
175+ new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET ),
176+ new AlterConfigOp (new ConfigEntry ("retention.ms" , "20000" ), AlterConfigOp .OpType .SET ),
177+ new AlterConfigOp (new ConfigEntry ("delete.retention.ms" , "1000" ), AlterConfigOp .OpType .SET ),
178+ new AlterConfigOp (new ConfigEntry ("file.delete.delay.ms" , "1000" ), AlterConfigOp .OpType .SET )
140179 );
141180 admin .incrementalAlterConfigs (Map .of (resource , configOps )).all ().get ();
142181 logger .info ("Set deleted config for topic: " + topic );
0 commit comments