Skip to content

Commit 2982495

Browse files
committed
Add Kafka based producer for beam mode changes
1 parent 7e09c5a commit 2982495

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
package alice.dip.beam.mode;
15+
16+
17+
import org.apache.kafka.common.serialization.ByteArraySerializer;
18+
import org.apache.kafka.common.serialization.IntegerSerializer;
19+
20+
import alice.dip.adapters.BeamModeProtoAdapter;
21+
import alice.dip.AliDip2BK;
22+
import alice.dip.enums.BeamModeEnum;
23+
import alice.dip.LhcInfoObj;
24+
import alice.dip.kafka.KafkaProducerInterface;
25+
26+
import ch.cern.alice.o2.control.common.Common;
27+
import ch.cern.alice.o2.control.events.Events;
28+
29+
/**
30+
* Kafka producer for LHC Beam Mode events, serialized using Protocol Buffers.
31+
*/
32+
public class BeamModeEventsKafkaProducer extends KafkaProducerInterface<Integer, byte[]> {
33+
public static final String KAFKA_PRODUCER_TOPIC_DIP = "dip.lhc.beam_mode";
34+
35+
/**
36+
* Constructor to create a BeamModeEventsKafkaProducer
37+
* @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port
38+
*/
39+
public BeamModeEventsKafkaProducer(String bootstrapServers) {
40+
super(bootstrapServers, KAFKA_PRODUCER_TOPIC_DIP, new IntegerSerializer(), new ByteArraySerializer());
41+
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Initialized producer for topic: " + KAFKA_PRODUCER_TOPIC_DIP);
42+
}
43+
44+
/**
45+
* Given a fill number for partitioning, a LhcInfoObj containing fill information,
46+
* and a timestamp, creates and sends a proto serialized Beam Mode Event to the Kafka topic.
47+
* @param fillNumber - fill number to be used for partition to ensure ordering
48+
* @param fill - LhcInfoObj containing fill information
49+
* @param timestamp - event timestamp at which the beam mode change event was received from DIP
50+
*/
51+
public void sendEvent(Integer fillNumber, LhcInfoObj fill, long timestamp) {
52+
String beamModeStr = fill.getBeamMode();
53+
BeamModeEnum beamMode = BeamModeProtoAdapter.fromStringToEnum(beamModeStr);
54+
55+
Common.BeamInfo beamInfo = Common.BeamInfo.newBuilder()
56+
.setStableBeamsStart(fill.getStableBeamStart())
57+
.setStableBeamsEnd(fill.getStableBeamStop())
58+
.setFillNumber(fill.fillNo)
59+
.setFillingSchemeName(fill.LHCFillingSchemeName)
60+
.setBeamMode(Common.BeamMode.valueOf(beamMode.name()))
61+
.setBeamType(fill.beamType)
62+
.build();
63+
64+
Events.Ev_BeamModeEvent beamModeEvent = Events.Ev_BeamModeEvent.newBuilder()
65+
.setTimestamp(timestamp)
66+
.setBeamInfo(beamInfo)
67+
.build();
68+
69+
Events.Event event = Events.Event.newBuilder()
70+
.setTimestamp(timestamp)
71+
.setTimestampNano((timestamp) * 1000000)
72+
.setBeamModeEvent(beamModeEvent)
73+
.build();
74+
byte[] value = event.toByteArray();
75+
76+
send(fillNumber, value);
77+
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Sent Beam Mode event for fill " + fill.fillNo + " with mode " + fill.getBeamMode() + " at timestamp " + timestamp);
78+
}
79+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
package alice.dip.kafka;
15+
16+
import org.apache.kafka.clients.producer.ProducerConfig;
17+
import org.apache.kafka.clients.producer.KafkaProducer;
18+
import org.apache.kafka.clients.producer.ProducerRecord;
19+
import org.apache.kafka.common.serialization.Serializer;
20+
21+
import java.util.Properties;
22+
23+
/**
24+
* Generic Kafka Producer interface to send messages to a specified topic.
25+
* @param <K> - Type of the message key (to be used for partitioning)
26+
* @param <V> - Type of the message value (payload)
27+
*/
28+
public class KafkaProducerInterface<K, V> implements AutoCloseable {
29+
private final KafkaProducer<K, V> producer;
30+
private final String topic;
31+
32+
/**
33+
* Constructor to create a KafkaProducerInterface
34+
* @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port
35+
* @param topic - Kafka topic to which messages will be sent
36+
* @param keySerializer - Kafka supported serializer for the message key
37+
* @param valueSerializer - Kafka supported serializer for the message value
38+
*/
39+
public KafkaProducerInterface(String bootstrapServers, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
40+
this.topic = topic;
41+
Properties props = new Properties();
42+
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
43+
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
44+
this.producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
45+
}
46+
47+
/**
48+
* Send a message to the configured Kafka topic
49+
* @param key - message key for partitioning
50+
* @param value - message value (payload)
51+
*/
52+
public java.util.concurrent.Future<org.apache.kafka.clients.producer.RecordMetadata> send(K key, V value) {
53+
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
54+
return producer.send(record);
55+
}
56+
57+
/**
58+
* Method to close the Kafka producer instance
59+
*/
60+
@Override
61+
public void close() {
62+
producer.close();
63+
}
64+
}

0 commit comments

Comments
 (0)