Skip to content

Commit aa3f5ee

Browse files
committed
testsuite: add tests for Python ResourceJournalConsumer class
Problem: There are no test of the Python ResourceJournalConsumer class. Add a Python based test in t/python/t0032-resource-journal.py.
1 parent d359e7c commit aa3f5ee

File tree

2 files changed

+249
-0
lines changed

2 files changed

+249
-0
lines changed

t/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ TESTSCRIPTS = \
292292
python/t0029-fileref.py \
293293
python/t0030-journal.py \
294294
python/t0031-conf-builtin.py \
295+
python/t0032-resource-journal.py \
295296
python/t1000-service-add-remove.py
296297

297298
if HAVE_FLUX_SECURITY

t/python/t0032-resource-journal.py

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
#!/usr/bin/env python3
2+
###############################################################
3+
# Copyright 2025 Lawrence Livermore National Security, LLC
4+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
5+
#
6+
# This file is part of the Flux resource manager framework.
7+
# For details, see https://github.com/flux-framework.
8+
#
9+
# SPDX-License-Identifier: LGPL-3.0
10+
###############################################################
11+
12+
import unittest
13+
14+
import flux
15+
from flux.resource import ResourceJournalConsumer, ResourceSet
16+
from subflux import rerun_under_flux
17+
18+
19+
def __flux_size():
20+
return 4
21+
22+
23+
class TestResourceJournalConsumer(unittest.TestCase):
24+
25+
@classmethod
26+
def setUpClass(self):
27+
self.fh = flux.Flux()
28+
29+
def test_history(self):
30+
consumer = ResourceJournalConsumer(self.fh).start()
31+
32+
events = []
33+
while True:
34+
# Read events until we see resource-define
35+
event = consumer.poll(timeout=5.0)
36+
events.append(event)
37+
if event.name == "resource-define":
38+
break
39+
consumer.stop()
40+
41+
# events are ordered
42+
self.assertTrue(
43+
all(
44+
events[index].timestamp < events[index + 1].timestamp
45+
for index in range(len(events) - 1)
46+
)
47+
)
48+
49+
# get timestamp of arbitrary event in history:
50+
seq = 1
51+
timestamp = events[seq].timestamp
52+
53+
# new consumer with since timestamp
54+
consumer = ResourceJournalConsumer(self.fh, since=timestamp).start()
55+
events2 = []
56+
while True:
57+
event = consumer.poll(timeout=5.0)
58+
events2.append(event)
59+
if event.timestamp == events[-1].timestamp:
60+
break
61+
62+
consumer.stop()
63+
64+
# events2 should be equal to the events[seq+1:]:
65+
self.assertListEqual(events2, events[seq + 1 :])
66+
67+
# ensure JournalEvent can be converted to a string:
68+
print(f"{events[-1]}")
69+
70+
def test_async(self):
71+
events = []
72+
test_arg2 = 42
73+
test_kw_arg = "foo"
74+
75+
count = [0]
76+
77+
def test_cb(event, arg2, kw_arg=None):
78+
events.append(event)
79+
self.assertEqual(arg2, test_arg2)
80+
self.assertEqual(kw_arg, test_kw_arg)
81+
count[0] += 1
82+
if count[0] == 4:
83+
# do not stop consumer here. Adding a new callback on
84+
# an active consumer is tested below
85+
self.fh.reactor_stop()
86+
87+
consumer = ResourceJournalConsumer(self.fh).start()
88+
consumer.set_callback(test_cb, test_arg2, kw_arg=test_kw_arg)
89+
90+
self.fh.reactor_run()
91+
92+
# historical events are ordered
93+
self.assertTrue(
94+
all(
95+
events[index].timestamp < events[index + 1].timestamp
96+
for index in range(len(events) - 1)
97+
)
98+
)
99+
100+
new_events = []
101+
102+
def new_cb(event):
103+
if event.name == "drain":
104+
consumer.stop()
105+
self.fh.reactor_stop()
106+
new_events.append(event)
107+
108+
# Reset callback to append to new_events
109+
consumer.set_callback(new_cb)
110+
111+
# drain a node to append a new event to resource eventlog
112+
self.fh.rpc("resource.drain", {"targets": "0", "reason": "test"}).get()
113+
self.fh.reactor_run()
114+
115+
for event in new_events:
116+
self.assertTrue(event.name == "drain")
117+
118+
# restore state
119+
self.fh.rpc("resource.undrain", {"targets": "0"}).get()
120+
121+
# stop consumer
122+
consumer.stop()
123+
124+
def test_sentinel(self):
125+
self.assertTrue(ResourceJournalConsumer.SENTINEL_EVENT.is_empty())
126+
127+
# Ensure empty event can be converted to a string:
128+
print(ResourceJournalConsumer.SENTINEL_EVENT)
129+
130+
consumer = ResourceJournalConsumer(self.fh, include_sentinel=True)
131+
consumer.start()
132+
while True:
133+
event = consumer.poll(0.1)
134+
if event == consumer.SENTINEL_EVENT:
135+
break
136+
137+
def test_sentinel_async(self):
138+
139+
def cb(event):
140+
if event is None:
141+
self.fh.reactor_stop()
142+
if event.is_empty():
143+
consumer.stop()
144+
145+
consumer = ResourceJournalConsumer(self.fh, include_sentinel=True)
146+
consumer.start()
147+
consumer.set_callback(cb)
148+
self.fh.reactor_run()
149+
150+
def test_poll_timeout(self):
151+
152+
consumer = ResourceJournalConsumer(self.fh).start()
153+
with self.assertRaises(TimeoutError):
154+
while True:
155+
consumer.poll(timeout=0.1)
156+
consumer.stop()
157+
158+
def test_poll_ENODATA(self):
159+
160+
consumer = ResourceJournalConsumer(self.fh, include_sentinel=True).start()
161+
while True:
162+
event = consumer.poll(0.1)
163+
if event.is_empty():
164+
consumer.stop()
165+
break
166+
self.assertIsNone(consumer.poll(timeout=5.0))
167+
168+
def test_poll_RuntimeError(self):
169+
170+
with self.assertRaises(RuntimeError):
171+
ResourceJournalConsumer(self.fh).poll(5.0)
172+
173+
def test_poll_cb_set_before_start(self):
174+
175+
def cb(event):
176+
if event.is_empty():
177+
consumer.stop()
178+
self.fh.reactor_stop()
179+
180+
consumer = ResourceJournalConsumer(self.fh, include_sentinel=True)
181+
consumer.set_callback(cb)
182+
consumer.start()
183+
self.fh.reactor_run()
184+
185+
def test_poll_cb_reset(self):
186+
"""test that the consumer callback can be reset"""
187+
188+
def cb(event):
189+
self.fail("incorrect callback called")
190+
191+
def cb2(event):
192+
if event is None:
193+
self.fh.reactor_stop()
194+
if event.is_empty():
195+
consumer.stop()
196+
197+
consumer = ResourceJournalConsumer(self.fh, include_sentinel=True)
198+
consumer.set_callback(cb)
199+
consumer.start()
200+
consumer.set_callback(cb2)
201+
self.fh.reactor_run()
202+
203+
def test_R_in_resource_define_event(self):
204+
"""test that R is available in resource-define event"""
205+
consumer = ResourceJournalConsumer(self.fh).start()
206+
while True:
207+
event = consumer.poll()
208+
if event is None:
209+
break
210+
if event.name == "resource-define":
211+
self.assertTrue(hasattr(event, "R"))
212+
R = ResourceSet(event.R)
213+
self.assertEqual(R.nnodes, 4)
214+
consumer.stop()
215+
216+
def test_event_after_history(self):
217+
"""Test that a consumer gets new events after history is processed"""
218+
219+
def cb(event, wait):
220+
if event.is_empty():
221+
# history processed, now drain a rank
222+
print("got sentinel event, draining rank 0")
223+
self.fh.rpc(
224+
"resource.drain", {"targets": "0", "reason": "history-test"}
225+
).get()
226+
elif (
227+
event.name == "drain"
228+
and "reason" in event.context
229+
and event.context["reason"] == "history-test"
230+
):
231+
print("got drain event, undraining rank 0")
232+
self.fh.rpc("resource.undrain", {"targets": "0"}).get()
233+
wait[0] = True
234+
elif wait[0] and event.name == "undrain":
235+
print("got undrain event, stopping consumer")
236+
consumer.stop()
237+
238+
consumer = ResourceJournalConsumer(self.fh, include_sentinel=True)
239+
consumer.set_callback(cb, wait=[False])
240+
consumer.start()
241+
self.fh.reactor_run()
242+
243+
244+
if __name__ == "__main__":
245+
if rerun_under_flux(__flux_size()):
246+
from pycotap import TAPTestRunner
247+
248+
unittest.main(testRunner=TAPTestRunner(), buffer=False)

0 commit comments

Comments
 (0)