1515logger = logging .getLogger ("borzoi" )
1616logging .basicConfig (level = logging .INFO )
1717
18- # TODO pass this in at start
19- INSTNAME = "NDW2932"
20-
2118
2219class RunStarter :
23- def __init__ (self , prefix , producer , topic ):
20+ def __init__ (
21+ self , prefix : str , instrument_name : str , producer : AIOKafkaProducer , topic : str
22+ ):
2423 self .producer = None
2524 self .prefix = prefix
2625 self .blocks = []
2726 self .current_job_id = ""
2827 self .producer = producer
28+ self .instrument_name = instrument_name
2929 self .topic = topic
3030
3131 async def set_up_monitors (self ):
32+ logger .info ("Setting up monitors" )
3233 camonitor (
3334 f"{ self .prefix } CS:BLOCKSERVER:BLOCKNAMES" ,
3435 callback = self ._update_blocks ,
@@ -74,21 +75,16 @@ async def construct_and_send_runstart(self, job_id: str):
7475 "type" : "group" ,
7576 "name" : "events" ,
7677 "children" : [
77- # {
78- # "type": "stream",
79- # "stream": {
80- # "topic": f"{INSTNAME}_events",
81- # "source": "ISISICP",
82- # "writer_module": "ev42",
83- # },
84- # },
85- ],
86- "attributes" : [
8778 {
88- "name" : "NX_class" ,
89- "values" : "NXentry"
90- }
91- ]
79+ "type" : "stream" ,
80+ "stream" : {
81+ "topic" : f"{ self .instrument_name } _events" ,
82+ "source" : "ISISICP" ,
83+ "writer_module" : "ev42" ,
84+ },
85+ },
86+ ],
87+ "attributes" : [{"name" : "NX_class" , "values" : "NXentry" }],
9288 },
9389 {
9490 "type" : "group" ,
@@ -97,7 +93,7 @@ async def construct_and_send_runstart(self, job_id: str):
9793 {
9894 "type" : "stream" ,
9995 "stream" : {
100- "topic" : f"{ INSTNAME } _sampleEnv" ,
96+ "topic" : f"{ self . instrument_name } _sampleEnv" ,
10197 "source" : x ,
10298 "writer_module" : "f144" ,
10399 },
@@ -106,16 +102,11 @@ async def construct_and_send_runstart(self, job_id: str):
106102 ],
107103 },
108104 ],
109- "attributes" : [
110- {
111- "name" : "NX_class" ,
112- "values" : "NXentry"
113- }
114- ]
105+ "attributes" : [{"name" : "NX_class" , "values" : "NXentry" }],
115106 }
116107 ]
117108 }
118- filename = f"{ INSTNAME } { runnum } .nxs"
109+ filename = f"{ self . instrument_name } { runnum } .nxs"
119110
120111 blob = serialise_pl72 (
121112 job_id ,
@@ -129,32 +120,48 @@ async def construct_and_send_runstop(self, job_id: str):
129120 logger .info (f"Sending run stop with job_id: { job_id } " )
130121 # stop_time only gets set to a non-zero value when the runstate goes back to SETUP.
131122 # This is dirty, but poll it every 0.5 seconds until it does.
132- while await caget (f"{ self .prefix } DAE:RUNSTATE" , datatype = str ) != "SETUP" :
123+ while (
124+ current_runstate := await caget (f"{ self .prefix } DAE:RUNSTATE" , datatype = str )
125+ != "SETUP"
126+ ):
127+ logger .debug (
128+ f"Waiting for run state to go back to SETUP. Currently { current_runstate } "
129+ )
133130 await asyncio .sleep (0.5 )
134131
135132 stop_time_s = await caget (f"{ self .prefix } DAE:STOP_TIME" )
136133 stop_time_ms = int (stop_time_s * 1000 )
137134 logger .info (f"stop time: { stop_time_ms } " )
138- blob = serialise_6s4t (job_id , stop_time = stop_time_ms , command_id = self .current_job_id )
135+ blob = serialise_6s4t (
136+ job_id , stop_time = stop_time_ms , command_id = self .current_job_id
137+ )
139138 await self .producer .send (self .topic , blob )
140139
141140
142- async def set_up_producer (broker : str ):
141+ async def set_up_producer (broker : str ) -> AIOKafkaProducer :
143142 producer = AIOKafkaProducer (bootstrap_servers = broker )
144143 await producer .start ()
145144 return producer
146145
147146
148147def main ():
149148 prefix = os .environ .get ("MYPVPREFIX" )
149+ instrument_name = os .environ .get ("INSTRUMENT" )
150150
151- broker = "livedata.isis.cclrc.ac.uk:31092"
152- topic = f"{ INSTNAME } _runInfo"
151+ if prefix is None or instrument_name is None :
152+ raise ValueError (
153+ "prefix or instrument name not set - have you run config_env.bat?"
154+ )
153155
156+ broker = "livedata.isis.cclrc.ac.uk:31092"
157+ topic = f"{ instrument_name } _runInfo"
158+ logger .info ("setting up producer" )
154159 loop = asyncio .new_event_loop ()
155160 producer = loop .run_until_complete (set_up_producer (broker ))
161+ logger .info ("set up producer" )
156162
157- run_starter = RunStarter (prefix , producer , topic )
163+ logger .info ("starting run starter" )
164+ run_starter = RunStarter (prefix , instrument_name , producer , topic )
158165 loop .create_task (run_starter .set_up_monitors ())
159166 loop .run_forever ()
160167
0 commit comments