Skip to content

Commit 9db930c

Browse files
committed
Add producer dedicated to beam mode change tracks
1 parent 04d4db5 commit 9db930c

File tree

1 file changed

+64
-0
lines changed

1 file changed

+64
-0
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
package alice.dip.kafka;
15+
16+
import org.apache.kafka.common.serialization.ByteArraySerializer;
17+
import org.apache.kafka.common.serialization.IntegerSerializer;
18+
19+
import alice.dip.AliDip2BK;
20+
import alice.dip.LhcInfoObj;
21+
import alice.dip.kafka.events.Events;
22+
23+
/**
24+
* Kafka producer for LHC Beam Mode events, serialized using Protocol Buffers.
25+
*/
26+
public class BeamModeEventsKafkaProducer extends KafkaProducerInterface<Integer, byte[]> {
27+
public static String KAFKA_PRODUCER_TOPIC_DIP = "dip.lhc.beam_mode";
28+
29+
/**
30+
* Constructor to create a BeamModeEventsKafkaProducer
31+
* @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port
32+
*/
33+
public BeamModeEventsKafkaProducer(String bootstrapServers) {
34+
super(bootstrapServers, KAFKA_PRODUCER_TOPIC_DIP, new IntegerSerializer(), new ByteArraySerializer());
35+
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Initialized producer for topic: " + KAFKA_PRODUCER_TOPIC_DIP);
36+
}
37+
38+
/**
39+
* Given a fill number for partitioning, a LhcInfoObj containing fill information,
40+
* and a timestamp, creates and sends a proto serialized Beam Mode Event to the Kafka topic.
41+
* @param fillNumber - fill number to be used for partition to ensure ordering
42+
* @param fill - LhcInfoObj containing fill information
43+
* @param timestamp - event timestamp at which the beam mode change event was received from DIP
44+
*/
45+
public void sendEvent(Integer fillNumber, LhcInfoObj fill, long timestamp) {
46+
Events.BeamInfo beamInfo = Events.BeamInfo.newBuilder()
47+
.setStableBeamsStart(fill.getStableBeamStart())
48+
.setStableBeamsEnd(fill.getStableBeamStop())
49+
.setFillNumber(fill.fillNo)
50+
.setFillingSchemeName(fill.LHCFillingSchemeName)
51+
.setBeamType(Events.BeamType.valueOf(fill.beamType))
52+
.build();
53+
54+
Events.Ev_BeamModeEvent event = Events.Ev_BeamModeEvent.newBuilder()
55+
.setBeamMode(fill.getBeamMode())
56+
.setTimestamp(timestamp)
57+
.setBeamInfo(beamInfo)
58+
.build();
59+
byte[] value = event.toByteArray();
60+
61+
send(fillNumber, value);
62+
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Sent Beam Mode event for fill " + fill.fillNo + " with mode " + fill.getBeamMode() + " at timestamp " + timestamp);
63+
}
64+
}

0 commit comments

Comments
 (0)