1818import java .util .Set ;
1919import java .util .concurrent .ConcurrentHashMap ;
2020import java .util .concurrent .SynchronousQueue ;
21+ import java .util .concurrent .TimeUnit ;
2122import java .util .concurrent .atomic .AtomicInteger ;
2223import java .util .logging .Level ;
2324import java .util .logging .LogManager ;
2425import java .util .prefs .Preferences ;
2526
27+ import org .apache .kafka .clients .admin .AdminClient ;
28+ import org .apache .kafka .clients .admin .AdminClientConfig ;
29+ import org .apache .kafka .clients .admin .AlterConfigOp ;
30+ import org .apache .kafka .clients .admin .ConfigEntry ;
31+ import org .apache .kafka .clients .admin .ListTopicsResult ;
32+ import org .apache .kafka .clients .admin .NewTopic ;
2633import org .phoebus .applications .alarm .AlarmSystemConstants ;
2734import org .phoebus .applications .alarm .client .ClientState ;
2835import org .phoebus .applications .alarm .model .AlarmTreeItem ;
3542import org .phoebus .util .shell .CommandShell ;
3643
3744import com .fasterxml .jackson .databind .JsonNode ;
45+ import org .apache .kafka .common .config .ConfigResource ;
46+ import org .slf4j .Logger ;
47+ import org .slf4j .LoggerFactory ;
3848
3949/** Alarm Server
4050 * @author Kay Kasemir
4151 */
4252@ SuppressWarnings ("nls" )
4353public class AlarmServerMain implements ServerModelListener
4454{
55+ private static final Logger log = LoggerFactory .getLogger (AlarmServerMain .class );
4556 private final SynchronousQueue <Boolean > restart = new SynchronousQueue <>();
4657
4758 private volatile ServerModel model ;
@@ -72,6 +83,68 @@ public class AlarmServerMain implements ServerModelListener
7283 "\t restart - Re-load alarm configuration and restart.\n " +
7384 "\t shutdown - Shut alarm server down and exit.\n " ;
7485
86+ private static void ensureKafkaTopics (String server , String topic , String kafka_props_file ) throws Exception {
87+ try (AdminClient admin = AdminClient .create (Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , server ))) {
88+ Set <String > topics = admin .listTopics ().names ().get (60 , TimeUnit .SECONDS );
89+ // Compacted topic
90+ String compactedTopic = topic ;
91+ if (!topics .contains (compactedTopic )) {
92+ createTopic (admin , compactedTopic );
93+ }
94+ setCompactedConfig (admin , compactedTopic );
95+
96+ // Deleted topics
97+ for (String suffix : List .of ("Command" , "Talk" )) {
98+ String deletedTopic = topic + suffix ;
99+ if (!topics .contains (deletedTopic )) {
100+ createTopic (admin , deletedTopic );
101+ }
102+ setDeletedConfig (admin , deletedTopic );
103+ }
104+ }
105+ }
106+
107+ private static void createTopic (AdminClient admin , String topic ) throws Exception {
108+ NewTopic newTopic = new NewTopic (topic , 1 , (short ) 1 );
109+ try {
110+ admin .createTopics (List .of (newTopic )).all ().get ();
111+ logger .info ("Created topic: " + topic );
112+ } catch (Exception e ) {
113+ if (e .getCause () instanceof org .apache .kafka .common .errors .TopicExistsException ) {
114+ logger .info ("Topic already exists: " + topic );
115+ } else {
116+ throw e ;
117+ }
118+ }
119+ }
120+
121+ private static void setCompactedConfig (AdminClient admin , String topic ) throws Exception {
122+ ConfigResource resource = new ConfigResource (ConfigResource .Type .TOPIC , topic );
123+ List <AlterConfigOp > configOps = List .of (
124+ new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "compact" ), AlterConfigOp .OpType .SET ),
125+ new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
126+ new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
127+ new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET )
128+ );
129+ admin .incrementalAlterConfigs (Map .of (resource , configOps )).all ().get ();
130+ logger .info ("Set compacted config for topic: " + topic );
131+ }
132+
133+ private static void setDeletedConfig (AdminClient admin , String topic ) throws Exception {
134+ ConfigResource resource = new ConfigResource (ConfigResource .Type .TOPIC , topic );
135+ List <AlterConfigOp > configOps = List .of (
136+ new AlterConfigOp (new ConfigEntry ("cleanup.policy" , "delete" ), AlterConfigOp .OpType .SET ),
137+ new AlterConfigOp (new ConfigEntry ("segment.ms" , "10000" ), AlterConfigOp .OpType .SET ),
138+ new AlterConfigOp (new ConfigEntry ("min.cleanable.dirty.ratio" , "0.01" ), AlterConfigOp .OpType .SET ),
139+ new AlterConfigOp (new ConfigEntry ("min.compaction.lag.ms" , "1000" ), AlterConfigOp .OpType .SET ),
140+ new AlterConfigOp (new ConfigEntry ("retention.ms" , "20000" ), AlterConfigOp .OpType .SET ),
141+ new AlterConfigOp (new ConfigEntry ("delete.retention.ms" , "1000" ), AlterConfigOp .OpType .SET ),
142+ new AlterConfigOp (new ConfigEntry ("file.delete.delay.ms" , "1000" ), AlterConfigOp .OpType .SET )
143+ );
144+ admin .incrementalAlterConfigs (Map .of (resource , configOps )).all ().get ();
145+ logger .info ("Set deleted config for topic: " + topic );
146+ }
147+
75148 private AlarmServerMain (final String server , final String config , final boolean use_shell , final String kafka_props_file )
76149 {
77150 logger .info ("Server: " + server );
@@ -85,6 +158,10 @@ private AlarmServerMain(final String server, final String config, final boolean
85158 boolean run = true ;
86159 while (run )
87160 {
161+ logger .info ("Verify topics exists and are correctly configured..." );
162+ // Create/verify topics before using Kafka
163+ ensureKafkaTopics (server , config , kafka_props_file );
164+
88165 logger .info ("Fetching past alarm states..." );
89166 final AlarmStateInitializer init = new AlarmStateInitializer (server , config , kafka_props_file );
90167 if (init .awaitCompleteStates ())
0 commit comments