Skip to content

Commit 0cadf49

Browse files
authored
Merge pull request #3647 from ControlSystemStudio/kafka_update
Kafka update
2 parents 7101978 + 7e1f486 commit 0cadf49

File tree

4 files changed

+105
-27
lines changed

4 files changed

+105
-27
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,4 @@ docs/source/_ext/**/*.pyc
131131

132132
# Ignore pip installation metadata
133133
docs/phoebus_docs.egg-info/
134+
*.pyc

app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,12 @@ public static Consumer<String, String> connectConsumer(final String kafka_server
6363
Properties kafka_props = loadPropsFromFile(properties_file);
6464
kafka_props.put("bootstrap.servers", kafka_servers);
6565

66-
if (!kafka_props.containsKey("group.id")){
67-
// API requires for Consumer to be in a group.
68-
// Each alarm client must receive all updates,
69-
// cannot balance updates across a group
70-
// --> Use unique group for each client
71-
final String group_id = "Alarm-" + UUID.randomUUID();
72-
kafka_props.put("group.id", group_id);
73-
}
66+
// API requires for Consumer to be in a group.
67+
// Each alarm client must receive all updates,
68+
// cannot balance updates across a group
69+
// --> Use unique group for each client
70+
final String group_id = "Alarm-" + UUID.randomUUID();
71+
kafka_props.put("group.id", group_id);
7472

7573
logger.fine(kafka_props.getProperty("group.id") + " subscribes to " + kafka_servers + " for " + topics);
7674

services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
import java.time.Instant;
77
import java.time.temporal.ChronoUnit;
88
import java.util.Properties;
9+
import java.util.UUID;
910
import java.util.concurrent.CountDownLatch;
10-
import java.util.concurrent.TimeUnit;
1111
import java.util.logging.Level;
1212
import java.util.regex.Matcher;
1313
import java.util.regex.Pattern;
@@ -19,12 +19,15 @@
1919
import org.apache.kafka.streams.KeyValue;
2020
import org.apache.kafka.streams.StreamsBuilder;
2121
import org.apache.kafka.streams.StreamsConfig;
22+
import org.apache.kafka.streams.kstream.Branched;
2223
import org.apache.kafka.streams.kstream.Consumed;
2324
import org.apache.kafka.streams.kstream.KStream;
25+
import org.apache.kafka.streams.kstream.Named;
2426
import org.apache.kafka.streams.kstream.Transformer;
2527
import org.apache.kafka.streams.kstream.TransformerSupplier;
2628
import org.apache.kafka.streams.processor.ProcessorContext;
2729
import org.apache.kafka.streams.processor.TimestampExtractor;
30+
import org.phoebus.applications.alarm.AlarmSystemConstants;
2831
import org.phoebus.applications.alarm.client.KafkaHelper;
2932
import org.phoebus.applications.alarm.messages.AlarmConfigMessage;
3033
import org.phoebus.applications.alarm.messages.AlarmMessage;
@@ -71,14 +74,19 @@ public void run() {
7174
Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties",""));
7275
kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-"+topic+"-alarm-messages");
7376

74-
if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){
75-
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
76-
props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
77-
} else {
78-
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
79-
}
80-
81-
77+
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
78+
props.getOrDefault(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
79+
80+
// API requires for Consumer to be in a group.
81+
// Each alarm client must receive all updates,
82+
// cannot balance updates across a group
83+
// --> Use unique group for each client
84+
final String group_id = "Alarm-" + UUID.randomUUID();
85+
kafkaProps.put("group.id", group_id);
86+
87+
AlarmSystemConstants.logger.fine(kafkaProps.getProperty("group.id") + " subscribes to "
88+
+ kafkaProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) + " for " + topic);
89+
8290
final String indexDateSpanUnits = props.getProperty("date_span_units");
8391
final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names"));
8492

@@ -112,14 +120,15 @@ public long extract(ConsumerRecord<Object, Object> record, long previousTimestam
112120
return new KeyValue<String, AlarmMessage>(key, value);
113121
});
114122

115-
@SuppressWarnings("unchecked")
116-
KStream<String, AlarmMessage>[] alarmBranches = alarms.branch((k,v) -> k.startsWith("state"),
117-
(k,v) -> k.startsWith("config"),
118-
(k,v) -> false
119-
);
120-
121-
processAlarmStateStream(alarmBranches[0], props);
122-
processAlarmConfigurationStream(alarmBranches[1], props);
123+
alarms.split(Named.as("alarm-"))
124+
.branch((k, v) -> k.startsWith("state"),
125+
Branched.withConsumer(alarmStateStream -> processAlarmStateStream(alarmStateStream)))
126+
.branch((k, v) -> k.startsWith("config"),
127+
Branched.withConsumer(alarmConfigStream -> processAlarmConfigurationStream(alarmConfigStream)))
128+
.defaultBranch(Branched.withConsumer(stream -> {
129+
// Log each unmatched key in the default branch
130+
stream.foreach((k, v) -> logger.warning("Unknown alarm message type for key: " + k));
131+
}));
123132

124133
final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProps);
125134
final CountDownLatch latch = new CountDownLatch(1);
@@ -143,7 +152,7 @@ public void run() {
143152
System.exit(0);
144153
}
145154

146-
private void processAlarmStateStream(KStream<String, AlarmMessage> alarmStateBranch, Properties props) {
155+
private void processAlarmStateStream(KStream<String, AlarmMessage> alarmStateBranch) {
147156

148157
KStream<String, AlarmStateMessage> transformedAlarms = alarmStateBranch
149158
.transform(new TransformerSupplier<String, AlarmMessage, KeyValue<String, AlarmStateMessage>>() {
@@ -193,7 +202,7 @@ public void close() {
193202

194203
}
195204

196-
private void processAlarmConfigurationStream(KStream<String, AlarmMessage> alarmConfigBranch, Properties props) {
205+
private void processAlarmConfigurationStream(KStream<String, AlarmMessage> alarmConfigBranch) {
197206
KStream<String, AlarmConfigMessage> alarmConfigMessages = alarmConfigBranch.transform(new TransformerSupplier<String, AlarmMessage, KeyValue<String,AlarmConfigMessage>>() {
198207

199208
@Override
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
version: '2.2'
2+
3+
services:
4+
kafka1:
5+
image: confluentinc/cp-kafka:8.1.0
6+
container_name: kafka1
7+
ports:
8+
- "9192:9192"
9+
environment:
10+
CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn'
11+
KAFKA_NODE_ID: 1
12+
KAFKA_PROCESS_ROLES: broker,controller
13+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193
14+
KAFKA_LISTENERS: PLAINTEXT://:9192,CONTROLLER://:9193
15+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9192
16+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
17+
KAFKA_LOG_DIRS: /var/lib/kafka/data
18+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
19+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
20+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
21+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
22+
volumes:
23+
- kafka1_data:/var/lib/kafka/data
24+
25+
kafka2:
26+
image: confluentinc/cp-kafka:8.1.0
27+
container_name: kafka2
28+
ports:
29+
- "9292:9292"
30+
environment:
31+
CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn'
32+
KAFKA_NODE_ID: 2
33+
KAFKA_PROCESS_ROLES: broker,controller
34+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193
35+
KAFKA_LISTENERS: PLAINTEXT://:9292,CONTROLLER://:9193
36+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9292
37+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
38+
KAFKA_LOG_DIRS: /var/lib/kafka/data
39+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
40+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
41+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
42+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
43+
volumes:
44+
- kafka2_data:/var/lib/kafka/data
45+
46+
kafka3:
47+
image: confluentinc/cp-kafka:8.1.0
48+
container_name: kafka3
49+
ports:
50+
- "9392:9392"
51+
environment:
52+
CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn'
53+
KAFKA_NODE_ID: 3
54+
KAFKA_PROCESS_ROLES: broker,controller
55+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193
56+
KAFKA_LISTENERS: PLAINTEXT://:9392,CONTROLLER://:9193
57+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9392
58+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
59+
KAFKA_LOG_DIRS: /var/lib/kafka/data
60+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
61+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
62+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
63+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
64+
volumes:
65+
- kafka3_data:/var/lib/kafka/data
66+
67+
volumes:
68+
kafka1_data:
69+
kafka2_data:
70+
kafka3_data:

0 commit comments

Comments
 (0)