1- import os
2- import uuid
3- import logging
41import asyncio
52import json
3+ import logging
4+ import os
5+ import uuid
66
77from aioca import caget , camonitor
88from aiokafka import AIOKafkaProducer
9- from epicscorelibs .ca .dbr import DBR_CHAR_BYTES
10- from ibex_non_ca_helpers .hex import dehex_decompress_and_dejson
11-
9+ from epicscorelibs .ca .dbr import DBR_CHAR_BYTES , ca_bytes
10+ from ibex_non_ca_helpers .compress_hex import dehex_decompress_and_dejson
1211from streaming_data_types .run_start_pl72 import serialise_pl72
1312from streaming_data_types .run_stop_6s4t import serialise_6s4t
1413
1918class RunStarter :
2019 def __init__ (
2120 self , prefix : str , instrument_name : str , producer : AIOKafkaProducer , topic : str
22- ):
21+ ) -> None :
2322 self .producer = None
2423 self .prefix = prefix
2524 self .blocks = []
@@ -28,7 +27,7 @@ def __init__(
2827 self .instrument_name = instrument_name
2928 self .topic = topic
3029
31- async def set_up_monitors (self ):
30+ async def set_up_monitors (self ) -> None :
3231 logger .info ("Setting up monitors" )
3332 camonitor (
3433 f"{ self .prefix } CS:BLOCKSERVER:BLOCKNAMES" ,
@@ -42,13 +41,13 @@ async def set_up_monitors(self):
4241 datatype = str ,
4342 )
4443
45- def _update_blocks (self , value ) :
44+ def _update_blocks (self , value : ca_bytes ) -> None :
4645 logger .debug (f"blocks_hexed: { value } " )
4746 blocks_unhexed = dehex_decompress_and_dejson (bytes (value ))
4847 logger .debug (f"blocks_unhexed: { blocks_unhexed } " )
4948 self .blocks = [f"{ self .prefix } CS:SB:{ x } " for x in blocks_unhexed ]
5049
51- async def _react_to_runstate_change (self , value ) :
50+ async def _react_to_runstate_change (self , value : str ) -> None :
5251 logger .info (f"Runstate changed to { value } " )
5352
5453 if value == "BEGINNING" :
@@ -57,7 +56,7 @@ async def _react_to_runstate_change(self, value):
5756 elif value == "ENDING" :
5857 await self .construct_and_send_runstop (self .current_job_id )
5958
60- async def construct_and_send_runstart (self , job_id : str ):
59+ async def construct_and_send_runstart (self , job_id : str ) -> None :
6160 logger .info (f"Sending run start with job_id: { job_id } " )
6261 start_time_s = await caget (f"{ self .prefix } DAE:START_TIME" )
6362 start_time_ms = int (start_time_s * 1000 )
@@ -116,25 +115,20 @@ async def construct_and_send_runstart(self, job_id: str):
116115 )
117116 await self .producer .send (self .topic , blob )
118117
119- async def construct_and_send_runstop (self , job_id : str ):
118+ async def construct_and_send_runstop (self , job_id : str ) -> None :
120119 logger .info (f"Sending run stop with job_id: { job_id } " )
121120 # stop_time only gets set to a non-zero value when the runstate goes back to SETUP.
122121 # This is dirty, but poll it every 0.5 seconds until it does.
123122 while (
124- current_runstate := await caget (f"{ self .prefix } DAE:RUNSTATE" , datatype = str )
125- != "SETUP"
123+ current_runstate := await caget (f"{ self .prefix } DAE:RUNSTATE" , datatype = str ) != "SETUP"
126124 ):
127- logger .debug (
128- f"Waiting for run state to go back to SETUP. Currently { current_runstate } "
129- )
125+ logger .debug (f"Waiting for run state to go back to SETUP. Currently { current_runstate } " )
130126 await asyncio .sleep (0.5 )
131127
132128 stop_time_s = await caget (f"{ self .prefix } DAE:STOP_TIME" )
133129 stop_time_ms = int (stop_time_s * 1000 )
134130 logger .info (f"stop time: { stop_time_ms } " )
135- blob = serialise_6s4t (
136- job_id , stop_time = stop_time_ms , command_id = self .current_job_id
137- )
131+ blob = serialise_6s4t (job_id , stop_time = stop_time_ms , command_id = self .current_job_id )
138132 await self .producer .send (self .topic , blob )
139133
140134
@@ -144,14 +138,12 @@ async def set_up_producer(broker: str) -> AIOKafkaProducer:
144138 return producer
145139
146140
147- def main ():
141+ def main () -> None :
148142 prefix = os .environ .get ("MYPVPREFIX" )
149143 instrument_name = os .environ .get ("INSTRUMENT" )
150144
151145 if prefix is None or instrument_name is None :
152- raise ValueError (
153- "prefix or instrument name not set - have you run config_env.bat?"
154- )
146+ raise ValueError ("prefix or instrument name not set - have you run config_env.bat?" )
155147
156148 broker = os .environ .get ("BORZOI_KAFKA_BROKER" , "livedata.isis.cclrc.ac.uk:31092" )
157149 topic = os .environ .get ("BORZOI_TOPIC" , f"{ instrument_name } _runInfo" )
0 commit comments