diff --git a/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java b/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java index 5771419a6d..997205e1a7 100644 --- a/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java +++ b/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java @@ -34,7 +34,7 @@ private static synchronized void initializeAlarmClient(String config) if (!alarmModels.containsKey(config)) { logger.log(Level.CONFIG, "Creating a alarm client for config : + " + config + " in the alarm datasource"); - AlarmClient model = new AlarmClient(AlarmSystem.server, config); + AlarmClient model = new AlarmClient(AlarmSystem.server, config, AlarmSystem.kafka_properties); model.addListener(new AlarmClientDatasourceListener(config)); model.start(); alarmModels.put(config, model); diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java index 5b5efcf79a..9d1d47e8e1 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java @@ -63,6 +63,9 @@ public class AlarmSystem /** Kafka Server host:port */ @Preference public static String server; + /** Kafka settings file */ + @Preference public static String kafka_properties; + /** Name of alarm tree root * *

Default name from preferences. diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java index 6464815b43..f8f51459f4 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java @@ -94,8 +94,9 @@ public class AlarmClient /** @param server Kafka Server host:port * @param config_name Name of alarm tree root + * @param kafka_properties_file File to load additional kafka properties from */ - public AlarmClient(final String server, final String config_name) + public AlarmClient(final String server, final String config_name, final String kafka_properties_file) { Objects.requireNonNull(server); Objects.requireNonNull(config_name); @@ -105,8 +106,8 @@ public AlarmClient(final String server, final String config_name) root = new AlarmClientNode(null, config_name); final List topics = List.of(config_topic); - consumer = KafkaHelper.connectConsumer(server, topics, topics); - producer = KafkaHelper.connectProducer(server); + consumer = KafkaHelper.connectConsumer(server, topics, topics, kafka_properties_file); + producer = KafkaHelper.connectProducer(server, kafka_properties_file); thread = new Thread(this::run, "AlarmClientModel " + config_name); thread.setDaemon(true); diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java index c0fa1c5f72..d3dcf4362c 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java @@ -9,10 +9,13 @@ import static org.phoebus.applications.alarm.AlarmSystem.logger; +import java.io.FileInputStream; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.logging.Level; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -51,24 +54,28 @@ public class KafkaHelper * @param kafka_servers Servers to read * @param topics Topics to which to subscribe * @param from_beginning Topics to read from the beginning + * @param properties_file File name to load additional settings for the kafka consumer * @return {@link Consumer} */ - public static Consumer connectConsumer(final String kafka_servers, final List topics, final List from_beginning) + public static Consumer connectConsumer(final String kafka_servers, final List topics, final List from_beginning, final String properties_file) { - final Properties props = new Properties(); - props.put("bootstrap.servers", kafka_servers); - // API requires for Consumer to be in a group. - // Each alarm client must receive all updates, - // cannot balance updates across a group - // --> Use unique group for each client - final String group_id = "Alarm-" + UUID.randomUUID(); - props.put("group.id", group_id); + Properties kafka_props = loadPropsFromFile(properties_file); + kafka_props.put("bootstrap.servers", kafka_servers); - logger.fine(group_id + " subscribes to " + kafka_servers + " for " + topics); + if (!kafka_props.containsKey("group.id")){ + // API requires for Consumer to be in a group. + // Each alarm client must receive all updates, + // cannot balance updates across a group + // --> Use unique group for each client + final String group_id = "Alarm-" + UUID.randomUUID(); + kafka_props.put("group.id", group_id); + } + + logger.fine(kafka_props.getProperty("group.id") + " subscribes to " + kafka_servers + " for " + topics); // Read key, value as string final Deserializer deserializer = new StringDeserializer(); - final Consumer consumer = new KafkaConsumer<>(props, deserializer, deserializer); + final Consumer consumer = new KafkaConsumer<>(kafka_props, deserializer, deserializer); // Rewind whenever assigned to partition final ConsumerRebalanceListener crl = new ConsumerRebalanceListener() @@ -101,19 +108,20 @@ public void onPartitionsRevoked(final Collection parts) /** Create producer for alarm information * @param kafka_servers Kafka servers + * @param properties_file File name to load additional settings for the kafka producer * @return {@link Producer} */ - public static Producer connectProducer(final String kafka_servers) + public static Producer connectProducer(final String kafka_servers, final String properties_file) { - final Properties props = new Properties(); - props.put("bootstrap.servers", kafka_servers); + Properties kafka_props = loadPropsFromFile(properties_file); + kafka_props.put("bootstrap.servers", kafka_servers); // Collect messages for 20ms until sending them out as a batch - props.put("linger.ms", 20); + kafka_props.put("linger.ms", 20); // Write String key, value final Serializer serializer = new StringSerializer(); - final Producer producer = new KafkaProducer<>(props, serializer, serializer); + final Producer producer = new KafkaProducer<>(kafka_props, serializer, serializer); return producer; } @@ -123,22 +131,43 @@ public static Producer connectProducer(final String kafka_server * @param kafka_servers - Sever to connect to. * @param topics List of topics to aggregate. * @param aggregate_topic - Name of topic to aggregate to. + * @param kafka_props File name to load additional settings for the kafka stream * @return aggregate_stream - KafkaStreams * @author Evan Smith */ - public static KafkaStreams aggregateTopics(String kafka_servers, List topics, String aggregate_topic) + public static KafkaStreams aggregateTopics(String kafka_servers, List topics, String aggregate_topic, final String properties_file) { - final Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Stream-To-Long-Term"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_servers); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + Properties kafka_props = loadPropsFromFile(properties_file); + kafka_props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Stream-To-Long-Term"); + kafka_props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_servers); + kafka_props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + kafka_props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); // Aggregate the topics by mapping the topic key value pairs one to one into the aggregate topic. builder.stream(topics).mapValues(pair -> pair).to(aggregate_topic); - return new KafkaStreams(builder.build(), props); + return new KafkaStreams(builder.build(), kafka_props); + } + + + /** + * Load properties from the given file path. Path may be blank or null + * resulting in a properties object without entries. + * @param filePath Full path to properties file + * @return properties - the properties loaded from file + */ + static public Properties loadPropsFromFile(String filePath) { + logger.fine("loading file from path: " + filePath); + Properties properties = new Properties(); + if(filePath != null && !filePath.isBlank()){ + try(FileInputStream file = new FileInputStream(filePath);){ + properties.load(file); + } catch(IOException e) { + logger.log(Level.SEVERE, "failed to load kafka properties", e); + } + } + return properties; } } diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/talk/TalkClient.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/talk/TalkClient.java index cfa1ae020e..31e956ecfb 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/talk/TalkClient.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/talk/TalkClient.java @@ -52,7 +52,7 @@ public TalkClient(final String server, final String config_name) Objects.requireNonNull(config_name); final List topics = List.of(config_name + AlarmSystem.TALK_TOPIC_SUFFIX); - consumer = KafkaHelper.connectConsumer(server, topics, Collections.emptyList()); + consumer = KafkaHelper.connectConsumer(server, topics, Collections.emptyList(), AlarmSystem.kafka_properties); thread = new Thread(this::run, "TalkClient"); thread.setDaemon(true); diff --git a/app/alarm/model/src/main/resources/alarm_preferences.properties b/app/alarm/model/src/main/resources/alarm_preferences.properties index 92bb569d4f..f6df69f5b2 100644 --- a/app/alarm/model/src/main/resources/alarm_preferences.properties +++ b/app/alarm/model/src/main/resources/alarm_preferences.properties @@ -5,6 +5,9 @@ # Kafka Server host:port server=localhost:9092 +# A file to configure the properites of kafka clients +kafka_properties= + # Name of alarm tree root config_name=Accelerator diff --git a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmClientModelDemo.java b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmClientModelDemo.java index 86e55eec33..a55bf47ddc 100644 --- a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmClientModelDemo.java +++ b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmClientModelDemo.java @@ -24,7 +24,7 @@ public class AlarmClientModelDemo @Test public void testClientModel() throws Exception { - final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT); + final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE); client.start(); TimeUnit.SECONDS.sleep(4); diff --git a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmDemoSettings.java b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmDemoSettings.java index 8e0787c260..77e846f452 100644 --- a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmDemoSettings.java +++ b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmDemoSettings.java @@ -15,4 +15,5 @@ public class AlarmDemoSettings { public static final String SERVERS = "localhost:9092"; public static final String ROOT = "Accelerator"; + public static final String KAFKA_PROPERTIES_FILE = ""; } diff --git a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmModelSnapshotDemo.java b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmModelSnapshotDemo.java index 5e1262fdd6..45d8c0daed 100644 --- a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmModelSnapshotDemo.java +++ b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmModelSnapshotDemo.java @@ -24,7 +24,7 @@ public class AlarmModelSnapshotDemo public void testAlarmModelWriter() throws Exception { // Get alarm configuration - final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT); + final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE); System.out.println("Wait for stable configuration, i.e. no changes for 4 seconds..."); diff --git a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/EnabledDeserializerTest.java b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/EnabledDeserializerTest.java index 5756c4020e..8cf74fa481 100644 --- a/app/alarm/model/src/test/java/org/phoebus/applications/alarm/EnabledDeserializerTest.java +++ b/app/alarm/model/src/test/java/org/phoebus/applications/alarm/EnabledDeserializerTest.java @@ -1,4 +1,4 @@ -package src.test.java.org.phoebus.applications.alarm; +package org.phoebus.applications.alarm; import org.junit.Test; import java.io.IOException; diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/area/AlarmAreaInstance.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/area/AlarmAreaInstance.java index 23474b280c..817156bbbf 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/area/AlarmAreaInstance.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/area/AlarmAreaInstance.java @@ -71,7 +71,7 @@ private Node create(final URI input) throws Exception try { - client = new AlarmClient(server, config_name); + client = new AlarmClient(server, config_name, AlarmSystem.kafka_properties); final AlarmAreaView area_view = new AlarmAreaView(client); client.start(); diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableInstance.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableInstance.java index b1a0da189b..f509c7aba9 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableInstance.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableInstance.java @@ -78,7 +78,7 @@ private Node create(final URI input) throws Exception try { - client = new AlarmClient(server, config_name); + client = new AlarmClient(server, config_name, AlarmSystem.kafka_properties); table = new AlarmTableUI(client); mediator = new AlarmTableMediator(client, table); client.addListener(mediator); diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/AlarmTreeInstance.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/AlarmTreeInstance.java index 33029d8849..a05ee7bb6a 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/AlarmTreeInstance.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/AlarmTreeInstance.java @@ -76,7 +76,7 @@ private Node create(final URI input) throws Exception try { - client = new AlarmClient(server, config_name); + client = new AlarmClient(server, config_name, AlarmSystem.kafka_properties); final AlarmTreeView tree_view = new AlarmTreeView(client); client.start(); diff --git a/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmAreaUIDemo.java b/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmAreaUIDemo.java index b23abd95b4..3d1df0f824 100644 --- a/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmAreaUIDemo.java +++ b/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmAreaUIDemo.java @@ -23,7 +23,7 @@ public class AlarmAreaUIDemo extends ApplicationWrapper @Override public void start(final Stage stage) throws Exception { - final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator"); + final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator", ""); final AlarmAreaView area_view = new AlarmAreaView(client); final Scene scene = new Scene(area_view, 600, 800); diff --git a/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTableDemo.java b/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTableDemo.java index 7aa3bf0dc0..67c6a300c7 100644 --- a/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTableDemo.java +++ b/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTableDemo.java @@ -24,7 +24,7 @@ public class AlarmTableDemo extends ApplicationWrapper @Override public void start(final Stage stage) throws Exception { - final AlarmClient client = new AlarmClient(AlarmSystem.server, AlarmSystem.config_name); + final AlarmClient client = new AlarmClient(AlarmSystem.server, AlarmSystem.config_name, AlarmSystem.kafka_properties); final AlarmTableUI table = new AlarmTableUI(client); final Scene scene = new Scene(table, 1200, 300); stage.setScene(scene); diff --git a/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTreeUIDemo.java b/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTreeUIDemo.java index 2faf07e74d..dfb0591dca 100644 --- a/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTreeUIDemo.java +++ b/app/alarm/ui/src/test/java/org/phoebus/applications/alarm/AlarmTreeUIDemo.java @@ -23,7 +23,7 @@ public class AlarmTreeUIDemo extends ApplicationWrapper @Override public void start(final Stage stage) throws Exception { - final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator"); + final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator",""); final AlarmTreeView tree_view = new AlarmTreeView(client); final Scene scene = new Scene(tree_view, 600, 800); diff --git a/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLogger.java b/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLogger.java index 96e4ab59f5..66c763f1b1 100644 --- a/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLogger.java +++ b/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLogger.java @@ -86,7 +86,7 @@ public AlarmConfigLogger(String topic, String location, String remoteLocation) { root = new File(location, this.topic); root.mkdirs(); - model = new AlarmClient(props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG), this.topic); + model = new AlarmClient(props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG), this.topic, props.getProperty("kafka_properties")); model.start(); initialize(); diff --git a/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLoggingService.java b/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLoggingService.java index 08d220dfd9..9650321c5f 100644 --- a/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLoggingService.java +++ b/services/alarm-config-logger/src/main/java/org/phoebus/alarm/logging/AlarmConfigLoggingService.java @@ -44,6 +44,7 @@ private static void help() { System.out.println("-help - This text"); System.out.println("-topics Accelerator - Alarm topics who's associated configuration is to be logged, they can be defined as a comma or colon separated list"); System.out.println("-bootstrap.servers localhost:9092 - Kafka server address"); + System.out.println("-kafka_properties /opt/client.properties - Properties file to load kafka client settings from"); System.out.println("-repo.location /tmp/alarm_repo - Location of the alarm configuration repository"); System.out.println("-remote.location https://remote.git/repo - Location of the remote git alarm configuration repository"); System.out.println("-username username - username for remote git repo"); @@ -105,6 +106,12 @@ public static void main(String[] original_args) throws InterruptedException { iter.remove(); properties.put("bootstrap.servers", iter.next()); iter.remove(); + } else if (cmd.equals("-kafka_properties")) { + if (!iter.hasNext()) + throw new Exception("Missing -kafka_properties file name"); + iter.remove(); + properties.put("kafka_properties",iter.next()); + iter.remove(); } else if (cmd.equals("-repo.location")) { if (!iter.hasNext()) throw new Exception("Missing -repo.location local checkout location for alarm confing repo"); diff --git a/services/alarm-config-logger/src/main/resources/alarm_config_logger.properties b/services/alarm-config-logger/src/main/resources/alarm_config_logger.properties index 85330f9d7c..93e44ec80f 100644 --- a/services/alarm-config-logger/src/main/resources/alarm_config_logger.properties +++ b/services/alarm-config-logger/src/main/resources/alarm_config_logger.properties @@ -1,6 +1,7 @@ # Kafka alarm topics bootstrap.servers=localhost:9092 alarm_topics=Accelerator +kafka_properties= # Location of the git repo local.location=/tmp/alarm_repo diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java index 034e45b472..404c3c4fd7 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmCmdLogger.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.phoebus.applications.alarm.client.KafkaHelper; import org.phoebus.applications.alarm.messages.AlarmCommandMessage; import org.phoebus.applications.alarm.messages.MessageParser; import org.phoebus.util.indexname.IndexNameHelper; @@ -63,7 +64,7 @@ public void run() { Properties props = new Properties(); props.putAll(PropertiesHelper.getProperties()); - Properties kafkaProps = new Properties(); + Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties","")); kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-" + topic + "-alarm-cmd"); if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){ diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java index 74b8ab21cc..df9de8993b 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmLoggingService.java @@ -51,6 +51,7 @@ private static void help() System.out.println("-es_port 9200 - elastic server port"); System.out.println("-es_sniff false - elastic server sniff feature"); System.out.println("-bootstrap.servers localhost:9092 - Kafka server address"); + System.out.println("-kafka_properties /opt/client.properties - Properties file to load kafka client settings from"); System.out.println("-properties /opt/alarm_logger.properties - Properties file to be used (instead of command line arguments)"); System.out.println("-date_span_units M - Date units for the time based index to span."); System.out.println("-date_span_value 1 - Date value for the time based index to span."); @@ -141,6 +142,12 @@ public static void main(final String[] original_args) throws Exception { iter.remove(); properties.put("bootstrap.servers",iter.next()); iter.remove(); + } else if (cmd.equals("-kafka_properties")) { + if (!iter.hasNext()) + throw new Exception("Missing -kafka_properties file name"); + iter.remove(); + properties.put("kafka_properties",iter.next()); + iter.remove(); } else if (cmd.equals("-date_span_units")) { diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java index 9866598529..5c06220815 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.phoebus.applications.alarm.client.KafkaHelper; import org.phoebus.applications.alarm.messages.AlarmConfigMessage; import org.phoebus.applications.alarm.messages.AlarmMessage; import org.phoebus.applications.alarm.messages.AlarmStateMessage; @@ -65,7 +66,7 @@ public void run() { Properties props = new Properties(); props.putAll(PropertiesHelper.getProperties()); - Properties kafkaProps = new Properties(); + Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties","")); kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-"+topic+"-alarm-messages"); if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){ diff --git a/services/alarm-logger/src/main/resources/alarm_logger.properties b/services/alarm-logger/src/main/resources/alarm_logger.properties index bd3f5ac84b..84c1a384f9 100644 --- a/services/alarm-logger/src/main/resources/alarm_logger.properties +++ b/services/alarm-logger/src/main/resources/alarm_logger.properties @@ -15,6 +15,9 @@ es_create_templates=true # Kafka server location bootstrap.servers=localhost:9092 +# Kafka client properties file +kafka_properties= + # The units of the indices date span: Days (D), Weeks(W), Months(M), Years(Y). date_span_units=M diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/aggregate/AggregateTopics.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/aggregate/AggregateTopics.java index f35d5288d2..9f8aa5237b 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/aggregate/AggregateTopics.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/aggregate/AggregateTopics.java @@ -30,6 +30,7 @@ public class AggregateTopics private Logger logger = Logger.getLogger(this.getClass().getPackageName()); private String kafka_servers = "localhost:9092"; private String config = "Accelerator"; + private String kafka_properties = ""; private final String longTerm; private boolean createTopic = false; private final List topics; @@ -43,7 +44,7 @@ private AggregateTopics(String[] args) if (createTopic) { logger.info("Discovering and creating topics in " + topics.toString()); - CreateTopics.discoverAndCreateTopics(kafka_servers, false, List.of(longTerm)); + CreateTopics.discoverAndCreateTopics(kafka_servers, false, List.of(longTerm), kafka_properties); } logger.info("server:\"" + kafka_servers + "\", config: \"" + config + "\""); @@ -84,6 +85,7 @@ private void help() System.out.println("\t-server server_name: Allows specification of server address.\n\t\tDefault is \"localhost:9092\"."); System.out.println("\t-confg config_name: Allows specification of config name.\n\t\tDefault is \"Accelerator\"."); System.out.println("\t-create : Discovers if the config + \"LongTerm\" topic already exists. If it does not, it creates it."); + System.out.println("\t-kafka_properties kafka_properties_file : File to load kafka-client properties from."); System.exit(0); } @@ -129,6 +131,18 @@ else if (arg.equals("-config")) throw new Exception("'-config' must be followed by a config name."); } } + else if (arg.equals("-kafka_properties")) + { + String next; + if (token.hasNext() && ! (next = token.next()).startsWith("-")) + { + kafka_properties = next; + } + else + { + throw new Exception("'-kafka_properties' must be followed by a file name."); + } + } else { throw new Exception("Unknown argument '" + arg + "'."); @@ -154,7 +168,8 @@ private KafkaStreams createStream() { KafkaStreams stream = KafkaHelper.aggregateTopics(kafka_servers, topics, - longTerm); + longTerm, + kafka_properties); // Log any uncaught exceptions. stream.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> { diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmConfigTool.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmConfigTool.java index c1a2a43b09..85fae11d5a 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmConfigTool.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmConfigTool.java @@ -32,7 +32,7 @@ public class AlarmConfigTool private static final long STABILIZATION_SECS = 4; // Export an alarm system model to an xml file. - public void exportModel(String filename, String server, String config) throws Exception + public void exportModel(String filename, String server, String config, String kafka_properties_file) throws Exception { final XmlModelWriter xmlWriter; @@ -51,7 +51,7 @@ public void exportModel(String filename, String server, String config) throws Ex xmlWriter = new XmlModelWriter(fos); } - final AlarmClient client = new AlarmClient(server, config); + final AlarmClient client = new AlarmClient(server, config, kafka_properties_file); client.start(); System.out.printf("Writing file after model is stable for %d seconds:\n", STABILIZATION_SECS); @@ -78,7 +78,7 @@ public void exportModel(String filename, String server, String config) throws Ex } // Import an alarm system model from an xml file. - public void importModel(final String filename, final String server, final String config) throws InterruptedException, Exception + public void importModel(final String filename, final String server, final String config, String kafka_properties_file) throws InterruptedException, Exception { System.out.println("Reading new configuration from " + filename); final long start = System.currentTimeMillis(); @@ -98,7 +98,7 @@ public void importModel(final String filename, final String server, final String final long got_xml = System.currentTimeMillis(); // Connect to the server. - final AlarmClient client = new AlarmClient(server, config); + final AlarmClient client = new AlarmClient(server, config, kafka_properties_file); client.start(); try { diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java index 01ac18c32e..b0fb8d8231 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmServerMain.java @@ -11,8 +11,11 @@ import java.io.FileInputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -69,10 +72,11 @@ public class AlarmServerMain implements ServerModelListener "\trestart - Re-load alarm configuration and restart.\n" + "\tshutdown - Shut alarm server down and exit.\n"; - private AlarmServerMain(final String server, final String config, final boolean use_shell) + private AlarmServerMain(final String server, final String config, final boolean use_shell, final String kafka_props_file) { logger.info("Server: " + server); logger.info("Config: " + config); + logger.info("Extra Kafka Properties: " + kafka_props_file); try { @@ -82,13 +86,13 @@ private AlarmServerMain(final String server, final String config, final boolean while (run) { logger.info("Fetching past alarm states..."); - final AlarmStateInitializer init = new AlarmStateInitializer(server, config); + final AlarmStateInitializer init = new AlarmStateInitializer(server, config, kafka_props_file); if (! init.awaitCompleteStates()) logger.log(Level.WARNING, "Keep receiving state updates, may have incomplete initial set of alarm states"); final ConcurrentHashMap initial_states = init.shutdown(); logger.info("Start handling alarms"); - model = new ServerModel(server, config, initial_states, this); + model = new ServerModel(server, config, initial_states, this, kafka_props_file); model.start(); if (use_shell) @@ -528,16 +532,17 @@ private static void help() System.out.println(); System.out.println("Command-line arguments:"); System.out.println(); - System.out.println("-help - This text"); - System.out.println("-server localhost:9092 - Kafka server with port number"); - System.out.println("-config Accelerator - Alarm configuration"); + System.out.println("-help - This text"); + System.out.println("-server localhost:9092 - Kafka server with port number"); + System.out.println("-config Accelerator - Alarm configuration"); // Don't mention this option, prefer examples/create_topics.sh // System.out.println("-create_topics - Create Kafka topics for alarm configuration?"); - System.out.println("-settings settings.{xml,ini} - Import preferences (PV connectivity) from property format file"); - System.out.println("-noshell - Disable the command shell for running without a terminal"); - System.out.println("-export config.xml - Export alarm configuration to file"); - System.out.println("-import config.xml - Import alarm configruation from file"); - System.out.println("-logging logging.properties - Load log settings"); + System.out.println("-settings settings.{xml,ini} - Import preferences (PV connectivity) from property format file"); + System.out.println("-noshell - Disable the command shell for running without a terminal"); + System.out.println("-export config.xml - Export alarm configuration to file"); + System.out.println("-import config.xml - Import alarm configruation from file"); + System.out.println("-logging logging.properties - Load log settings"); + System.out.println("-kafka_properties client.properties - Load kafka client settings from file"); System.out.println(); } @@ -548,137 +553,122 @@ public static void main(final String[] original_args) throws Exception String server = "localhost:9092"; String config = "Accelerator"; + String kafka_properties = ""; boolean use_shell = true; - String args_server = ""; - String args_config = ""; - boolean use_args_server = false; - boolean use_args_config = false; - boolean use_settings = false; - String import_filename = null; - String export_filename = null; // Handle arguments final List args = new ArrayList<>(List.of(original_args)); final Iterator iter = args.iterator(); + HashMap parsed_args = new HashMap(); try { + // define command line arguments + String help_arg = "-help"; + String help_alt_arg = "-h"; + String server_arg = "-server"; + String config_arg = "-config"; + String create_topics_arg = "-create_topics"; + String settings_arg = "-settings"; + String noshell_arg = "-noshell"; + String export_arg = "-export"; + String import_arg = "-import"; + String logging_arg = "-logging"; + String kafka_props_arg = "-kafka_properties"; + + Set options = Set.of( + server_arg, + config_arg, + settings_arg, + export_arg, + import_arg, + logging_arg, + kafka_props_arg); + + Set flags = Set.of( + help_arg, + help_alt_arg, + noshell_arg, + create_topics_arg + ); + + // to handle arguments that may be provided via a settings file + // as well as directly on the commandline, map their relationship + Map args_to_prefs = Map.ofEntries( + Map.entry(config_arg, "config_names"), + Map.entry(server_arg, "server"), + Map.entry(kafka_props_arg, "kafka_properties") + ); + while (iter.hasNext()) { final String cmd = iter.next(); - if ( cmd.equals("-h") || cmd.equals("-help")) - { - help(); - return; - } - else if (cmd.equals("-server")) - { - if (! iter.hasNext()) - throw new Exception("Missing -server name"); - iter.remove(); - args_server = iter.next(); - use_args_server = true; - iter.remove(); - } - else if (cmd.equals("-config")) - { - if (! iter.hasNext()) - throw new Exception("Missing -config name"); - iter.remove(); - args_config = iter.next(); - use_args_config = true; - iter.remove(); - } - else if (cmd.equals("-logging")) - { - if (! iter.hasNext()) - throw new Exception("Missing -logging file name"); - iter.remove(); - final String filename = iter.next(); - iter.remove(); - LogManager.getLogManager().readConfiguration(new FileInputStream(filename)); - } - else if (cmd.equals("-settings")) - { - if (! iter.hasNext()) - throw new Exception("Missing -settings file name"); - iter.remove(); - final String filename = iter.next(); - iter.remove(); - logger.info("Loading settings from " + filename); - PropertyPreferenceLoader.load(new FileInputStream(filename)); - - Preferences userPrefs = Preferences.userRoot().node("org/phoebus/applications/alarm"); - String pref_server = userPrefs.get("server", server); - String pref_conf_names = userPrefs.get("config_names", config); - server = pref_server; - config = pref_conf_names; - use_settings = true; - } - else if (cmd.equals("-noshell")) - { - use_shell = false; - } - else if (cmd.equals("-create_topics")) - { - iter.remove(); - logger.info("Discovering and creating any missing topics at " + server); - CreateTopics.discoverAndCreateTopics(server, true, List.of(config, - config + AlarmSystem.COMMAND_TOPIC_SUFFIX, - config + AlarmSystem.TALK_TOPIC_SUFFIX)); - } - else if (cmd.equals("-import")) - { + if (options.contains(cmd)) { if (! iter.hasNext()) - throw new Exception("Missing -import file name"); - iter.remove(); - import_filename = iter.next(); - iter.remove(); + throw new Exception("Missing argument for " + cmd); + final String arg = iter.next(); + parsed_args.put(cmd, arg); } - else if (cmd.equals("-export")) - { - if (! iter.hasNext()) - throw new Exception("Missing -export file name"); - iter.remove(); - export_filename = iter.next(); - iter.remove(); + else if (flags.contains(cmd)) { + parsed_args.put(cmd, ""); } - else + else { throw new Exception("Unknown option " + cmd); + } } - if ( use_args_server ) { - if ( use_settings ) { - logger.log(Level.WARNING,"Found the conflicted configurations : -settings/server:" + server + " and -server:" + args_server); - logger.log(Level.WARNING,"Force to use the argument -server instead of -settings"); - logger.log(Level.WARNING,"Server : " + args_server); - } - server = args_server; + if (parsed_args.containsKey(help_arg) || parsed_args.containsKey(help_alt_arg)){ + help(); + return; + } + if (parsed_args.containsKey(logging_arg)) { + LogManager.getLogManager().readConfiguration(new FileInputStream(parsed_args.get(logging_arg))); } - if ( use_args_config ) { - if ( use_settings ) { - logger.log(Level.WARNING,"Found the conflicted configurations : -settings/config:" + config + " and -config:" + args_config); - logger.log(Level.WARNING,"Force to use the argument -config instead of -settings"); - logger.log(Level.WARNING,"Config : " + args_config); + if (parsed_args.containsKey(settings_arg)){ + final String filename = parsed_args.get(settings_arg); + logger.info("Loading settings from " + filename); + PropertyPreferenceLoader.load(new FileInputStream(filename)); + Preferences userPrefs = Preferences.userRoot().node("org/phoebus/applications/alarm"); + + for (Map.Entry entry: args_to_prefs.entrySet()) { + final String prefKey = entry.getValue(); + final String arg = entry.getKey(); + + if (parsed_args.containsKey(arg)){ + logger.log(Level.WARNING,"Potentially conflicting setting: -settings/"+prefKey+": " + userPrefs.get(prefKey, "") + " and " + arg + ":" + parsed_args.get(arg)); + logger.log(Level.WARNING,"Using argument " + arg + " instead of -settings"); + logger.log(Level.WARNING,prefKey + ": " + parsed_args.get(arg)); + } + else if (Set.of(userPrefs.keys()).contains(prefKey)){ + parsed_args.put(arg, userPrefs.get(prefKey, "")); + } } - config = args_config; } - // Export or import requested? - // Actually allow _both_ export and import, first exporting - // so that existing config gets exported, then new one imported - if (export_filename != null) - { - logger.info("Exporting model to " + export_filename); - new AlarmConfigTool().exportModel(export_filename, server, use_args_config ? args_config : config); + config = parsed_args.getOrDefault(config_arg, config); + server = parsed_args.getOrDefault(server_arg, server); + kafka_properties = parsed_args.getOrDefault(kafka_props_arg, kafka_properties); + use_shell = !parsed_args.containsKey(noshell_arg); + + if (parsed_args.containsKey(create_topics_arg)){ + logger.info("Discovering and creating any missing topics at " + server); + CreateTopics.discoverAndCreateTopics(server, true, List.of(config, + config + AlarmSystem.COMMAND_TOPIC_SUFFIX, + config + AlarmSystem.TALK_TOPIC_SUFFIX), + kafka_properties); } - if (import_filename != null) - { - logger.info("Import model from " + import_filename); - new AlarmConfigTool().importModel(import_filename, server, use_args_config ? args_config : config); + if (parsed_args.containsKey(export_arg)){ + final String filename = parsed_args.get(export_arg); + logger.info("Exporting model to " + filename); + new AlarmConfigTool().exportModel(filename, server, config, kafka_properties); } - // Any export/import means we're done, not running alarm server - if (export_filename != null || import_filename != null) + if (parsed_args.containsKey(import_arg)){ + final String filename = parsed_args.get(import_arg); + logger.info("Import model from " + filename); + new AlarmConfigTool().importModel(filename, server, config, kafka_properties); + } + if (parsed_args.containsKey(export_arg) || parsed_args.containsKey(import_arg)){ return; + } } catch (final Exception ex) { @@ -690,6 +680,6 @@ else if (cmd.equals("-export")) logger.info("Alarm Server (PID " + ProcessHandle.current().pid() + ")"); - new AlarmServerMain(server, config, use_shell); + new AlarmServerMain(server, config, use_shell, kafka_properties); } } diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmStateInitializer.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmStateInitializer.java index 3e5a8a7dac..2b4b774733 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmStateInitializer.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/AlarmStateInitializer.java @@ -51,10 +51,11 @@ public class AlarmStateInitializer /** @param server Kafka Server host:port * @param config_name Name of alarm tree root + * @param kafka_props Additional properties to pass to the kafka client */ - public AlarmStateInitializer(final String server, final String config_name) + public AlarmStateInitializer(final String server, final String config_name, final String kafka_props_file) { - consumer = KafkaHelper.connectConsumer(server, List.of(config_name), List.of(config_name)); + consumer = KafkaHelper.connectConsumer(server, List.of(config_name), List.of(config_name), kafka_props_file); thread = new Thread(this::run, "AlarmStateInitializer"); thread.setDaemon(true); diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/CreateTopics.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/CreateTopics.java index 6dd826d03a..e591daa77b 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/CreateTopics.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/CreateTopics.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; +import org.phoebus.applications.alarm.client.KafkaHelper; /** Create alarm topics * @author Evan Smith @@ -46,10 +47,11 @@ public class CreateTopics * @param compact If the topics to be created should be compacted. * @param topics Topics to discover and create if missing. */ - public static void discoverAndCreateTopics (final String kafka_servers, final boolean compact, final List topics) + public static void discoverAndCreateTopics (final String kafka_servers, final boolean compact, + final List topics, final String kafka_properties_file) { // Connect to Kafka server. - final Properties props = new Properties(); + final Properties props = KafkaHelper.loadPropsFromFile(kafka_properties_file); props.put("bootstrap.servers", kafka_servers); final AdminClient client = AdminClient.create(props); diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java index 273f37c330..6021d88ab9 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java @@ -79,12 +79,15 @@ class ServerModel /** @param kafka_servers Servers * @param config_name Name of alarm tree root - * @param initial_states + * @param initial_states + * @param listener + * @param kafka_properties_file Additional properties to pass to the kafka client * @throws Exception on error */ public ServerModel(final String kafka_servers, final String config_name, final ConcurrentHashMap initial_states, - final ServerModelListener listener) throws Exception + final ServerModelListener listener, + final String kafka_properties_file) throws Exception { this.initial_states = initial_states; // initial_states.entrySet().forEach(state -> @@ -99,8 +102,9 @@ public ServerModel(final String kafka_servers, final String config_name, consumer = KafkaHelper.connectConsumer(Objects.requireNonNull(kafka_servers), List.of(config_state_topic, command_topic), - List.of(config_state_topic)); - producer = KafkaHelper.connectProducer(kafka_servers); + List.of(config_state_topic), + kafka_properties_file); + producer = KafkaHelper.connectProducer(kafka_servers, kafka_properties_file); thread = new Thread(this::run, "ServerModel"); thread.setDaemon(true);