Skip to content

Commit 0431544

Browse files
committed
make borzoi use start/stop time instead of runstate
1 parent bae766e commit 0431544

File tree

2 files changed

+53
-40
lines changed

2 files changed

+53
-40
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Borzoi
22

3-
Borzoi like running. This listens to ISISDAE's run state and pushes start/stop Flatbuffer blobs to Kafka accordingly.
3+
Borzoi like running. This listens to ISISDAE's start/stop times and pushes start/stop Flatbuffer blobs to Kafka accordingly.

main.py

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
import uuid
66

7-
from aioca import caget, camonitor
7+
from aioca import camonitor
88
from aiokafka import AIOKafkaProducer
99
from epicscorelibs.ca.dbr import DBR_CHAR_BYTES, ca_bytes
1010
from ibex_non_ca_helpers.compress_hex import dehex_decompress_and_dejson
@@ -27,64 +27,68 @@ def __init__(
2727
self.instrument_name = instrument_name
2828
self.topic = topic
2929
self.current_run_number = None
30-
self.current_start_time_ms = None
30+
self.current_start_time = None
31+
self.current_stop_time = None
3132

3233
async def set_up_monitors(self) -> None:
3334
logger.info("Setting up monitors")
3435
camonitor(
3536
f"{self.prefix}CS:BLOCKSERVER:BLOCKNAMES",
3637
callback=self._update_blocks,
38+
all_updates=True,
3739
datatype=DBR_CHAR_BYTES,
3840
)
3941
camonitor(
40-
f"{self.prefix}DAE:RUNSTATE",
41-
callback=self._react_to_runstate_change,
42+
f"{self.prefix}DAE:RUNNUMBER",
43+
callback=self._update_run_number,
4244
all_updates=True,
4345
datatype=str,
4446
)
4547
camonitor(
46-
f"{self.prefix}DAE:RUNNUMBER",
47-
callback=self._update_run_number,
48+
f"{self.prefix}DAE:START_TIME",
49+
callback=self.construct_and_send_runstart,
4850
all_updates=True,
49-
datatype=int,
51+
datatype=float,
5052
)
5153
camonitor(
52-
f"{self.prefix}DAE:START_TIME",
53-
callback=self._update_start_time_ms,
54+
f"{self.prefix}DAE:STOP_TIME",
55+
self.construct_and_send_runstop,
5456
all_updates=True,
55-
datatype=int,
57+
datatype=float,
5658
)
5759

5860
def _update_run_number(self, value: int) -> None:
5961
# Cache this as we want the run start message construction and production to be as fast as
6062
# possible so we don't miss events
61-
logger.debug(f"Run number updated to {value}")
63+
logger.info(f"Run number updated to {value}")
6264
self.current_run_number = value
6365

6466
def _update_start_time_ms(self, value: int) -> None:
6567
# Cache this as we want the run start message construction and production to be as fast as
6668
# possible so we don't miss events
67-
logger.debug(f"Run start time updated to {value} so changing it to ms ({value * 1000})")
68-
self.current_start_time_ms = value * 1000
69+
logger.info(f"Run start time updated to {value} so changing it to ms ({value * 1000})")
70+
self.current_start_time_ms = int(value) * 1000
6971

7072
def _update_blocks(self, value: ca_bytes) -> None:
7173
logger.debug(f"blocks_hexed: {value}")
7274
blocks_unhexed = dehex_decompress_and_dejson(bytes(value))
7375
logger.debug(f"blocks_unhexed: {blocks_unhexed}")
7476
self.blocks = [f"{self.prefix}CS:SB:{x}" for x in blocks_unhexed]
7577

76-
async def _react_to_runstate_change(self, value: str) -> None:
77-
logger.info(f"Runstate changed to {value}")
78+
async def construct_and_send_runstart(self, value: float) -> None:
79+
if self.current_start_time is None:
80+
logger.info("Initial update for start time - not sending run start")
81+
self.current_start_time = value
82+
return
7883

79-
if value == "BEGINNING":
80-
self.current_job_id = str(uuid.uuid4())
81-
await self.construct_and_send_runstart(self.current_job_id)
82-
elif value == "ENDING":
83-
await self.construct_and_send_runstop(self.current_job_id)
84+
if value == self.current_start_time or value is None:
85+
logger.error("run start time is the same as cached or invalid. ignoring update")
86+
return
8487

85-
async def construct_and_send_runstart(self, job_id: str) -> None:
86-
logger.info(f"Sending run start with job_id: {job_id}")
87-
start_time_ms = self.current_start_time_ms
88+
self.current_start_time = value
89+
self.current_job_id = str(uuid.uuid4())
90+
logger.info(f"Sending run start with job_id: {self.current_job_id}")
91+
start_time_ms = int(self.current_start_time) * 1000
8892
logger.info(f"Start time: {start_time_ms}")
8993

9094
runnum = self.current_run_number
@@ -133,31 +137,40 @@ async def construct_and_send_runstart(self, job_id: str) -> None:
133137
filename = f"{self.instrument_name}{runnum}.nxs"
134138

135139
blob = serialise_pl72(
136-
job_id,
140+
self.current_job_id,
137141
filename=filename,
138142
start_time=start_time_ms,
139143
nexus_structure=json.dumps(nexus_structure),
140144
run_name=runnum,
145+
instrument_name=self.instrument_name,
141146
)
142147
await self.producer.send(self.topic, blob)
148+
logger.info(f"Sent {blob} blob")
149+
150+
async def construct_and_send_runstop(self, value: float) -> None:
151+
if self.current_stop_time is None:
152+
self.current_stop_time = value
153+
logger.info("Initial update for stop time - not sending run stop")
154+
return
155+
156+
if value == self.current_stop_time or value is None:
157+
logger.error("run stop time is the same as cached or invalid")
158+
return
143159

144-
async def construct_and_send_runstop(self, job_id: str) -> None:
145-
logger.info(f"Sending run stop with job_id: {job_id}")
146-
# stop_time only gets set to a non-zero value when the runstate goes back to SETUP.
147-
# This is dirty, but poll it every 0.5 seconds until it does.
148-
while (
149-
current_runstate := await caget(f"{self.prefix}DAE:RUNSTATE", datatype=str) != "SETUP"
150-
):
151-
logger.debug(f"Waiting for run state to go back to SETUP. Currently {current_runstate}")
152-
await asyncio.sleep(0.5)
153-
154-
stop_time_s = await caget(f"{self.prefix}DAE:STOP_TIME")
155-
if stop_time_s is None:
156-
logger.error(f"Failed to get stop time from {job_id}")
160+
if value == 0:
161+
# This happens when a new run starts
162+
logger.debug("stop time set to 0")
163+
self.current_stop_time = value
157164
return
158-
stop_time_ms = int(stop_time_s * 1000)
165+
166+
self.current_stop_time = value
167+
logger.info(f"Sending run stop with job_id: {self.current_job_id}")
168+
169+
stop_time_ms = int(value * 1000)
159170
logger.info(f"stop time: {stop_time_ms}")
160-
blob = serialise_6s4t(job_id, stop_time=stop_time_ms, command_id=self.current_job_id)
171+
blob = serialise_6s4t(
172+
self.current_job_id, stop_time=stop_time_ms, command_id=self.current_job_id
173+
)
161174
await self.producer.send(self.topic, blob)
162175

163176

0 commit comments

Comments
 (0)