1- import os
2- import uuid
3- import logging
41import asyncio
52import json
3+ import logging
4+ import os
5+ import uuid
66
7- from aioca import caget , camonitor
7+ from aioca import camonitor
88from aiokafka import AIOKafkaProducer
9- from epicscorelibs .ca .dbr import DBR_CHAR_BYTES
10- from ibex_non_ca_helpers .hex import dehex_decompress_and_dejson
11-
9+ from epicscorelibs .ca .dbr import DBR_CHAR_BYTES , ca_bytes
10+ from ibex_non_ca_helpers .compress_hex import dehex_decompress_and_dejson
1211from streaming_data_types .run_start_pl72 import serialise_pl72
1312from streaming_data_types .run_stop_6s4t import serialise_6s4t
1413
1918class RunStarter :
2019 def __init__ (
2120 self , prefix : str , instrument_name : str , producer : AIOKafkaProducer , topic : str
22- ):
21+ ) -> None :
2322 self .producer = None
2423 self .prefix = prefix
2524 self .blocks = []
2625 self .current_job_id = ""
2726 self .producer = producer
2827 self .instrument_name = instrument_name
2928 self .topic = topic
29+ self .current_run_number = None
30+ self .current_start_time = None
31+ self .current_stop_time = None
3032
31- async def set_up_monitors (self ):
33+ async def set_up_monitors (self ) -> None :
3234 logger .info ("Setting up monitors" )
3335 camonitor (
3436 f"{ self .prefix } CS:BLOCKSERVER:BLOCKNAMES" ,
3537 callback = self ._update_blocks ,
38+ all_updates = True ,
3639 datatype = DBR_CHAR_BYTES ,
3740 )
3841 camonitor (
39- f"{ self .prefix } DAE:RUNSTATE " ,
40- callback = self ._react_to_runstate_change ,
42+ f"{ self .prefix } DAE:RUNNUMBER " ,
43+ callback = self ._update_run_number ,
4144 all_updates = True ,
4245 datatype = str ,
4346 )
47+ camonitor (
48+ f"{ self .prefix } DAE:START_TIME" ,
49+ callback = self .construct_and_send_runstart ,
50+ all_updates = True ,
51+ datatype = float ,
52+ )
53+ camonitor (
54+ f"{ self .prefix } DAE:STOP_TIME" ,
55+ self .construct_and_send_runstop ,
56+ all_updates = True ,
57+ datatype = float ,
58+ )
59+
60+ def _update_run_number (self , value : int ) -> None :
61+ # Cache this as we want the run start message construction and production to be as fast as
62+ # possible so we don't miss events
63+ logger .info (f"Run number updated to { value } " )
64+ self .current_run_number = value
4465
45- def _update_blocks (self , value ) :
66+ def _update_blocks (self , value : ca_bytes ) -> None :
4667 logger .debug (f"blocks_hexed: { value } " )
4768 blocks_unhexed = dehex_decompress_and_dejson (bytes (value ))
4869 logger .debug (f"blocks_unhexed: { blocks_unhexed } " )
4970 self .blocks = [f"{ self .prefix } CS:SB:{ x } " for x in blocks_unhexed ]
5071
51- async def _react_to_runstate_change (self , value ):
52- logger .info (f"Runstate changed to { value } " )
72+ async def construct_and_send_runstart (self , value : float | None ) -> None :
73+ if self .current_start_time is None :
74+ logger .info ("Initial update for start time - not sending run start" )
75+ self .current_start_time = value
76+ return
5377
54- if value == "BEGINNING" :
55- self .current_job_id = str (uuid .uuid4 ())
56- await self .construct_and_send_runstart (self .current_job_id )
57- elif value == "ENDING" :
58- await self .construct_and_send_runstop (self .current_job_id )
78+ if value == self .current_start_time or value is None :
79+ logger .error ("run start time is the same as cached or invalid. ignoring update" )
80+ return
5981
60- async def construct_and_send_runstart ( self , job_id : str ):
61- logger . info ( f"Sending run start with job_id: { job_id } " )
62- start_time_s = await caget (f"{ self .prefix } DAE:START_TIME " )
63- start_time_ms = int (start_time_s * 1000 )
82+ self . current_start_time = value
83+ self . current_job_id = str ( uuid . uuid4 () )
84+ logger . info (f"Sending run start with job_id: { self .current_job_id } " )
85+ start_time_ms = int (self . current_start_time ) * 1000
6486 logger .info (f"Start time: { start_time_ms } " )
6587
66- runnum = await caget ( f" { self .prefix } DAE:RUNNUMBER" )
88+ runnum = self .current_run_number
6789
6890 nexus_structure = {
6991 "children" : [
@@ -109,31 +131,39 @@ async def construct_and_send_runstart(self, job_id: str):
109131 filename = f"{ self .instrument_name } { runnum } .nxs"
110132
111133 blob = serialise_pl72 (
112- job_id ,
134+ self . current_job_id ,
113135 filename = filename ,
114136 start_time = start_time_ms ,
115137 nexus_structure = json .dumps (nexus_structure ),
138+ run_name = runnum ,
139+ instrument_name = self .instrument_name ,
116140 )
117141 await self .producer .send (self .topic , blob )
142+ logger .info (f"Sent { blob } blob" )
118143
119- async def construct_and_send_runstop (self , job_id : str ):
120- logger .info (f"Sending run stop with job_id: { job_id } " )
121- # stop_time only gets set to a non-zero value when the runstate goes back to SETUP.
122- # This is dirty, but poll it every 0.5 seconds until it does.
123- while (
124- current_runstate := await caget (f"{ self .prefix } DAE:RUNSTATE" , datatype = str )
125- != "SETUP"
126- ):
127- logger .debug (
128- f"Waiting for run state to go back to SETUP. Currently { current_runstate } "
129- )
130- await asyncio .sleep (0.5 )
131-
132- stop_time_s = await caget (f"{ self .prefix } DAE:STOP_TIME" )
133- stop_time_ms = int (stop_time_s * 1000 )
144+ async def construct_and_send_runstop (self , value : float ) -> None :
145+ if self .current_stop_time is None :
146+ self .current_stop_time = value
147+ logger .info ("Initial update for stop time - not sending run stop" )
148+ return
149+
150+ if value == self .current_stop_time or value is None :
151+ logger .error ("run stop time is the same as cached or invalid" )
152+ return
153+
154+ if value == 0 :
155+ # This happens when a new run starts
156+ logger .debug ("stop time set to 0" )
157+ self .current_stop_time = value
158+ return
159+
160+ self .current_stop_time = value
161+ logger .info (f"Sending run stop with job_id: { self .current_job_id } " )
162+
163+ stop_time_ms = int (value * 1000 )
134164 logger .info (f"stop time: { stop_time_ms } " )
135165 blob = serialise_6s4t (
136- job_id , stop_time = stop_time_ms , command_id = self .current_job_id
166+ self . current_job_id , stop_time = stop_time_ms , command_id = self .current_job_id
137167 )
138168 await self .producer .send (self .topic , blob )
139169
@@ -144,17 +174,15 @@ async def set_up_producer(broker: str) -> AIOKafkaProducer:
144174 return producer
145175
146176
147- def main ():
177+ def main () -> None :
148178 prefix = os .environ .get ("MYPVPREFIX" )
149179 instrument_name = os .environ .get ("INSTRUMENT" )
150180
151181 if prefix is None or instrument_name is None :
152- raise ValueError (
153- "prefix or instrument name not set - have you run config_env.bat?"
154- )
182+ raise ValueError ("prefix or instrument name not set - have you run config_env.bat?" )
155183
156- broker = " livedata.isis.cclrc.ac.uk:31092"
157- topic = f"{ instrument_name } _runInfo"
184+ broker = os . environ . get ( "BORZOI_KAFKA_BROKER" , " livedata.isis.cclrc.ac.uk:31092")
185+ topic = os . environ . get ( "BORZOI_TOPIC" , f"{ instrument_name } _runInfo" )
158186 logger .info ("setting up producer" )
159187 loop = asyncio .new_event_loop ()
160188 producer = loop .run_until_complete (set_up_producer (broker ))
0 commit comments