Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private static synchronized void initializeAlarmClient(String config)
if (!alarmModels.containsKey(config))
{
logger.log(Level.CONFIG, "Creating a alarm client for config : + " + config + " in the alarm datasource");
AlarmClient model = new AlarmClient(AlarmSystem.server, config);
AlarmClient model = new AlarmClient(AlarmSystem.server, config, AlarmSystem.kafka_properties);
model.addListener(new AlarmClientDatasourceListener(config));
model.start();
alarmModels.put(config, model);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class AlarmSystem
/** Kafka Server host:port */
@Preference public static String server;

/** Kafka settings file */
@Preference public static String kafka_properties;

/** Name of alarm tree root
*
* <p>Default name from preferences.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ public class AlarmClient

/** @param server Kafka Server host:port
* @param config_name Name of alarm tree root
* @param kafka_properties_file File to load additional kafka properties from
*/
public AlarmClient(final String server, final String config_name)
public AlarmClient(final String server, final String config_name, final String kafka_properties_file)
{
Objects.requireNonNull(server);
Objects.requireNonNull(config_name);
Expand All @@ -105,8 +106,8 @@ public AlarmClient(final String server, final String config_name)

root = new AlarmClientNode(null, config_name);
final List<String> topics = List.of(config_topic);
consumer = KafkaHelper.connectConsumer(server, topics, topics);
producer = KafkaHelper.connectProducer(server);
consumer = KafkaHelper.connectConsumer(server, topics, topics, kafka_properties_file);
producer = KafkaHelper.connectProducer(server, kafka_properties_file);

thread = new Thread(this::run, "AlarmClientModel " + config_name);
thread.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@

import static org.phoebus.applications.alarm.AlarmSystem.logger;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Level;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -51,24 +54,28 @@ public class KafkaHelper
* @param kafka_servers Servers to read
* @param topics Topics to which to subscribe
* @param from_beginning Topics to read from the beginning
* @param properties_file File name to load additional settings for the kafka consumer
* @return {@link Consumer}
*/
public static Consumer<String, String> connectConsumer(final String kafka_servers, final List<String> topics, final List<String> from_beginning)
public static Consumer<String, String> connectConsumer(final String kafka_servers, final List<String> topics, final List<String> from_beginning, final String properties_file)
{
final Properties props = new Properties();
props.put("bootstrap.servers", kafka_servers);
// API requires for Consumer to be in a group.
// Each alarm client must receive all updates,
// cannot balance updates across a group
// --> Use unique group for each client
final String group_id = "Alarm-" + UUID.randomUUID();
props.put("group.id", group_id);
Properties kafka_props = loadPropsFromFile(properties_file);
kafka_props.put("bootstrap.servers", kafka_servers);

logger.fine(group_id + " subscribes to " + kafka_servers + " for " + topics);
if (!kafka_props.containsKey("group.id")){
// API requires for Consumer to be in a group.
// Each alarm client must receive all updates,
// cannot balance updates across a group
// --> Use unique group for each client
final String group_id = "Alarm-" + UUID.randomUUID();
kafka_props.put("group.id", group_id);
}

logger.fine(kafka_props.getProperty("group.id") + " subscribes to " + kafka_servers + " for " + topics);

// Read key, value as string
final Deserializer<String> deserializer = new StringDeserializer();
final Consumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer);
final Consumer<String, String> consumer = new KafkaConsumer<>(kafka_props, deserializer, deserializer);

// Rewind whenever assigned to partition
final ConsumerRebalanceListener crl = new ConsumerRebalanceListener()
Expand Down Expand Up @@ -101,19 +108,20 @@ public void onPartitionsRevoked(final Collection<TopicPartition> parts)

/** Create producer for alarm information
* @param kafka_servers Kafka servers
* @param properties_file File name to load additional settings for the kafka producer
* @return {@link Producer}
*/
public static Producer<String, String> connectProducer(final String kafka_servers)
public static Producer<String, String> connectProducer(final String kafka_servers, final String properties_file)
{
final Properties props = new Properties();
props.put("bootstrap.servers", kafka_servers);
Properties kafka_props = loadPropsFromFile(properties_file);
kafka_props.put("bootstrap.servers", kafka_servers);
// Collect messages for 20ms until sending them out as a batch
props.put("linger.ms", 20);
kafka_props.put("linger.ms", 20);

// Write String key, value
final Serializer<String> serializer = new StringSerializer();

final Producer<String, String> producer = new KafkaProducer<>(props, serializer, serializer);
final Producer<String, String> producer = new KafkaProducer<>(kafka_props, serializer, serializer);

return producer;
}
Expand All @@ -123,22 +131,43 @@ public static Producer<String, String> connectProducer(final String kafka_server
* @param kafka_servers - Sever to connect to.
* @param topics List of topics to aggregate.
* @param aggregate_topic - Name of topic to aggregate to.
* @param kafka_props File name to load additional settings for the kafka stream
* @return aggregate_stream - KafkaStreams
* @author Evan Smith
*/
public static KafkaStreams aggregateTopics(String kafka_servers, List<String> topics, String aggregate_topic)
public static KafkaStreams aggregateTopics(String kafka_servers, List<String> topics, String aggregate_topic, final String properties_file)
{
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Stream-To-Long-Term");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_servers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Properties kafka_props = loadPropsFromFile(properties_file);
kafka_props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Stream-To-Long-Term");
kafka_props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_servers);
kafka_props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
kafka_props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

// Aggregate the topics by mapping the topic key value pairs one to one into the aggregate topic.
builder.<String, String>stream(topics).mapValues(pair -> pair).to(aggregate_topic);

return new KafkaStreams(builder.build(), props);
return new KafkaStreams(builder.build(), kafka_props);
}


/**
* Load properties from the given file path. Path may be blank or null
* resulting in a properties object without entries.
* @param filePath Full path to properties file
* @return properties - the properties loaded from file
*/
static public Properties loadPropsFromFile(String filePath) {
logger.fine("loading file from path: " + filePath);
Properties properties = new Properties();
if(filePath != null && !filePath.isBlank()){
try(FileInputStream file = new FileInputStream(filePath);){
properties.load(file);
} catch(IOException e) {
logger.log(Level.SEVERE, "failed to load kafka properties", e);
}
}
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public TalkClient(final String server, final String config_name)
Objects.requireNonNull(config_name);

final List<String> topics = List.of(config_name + AlarmSystem.TALK_TOPIC_SUFFIX);
consumer = KafkaHelper.connectConsumer(server, topics, Collections.emptyList());
consumer = KafkaHelper.connectConsumer(server, topics, Collections.emptyList(), AlarmSystem.kafka_properties);

thread = new Thread(this::run, "TalkClient");
thread.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
# Kafka Server host:port
server=localhost:9092

# A file to configure the properites of kafka clients
kafka_properties=

# Name of alarm tree root
config_name=Accelerator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class AlarmClientModelDemo
@Test
public void testClientModel() throws Exception
{
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT);
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE);
client.start();
TimeUnit.SECONDS.sleep(4);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ public class AlarmDemoSettings
{
public static final String SERVERS = "localhost:9092";
public static final String ROOT = "Accelerator";
public static final String KAFKA_PROPERTIES_FILE = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class AlarmModelSnapshotDemo
public void testAlarmModelWriter() throws Exception
{
// Get alarm configuration
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT);
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE);

System.out.println("Wait for stable configuration, i.e. no changes for 4 seconds...");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package src.test.java.org.phoebus.applications.alarm;
package org.phoebus.applications.alarm;

import org.junit.Test;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private Node create(final URI input) throws Exception

try
{
client = new AlarmClient(server, config_name);
client = new AlarmClient(server, config_name, AlarmSystem.kafka_properties);
final AlarmAreaView area_view = new AlarmAreaView(client);
client.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private Node create(final URI input) throws Exception

try
{
client = new AlarmClient(server, config_name);
client = new AlarmClient(server, config_name, AlarmSystem.kafka_properties);
table = new AlarmTableUI(client);
mediator = new AlarmTableMediator(client, table);
client.addListener(mediator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private Node create(final URI input) throws Exception

try
{
client = new AlarmClient(server, config_name);
client = new AlarmClient(server, config_name, AlarmSystem.kafka_properties);
final AlarmTreeView tree_view = new AlarmTreeView(client);
client.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class AlarmAreaUIDemo extends ApplicationWrapper
@Override
public void start(final Stage stage) throws Exception
{
final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator");
final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator", "");

final AlarmAreaView area_view = new AlarmAreaView(client);
final Scene scene = new Scene(area_view, 600, 800);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class AlarmTableDemo extends ApplicationWrapper
@Override
public void start(final Stage stage) throws Exception
{
final AlarmClient client = new AlarmClient(AlarmSystem.server, AlarmSystem.config_name);
final AlarmClient client = new AlarmClient(AlarmSystem.server, AlarmSystem.config_name, AlarmSystem.kafka_properties);
final AlarmTableUI table = new AlarmTableUI(client);
final Scene scene = new Scene(table, 1200, 300);
stage.setScene(scene);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class AlarmTreeUIDemo extends ApplicationWrapper
@Override
public void start(final Stage stage) throws Exception
{
final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator");
final AlarmClient client = new AlarmClient("localhost:9092", "Accelerator","");

final AlarmTreeView tree_view = new AlarmTreeView(client);
final Scene scene = new Scene(tree_view, 600, 800);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public AlarmConfigLogger(String topic, String location, String remoteLocation) {
root = new File(location, this.topic);
root.mkdirs();

model = new AlarmClient(props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG), this.topic);
model = new AlarmClient(props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG), this.topic, props.getProperty("kafka_properties"));
model.start();

initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private static void help() {
System.out.println("-help - This text");
System.out.println("-topics Accelerator - Alarm topics who's associated configuration is to be logged, they can be defined as a comma or colon separated list");
System.out.println("-bootstrap.servers localhost:9092 - Kafka server address");
System.out.println("-kafka_properties /opt/client.properties - Properties file to load kafka client settings from");
System.out.println("-repo.location /tmp/alarm_repo - Location of the alarm configuration repository");
System.out.println("-remote.location https://remote.git/repo - Location of the remote git alarm configuration repository");
System.out.println("-username username - username for remote git repo");
Expand Down Expand Up @@ -105,6 +106,12 @@ public static void main(String[] original_args) throws InterruptedException {
iter.remove();
properties.put("bootstrap.servers", iter.next());
iter.remove();
} else if (cmd.equals("-kafka_properties")) {
if (!iter.hasNext())
throw new Exception("Missing -kafka_properties file name");
iter.remove();
properties.put("kafka_properties",iter.next());
iter.remove();
} else if (cmd.equals("-repo.location")) {
if (!iter.hasNext())
throw new Exception("Missing -repo.location local checkout location for alarm confing repo");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Kafka alarm topics
bootstrap.servers=localhost:9092
alarm_topics=Accelerator
kafka_properties=

# Location of the git repo
local.location=/tmp/alarm_repo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.phoebus.applications.alarm.client.KafkaHelper;
import org.phoebus.applications.alarm.messages.AlarmCommandMessage;
import org.phoebus.applications.alarm.messages.MessageParser;
import org.phoebus.util.indexname.IndexNameHelper;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void run() {
Properties props = new Properties();
props.putAll(PropertiesHelper.getProperties());

Properties kafkaProps = new Properties();
Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties",""));
kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-" + topic + "-alarm-cmd");

if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private static void help()
System.out.println("-es_port 9200 - elastic server port");
System.out.println("-es_sniff false - elastic server sniff feature");
System.out.println("-bootstrap.servers localhost:9092 - Kafka server address");
System.out.println("-kafka_properties /opt/client.properties - Properties file to load kafka client settings from");
System.out.println("-properties /opt/alarm_logger.properties - Properties file to be used (instead of command line arguments)");
System.out.println("-date_span_units M - Date units for the time based index to span.");
System.out.println("-date_span_value 1 - Date value for the time based index to span.");
Expand Down Expand Up @@ -141,6 +142,12 @@ public static void main(final String[] original_args) throws Exception {
iter.remove();
properties.put("bootstrap.servers",iter.next());
iter.remove();
} else if (cmd.equals("-kafka_properties")) {
if (!iter.hasNext())
throw new Exception("Missing -kafka_properties file name");
iter.remove();
properties.put("kafka_properties",iter.next());
iter.remove();
}
else if (cmd.equals("-date_span_units"))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.phoebus.applications.alarm.client.KafkaHelper;
import org.phoebus.applications.alarm.messages.AlarmConfigMessage;
import org.phoebus.applications.alarm.messages.AlarmMessage;
import org.phoebus.applications.alarm.messages.AlarmStateMessage;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void run() {
Properties props = new Properties();
props.putAll(PropertiesHelper.getProperties());

Properties kafkaProps = new Properties();
Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties",""));
kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-"+topic+"-alarm-messages");

if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ es_create_templates=true
# Kafka server location
bootstrap.servers=localhost:9092

# Kafka client properties file
kafka_properties=

# The units of the indices date span: Days (D), Weeks(W), Months(M), Years(Y).
date_span_units=M

Expand Down
Loading