Skip to content

Commit 5b9fc41

Browse files
authored
Merge pull request #2281 from mgotz/kafka-ssl
Additional options for kafka clients
2 parents 2a9a07a + 096bebc commit 5b9fc41

File tree

29 files changed

+243
-174
lines changed

29 files changed

+243
-174
lines changed

app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private static synchronized void initializeAlarmClient(String config)
3434
if (!alarmModels.containsKey(config))
3535
{
3636
logger.log(Level.CONFIG, "Creating a alarm client for config : + " + config + " in the alarm datasource");
37-
AlarmClient model = new AlarmClient(AlarmSystem.server, config);
37+
AlarmClient model = new AlarmClient(AlarmSystem.server, config, AlarmSystem.kafka_properties);
3838
model.addListener(new AlarmClientDatasourceListener(config));
3939
model.start();
4040
alarmModels.put(config, model);

app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public class AlarmSystem
6363
/** Kafka Server host:port */
6464
@Preference public static String server;
6565

66+
/** Kafka settings file */
67+
@Preference public static String kafka_properties;
68+
6669
/** Name of alarm tree root
6770
*
6871
* <p>Default name from preferences.

app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,9 @@ public class AlarmClient
9494

9595
/** @param server Kafka Server host:port
9696
* @param config_name Name of alarm tree root
97+
* @param kafka_properties_file File to load additional kafka properties from
9798
*/
98-
public AlarmClient(final String server, final String config_name)
99+
public AlarmClient(final String server, final String config_name, final String kafka_properties_file)
99100
{
100101
Objects.requireNonNull(server);
101102
Objects.requireNonNull(config_name);
@@ -105,8 +106,8 @@ public AlarmClient(final String server, final String config_name)
105106

106107
root = new AlarmClientNode(null, config_name);
107108
final List<String> topics = List.of(config_topic);
108-
consumer = KafkaHelper.connectConsumer(server, topics, topics);
109-
producer = KafkaHelper.connectProducer(server);
109+
consumer = KafkaHelper.connectConsumer(server, topics, topics, kafka_properties_file);
110+
producer = KafkaHelper.connectProducer(server, kafka_properties_file);
110111

111112
thread = new Thread(this::run, "AlarmClientModel " + config_name);
112113
thread.setDaemon(true);

app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99

1010
import static org.phoebus.applications.alarm.AlarmSystem.logger;
1111

12+
import java.io.FileInputStream;
13+
import java.io.IOException;
1214
import java.util.Collection;
1315
import java.util.List;
1416
import java.util.Properties;
1517
import java.util.UUID;
18+
import java.util.logging.Level;
1619

1720
import org.apache.kafka.clients.consumer.Consumer;
1821
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -51,24 +54,28 @@ public class KafkaHelper
5154
* @param kafka_servers Servers to read
5255
* @param topics Topics to which to subscribe
5356
* @param from_beginning Topics to read from the beginning
57+
* @param properties_file File name to load additional settings for the kafka consumer
5458
* @return {@link Consumer}
5559
*/
56-
public static Consumer<String, String> connectConsumer(final String kafka_servers, final List<String> topics, final List<String> from_beginning)
60+
public static Consumer<String, String> connectConsumer(final String kafka_servers, final List<String> topics, final List<String> from_beginning, final String properties_file)
5761
{
58-
final Properties props = new Properties();
59-
props.put("bootstrap.servers", kafka_servers);
60-
// API requires for Consumer to be in a group.
61-
// Each alarm client must receive all updates,
62-
// cannot balance updates across a group
63-
// --> Use unique group for each client
64-
final String group_id = "Alarm-" + UUID.randomUUID();
65-
props.put("group.id", group_id);
62+
Properties kafka_props = loadPropsFromFile(properties_file);
63+
kafka_props.put("bootstrap.servers", kafka_servers);
6664

67-
logger.fine(group_id + " subscribes to " + kafka_servers + " for " + topics);
65+
if (!kafka_props.containsKey("group.id")){
66+
// API requires for Consumer to be in a group.
67+
// Each alarm client must receive all updates,
68+
// cannot balance updates across a group
69+
// --> Use unique group for each client
70+
final String group_id = "Alarm-" + UUID.randomUUID();
71+
kafka_props.put("group.id", group_id);
72+
}
73+
74+
logger.fine(kafka_props.getProperty("group.id") + " subscribes to " + kafka_servers + " for " + topics);
6875

6976
// Read key, value as string
7077
final Deserializer<String> deserializer = new StringDeserializer();
71-
final Consumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer);
78+
final Consumer<String, String> consumer = new KafkaConsumer<>(kafka_props, deserializer, deserializer);
7279

7380
// Rewind whenever assigned to partition
7481
final ConsumerRebalanceListener crl = new ConsumerRebalanceListener()
@@ -101,19 +108,20 @@ public void onPartitionsRevoked(final Collection<TopicPartition> parts)
101108

102109
/** Create producer for alarm information
103110
* @param kafka_servers Kafka servers
111+
* @param properties_file File name to load additional settings for the kafka producer
104112
* @return {@link Producer}
105113
*/
106-
public static Producer<String, String> connectProducer(final String kafka_servers)
114+
public static Producer<String, String> connectProducer(final String kafka_servers, final String properties_file)
107115
{
108-
final Properties props = new Properties();
109-
props.put("bootstrap.servers", kafka_servers);
116+
Properties kafka_props = loadPropsFromFile(properties_file);
117+
kafka_props.put("bootstrap.servers", kafka_servers);
110118
// Collect messages for 20ms until sending them out as a batch
111-
props.put("linger.ms", 20);
119+
kafka_props.put("linger.ms", 20);
112120

113121
// Write String key, value
114122
final Serializer<String> serializer = new StringSerializer();
115123

116-
final Producer<String, String> producer = new KafkaProducer<>(props, serializer, serializer);
124+
final Producer<String, String> producer = new KafkaProducer<>(kafka_props, serializer, serializer);
117125

118126
return producer;
119127
}
@@ -123,22 +131,43 @@ public static Producer<String, String> connectProducer(final String kafka_server
123131
* @param kafka_servers - Sever to connect to.
124132
* @param topics List of topics to aggregate.
125133
* @param aggregate_topic - Name of topic to aggregate to.
134+
* @param kafka_props File name to load additional settings for the kafka stream
126135
* @return aggregate_stream - KafkaStreams
127136
* @author Evan Smith
128137
*/
129-
public static KafkaStreams aggregateTopics(String kafka_servers, List<String> topics, String aggregate_topic)
138+
public static KafkaStreams aggregateTopics(String kafka_servers, List<String> topics, String aggregate_topic, final String properties_file)
130139
{
131-
final Properties props = new Properties();
132-
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Stream-To-Long-Term");
133-
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_servers);
134-
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
135-
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
140+
Properties kafka_props = loadPropsFromFile(properties_file);
141+
kafka_props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Stream-To-Long-Term");
142+
kafka_props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_servers);
143+
kafka_props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
144+
kafka_props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
136145

137146
final StreamsBuilder builder = new StreamsBuilder();
138147

139148
// Aggregate the topics by mapping the topic key value pairs one to one into the aggregate topic.
140149
builder.<String, String>stream(topics).mapValues(pair -> pair).to(aggregate_topic);
141150

142-
return new KafkaStreams(builder.build(), props);
151+
return new KafkaStreams(builder.build(), kafka_props);
152+
}
153+
154+
155+
/**
156+
* Load properties from the given file path. Path may be blank or null
157+
* resulting in a properties object without entries.
158+
* @param filePath Full path to properties file
159+
* @return properties - the properties loaded from file
160+
*/
161+
static public Properties loadPropsFromFile(String filePath) {
162+
logger.fine("loading file from path: " + filePath);
163+
Properties properties = new Properties();
164+
if(filePath != null && !filePath.isBlank()){
165+
try(FileInputStream file = new FileInputStream(filePath);){
166+
properties.load(file);
167+
} catch(IOException e) {
168+
logger.log(Level.SEVERE, "failed to load kafka properties", e);
169+
}
170+
}
171+
return properties;
143172
}
144173
}

app/alarm/model/src/main/java/org/phoebus/applications/alarm/talk/TalkClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public TalkClient(final String server, final String config_name)
5252
Objects.requireNonNull(config_name);
5353

5454
final List<String> topics = List.of(config_name + AlarmSystem.TALK_TOPIC_SUFFIX);
55-
consumer = KafkaHelper.connectConsumer(server, topics, Collections.emptyList());
55+
consumer = KafkaHelper.connectConsumer(server, topics, Collections.emptyList(), AlarmSystem.kafka_properties);
5656

5757
thread = new Thread(this::run, "TalkClient");
5858
thread.setDaemon(true);

app/alarm/model/src/main/resources/alarm_preferences.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
# Kafka Server host:port
66
server=localhost:9092
77

8+
# A file to configure the properites of kafka clients
9+
kafka_properties=
10+
811
# Name of alarm tree root
912
config_name=Accelerator
1013

app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmClientModelDemo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class AlarmClientModelDemo
2424
@Test
2525
public void testClientModel() throws Exception
2626
{
27-
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT);
27+
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE);
2828
client.start();
2929
TimeUnit.SECONDS.sleep(4);
3030

app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmDemoSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ public class AlarmDemoSettings
1515
{
1616
public static final String SERVERS = "localhost:9092";
1717
public static final String ROOT = "Accelerator";
18+
public static final String KAFKA_PROPERTIES_FILE = "";
1819
}

app/alarm/model/src/test/java/org/phoebus/applications/alarm/AlarmModelSnapshotDemo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class AlarmModelSnapshotDemo
2424
public void testAlarmModelWriter() throws Exception
2525
{
2626
// Get alarm configuration
27-
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT);
27+
final AlarmClient client = new AlarmClient(AlarmDemoSettings.SERVERS, AlarmDemoSettings.ROOT, AlarmDemoSettings.KAFKA_PROPERTIES_FILE);
2828

2929
System.out.println("Wait for stable configuration, i.e. no changes for 4 seconds...");
3030

app/alarm/model/src/test/java/org/phoebus/applications/alarm/EnabledDeserializerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package src.test.java.org.phoebus.applications.alarm;
1+
package org.phoebus.applications.alarm;
22

33
import org.junit.Test;
44
import java.io.IOException;

0 commit comments

Comments
 (0)