Skip to content

Commit e6b6276

Browse files
Fix to show child test reports on test completion for all subscribed clients #17
2 parents c2337c1 + 6c7a1fb commit e6b6276

13 files changed

+264
-52
lines changed

openhtf/output/servers/pub_sub.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,13 @@ def publish(cls, message, client_filter=None):
5151
with cls._lock: # pylint: disable=not-context-manager
5252
for client in cls.subscribers: # pylint: disable=not-an-iterable
5353
if (not client_filter) or client_filter(client):
54-
client.send(message)
54+
try:
55+
client.send(message)
56+
except Exception as e: # pylint: disable=broad-except
57+
# Log the error but continue sending to other clients.
58+
# This can happen when publishing from threads without an event loop
59+
# (e.g., child test threads calling publish_test_record).
60+
_LOG.debug('Failed to send message to client: %s', e)
5561

5662
def on_open(self, info):
5763
_LOG.debug('New subscriber from %s.', info.ip)

openhtf/output/servers/station_server.py

Lines changed: 200 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from typing import Optional, Union
3434

3535
import openhtf
36+
from openhtf.core.test_descriptor import UnrecognizedTestUidError
3637
from openhtf.output.servers import pub_sub
3738
from openhtf.output.servers import web_gui_server
3839
from openhtf.util import configuration
@@ -73,6 +74,14 @@
7374
CONF.declare('station_discovery_port')
7475
CONF.declare('station_discovery_ttl')
7576

77+
# Cache for phase descriptors - persists after tests complete so frontend
78+
# can still fetch them. Maps test_uid -> list of phase descriptor dicts.
79+
# This is necessary because TEST_INSTANCES is a WeakValueDictionary and
80+
# tests get garbage collected after completion.
81+
_PHASE_DESCRIPTOR_CACHE = {}
82+
_PHASE_DESCRIPTOR_CACHE_LOCK = threading.Lock()
83+
_MAX_CACHED_TESTS = 100 # Limit cache size to prevent memory leaks
84+
7685

7786
def _get_executing_test():
7887
"""Get the currently executing test and its state.
@@ -111,6 +120,96 @@ def _get_executing_test():
111120
return test, test_state
112121

113122

123+
def _get_test_by_uid(test_uid: str):
124+
"""Get a specific test by UID (parent or child).
125+
126+
Returns:
127+
test: The test with the given UID, or None.
128+
test_state: The state of the test, or None.
129+
"""
130+
try:
131+
test = openhtf.Test.from_uid(test_uid)
132+
test_state = test.state
133+
if test_state is None:
134+
return None, None
135+
return test, test_state
136+
except UnrecognizedTestUidError:
137+
return None, None
138+
139+
140+
def _get_parent_and_children():
141+
"""Get the parent test and all executing child tests.
142+
143+
Returns:
144+
parent: The parent test, or None.
145+
parent_state: The parent test state, or None.
146+
children: List of (child_test, child_state) tuples.
147+
"""
148+
tests = list(openhtf.Test.TEST_INSTANCES.values())
149+
150+
parent_tests = [t for t in tests if not t.is_child_test]
151+
if not parent_tests:
152+
return None, None, []
153+
154+
if len(parent_tests) > 1:
155+
_LOG.warning('Multiple parent tests detected, using first.')
156+
157+
parent = parent_tests[0]
158+
parent_state = parent.state
159+
if parent_state is None:
160+
return None, None, []
161+
162+
# Get executing children from TEST_INSTANCES
163+
child_tests = [t for t in tests if t.is_child_test]
164+
children = []
165+
for child in child_tests:
166+
child_state = child.state
167+
if child_state is not None:
168+
children.append((child, child_state))
169+
170+
return parent, parent_state, children
171+
172+
173+
def _cache_phase_descriptors(test_uid, test):
174+
"""Cache phase descriptors for a test so they persist after completion.
175+
176+
Args:
177+
test_uid: The unique identifier for the test.
178+
test: The Test object to cache phase descriptors from.
179+
"""
180+
with _PHASE_DESCRIPTOR_CACHE_LOCK:
181+
if test_uid in _PHASE_DESCRIPTOR_CACHE:
182+
return # Already cached
183+
184+
# Evict old entries if cache is too large
185+
if len(_PHASE_DESCRIPTOR_CACHE) >= _MAX_CACHED_TESTS:
186+
# Remove the oldest entry (first key in dict - Python 3.7+ preserves order)
187+
oldest_key = next(iter(_PHASE_DESCRIPTOR_CACHE))
188+
del _PHASE_DESCRIPTOR_CACHE[oldest_key]
189+
190+
try:
191+
phase_descriptors = [
192+
dict(id=id(phase), **data.convert_to_base_types(phase))
193+
for phase in test.descriptor.phase_sequence.all_phases()
194+
]
195+
_PHASE_DESCRIPTOR_CACHE[test_uid] = phase_descriptors
196+
except Exception as e:
197+
_LOG.warning('Failed to cache phase descriptors for %s: %s', test_uid, e)
198+
199+
200+
def _get_cached_phase_descriptors(test_uid):
201+
"""Get cached phase descriptors for a test.
202+
203+
Args:
204+
test_uid: The unique identifier for the test.
205+
206+
Returns:
207+
List of phase descriptor dicts, or None if not cached.
208+
"""
209+
with _PHASE_DESCRIPTOR_CACHE_LOCK:
210+
return _PHASE_DESCRIPTOR_CACHE.get(test_uid)
211+
212+
114213
def _test_state_from_record(test_record_dict, execution_uid=None):
115214
"""Convert a test record dict to a test state dict.
116215
@@ -194,28 +293,55 @@ def run(self):
194293

195294
@functions.call_at_most_every(float(CONF.frontend_throttle_s))
196295
def _poll_for_update(self):
197-
"""Call the callback with the current test state, then wait for a change."""
198-
test, test_state = _get_executing_test()
296+
"""Call the callback with current test states, then wait for changes."""
297+
parent, parent_state, children = _get_parent_and_children()
199298

200-
if test is None:
299+
if parent is None:
201300
time.sleep(_WAIT_FOR_EXECUTING_TEST_POLL_S)
202301
return
203302

204-
state_dict, event = self._to_dict_with_event(test_state)
205-
self._update_callback(state_dict)
303+
# Cache phase descriptors for parent and children so they persist
304+
# after tests complete and can still be fetched by the frontend
305+
_cache_phase_descriptors(parent_state.execution_uid, parent)
306+
for child, child_state in children:
307+
_cache_phase_descriptors(child_state.execution_uid, child)
308+
309+
# Convert parent state
310+
parent_dict, parent_event = self._to_dict_with_event(parent_state)
311+
312+
# Convert child states
313+
child_dicts = []
314+
child_events = []
315+
for child, child_state in children:
316+
child_dict, child_event = self._to_dict_with_event(child_state)
317+
child_dicts.append(child_dict)
318+
child_events.append(child_event)
206319

207-
plug_manager = test_state.plug_manager
320+
# Publish with children
321+
self._update_callback(parent_dict, child_dicts)
322+
323+
# Gather plug events from parent
324+
plug_manager = parent_state.plug_manager
208325
plug_events = [
209326
plug_manager.get_plug_by_class_path(plug_name).asdict_with_event()[1]
210327
for plug_name in plug_manager.get_frontend_aware_plug_names()
211328
]
212-
events = [event] + plug_events
329+
330+
# Wait for any event (parent, children, or plugs)
331+
events = [parent_event] + child_events + plug_events
332+
333+
# Track how many children we know about
334+
known_child_count = len(children)
213335

214336
# Wait for the test state or a plug state to change, or for the previously
215-
# executing test to finish.
337+
# executing test to finish, or for new child tests to appear.
216338
while not _wait_for_any_event(events, _CHECK_FOR_FINISHED_TEST_POLL_S):
217-
new_test, _ = _get_executing_test()
218-
if test != new_test:
339+
new_parent, _, new_children = _get_parent_and_children()
340+
if parent != new_parent:
341+
break
342+
# Also break if new child tests have appeared - we need to subscribe
343+
# to their events and include them in updates
344+
if len(new_children) != known_child_count:
219345
break
220346

221347
@classmethod
@@ -277,28 +403,57 @@ class StationPubSub(pub_sub.PubSub):
277403
_lock = threading.Lock() # Required by pub_sub.PubSub.
278404
subscribers = set() # Required by pub_sub.PubSub.
279405
_last_message = None
406+
# Track last 'update' message separately - used for new subscribers.
407+
# This prevents 'record' messages (which have empty child_tests) from
408+
# overwriting the current state that new clients should see.
409+
_last_update_message = None
280410

281411
@classmethod
282412
def publish_test_record(cls, test_record):
413+
# Cache phase descriptors before the test is removed from TEST_INSTANCES.
414+
# This handles fast-completing tests that StationWatcher might not see.
415+
test_uid = test_record.test_uid
416+
if test_uid:
417+
test, _ = _get_test_by_uid(test_uid)
418+
if test is not None:
419+
_cache_phase_descriptors(test_uid, test)
420+
283421
test_record_dict = data.convert_to_base_types(test_record)
284422
test_state_dict = _test_state_from_record(test_record_dict,
285423
test_record.test_uid)
286-
cls._publish_test_state(test_state_dict, 'record')
424+
cls._publish_test_state(test_state_dict, [], 'record')
287425

288426
@classmethod
289-
def publish_update(cls, test_state_dict):
290-
"""Publish the state of the currently executing test."""
291-
cls._publish_test_state(test_state_dict, 'update')
427+
def publish_update(cls, parent_state_dict, child_state_dicts=None):
428+
"""Publish the state of the currently executing tests."""
429+
cls._publish_test_state(parent_state_dict, child_state_dicts or [], 'update')
292430

293431
@classmethod
294-
def _publish_test_state(cls, test_state_dict, message_type):
432+
def _publish_test_state(cls, parent_state_dict, child_state_dicts, message_type):
295433
message = {
296-
'state': test_state_dict,
297-
'test_uid': test_state_dict['execution_uid'],
434+
'state': parent_state_dict,
435+
'test_uid': parent_state_dict['execution_uid'],
298436
'type': message_type,
437+
'child_tests': [
438+
{
439+
'state': child_dict,
440+
'test_uid': child_dict['execution_uid'],
441+
}
442+
for child_dict in child_state_dicts
443+
],
299444
}
300-
super(StationPubSub, cls).publish(message)
445+
# IMPORTANT: Update _last_update_message BEFORE publish() to avoid race
446+
# condition where a new subscriber connects after publish() but before
447+
# the update, causing them to receive a stale message in on_subscribe().
301448
cls._last_message = message
449+
if message_type == 'update':
450+
cls._last_update_message = message
451+
super(StationPubSub, cls).publish(message)
452+
453+
@classmethod
454+
def clear_last_update(cls):
455+
"""Clear the last update message when tests complete."""
456+
cls._last_update_message = None
302457

303458
def on_subscribe(self, info):
304459
"""Send the more recent test state to new subscribers when they connect.
@@ -310,18 +465,20 @@ def on_subscribe(self, info):
310465
"""
311466
test, _ = _get_executing_test()
312467

313-
if self._last_message is not None and test is not None:
314-
self.send(self._last_message)
468+
# Use _last_update_message for new subscribers - this preserves child_tests
469+
# even if 'record' messages have been published for completed children.
470+
if self._last_update_message is not None and test is not None:
471+
self.send(self._last_update_message)
315472

316473

317474
class BaseTestHandler(web_gui_server.CorsRequestHandler):
318475
"""Base class for HTTP endpoints that get test data."""
319476

320477
def get_test(self, test_uid):
321478
"""Get the specified test. Write 404 and return None if it is not found."""
322-
test, test_state = _get_executing_test()
479+
test, test_state = _get_test_by_uid(test_uid)
323480

324-
if test is None or str(test.uid) != test_uid:
481+
if test is None:
325482
self.write('Unknown test UID %s' % test_uid)
326483
self.set_status(404)
327484
return None, None
@@ -384,15 +541,27 @@ class PhasesHandler(BaseTestHandler):
384541
"""GET endpoint for phase descriptors for a test, i.e. the full phase list."""
385542

386543
def get(self, test_uid):
387-
test, _ = self.get_test(test_uid)
388-
389-
if test is None:
390-
return
391-
392-
phase_descriptors = [
393-
dict(id=id(phase), **data.convert_to_base_types(phase))
394-
for phase in test.descriptor.phase_sequence.all_phases()
395-
]
544+
# First try to get the test from TEST_INSTANCES (still running)
545+
test, _ = _get_test_by_uid(test_uid)
546+
547+
if test is not None:
548+
# Test is still running, get live phase descriptors and cache them
549+
phase_descriptors = [
550+
dict(id=id(phase), **data.convert_to_base_types(phase))
551+
for phase in test.descriptor.phase_sequence.all_phases()
552+
]
553+
# Cache for future requests after test completes
554+
with _PHASE_DESCRIPTOR_CACHE_LOCK:
555+
_PHASE_DESCRIPTOR_CACHE[test_uid] = phase_descriptors
556+
else:
557+
# Test not found in TEST_INSTANCES - try the cache
558+
# This handles child tests that have already completed
559+
phase_descriptors = _get_cached_phase_descriptors(test_uid)
560+
if phase_descriptors is None:
561+
# Not in cache either - return 404
562+
self.write('Unknown test UID %s' % test_uid)
563+
self.set_status(404)
564+
return
396565

397566
# Wrap value in a dict because writing a list directly is prohibited.
398567
self.write({'data': phase_descriptors})

openhtf/output/web_gui/dist/css/app.25f3f3a128d326614a9c.css renamed to openhtf/output/web_gui/dist/css/app.575e9bb61286fde76cb9.css

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openhtf/output/web_gui/dist/css/app.25f3f3a128d326614a9c.css.map renamed to openhtf/output/web_gui/dist/css/app.575e9bb61286fde76cb9.css.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openhtf/output/web_gui/dist/index.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<head><link href="/css/app.25f3f3a128d326614a9c.css" rel="stylesheet"></head><!doctype html>
1+
<head><link href="/css/app.575e9bb61286fde76cb9.css" rel="stylesheet"></head><!doctype html>
22
<!--
33
Copyright 2022 Google LLC
44
@@ -22,4 +22,4 @@
2222

2323
<base href="/">
2424
<htf-app config="{{ json_encode(config) }}">Loading...</htf-app>
25-
<script type="text/javascript" src="/js/polyfills.25f3f3a128d326614a9c.js"></script><script type="text/javascript" src="/js/vendor.25f3f3a128d326614a9c.js"></script><script type="text/javascript" src="/js/app.25f3f3a128d326614a9c.js"></script>
25+
<script type="text/javascript" src="/js/polyfills.575e9bb61286fde76cb9.js"></script><script type="text/javascript" src="/js/vendor.575e9bb61286fde76cb9.js"></script><script type="text/javascript" src="/js/app.575e9bb61286fde76cb9.js"></script>

openhtf/output/web_gui/dist/js/app.25f3f3a128d326614a9c.js.map

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)