Skip to content

Commit 6cebdf5

Browse files
committed
STY: cleanup and adding comments
1 parent 1f168d7 commit 6cebdf5

File tree

1 file changed

+90
-79
lines changed

1 file changed

+90
-79
lines changed

process_b.py

Lines changed: 90 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
import time
1111

1212
class ProcessB(CustomProcessObject):
13-
13+
"""
14+
ProcessB consumes k2eg snapshots from queue_one and buffers 5 minutes of data per PV at 120hz.
15+
it performs beam checks and candidate window searcing, and forwards output to queue_two for ProcessC.
16+
"""
1417
def __init__(self,
1518
queue_one: 'Manager.Queue',
1619
queue_two: 'Manager.Queue',
@@ -19,41 +22,42 @@ def __init__(self,
1922
):
2023
self.queue_one = queue_one
2124
self.queue_two = queue_two
25+
2226
self.pv_list = pv_list
27+
2328
self.logging_kwargs = logging_kwargs
2429
self.logging_kwargs['logger_name'] = 'process_b'
2530
self.logger = None
26-
self.buffer = Buffer(pv_list, 36000, logging_kwargs)
31+
32+
# holds up to 5 minutes of 120hz data (36000 points) per pv.
33+
self.buffer = Buffer(pv_list, 36000, logging_kwargs) # 3600 = 120hz * 60sec * 5mins
2734

2835
def __call__(self):
2936
if self.logger is None:
3037
self.logger = create_worker_logger(**self.logging_kwargs)
3138

32-
self.logger.debug(f"number of pvs running on: {len(self.pv_list)}")
33-
self.logger.debug("starting proecess_b data processing")
39+
self.logger.debug(f"running provess_b on {len(self.pv_list)} pvs")
40+
self.logger.debug("startin data processing loop...")
3441

3542
while True:
3643
try:
3744
r = self.queue_one.get(timeout=0.05) # wait 50ms
3845

39-
#self.logger.debug(f"ProcessB sees {r['iteration']}")
40-
#self.logger.debug(f"ProcessB sees {r}")
41-
#self.queue_two.put(r)
42-
43-
if r is None: # enqueue a None to stop this process
46+
if r is None: # enqueuing a None should stop this process
4447
break
4548

49+
#need to be sure each iteration of this processing loop is <= 1 second
50+
# (new data comes each second from process_a, so data will pile-up if our processing takes over 1 second)
4651
start = time.perf_counter()
4752

48-
self.update_pv_values(r)
49-
result = self.find_candidates()
53+
# parse the k2eg snapshot and update buffer
54+
self.update_buffer(r)
5055

51-
# write to queue_2 for process_c.py to read
56+
# process data in buffer and get stuff to pass to process_c and CoAD
57+
self.do_beam_checks()
58+
result = self.find_candidates()
5259

53-
# from process_c:
54-
# "r should have the structure (rf_input_tensor, bpm_input_tensor, rf_station)
55-
# where rf_input_tensor and bpm_input_tensor are tensors of size (1, 1066)
56-
# and (8, 1066) respectively, and rf_station is a string representing the PV name"
60+
# placeholder: dummy data for ProcessC:
5761
fake_rf_input_tensor = np.random.rand(1, 1066).astype(np.float32)
5862
fake_bpm_input_tensor = np.random.rand(8, 1066).astype(np.float32)
5963
fake_pv_name = "fake_pv_name"
@@ -64,118 +68,125 @@ def __call__(self):
6468
end = time.perf_counter()
6569
elapsed_ms = (end - start) * 1000
6670
self.logger.debug(f"process_b iteration took : {elapsed_ms:.2f} ms")
67-
6871
if elapsed_ms > 1000: # have to be <= 1 sec
6972
self.logger.warning(f"process_b iteration is slow!! : {elapsed_ms:.2f} ms")
7073

7174
except Empty:
7275
continue
7376

74-
self.logger.debug("ending data processing")
75-
76-
#dir_name = f"buffer_txt_dump_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
77-
#self.logger.debug(f"dump dir: {dir_name}")
78-
#self.buffer.dump_to_human_readable(directory=dir_name)
77+
self.logger.debug("shutting down process_b")
7978

8079
for handler in self.logger.handlers:
8180
handler.close()
8281

83-
def update_pv_values(self, snapshot):
84-
85-
iteration = snapshot.get("iteration", None)
86-
self.logger.debug(f"\niteration: {iteration}")
87-
82+
def update_buffer(self, snapshot):
83+
"""
84+
Append the latest 120-sample PV snapshot into the buffer for each pv
85+
"""
8886
for pv in self.pv_list:
8987
entries = snapshot.get(pv, [])
90-
num_entries = len(entries)
91-
#self.logger.debug(f"{pv}: {num_entries} entries in snapshot")
92-
93-
if num_entries == 0:
94-
continue # skip if no data
9588

96-
times = np.empty(120, dtype=np.float64)
9789
values = np.empty(120, dtype=np.float64)
98-
9990
for i, e in enumerate(entries):
100-
ts = e.get("timeStamp", {})
101-
times[i] = ts.get("secondsPastEpoch", 0)
10291
values[i] = e.get("value", np.nan)
10392

104-
self.buffer.append(pv, values)
93+
self.buffer.append(pv, values)
94+
95+
# Update buffer index tracking
96+
if self.buffer.index != self.buffer.buffer_len:
97+
self.buffer.index += 120
98+
else:
99+
self.buffer_index == self.buffer_len - 120
105100

106101
def do_beam_checks(self):
102+
# placeholder: beam condition logic here.
107103
return True
108104

109105
def find_candidates(self):
110-
# add beam checks here
106+
# placeholder: candidate determination logic here.
111107
return []
112108

113109
class Buffer:
110+
"""
111+
Fixed-length buffer for storing a sliding window of 120hz float data per pv.
112+
By default stores 5 mins (36000 values) of past data.
113+
"""
114114
def __init__(self, pv_list: list[str], buffer_len: int = 36000, logging_kwargs: Optional[dict] = default_logging_kwargs):
115-
self.buffer_len = buffer_len
116115
self.pv_list = pv_list
117-
self.last_snapshot_time = 0 # for sanity check of snapshot validity
118-
self.starting_pv_time = 0 # store the earliest time in buffer, and assume all data is timed at 120hz
116+
117+
# max length of buffer
118+
self.buffer_len = buffer_len
119+
120+
# default buffer length is 36000 to store 5 mins of data at 120hz.
121+
# we allocate the buffer initially to avoid potential memory-copies during array append operation.
119122
self.buffer_map = {
120123
pv: np.empty(buffer_len, dtype=np.float64) for pv in self.pv_list
121124
}
122-
self.index = 0 # tracking next write index
125+
self.index = 0 # tracks the next write index
126+
127+
self.passes_beam_check = np.empty(buffer_len, dtype=bool) # whether at this timestamp all
128+
123129
self.logging_kwargs = logging_kwargs
124130
self.logger = create_worker_logger(**self.logging_kwargs)
125131
self.logging_kwargs['logger_name'] = 'buffer'
126132

127-
def dump_to_human_readable(self, directory: str = "buffer_dump_txt"):
128-
# individual human-readable .txt file per pv, each line has one float val
129-
os.makedirs(directory, exist_ok=True)
130-
131-
for pv in self.pv_list:
132-
valid_data = self.buffer_map[pv][:self.index]
133-
pv = pv[5:] # get rid of "ca://"
134-
filepath = os.path.join(directory, f"{pv.replace(':', '_')}.txt")
135-
136-
self.logger.debug(f"writing dump file {filepath} for {pv}")
137-
with open(filepath, "w") as f:
138-
for v in valid_data:
139-
f.write(f"{v}\n")
140-
141133
def append(self, key: str, values: np.ndarray):
142-
# we only allocate array memory once, and then once memory is full
143-
# we drop the lowest-index 120 values and shift the existing values over
144-
# and append the new values to the end.
145-
# this will soon be done in a time-based fashion (older than 5 min values are dropped)
146-
134+
"""
135+
Appends 120 new values for a given pv. If the buffer is full, old data is shifted to make room.
136+
"""
147137
if key not in self.buffer_map:
148138
raise KeyError(f"key '{key}' not found in buffer")
149139

150-
#if len(values) != 120:
151-
# raise ValueError(f"Expected array of length 120, got {len(values)}")
140+
if len(values) != 120:
141+
raise ValueError(f"Expected array of length 120, got {len(values)}")
152142

143+
curr_pv_arr = self.buffer_map[key]
153144
idx = self.index
154-
buf = self.buffer_map[key]
155-
156-
#if idx == 0:
157-
#self.starting_pv_time = time
158145

159146
if idx + 120 <= self.buffer_len:
160147
# have enough room without shifting, just write to next open index (this only happens during initial buffer fill-up)
161-
self.logger.debug(f"initial filling of buffer, curr index {idx}")
162-
buf[idx:idx+120] = values
163-
self.index += 120
148+
self.logger.debug(f"initial filling of buffer, current index {idx}")
149+
curr_pv_arr[idx:idx+120] = values
164150
else:
165-
# shift left and append to the end
166-
self.logger.debug(f"buffer is full, removing oldest second of data")
167-
buf[:-120] = buf[120:]
168-
buf[-120:] = values
169-
self.index = self.buffer_len
170-
#self.starting_pv_time = time
151+
# shift left and append to the end, this should be quick on a np.arr
152+
self.logger.debug(f"buffer is full, removing oldest data")
153+
curr_pv_arr[:-120] = curr_pv_arr[120:]
154+
curr_pv_arr[-120:] = values
171155

172156
def get(self, key: str):
157+
"""
158+
Get data from the buffer map for given pv.
159+
"""
173160
if key not in self.buffer_map:
174-
raise KeyError(f"key '{key}' not found in buffer")
161+
raise KeyError(f"key '{key}' not found in buffer map")
175162
return self.buffer_map[key][:self.index]
176163

177164
def clear(self, key: str):
165+
"""
166+
Clear buffer contents for given pv.
167+
"""
178168
if key not in self.buffer_map:
179-
raise KeyError(f"key '{key}' not found in buffer.")
169+
raise KeyError(f"key '{key}' not found in buffer map.")
180170
self.buffer_map[key][:] = np.empty(self.buffer_len, dtype=np.float64)
181-
self.index = 0
171+
self.index = 0
172+
173+
def dump_to_human_readable(self, directory: str = "buffer_dump_txt"):
174+
"""
175+
Debug util: dump each PV's data to a separate text file, 1 value per line.
176+
177+
Call like this:
178+
dir_name = f"buffer_txt_dump_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
179+
self.logger.debug(f"dump dir: {dir_name}")
180+
self.buffer.dump_to_human_readable(directory=dir_name)
181+
"""
182+
os.makedirs(directory, exist_ok=True)
183+
184+
for pv in self.pv_list:
185+
valid_data = self.buffer_map[pv][:self.index]
186+
pv = pv[5:] # get rid of "ca://"
187+
filepath = os.path.join(directory, f"{pv.replace(':', '_')}.txt")
188+
189+
self.logger.debug(f"writing dump file {filepath} for {pv}")
190+
with open(filepath, "w") as f:
191+
for v in valid_data:
192+
f.write(f"{v}\n")

0 commit comments

Comments
 (0)