Skip to content

Commit 1a31bbb

Browse files
committed
Make use of new producers
1 parent 460933c commit 1a31bbb

File tree

3 files changed

+124
-55
lines changed

3 files changed

+124
-55
lines changed

src/main/java/alice/dip/AliDip2BK.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
/*************
2-
* cil
3-
**************/
4-
5-
/*
6-
* Main Class
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
78
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
812
*/
9-
1013
package alice.dip;
1114

1215
import java.io.BufferedWriter;
@@ -18,8 +21,10 @@
1821
import java.util.Date;
1922
import java.util.Properties;
2023

24+
import alice.dip.beam.mode.BeamModeEventsKafkaProducer;
25+
2126
public class AliDip2BK implements Runnable {
22-
public static String Version = "2.1.2 22-Jul-2025";
27+
public static String Version = "3.0.0 13-Oct-2025";
2328
public static String DNSnode = "dipnsdev.cern.ch";
2429
public static String[] endFillCases = {"CUCU"};
2530
public static boolean LIST_PARAM = false;
@@ -52,6 +57,7 @@ public class AliDip2BK implements Runnable {
5257
BookkeepingClient bookkeepingClient;
5358
StartOfRunKafkaConsumer kcs;
5459
EndOfRunKafkaConsumer kce;
60+
BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;
5561

5662
public AliDip2BK() {
5763
startDate = (new Date()).getTime();
@@ -82,6 +88,8 @@ public AliDip2BK() {
8288
kcs = new StartOfRunKafkaConsumer(dipMessagesProcessor);
8389

8490
kce = new EndOfRunKafkaConsumer(dipMessagesProcessor);
91+
beamModeEventsKafkaProducer = new BeamModeEventsKafkaProducer(AliDip2BK.bootstrapServers);
92+
dipMessagesProcessor.setEventsProducer(beamModeEventsKafkaProducer);
8593

8694
shutdownProc();
8795

@@ -145,6 +153,8 @@ public void run() {
145153
}
146154
dipMessagesProcessor.saveState();
147155
writeStat("AliDip2BK.stat", true);
156+
beamModeEventsKafkaProducer.close();
157+
log(4, "AliDip2BK", "Beam Mode Events Kafka Producer closed");
148158
}
149159
});
150160
}

src/main/java/alice/dip/DipMessagesProcessor.java

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1-
/*************
2-
* cil
3-
**************/
4-
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
513
package alice.dip;
614

715
import java.io.BufferedWriter;
@@ -23,6 +31,8 @@
2331
import cern.dip.DipData;
2432
import cern.dip.TypeMismatch;
2533

34+
import alice.dip.beam.mode.BeamModeEventsKafkaProducer;
35+
2636
/*
2737
* Process dip messages received from the DipClient
2838
* Receives DipData messages in a blocking Queue and then process them asynchronously
@@ -45,11 +55,13 @@ public class DipMessagesProcessor implements Runnable {
4555
private BlockingQueue<MessageItem> outputQueue = new ArrayBlockingQueue<>(100);
4656

4757
private final LuminosityManager luminosityManager;
58+
private volatile BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;
4859

4960
public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManager luminosityManager) {
5061

5162
this.bookkeepingClient = bookkeepingClient;
5263
this.luminosityManager = luminosityManager;
64+
this.beamModeEventsKafkaProducer = null;
5365

5466
Thread t = new Thread(this);
5567
t.start();
@@ -58,6 +70,14 @@ public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManag
5870
loadState();
5971
}
6072

73+
/**
74+
* Setter of events producer
75+
* @param beamModeEventsKafkaProducer - instance of BeamModeEventsKafkaProducer to be used to send events
76+
*/
77+
public void setEventsProducer(BeamModeEventsKafkaProducer beamModeEventsKafkaProducer) {
78+
this.beamModeEventsKafkaProducer = beamModeEventsKafkaProducer;
79+
}
80+
6181
/*
6282
* This method is used for receiving DipData messages from the Dip Client
6383
*/
@@ -299,25 +319,25 @@ private void handleSafeBeamMessage(DipData dipData) throws BadParameter, TypeMis
299319
if (currentFill == null) return;
300320

301321
String bm = currentFill.getBeamMode();
302-
303-
if (bm.contentEquals("STABLE BEAMS")) {
304-
AliDip2BK.log(
305-
0,
306-
"ProcData.newSafeBeams",
307-
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
308-
);
309-
310-
if (!isBeam1 || !isBeam2) {
322+
AliDip2BK.log(
323+
1,
324+
"ProcData.newSafeBeams",
325+
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
326+
);
327+
if (bm != null) {
328+
if ((bm.contentEquals("STABLE BEAMS") && (!isBeam1 || !isBeam2))) {
311329
currentFill.setBeamMode(time, "LOST BEAMS");
330+
if (this.beamModeEventsKafkaProducer != null) {
331+
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
332+
}
312333
AliDip2BK.log(5, "ProcData.newSafeBeams", " CHANGE BEAM MODE TO LOST BEAMS !!! ");
334+
} else if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
335+
currentFill.setBeamMode(time, "STABLE BEAMS");
336+
if (this.beamModeEventsKafkaProducer != null) {
337+
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
338+
}
339+
AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS ");
313340
}
314-
315-
return;
316-
}
317-
318-
if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
319-
currentFill.setBeamMode(time, "STABLE BEAMS");
320-
AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS ");
321341
}
322342
}
323343

@@ -569,35 +589,18 @@ public void newFillNo(long date, String strFno, String par1, String par2, String
569589
}
570590

571591
public void newBeamMode(long date, String BeamMode) {
572-
573592
if (currentFill != null) {
593+
AliDip2BK.log(
594+
2,
595+
"ProcData.newBeamMode",
596+
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
597+
);
574598
currentFill.setBeamMode(date, BeamMode);
599+
bookkeepingClient.updateLhcFill(currentFill);
600+
saveState();
575601

576-
int mc = -1;
577-
for (int i = 0; i < AliDip2BK.endFillCases.length; i++) {
578-
if (AliDip2BK.endFillCases[i].equalsIgnoreCase(BeamMode)) mc = i;
579-
}
580-
if (mc < 0) {
581-
582-
AliDip2BK.log(
583-
2,
584-
"ProcData.newBeamMode",
585-
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
586-
);
587-
bookkeepingClient.updateLhcFill(currentFill);
588-
saveState();
589-
} else {
590-
currentFill.endedTime = date;
591-
bookkeepingClient.updateLhcFill(currentFill);
592-
if (AliDip2BK.KEEP_FILLS_HISTORY_DIRECTORY != null) {
593-
writeFillHistFile(currentFill);
594-
}
595-
AliDip2BK.log(
596-
3,
597-
"ProcData.newBeamMode",
598-
"CLOSE Fill_NO=" + currentFill.fillNo + " Based on new beam mode=" + BeamMode
599-
);
600-
currentFill = null;
602+
if (this.beamModeEventsKafkaProducer != null) {
603+
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, date);
601604
}
602605
} else {
603606
AliDip2BK.log(4, "ProcData.newBeamMode", " ERROR new beam mode=" + BeamMode + " NO FILL NO for it");
@@ -753,7 +756,7 @@ private void handleBookkeepingCtpClockMessage(DipData dipData) throws BadParamet
753756
var phaseShiftBeam2 = dipData.extractFloat("PhaseShift_Beam2");
754757

755758
AliDip2BK.log(
756-
2,
759+
0,
757760
"ProcData.dispatch",
758761
" Bookkeeping CTP Clock: PhaseShift_Beam1=" + phaseShiftBeam1 + " PhaseShift_Beam2=" + phaseShiftBeam2
759762
);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
13+
14+
package alice.dip.adapters;
15+
16+
import org.junit.jupiter.api.Test;
17+
import static org.junit.jupiter.api.Assertions.*;
18+
19+
import alice.dip.enums.BeamModeEnum;
20+
21+
/**
22+
* Unit tests for the BeamModeProtoAdapter class.
23+
*/
24+
class BeamModeProtoAdapterTest {
25+
26+
@Test
27+
void shouldReturnUnknownToEmptyStrings() {
28+
assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum(""));
29+
assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum(" "));
30+
}
31+
32+
@Test
33+
void shouldReturnBeamModeUnknownToNull() {
34+
assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum(null));
35+
}
36+
37+
@Test
38+
void shouldReturnBeamModeUnknownToInvalidStrings() {
39+
assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum("INVALID"));
40+
assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum("SETUP_BEAM"));
41+
assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum("injection physics beam extra"));
42+
}
43+
44+
@Test
45+
void shouldReturnCorrectBeamModeEnumForValidStrings() {
46+
assertEquals(BeamModeEnum.NO_BEAM, BeamModeProtoAdapter.fromStringToEnum("NO BEAM"));
47+
assertEquals(BeamModeEnum.INJECTION_PHYSICS_BEAM, BeamModeProtoAdapter.fromStringToEnum("INJECTION PHYSICS BEAM"));
48+
assertEquals(BeamModeEnum.INJECTION_PHYSICS_BEAM, BeamModeProtoAdapter.fromStringToEnum("injection physics beam"));
49+
assertEquals(BeamModeEnum.LOST_BEAMS, BeamModeProtoAdapter.fromStringToEnum("LOST BEAMS"));
50+
51+
for (BeamModeEnum mode : BeamModeEnum.values()) {
52+
assertEquals(mode, BeamModeProtoAdapter.fromStringToEnum(mode.label));
53+
assertEquals(mode, BeamModeProtoAdapter.fromStringToEnum(mode.label.toLowerCase()));
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)