22import uuid
33import logging
44import asyncio
5-
5+ import json
66
77from aioca import caget , camonitor
88from aiokafka import AIOKafkaProducer
1515logger = logging .getLogger ("borzoi" )
1616logging .basicConfig (level = logging .INFO )
1717
18+ # TODO pass this in at start
19+ INSTNAME = "NDW2932"
20+
1821
1922class RunStarter :
2023 def __init__ (self , prefix , producer , topic ):
@@ -61,15 +64,64 @@ async def construct_and_send_runstart(self, job_id: str):
6164
6265 runnum = await caget (f"{ self .prefix } DAE:RUNNUMBER" )
6366
64- # TODO construct filename here - wants to be hostname minus NDX prefix(?) + rbnum .nxs
65- filename = f"TEST_{ runnum } .nxs"
67+ nexus_structure = {
68+ "children" : [
69+ {
70+ "type" : "group" ,
71+ "name" : "raw_data_1" ,
72+ "children" : [
73+ {
74+ "type" : "group" ,
75+ "name" : "events" ,
76+ "children" : [
77+ # {
78+ # "type": "stream",
79+ # "stream": {
80+ # "topic": f"{INSTNAME}_events",
81+ # "source": "ISISICP",
82+ # "writer_module": "ev42",
83+ # },
84+ # },
85+ ],
86+ "attributes" : [
87+ {
88+ "name" : "NX_class" ,
89+ "values" : "NXentry"
90+ }
91+ ]
92+ },
93+ {
94+ "type" : "group" ,
95+ "name" : "selog" ,
96+ "children" : [
97+ {
98+ "type" : "stream" ,
99+ "stream" : {
100+ "topic" : f"{ INSTNAME } _sampleEnv" ,
101+ "source" : x ,
102+ "writer_module" : "f144" ,
103+ },
104+ }
105+ for x in self .blocks
106+ ],
107+ },
108+ ],
109+ "attributes" : [
110+ {
111+ "name" : "NX_class" ,
112+ "values" : "NXentry"
113+ }
114+ ]
115+ }
116+ ]
117+ }
118+ filename = f"{ INSTNAME } { runnum } .nxs"
66119
67120 blob = serialise_pl72 (
68121 job_id ,
69122 filename = filename ,
70123 start_time = start_time_ms ,
71- nexus_structure = "{}" ,
72- instrument_name = "TEST" ,
124+ nexus_structure = json .dumps (nexus_structure ),
73125 )
74126 await self .producer .send (self .topic , blob )
75127
@@ -83,7 +135,7 @@ async def construct_and_send_runstop(self, job_id: str):
83135 stop_time_s = await caget (f"{ self .prefix } DAE:STOP_TIME" )
84136 stop_time_ms = int (stop_time_s * 1000 )
85137 logger .info (f"stop time: { stop_time_ms } " )
86- blob = serialise_6s4t (job_id , stop_time = stop_time_ms )
138+ blob = serialise_6s4t (job_id , stop_time = stop_time_ms , command_id = self . current_job_id )
87139 await self .producer .send (self .topic , blob )
88140
89141
@@ -97,7 +149,7 @@ def main():
97149 prefix = os .environ .get ("MYPVPREFIX" )
98150
99151 broker = "livedata.isis.cclrc.ac.uk:31092"
100- topic = "jacktestconfig "
152+ topic = f" { INSTNAME } _runInfo "
101153
102154 loop = asyncio .new_event_loop ()
103155 producer = loop .run_until_complete (set_up_producer (broker ))
0 commit comments