Skip to content

Commit 8242631

Browse files
committed
Use the newly added beam mode event producer
1 parent 9db930c commit 8242631

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

src/alice/dip/AliDip2BK.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
/*************
2-
* cil
3-
**************/
4-
5-
/*
6-
* Main Class
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
78
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
812
*/
913

1014
package alice.dip;
@@ -18,6 +22,8 @@
1822
import java.util.Date;
1923
import java.util.Properties;
2024

25+
import alice.dip.kafka.BeamModeEventsKafkaProducer;
26+
2127
public class AliDip2BK implements Runnable {
2228
public static String Version = "2.1.2 22-Jul-2025";
2329
public static String DNSnode = "dipnsdev.cern.ch";
@@ -52,6 +58,7 @@ public class AliDip2BK implements Runnable {
5258
BookkeepingClient bookkeepingClient;
5359
StartOfRunKafkaConsumer kcs;
5460
EndOfRunKafkaConsumer kce;
61+
BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;
5562

5663
public AliDip2BK() {
5764
startDate = (new Date()).getTime();
@@ -83,6 +90,9 @@ public AliDip2BK() {
8390

8491
kce = new EndOfRunKafkaConsumer(dipMessagesProcessor);
8592

93+
beamModeEventsKafkaProducer = new BeamModeEventsKafkaProducer(AliDip2BK.bootstrapServers);
94+
dipMessagesProcessor.setEventsProducer(beamModeEventsKafkaProducer);
95+
8696
shutdownProc();
8797

8898
Thread t = new Thread(this);
@@ -145,6 +155,8 @@ public void run() {
145155
}
146156
dipMessagesProcessor.saveState();
147157
writeStat("AliDip2BK.stat", true);
158+
beamModeEventsKafkaProducer.close();
159+
log(4, "AliDip2BK", "Beam Mode Events Kafka Producer closed");
148160
}
149161
});
150162
}

src/alice/dip/DipMessagesProcessor.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
1-
/*************
2-
* cil
3-
**************/
1+
/**
2+
* @license
3+
* Copyright CERN and copyright holders of ALICE O2. This software is
4+
* distributed under the terms of the GNU General Public License v3 (GPL
5+
* Version 3), copied verbatim in the file "COPYING".
6+
*
7+
* See http://alice-o2.web.cern.ch/license for full licensing information.
8+
*
9+
* In applying this license CERN does not waive the privileges and immunities
10+
* granted to it by virtue of its status as an Intergovernmental Organization
11+
* or submit itself to any jurisdiction.
12+
*/
413

514
package alice.dip;
615

@@ -19,6 +28,7 @@
1928
import java.util.concurrent.ArrayBlockingQueue;
2029
import java.util.concurrent.BlockingQueue;
2130

31+
import alice.dip.kafka.BeamModeEventsKafkaProducer;
2232
import cern.dip.BadParameter;
2333
import cern.dip.DipData;
2434
import cern.dip.DipTimestamp;
@@ -46,19 +56,28 @@ public class DipMessagesProcessor implements Runnable {
4656
private BlockingQueue<MessageItem> outputQueue = new ArrayBlockingQueue<MessageItem>(100);
4757

4858
private final LuminosityManager luminosityManager;
59+
private BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;
4960

5061
public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManager luminosityManager) {
5162

5263
this.bookkeepingClient = bookkeepingClient;
5364
this.luminosityManager = luminosityManager;
54-
65+
this.beamModeEventsKafkaProducer = null;
5566
Thread t = new Thread(this);
5667
t.start();
5768

5869
currentAlice = new AliceInfoObj();
5970
loadState();
6071
}
6172

73+
/**
74+
* Setter of events producer
75+
* @param beamModeEventsKafkaProducer - instance of BeamModeEventsKafkaProducer to be used to send events
76+
*/
77+
public void setEventsProducer(BeamModeEventsKafkaProducer beamModeEventsKafkaProducer) {
78+
this.beamModeEventsKafkaProducer = beamModeEventsKafkaProducer;
79+
}
80+
6281
/*
6382
* This method is used for receiving DipData messages from the Dip Client
6483
*/
@@ -399,6 +418,9 @@ public void newSafeMode(long time, int val) {
399418
} else {
400419

401420
currentFill.setBeamMode(time, "LOST BEAMS");
421+
if (beamModeEventsKafkaProducer != null) {
422+
beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
423+
}
402424
AliDip2BK.log(5, "ProcData.newSafeBeams", " CHANGE BEAM MODE TO LOST BEAMS !!! ");
403425
}
404426

@@ -580,6 +602,9 @@ public void newBeamMode(long date, String BeamMode) {
580602
);
581603
bookkeepingClient.updateLhcFill(currentFill);
582604
saveState();
605+
if (beamModeEventsKafkaProducer != null) {
606+
beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, date);
607+
}
583608
} else {
584609
currentFill.endedTime = date;
585610
bookkeepingClient.updateLhcFill(currentFill);

0 commit comments

Comments
 (0)