Skip to content

Commit 9016ab2

Browse files
committed
python: derive flux.job.JournalConsumer from JournalConsumerBase
Problem: The flux.job.JournalConsumer class duplicates code from the JournalConsumerBase class. Make flux.job.JournalConsumer a subclass of the JournalConsumerBase abstract base class.
1 parent e81cb8c commit 9016ab2

File tree

1 file changed

+28
-166
lines changed

1 file changed

+28
-166
lines changed

src/bindings/python/flux/job/journal.py

Lines changed: 28 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,11 @@
88
# SPDX-License-Identifier: LGPL-3.0
99
###############################################################
1010

11-
import errno
12-
from collections import deque
13-
14-
from flux.constants import FLUX_RPC_NORESPONSE, FLUX_RPC_STREAMING
11+
from flux.abc import JournalConsumerBase, JournalEventBase
1512
from flux.job import JobID
16-
from flux.job.event import EventLogEvent
1713

1814

19-
class JournalEvent(EventLogEvent):
15+
class JournalEvent(JournalEventBase):
2016
"""A container for an event from the job manager journal
2117
2218
Attributes:
@@ -66,7 +62,7 @@ def __str__(self):
6662
"""
6763

6864

69-
class JournalConsumer:
65+
class JournalConsumer(JournalConsumerBase):
7066
"""Class for consuming the job manager journal
7167
7268
This class is a wrapper around the ``job-manager.events-journal`` RPC,
@@ -108,171 +104,37 @@ class JournalConsumer:
108104
109105
"""
110106

111-
def __init__(self, flux_handle, full=True, since=0.0, include_sentinel=False):
112-
self.handle = flux_handle
113-
self.backlog = deque()
114-
self.full = full
115-
self.since = since
116-
self.rpc = None
117-
self.cb = None
118-
self.cb_args = []
119-
self.cb_kwargs = {}
120-
self.processing_inactive = self.full
121-
self.include_sentinel = include_sentinel
122-
123-
def __sort_backlog(self):
124-
self.processing_inactive = False
125-
self.backlog = deque(sorted(self.backlog, key=lambda x: x.timestamp))
126-
if self.include_sentinel:
127-
self.backlog.append(SENTINEL_EVENT)
128-
129-
def __enqueue_response(self, resp):
130-
if resp is None:
131-
# End of data, enqueue None:
132-
self.backlog.append(None)
133-
return
134-
135-
jobid = resp["id"]
136-
jobspec = resp.get("jobspec")
137-
R = resp.get("R")
138-
for entry in resp["events"]:
139-
event = JournalEvent(jobid, entry)
140-
if event.timestamp > self.since:
141-
if event.name == "submit":
142-
event.jobspec = jobspec or None
143-
elif event.name == "alloc":
144-
event.R = R or None
145-
self.backlog.append(event)
146-
147-
def __next_event(self):
148-
return self.backlog.popleft()
149-
150-
def __set_then_cb(self):
151-
if self.rpc is not None and self.cb is not None:
152-
try:
153-
self.rpc.then(self.__cb)
154-
except OSError as exc:
155-
if exc.errno == errno.EINVAL:
156-
# then callback is already set
157-
pass
158-
159-
def start(self):
160-
"""Start the stream of events by sending a request to the job manager
161-
162-
This function sends the job-manager.events-journal RPC to the
163-
job manager. It must be called to start the stream of events.
164-
165-
.. note::
166-
If :func:`start` is called more than once the stream of events
167-
will be restarted using the original options passed to the
168-
constructor. This may cause duplicate events, or missed events
169-
if *full* is False since no history will be included.
170-
"""
171-
self.rpc = self.handle.rpc(
172-
"job-manager.events-journal", {"full": self.full}, 0, FLUX_RPC_STREAMING
173-
)
174-
# Need to call self.rpc.then() if a user cb is registered:
175-
self.__set_then_cb()
176-
177-
return self
178-
179-
def stop(self):
180-
"""Cancel the job-manager.events-journal RPC
107+
SENTINEL_EVENT = SENTINEL_EVENT
181108

182-
Cancel the RPC. This will eventually stop the stream of events to
183-
either :func:`poll` or the defined callback. After all events have
184-
been processed an *event* of None will be returned by :func:`poll`
185-
or the defined callback.
186-
"""
187-
self.handle.rpc(
188-
"job-manager.events-journal-cancel",
189-
{"matchtag": self.rpc.pimpl.get_matchtag()},
190-
0,
191-
FLUX_RPC_NORESPONSE,
109+
def __init__(self, flux_handle, full=True, since=0.0, include_sentinel=False):
110+
super().__init__(
111+
flux_handle,
112+
"job-manager.events-journal",
113+
full=full,
114+
since=since,
115+
include_sentinel=include_sentinel,
192116
)
193117

194-
def poll(self, timeout=-1.0):
195-
"""Synchronously get the next job event
196-
197-
if *full* is True, then this call will not return until all
198-
historical events have been processed. Historical events will sorted
199-
in time order and returned once per :func:`poll` call.
200-
201-
:func:`start` must be called before this function.
202-
203-
Args:
204-
timeout (float): Only wait *timeout* seconds for the next event.
205-
If the timeout expires then a :exc:`TimeoutError` is raised.
206-
A *timeout* of -1.0 disables any timeout.
207-
Raises:
208-
RuntimeError: :func:`poll` was called before :func:`start`.
209-
"""
210-
if self.rpc is None:
211-
raise RuntimeError("poll() called before start()")
212-
213-
if self.processing_inactive:
214-
# process backlog. Time order events once done:
215-
while self.processing_inactive:
216-
resp = self.rpc.wait_for(timeout).get()
217-
if resp["id"] == -1:
218-
self.__sort_backlog()
219-
else:
220-
self.__enqueue_response(resp)
221-
self.rpc.reset()
222-
223-
while not self.backlog:
224-
try:
225-
resp = self.rpc.wait_for(timeout).get()
226-
except OSError as exc:
227-
if exc.errno != errno.ENODATA:
228-
raise
229-
resp = None
230-
self.__enqueue_response(resp)
231-
self.rpc.reset()
232-
233-
return self.__next_event()
234-
235-
def __user_cb_flush(self):
236-
if self.processing_inactive:
237-
# all events are accumulated in the backlog while we're still
238-
# processing inactive job events so that those events can be
239-
# sorted in timestamp order:
240-
return
241-
while self.backlog:
242-
self.cb(self.__next_event(), *self.cb_args, **self.cb_kwargs)
243-
244-
def __cb(self, future):
245-
try:
246-
resp = future.get()
247-
if self.processing_inactive and resp["id"] == -1:
248-
self.__sort_backlog()
249-
else:
250-
self.__enqueue_response(resp)
251-
self.__user_cb_flush()
252-
except OSError as exc:
253-
if exc.errno == errno.ENODATA:
254-
self.__enqueue_response(None)
255-
finally:
256-
future.reset()
257-
258-
def set_callback(self, event_cb, *args, **kwargs):
259-
"""Register callback *event_cb* to be called for each job event
260-
261-
If provided, ``*args``, and ``**kwargs`` are passed along to
262-
*event_cb*, whose only required argument is an *event*, e.g.
118+
@property
119+
def request_payload(self):
120+
return {"full": self.full}
263121

264-
>>> def event_cb(event)
265-
>>> # do something with event
122+
def process_response(self, resp):
123+
"""Process a single job manager journal response
266124
267-
After a :obj:`JournalConsumer` is stopped and the final event is
268-
received, *event_cb* will be called with an *event* of None, which
269-
signals the end of the event stream.
125+
The response will contain the jobid and possibly jobspec and R,
126+
depending on the specific events in the payload. Return these in
127+
a dict so they are passed as keyword arguments to create_event.
270128
"""
271-
self.cb = event_cb
272-
self.cb_args = args
273-
self.cb_kwargs = kwargs
274-
self.__set_then_cb()
275-
return self
129+
return {
130+
"jobid": resp.get("id"),
131+
"jobspec": resp.get("jobspec"),
132+
"R": resp.get("R"),
133+
}
134+
135+
def create_event(self, entry, jobid=-1, jobspec=None, R=None):
136+
"""Create a single JournalEvent from one event entry"""
137+
return JournalEvent(jobid, entry, jobspec=jobspec, R=R)
276138

277139

278140
# vi: ts=4 sw=4 expandtab

0 commit comments

Comments
 (0)