Skip to content

Commit 709c8fb

Browse files
authored
[O2B-1474] Add kafka producer for beam mode changes and use proto for published events (#15)
* adds a Kafka interface for the BKP-LHC-Client project to facilitate the re-use of producers * adds BeamMode producers which is to be used for sending events via Kafka as proto objects on each `BeamMode` change but also for unsafe beams during the stable beams mode. * adds proto files to be used for building the events that are to be sent * as junit plugin for testing and update GH actions to run mvn tests
1 parent 7e09c5a commit 709c8fb

File tree

12 files changed

+639
-57
lines changed

12 files changed

+639
-57
lines changed

.github/workflows/maven-multi-os.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ jobs:
3333
elif [[ "${{ runner.os }}" == "macOS" ]]; then
3434
mvn validate -Dos.version=osx-x86_64
3535
fi
36+
- name: Run Tests with Maven
37+
run: |
38+
if [[ "${{ runner.os }}" == "Linux" ]]; then
39+
mvn test -Dos.version=linux-x86_64
40+
elif [[ "${{ runner.os }}" == "macOS" ]]; then
41+
mvn test -Dos.version=osx-x86_64
42+
fi
3643
- name: Package with Maven (Fat JAR)
3744
run: |
3845
if [[ "${{ runner.os }}" == "Linux" ]]; then

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,28 @@
22

33
Repository based on work from @iclegrand in repository: https://github.com/iclegrand/AliDip2BK
44

5-
Projects consumes selected messages from the CERN DIP system (LHC & ALICE -DCS) and publishes them into the O2 systems. A detailed description for this project is provided by Roberto in this document:
5+
The BKP-LHC Client is a java based application which uses the CERN DIP `jar` dependency to consume events from desired tracks. These events are then either:
6+
- published on O2 Kafka Topics to be consumed further by O2 applications (e.g. ECS)
7+
- updates the O2 Bookkeeping application via their HTTP endpoints.
8+
9+
A detailed description for this project is provided by Roberto in this document:
610
https://codimd.web.cern.ch/G0TSXqA1R8iPqWw2w2wuew
711

12+
### Published Events
13+
Currently the BKP-LHC-Client publishes on Kafka (topic: "dip.lhc.beam_mode") events for the start and end of stable beams in the format of `Ev_BeamModeEvent`. The proto file's source of truth is within the [Control Repository](https://github.com/AliceO2Group/Control/blob/master/common/protos/events.proto)
14+
815
### Requirements
916
- This program requires java 11 on a 64 bit system (this is a constrain from the DIP library)
1017
- maven
18+
-
19+
### Configuration
20+
The run configuration is defined in the `AliDip2BK.properties` file.
1121

1222
### Maven Commands for dev,tst,deployments
1323
```bash
1424
mvn <clean> compile -Dos.version={os_version}
1525
mvn <clean> package -Dos.version={os_version}
26+
mvn tst -Dos.version={os_version}
1627
```
1728

1829
E.g. os_version `macosx-x86_64`

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<maven.compiler.target>16</maven.compiler.target>
1414
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1515
<os.version>linux-x86_64</os.version>
16-
<cern.dip.version>5.7.0</cern.dip.version>
16+
<cern.dip.version>2.7.0</cern.dip.version>
1717
<protobuf.version>4.29.3</protobuf.version>
1818
<kafka.version>3.1.0</kafka.version>
1919
<slf4j-api.version>1.7.30</slf4j-api.version>
@@ -47,6 +47,13 @@
4747
<groupId>org.slf4j</groupId>
4848
<artifactId>slf4j-simple</artifactId>
4949
<version>${slf4j-simple.version}</version>
50+
</dependency>
51+
<!-- Test Dependencies -->
52+
<dependency>
53+
<groupId>org.junit.jupiter</groupId>
54+
<artifactId>junit-jupiter</artifactId>
55+
<version>5.13.4</version>
56+
<scope>test</scope>
5057
</dependency>
5158
</dependencies>
5259

src/main/java/alice/dip/AliDip2BK.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
/*************
2-
* cil
3-
**************/
4-
5-
/*
6-
* Main Class
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
78
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
812
*/
9-
1013
package alice.dip;
1114

1215
import java.io.BufferedWriter;
@@ -18,8 +21,10 @@
1821
import java.util.Date;
1922
import java.util.Properties;
2023

24+
import alice.dip.beam.mode.BeamModeEventsKafkaProducer;
25+
2126
public class AliDip2BK implements Runnable {
22-
public static String Version = "2.1.2 22-Jul-2025";
27+
public static String Version = "3.0.0 13-Oct-2025";
2328
public static String DNSnode = "dipnsdev.cern.ch";
2429
public static String[] endFillCases = {"CUCU"};
2530
public static boolean LIST_PARAM = false;
@@ -52,6 +57,7 @@ public class AliDip2BK implements Runnable {
5257
BookkeepingClient bookkeepingClient;
5358
StartOfRunKafkaConsumer kcs;
5459
EndOfRunKafkaConsumer kce;
60+
BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;
5561

5662
public AliDip2BK() {
5763
startDate = (new Date()).getTime();
@@ -82,6 +88,8 @@ public AliDip2BK() {
8288
kcs = new StartOfRunKafkaConsumer(dipMessagesProcessor);
8389

8490
kce = new EndOfRunKafkaConsumer(dipMessagesProcessor);
91+
beamModeEventsKafkaProducer = new BeamModeEventsKafkaProducer(AliDip2BK.bootstrapServers);
92+
dipMessagesProcessor.setEventsProducer(beamModeEventsKafkaProducer);
8593

8694
shutdownProc();
8795

@@ -145,6 +153,8 @@ public void run() {
145153
}
146154
dipMessagesProcessor.saveState();
147155
writeStat("AliDip2BK.stat", true);
156+
beamModeEventsKafkaProducer.close();
157+
log(4, "AliDip2BK", "Beam Mode Events Kafka Producer closed");
148158
}
149159
});
150160
}

src/main/java/alice/dip/DipMessagesProcessor.java

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1-
/*************
2-
* cil
3-
**************/
4-
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
513
package alice.dip;
614

715
import java.io.BufferedWriter;
@@ -23,6 +31,8 @@
2331
import cern.dip.DipData;
2432
import cern.dip.TypeMismatch;
2533

34+
import alice.dip.beam.mode.BeamModeEventsKafkaProducer;
35+
2636
/*
2737
* Process dip messages received from the DipClient
2838
* Receives DipData messages in a blocking Queue and then process them asynchronously
@@ -45,11 +55,13 @@ public class DipMessagesProcessor implements Runnable {
4555
private BlockingQueue<MessageItem> outputQueue = new ArrayBlockingQueue<>(100);
4656

4757
private final LuminosityManager luminosityManager;
58+
private volatile BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;
4859

4960
public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManager luminosityManager) {
5061

5162
this.bookkeepingClient = bookkeepingClient;
5263
this.luminosityManager = luminosityManager;
64+
this.beamModeEventsKafkaProducer = null;
5365

5466
Thread t = new Thread(this);
5567
t.start();
@@ -58,6 +70,14 @@ public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManag
5870
loadState();
5971
}
6072

73+
/**
74+
* Setter of events producer
75+
* @param beamModeEventsKafkaProducer - instance of BeamModeEventsKafkaProducer to be used to send events
76+
*/
77+
public void setEventsProducer(BeamModeEventsKafkaProducer beamModeEventsKafkaProducer) {
78+
this.beamModeEventsKafkaProducer = beamModeEventsKafkaProducer;
79+
}
80+
6181
/*
6282
* This method is used for receiving DipData messages from the Dip Client
6383
*/
@@ -299,25 +319,25 @@ private void handleSafeBeamMessage(DipData dipData) throws BadParameter, TypeMis
299319
if (currentFill == null) return;
300320

301321
String bm = currentFill.getBeamMode();
302-
303-
if (bm.contentEquals("STABLE BEAMS")) {
304-
AliDip2BK.log(
305-
0,
306-
"ProcData.newSafeBeams",
307-
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
308-
);
309-
310-
if (!isBeam1 || !isBeam2) {
322+
AliDip2BK.log(
323+
1,
324+
"ProcData.newSafeBeams",
325+
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
326+
);
327+
if (bm != null) {
328+
if ((bm.contentEquals("STABLE BEAMS") && (!isBeam1 || !isBeam2))) {
311329
currentFill.setBeamMode(time, "LOST BEAMS");
330+
if (this.beamModeEventsKafkaProducer != null) {
331+
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
332+
}
312333
AliDip2BK.log(5, "ProcData.newSafeBeams", " CHANGE BEAM MODE TO LOST BEAMS !!! ");
334+
} else if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
335+
currentFill.setBeamMode(time, "STABLE BEAMS");
336+
if (this.beamModeEventsKafkaProducer != null) {
337+
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
338+
}
339+
AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS ");
313340
}
314-
315-
return;
316-
}
317-
318-
if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
319-
currentFill.setBeamMode(time, "STABLE BEAMS");
320-
AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS ");
321341
}
322342
}
323343

@@ -569,35 +589,18 @@ public void newFillNo(long date, String strFno, String par1, String par2, String
569589
}
570590

571591
public void newBeamMode(long date, String BeamMode) {
572-
573592
if (currentFill != null) {
593+
AliDip2BK.log(
594+
2,
595+
"ProcData.newBeamMode",
596+
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
597+
);
574598
currentFill.setBeamMode(date, BeamMode);
599+
bookkeepingClient.updateLhcFill(currentFill);
600+
saveState();
575601

576-
int mc = -1;
577-
for (int i = 0; i < AliDip2BK.endFillCases.length; i++) {
578-
if (AliDip2BK.endFillCases[i].equalsIgnoreCase(BeamMode)) mc = i;
579-
}
580-
if (mc < 0) {
581-
582-
AliDip2BK.log(
583-
2,
584-
"ProcData.newBeamMode",
585-
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
586-
);
587-
bookkeepingClient.updateLhcFill(currentFill);
588-
saveState();
589-
} else {
590-
currentFill.endedTime = date;
591-
bookkeepingClient.updateLhcFill(currentFill);
592-
if (AliDip2BK.KEEP_FILLS_HISTORY_DIRECTORY != null) {
593-
writeFillHistFile(currentFill);
594-
}
595-
AliDip2BK.log(
596-
3,
597-
"ProcData.newBeamMode",
598-
"CLOSE Fill_NO=" + currentFill.fillNo + " Based on new beam mode=" + BeamMode
599-
);
600-
currentFill = null;
602+
if (this.beamModeEventsKafkaProducer != null) {
603+
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, date);
601604
}
602605
} else {
603606
AliDip2BK.log(4, "ProcData.newBeamMode", " ERROR new beam mode=" + BeamMode + " NO FILL NO for it");
@@ -753,7 +756,7 @@ private void handleBookkeepingCtpClockMessage(DipData dipData) throws BadParamet
753756
var phaseShiftBeam2 = dipData.extractFloat("PhaseShift_Beam2");
754757

755758
AliDip2BK.log(
756-
2,
759+
0,
757760
"ProcData.dispatch",
758761
" Bookkeeping CTP Clock: PhaseShift_Beam1=" + phaseShiftBeam1 + " PhaseShift_Beam2=" + phaseShiftBeam2
759762
);
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
package alice.dip.adapters;
15+
16+
import alice.dip.enums.BeamModeEnum;
17+
18+
/**
19+
* Adapter class to convert between string representations of beam modes and the BeamModeEnum.
20+
*/
21+
public class BeamModeProtoAdapter {
22+
23+
/**
24+
* Returns the enum constant matching the given string, or UNKNOWN if not found.
25+
* Accepts both space and underscore separated names, case-insensitive.
26+
* @param beamMode The beam mode string to convert.
27+
* @return The corresponding BeamModeEnum constant, or UNKNOWN if not recognized.
28+
*/
29+
public static BeamModeEnum fromStringToEnum(String beamMode) {
30+
if (beamMode == null || beamMode.trim().isEmpty()) {
31+
return BeamModeEnum.UNKNOWN;
32+
}
33+
for (BeamModeEnum value : BeamModeEnum.values()) {
34+
if (value.label.equalsIgnoreCase(beamMode)) {
35+
return value;
36+
}
37+
}
38+
return BeamModeEnum.UNKNOWN;
39+
}
40+
}

0 commit comments

Comments
 (0)