Skip to content

Commit e81cb8c

Browse files
committed
python: add JournalConsumerBase abstract base class
Problem: The JournalConsumer class in flux.job is specific to the job manager journal, but there may be other services that implement a journal-like interface. Add an abstract base class for journal consumer interfaces in flux.abc.JournalConsumerBase. Include a JournalEventBase class that derives from EventLogEvent. Most of the journal consumer interface is implemented in this class, with the expectation that derived classes only need to implement the few abstract methods left open for implementations.
1 parent 5550ee1 commit e81cb8c

File tree

3 files changed

+374
-0
lines changed

3 files changed

+374
-0
lines changed

src/bindings/python/flux/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ nobase_fluxpy_PYTHON = \
7272
constraint/parser.py \
7373
constraint/parsetab.py \
7474
constraint/__init__.py \
75+
abc/journal.py \
76+
abc/__init__.py \
7577
utils/parsedatetime/__init__.py \
7678
utils/parsedatetime/parsedatetime.py \
7779
utils/parsedatetime/warns.py \
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
###############################################################
2+
# Copyright 2025 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
11+
from flux.abc.journal import JournalConsumerBase, JournalEventBase
12+
13+
__all__ = ["JournalConsumerBase", "JournalEventBase"]
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
###############################################################
2+
# Copyright 2025 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
###############################################################
10+
11+
"""Abstract classes for journal consumer interfaces"""
12+
13+
import errno
14+
import time
15+
from abc import ABC, abstractmethod
16+
from collections import deque
17+
18+
from flux.constants import FLUX_RPC_NORESPONSE, FLUX_RPC_STREAMING
19+
from flux.eventlog import EventLogEvent
20+
21+
22+
class JournalEventBase(EventLogEvent):
23+
"""A container for an event from a journal RPC interface
24+
25+
Attributes:
26+
name (str): event name (Possible event names will depend on the
27+
journal service being consumed)
28+
timestamp (float): event timestamp in seconds since the epoch
29+
with sub-millisecond precision.
30+
context (dict): context dictionary
31+
context_string (str): context dict converted to comma separated
32+
key=value string.
33+
"""
34+
35+
def __init__(self, event):
36+
# If no timestamp, then capture the time now
37+
if "timestamp" not in event:
38+
event["timestamp"] = time.time()
39+
super().__init__(event)
40+
41+
def is_empty(self):
42+
"""Return True if this event is an empty journal event"""
43+
return "name" not in self
44+
45+
def __str__(self):
46+
if self.is_empty():
47+
return f"{self.timestamp:<0.5f} End of historical data stream"
48+
return " ".join(
49+
[
50+
f"{self.timestamp:<0.5f}",
51+
self.name,
52+
self.context_string,
53+
]
54+
)
55+
56+
57+
class JournalConsumerBase(ABC):
58+
"""Base class for consuming events from a journal-style RPC
59+
60+
This base class implements a wrapper around journal style RPCs
61+
which allow clients to obtain a stream of events, possibly including
62+
historical data.
63+
64+
Subclasses of :obj`JournalConsumerBase` should be implemented to more
65+
completely handle the specifics of a given interface, but all such
66+
subclasses will follow the interface documented here.
67+
68+
A journal consumer is created by passing the constructor an open
69+
:obj:`~flux.Flux` handle, the topic string of the journal service along
70+
with any other optional parameters described below. The :func:`start`
71+
method should then be called, which sends the initial RPC. To stop the
72+
stream of events, call the :func:`stop` method. After all events have
73+
been processed :func:`poll` or the registered callback will return None.
74+
75+
When the consumer is first started, historical data (events in the past)
76+
will be sent from the journal service unless *full* is set to False.
77+
These events are stored until all historical events are processed,
78+
then are time ordered before returning them via :func:`poll` or to
79+
the callback registered by :func:`set_callback`.
80+
81+
To avoid processing previously seen events with a new instance of this
82+
class, the timestamp of the newest processed event can be passed via the
83+
*since* parameter. Timestamps should be unique so :func:`poll` or the
84+
callback will start with the newest event after the *since* timestamp.
85+
86+
When *full* is True, a compliant journal service sends a sentinel
87+
event in the journal stream to demarcate the transition between
88+
history and current events. If *include_sentinel* is set to True,
89+
then an empty event will be returned by :func:`poll` or the
90+
registered callback to represent the sentinel. An empty event
91+
can be compared to :data:`self.SENTINEL_EVENT` or by using the
92+
:func:`JournalEventBase.is_empty` method. The default behavior is to
93+
suppress the sentinel.
94+
95+
Subclasses of :obj:`JournalConsumerBase` must implement the following
96+
abstract methods, see method documentation for details:
97+
98+
* :meth:`flux.abc.JournalConsumerBase.process_response`, in which a
99+
single response from the journal service is processed and a dictionary
100+
of kwargs is returned.
101+
* :meth:`flux.abc.JournalConsumerBase.create_event`, which is passed
102+
one event entry from the same response and the `**kwargs` obtained
103+
from above, and should return an object of a class derived from
104+
:obj:`JournalEventBase`
105+
106+
Subclasses may optionally implement the following methods:
107+
108+
* :meth:`flux.abc.JournalConsumerBase.is_empty_response`, which is
109+
used to determine if a journal response message is to be considered
110+
the "empty" sentinel response.
111+
112+
Finally, subclasses may optionally override the following properties:
113+
114+
* :data:`request_payload`, which is the payload of a journal request
115+
(the default is an empty payload)
116+
117+
* :data:`SENTINEL_EVENT`, which is a class constant representing the
118+
sentinel event for this class.
119+
120+
"""
121+
122+
SENTINEL_EVENT = JournalEventBase({})
123+
124+
def __init__(
125+
self,
126+
flux_handle,
127+
topic,
128+
cancel_topic=None,
129+
full=True,
130+
since=0.0,
131+
include_sentinel=False,
132+
):
133+
"""Initialize a :obj:`JournalConsumerBase` instance
134+
135+
Args:
136+
flux_handle (:obj:`~flux.Flux`): A Flux handle
137+
topic (str): The journal RPC topic string
138+
cancel_topic (:obj:`str`, optional): The RPC topic string to cancel
139+
an active journal request. If not set, it will defuault to
140+
topic + "-cancel".
141+
full (bool, optional): Whether to return full history, if the
142+
journal interface supports it. Defaults to True.
143+
since (float, optional): Return only events that have a timestamp
144+
greater than ``since``.
145+
include_sentinel (bool, optional): Return an empty event upon
146+
receipt of the sentinel from the journal service. The default
147+
is to suppress this event.
148+
"""
149+
self.handle = flux_handle
150+
self.topic = topic
151+
self.cancel_topic = cancel_topic
152+
153+
if self.cancel_topic is None:
154+
self.cancel_topic = f"{self.topic}-cancel"
155+
156+
self.backlog = deque()
157+
self.full = full
158+
self.since = since
159+
self.rpc = None
160+
self.cb = None
161+
self.cb_args = []
162+
self.cb_kwargs = {}
163+
self.processing_inactive = self.full
164+
self.include_sentinel = include_sentinel
165+
166+
def __sort_backlog(self):
167+
self.processing_inactive = False
168+
self.backlog = deque(sorted(self.backlog, key=lambda x: x.timestamp))
169+
if self.include_sentinel:
170+
self.backlog.append(self.SENTINEL_EVENT)
171+
172+
@abstractmethod
173+
def process_response(self, resp):
174+
"""Process a journal response.
175+
176+
This method may return a dictionary which is then passed as
177+
``*kwargs`` to :meth:`create_event` below.
178+
179+
Args:
180+
resp (dict): The payload of a single journal response (converted
181+
from json to a dict)
182+
Returns:
183+
dict
184+
"""
185+
return {}
186+
187+
@abstractmethod
188+
def create_event(self, entry, **kwargs):
189+
"""Return a journal event object from a response entry
190+
191+
This method should return an event object given an individual eventlog
192+
entry and ``**kwargs`` returned from :meth:`process_response`.
193+
194+
Args:
195+
entry (dict): An individual event dictionary from the ``events``
196+
array of a journal response.
197+
**kwargs : Additional keyword arguments returned from the
198+
:meth:`process_response` method.
199+
200+
Returns:
201+
:obj:`JournalEventBase` or a subclass
202+
203+
"""
204+
return JournalEventBase(entry)
205+
206+
def __enqueue_response(self, resp, *args):
207+
if resp is None:
208+
# End of data, enqueue None:
209+
self.backlog.append(None)
210+
return
211+
212+
kwargs = self.process_response(resp)
213+
for entry in resp["events"]:
214+
event = self.create_event(entry, **kwargs)
215+
if event.timestamp > self.since:
216+
self.backlog.append(event)
217+
218+
def __next_event(self):
219+
return self.backlog.popleft()
220+
221+
def __set_then_cb(self):
222+
if self.rpc is not None and self.cb is not None:
223+
try:
224+
self.rpc.then(self.__cb)
225+
except OSError as exc:
226+
if exc.errno == errno.EINVAL:
227+
# then callback is already set
228+
pass
229+
230+
def is_empty_response(self, resp):
231+
"""Return True if resp is "empty" """
232+
return len(resp["events"]) == 0
233+
234+
@property
235+
def request_payload(self):
236+
"""Approprioate request payload for this journal RPC"""
237+
return {}
238+
239+
def start(self):
240+
"""Start the stream of events by sending a request
241+
242+
This function initiates the stream of events for a journal
243+
consumer by sending the initial request to the configured journal
244+
service endpoint.
245+
246+
.. note::
247+
If :func:`start` is called more than once the stream of events
248+
will be restarted using the original options passed to the
249+
constructor. This may cause duplicate events, or missed events
250+
if *full* is False since no history will be included.
251+
"""
252+
self.rpc = self.handle.rpc(
253+
self.topic, self.request_payload, 0, FLUX_RPC_STREAMING
254+
)
255+
# Need to call self.rpc.then() if a user cb is registered:
256+
self.__set_then_cb()
257+
258+
return self
259+
260+
def stop(self):
261+
"""Cancel the journal RPC
262+
263+
Cancel the RPC. This will eventually stop the stream of events to
264+
either :func:`poll` or the defined callback. After all events have
265+
been processed an *event* of None will be returned by :func:`poll`
266+
or the defined callback.
267+
"""
268+
self.handle.rpc(
269+
self.cancel_topic,
270+
{"matchtag": self.rpc.pimpl.get_matchtag()},
271+
0,
272+
FLUX_RPC_NORESPONSE,
273+
)
274+
275+
def poll(self, timeout=-1.0):
276+
"""Synchronously get the next journal event
277+
278+
if *full* is True, then this call will not return until all
279+
historical events have been processed. Historical events will sorted
280+
in time order and returned once per :func:`poll` call.
281+
282+
:func:`start` must be called before this function.
283+
284+
Args:
285+
timeout (float): Only wait *timeout* seconds for the next event.
286+
If the timeout expires then a :exc:`TimeoutError` is raised.
287+
A *timeout* of -1.0 disables any timeout.
288+
Raises:
289+
RuntimeError: :func:`poll` was called before :func:`start`.
290+
"""
291+
if self.rpc is None:
292+
raise RuntimeError("poll() called before start()")
293+
294+
if self.processing_inactive:
295+
# process backlog. Time order events once done:
296+
while self.processing_inactive:
297+
resp = self.rpc.wait_for(timeout).get()
298+
if self.is_empty_response(resp):
299+
self.__sort_backlog()
300+
else:
301+
self.__enqueue_response(resp)
302+
self.rpc.reset()
303+
304+
while not self.backlog:
305+
try:
306+
resp = self.rpc.wait_for(timeout).get()
307+
except OSError as exc:
308+
if exc.errno != errno.ENODATA:
309+
raise
310+
resp = None
311+
self.__enqueue_response(resp)
312+
self.rpc.reset()
313+
314+
return self.__next_event()
315+
316+
def __user_cb_flush(self):
317+
if self.processing_inactive:
318+
# all events are accumulated in the backlog while we're still
319+
# processing history so that those events can be sorted in
320+
# timestamp order:
321+
return
322+
while self.backlog:
323+
self.cb(self.__next_event(), *self.cb_args, **self.cb_kwargs)
324+
325+
def __cb(self, future):
326+
try:
327+
resp = future.get()
328+
if self.processing_inactive and self.is_empty_response(resp):
329+
self.__sort_backlog()
330+
else:
331+
self.__enqueue_response(resp)
332+
self.__user_cb_flush()
333+
except OSError as exc:
334+
if exc.errno == errno.ENODATA:
335+
self.__enqueue_response(None)
336+
finally:
337+
future.reset()
338+
339+
def set_callback(self, event_cb, *args, **kwargs):
340+
"""Register callback *event_cb* to be called for each journal event
341+
342+
If provided, ``*args``, and ``**kwargs`` are passed along to
343+
*event_cb*, whose only required argument is an *event*, e.g.
344+
345+
>>> def event_cb(event)
346+
>>> # do something with event
347+
348+
After a :obj:`JournalConsumer` is stopped and the final event is
349+
received, *event_cb* will be called with an *event* of None, which
350+
signals the end of the event stream.
351+
"""
352+
self.cb = event_cb
353+
self.cb_args = args
354+
self.cb_kwargs = kwargs
355+
self.__set_then_cb()
356+
return self
357+
358+
359+
# vi: ts=4 sw=4 expandtab

0 commit comments

Comments
 (0)