diff --git a/.gitignore b/.gitignore index 50217ea984..3d835da330 100644 --- a/.gitignore +++ b/.gitignore @@ -131,3 +131,4 @@ docs/source/_ext/**/*.pyc # Ignore pip installation metadata docs/phoebus_docs.egg-info/ +*.pyc diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java index 17fd53d4a3..6aeaf32cd0 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java @@ -63,14 +63,12 @@ public static Consumer connectConsumer(final String kafka_server Properties kafka_props = loadPropsFromFile(properties_file); kafka_props.put("bootstrap.servers", kafka_servers); - if (!kafka_props.containsKey("group.id")){ - // API requires for Consumer to be in a group. - // Each alarm client must receive all updates, - // cannot balance updates across a group - // --> Use unique group for each client - final String group_id = "Alarm-" + UUID.randomUUID(); - kafka_props.put("group.id", group_id); - } + // API requires for Consumer to be in a group. + // Each alarm client must receive all updates, + // cannot balance updates across a group + // --> Use unique group for each client + final String group_id = "Alarm-" + UUID.randomUUID(); + kafka_props.put("group.id", group_id); logger.fine(kafka_props.getProperty("group.id") + " subscribes to " + kafka_servers + " for " + topics); diff --git a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java index 73ab6a7cdb..1fb346636b 100644 --- a/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java +++ b/services/alarm-logger/src/main/java/org/phoebus/alarm/logging/AlarmMessageLogger.java @@ -6,8 +6,8 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -19,12 +19,15 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Branched; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.phoebus.applications.alarm.AlarmSystemConstants; import org.phoebus.applications.alarm.client.KafkaHelper; import org.phoebus.applications.alarm.messages.AlarmConfigMessage; import org.phoebus.applications.alarm.messages.AlarmMessage; @@ -71,14 +74,19 @@ public void run() { Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties","")); kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-"+topic+"-alarm-messages"); - if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){ - kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, - props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); - } else { - kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - } - - + kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, + props.getOrDefault(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); + + // API requires for Consumer to be in a group. + // Each alarm client must receive all updates, + // cannot balance updates across a group + // --> Use unique group for each client + final String group_id = "Alarm-" + UUID.randomUUID(); + kafkaProps.put("group.id", group_id); + + AlarmSystemConstants.logger.fine(kafkaProps.getProperty("group.id") + " subscribes to " + + kafkaProps.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) + " for " + topic); + final String indexDateSpanUnits = props.getProperty("date_span_units"); final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names")); @@ -112,14 +120,15 @@ public long extract(ConsumerRecord record, long previousTimestam return new KeyValue(key, value); }); - @SuppressWarnings("unchecked") - KStream[] alarmBranches = alarms.branch((k,v) -> k.startsWith("state"), - (k,v) -> k.startsWith("config"), - (k,v) -> false - ); - - processAlarmStateStream(alarmBranches[0], props); - processAlarmConfigurationStream(alarmBranches[1], props); + alarms.split(Named.as("alarm-")) + .branch((k, v) -> k.startsWith("state"), + Branched.withConsumer(alarmStateStream -> processAlarmStateStream(alarmStateStream))) + .branch((k, v) -> k.startsWith("config"), + Branched.withConsumer(alarmConfigStream -> processAlarmConfigurationStream(alarmConfigStream))) + .defaultBranch(Branched.withConsumer(stream -> { + // Log each unmatched key in the default branch + stream.foreach((k, v) -> logger.warning("Unknown alarm message type for key: " + k)); + })); final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProps); final CountDownLatch latch = new CountDownLatch(1); @@ -143,7 +152,7 @@ public void run() { System.exit(0); } - private void processAlarmStateStream(KStream alarmStateBranch, Properties props) { + private void processAlarmStateStream(KStream alarmStateBranch) { KStream transformedAlarms = alarmStateBranch .transform(new TransformerSupplier>() { @@ -193,7 +202,7 @@ public void close() { } - private void processAlarmConfigurationStream(KStream alarmConfigBranch, Properties props) { + private void processAlarmConfigurationStream(KStream alarmConfigBranch) { KStream alarmConfigMessages = alarmConfigBranch.transform(new TransformerSupplier>() { @Override diff --git a/services/alarm-server/src/test/resources/docker/docker-compose-kafka-cluster.yml b/services/alarm-server/src/test/resources/docker/docker-compose-kafka-cluster.yml new file mode 100644 index 0000000000..0ceb8f070d --- /dev/null +++ b/services/alarm-server/src/test/resources/docker/docker-compose-kafka-cluster.yml @@ -0,0 +1,70 @@ +version: '2.2' + +services: + kafka1: + image: confluentinc/cp-kafka:8.1.0 + container_name: kafka1 + ports: + - "9192:9192" + environment: + CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn' + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193 + KAFKA_LISTENERS: PLAINTEXT://:9192,CONTROLLER://:9193 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9192 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /var/lib/kafka/data + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 + volumes: + - kafka1_data:/var/lib/kafka/data + + kafka2: + image: confluentinc/cp-kafka:8.1.0 + container_name: kafka2 + ports: + - "9292:9292" + environment: + CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn' + KAFKA_NODE_ID: 2 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193 + KAFKA_LISTENERS: PLAINTEXT://:9292,CONTROLLER://:9193 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9292 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /var/lib/kafka/data + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 + volumes: + - kafka2_data:/var/lib/kafka/data + + kafka3: + image: confluentinc/cp-kafka:8.1.0 + container_name: kafka3 + ports: + - "9392:9392" + environment: + CLUSTER_ID: 'b1a2c3d4e5f6g7h8i9j0klmn' + KAFKA_NODE_ID: 3 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9193,2@kafka2:9193,3@kafka3:9193 + KAFKA_LISTENERS: PLAINTEXT://:9392,CONTROLLER://:9193 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9392 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /var/lib/kafka/data + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 + volumes: + - kafka3_data:/var/lib/kafka/data + +volumes: + kafka1_data: + kafka2_data: + kafka3_data: \ No newline at end of file