Skip to content

Commit e2be4db

Browse files
authored
Murko: publish and look for stop messages (#1686)
* Filter out small x Murko results * Add test * Rename confusing variable * More fixes * Publish complete message to redis * Wait for results complete message to stop results loop * Move forwarding complete message to MurkoCallback * Fix MurkoResults device re stop messages * Remove unused signals * Relax typing * Fix tests * Add test * Remove unused code * Add test and tidy tests * Small changes * Improve test typing * PR comments
1 parent 5a20e78 commit e2be4db

File tree

2 files changed

+191
-214
lines changed

2 files changed

+191
-214
lines changed

src/dodal/devices/i04/murko_results.py

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import pickle
3+
import time
34
from dataclasses import dataclass
45
from enum import Enum
56
from typing import TypedDict
@@ -21,6 +22,7 @@
2122
from dodal.log import LOGGER
2223

2324
NO_MURKO_RESULT = (-1, -1)
25+
RESULTS_COMPLETE_MESSAGE = "murko_results_complete"
2426

2527

2628
class MurkoMetadata(TypedDict):
@@ -71,7 +73,8 @@ class MurkoResultsDevice(StandardReadable, Triggerable, Stageable):
7173
solutions for y and z can be calculated using numpy's linear algebra library.
7274
"""
7375

74-
TIMEOUT_S = 2
76+
GET_MESSAGE_TIMEOUT_S = 2
77+
RESULTS_COMPLETE_TIMEOUT_S = 5
7578
PERCENTAGE_TO_USE = 25
7679
LEFTMOST_PIXEL_TO_USE = 10
7780
NUMBER_OF_WRONG_RESULTS_TO_LOG = 5
@@ -82,7 +85,6 @@ def __init__(
8285
redis_password=RedisConstants.REDIS_PASSWORD,
8386
redis_db=RedisConstants.MURKO_REDIS_DB,
8487
name="",
85-
stop_angle=350,
8688
):
8789
self.redis_client = StrictRedis(
8890
host=redis_host,
@@ -91,8 +93,6 @@ def __init__(
9193
)
9294
self.pubsub = self.redis_client.pubsub()
9395
self.sample_id = soft_signal_rw(str) # Should get from redis
94-
self.stop_angle = soft_signal_rw(int, initial_value=stop_angle)
95-
self.invert_stop_angle = soft_signal_rw(bool, initial_value=False)
9696

9797
self._reset()
9898

@@ -121,15 +121,27 @@ async def unstage(self):
121121

122122
@AsyncStatus.wrap
123123
async def trigger(self):
124-
# Wait for results
125124
sample_id = await self.sample_id.get_value()
126-
127-
while not await self.check_if_reached_stop_angle():
125+
t_last_result = time.time()
126+
while True:
127+
if time.time() - t_last_result > self.RESULTS_COMPLETE_TIMEOUT_S:
128+
LOGGER.warning(
129+
f"Time since last result > {self.RESULTS_COMPLETE_TIMEOUT_S}, expected to receive {RESULTS_COMPLETE_MESSAGE}"
130+
)
131+
break
128132
# waits here for next batch to be received
129-
message = await self.pubsub.get_message(timeout=self.TIMEOUT_S)
130-
if message is None:
131-
continue
132-
await self.process_batch(message, sample_id)
133+
message = await self.pubsub.get_message(timeout=self.GET_MESSAGE_TIMEOUT_S)
134+
if message and message["type"] == "message":
135+
t_last_result = time.time()
136+
data = pickle.loads(message["data"])
137+
138+
if data == RESULTS_COMPLETE_MESSAGE:
139+
LOGGER.info(
140+
f"Received results complete message: {RESULTS_COMPLETE_MESSAGE}"
141+
)
142+
break
143+
144+
await self.process_batch(data, sample_id)
133145

134146
if not self._results:
135147
raise NoResultsFoundError("No results retrieved from Murko")
@@ -157,22 +169,18 @@ async def trigger(self):
157169
f"murko:{sample_id}:metadata", result.uuid, json.dumps(result.metadata)
158170
)
159171

160-
async def process_batch(self, message: dict | None, sample_id: str):
161-
if message and message["type"] == "message":
162-
batch_results: list[dict] = pickle.loads(message["data"])
163-
for results in batch_results:
164-
for uuid, result in results.items():
165-
if metadata_str := await self.redis_client.hget( # type: ignore
166-
f"murko:{sample_id}:metadata", uuid
167-
):
168-
LOGGER.info(
169-
f"Found metadata for uuid {uuid}, processing result"
170-
)
171-
self.process_result(
172-
result, MurkoMetadata(json.loads(metadata_str))
173-
)
174-
else:
175-
LOGGER.info(f"Found no metadata for uuid {uuid}")
172+
async def process_batch(
173+
self, batch_results: list[tuple[str, dict]], sample_id: str
174+
):
175+
for result_with_uuid in batch_results:
176+
uuid, result = result_with_uuid
177+
if metadata_str := await self.redis_client.hget( # type: ignore
178+
f"murko:{sample_id}:metadata", uuid
179+
):
180+
LOGGER.info(f"Found metadata for uuid {uuid}, processing result")
181+
self.process_result(result, MurkoMetadata(json.loads(metadata_str)))
182+
else:
183+
LOGGER.info(f"Found no metadata for uuid {uuid}")
176184

177185
def process_result(self, result: dict, metadata: MurkoMetadata):
178186
"""Uses the 'most_likely_click' coordinates from Murko to calculate the
@@ -253,16 +261,6 @@ def filter_outliers(self):
253261
LOGGER.info(f"Number of results after filtering: {len(best_x)}")
254262
return best_x
255263

256-
async def check_if_reached_stop_angle(self):
257-
inverted = await self.invert_stop_angle.get_value()
258-
stop_angle = await self.stop_angle.get_value()
259-
if self._last_omega is None:
260-
return False
261-
if inverted:
262-
return self._last_omega <= stop_angle
263-
else:
264-
return self._last_omega >= stop_angle
265-
266264

267265
def get_yz_least_squares(vertical_dists: list, omegas: list) -> tuple[float, float]:
268266
"""Get the least squares solution for y and z from the vertical distances and omega angles.

0 commit comments

Comments
 (0)