Skip to content

Commit 4a58554

Browse files
committed
python: Add WatchImplementation class
Problem: The KVSWatchFuture and JobEventWatchFuture classes are identical except for the get() and cancel() calls to get a value or cancel the stream. Solution: Create a new WatchImplementation class that can act as an abstract class for both watch futures. The futures both need only implement their respective "get" and "cancel" implementations. Update KVSWatchFuture and JobEventWatchFuture to be based on WatchImplementation.
1 parent 731f861 commit 4a58554

File tree

2 files changed

+66
-56
lines changed

2 files changed

+66
-56
lines changed

src/bindings/python/flux/job/event.py

Lines changed: 21 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import json
1212

1313
from _flux._core import ffi
14-
from flux.future import Future
1514
from flux.job._wrapper import _RAW as RAW
15+
from flux.kvs import WatchImplementation
1616

1717
# Names of events that may appear in the main eventlog (i.e. ``eventlog="eventlog"``)
1818
# See Flux RFC 21 for documentation on each event.
@@ -74,23 +74,32 @@ def context_string(self):
7474
)
7575

7676

77-
class JobEventWatchFuture(Future):
77+
class JobEventWatchFuture(WatchImplementation):
7878
"""
7979
A future returned from job.event_watch_async().
8080
Adds get_event() method to return an EventLogEntry event
8181
"""
8282

83-
def __del__(self):
84-
if self.needs_cancel is not False:
85-
self.cancel()
86-
try:
87-
super().__del__()
88-
except AttributeError:
89-
pass
90-
9183
def __init__(self, future_handle):
9284
super().__init__(future_handle)
93-
self.needs_cancel = True
85+
86+
def watch_get(self, future):
87+
"""
88+
Implementation of watch_get() for JobEventWatchFuture.
89+
90+
Will be called from WatchABC.get()
91+
"""
92+
result = ffi.new("char *[1]")
93+
RAW.event_watch_get(future, result)
94+
return EventLogEvent(ffi.string(result[0]).decode("utf-8"))
95+
96+
def watch_cancel(self, future):
97+
"""
98+
Implementation of watch_cancel() for JobEventWatchFuture.
99+
100+
Will be called from WatchABC.cancel()
101+
"""
102+
RAW.event_watch_cancel(future)
94103

95104
def get_event(self, autoreset=True):
96105
"""
@@ -101,41 +110,7 @@ def get_event(self, autoreset=True):
101110
call to get_event() will try to fetch the next event and thus
102111
may block.
103112
"""
104-
result = ffi.new("char *[1]")
105-
try:
106-
# Block until Future is ready:
107-
self.wait_for()
108-
RAW.event_watch_get(self.pimpl, result)
109-
except OSError as exc:
110-
if exc.errno == errno.ENODATA:
111-
self.needs_cancel = False
112-
return None
113-
# raise handle exception if there is one
114-
self.raise_if_handle_exception()
115-
# re-raise all other exceptions
116-
#
117-
# Note: overwrite generic OSError strerror string with the
118-
# EventWatch future error string to give the caller appropriate
119-
# detail (e.g. instead of "No such file or directory" use
120-
# "job <jobid> does not exist"
121-
#
122-
exc.strerror = self.error_string()
123-
raise
124-
event = EventLogEvent(ffi.string(result[0]).decode("utf-8"))
125-
if autoreset is True:
126-
self.reset()
127-
return event
128-
129-
def cancel(self, stop=False):
130-
"""Cancel a streaming job.event_watch_async() future
131-
132-
If stop=True, then deactivate the multi-response future so no
133-
further callbacks are called.
134-
"""
135-
RAW.event_watch_cancel(self.pimpl)
136-
self.needs_cancel = False
137-
if stop:
138-
self.stop()
113+
return self.get(autoreset=autoreset)
139114

140115

141116
def event_watch_async(flux_handle, jobid, eventlog="eventlog"):

src/bindings/python/flux/kvs.py

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import errno
1313
import json
1414
import os
15+
from abc import ABC, abstractmethod
1516
from typing import Any, Mapping
1617

1718
import flux.constants
@@ -781,9 +782,11 @@ def walk(directory, topdown=False, flux_handle=None, namespace=None):
781782
return _inner_walk(directory, "", topdown, namespace=namespace)
782783

783784

784-
class KVSWatchFuture(Future):
785+
class WatchImplementation(Future, ABC):
785786
"""
786-
A future returned from kvs_watch_async().
787+
Interface for KVS based watchers
788+
789+
Users to implement watch_get() and watch_cancel() functions.
787790
"""
788791

789792
def __del__(self):
@@ -798,20 +801,26 @@ def __init__(self, future_handle):
798801
super().__init__(future_handle)
799802
self.needs_cancel = True
800803

804+
@abstractmethod
805+
def watch_get(self, future):
806+
pass
807+
808+
@abstractmethod
809+
def watch_cancel(self, future):
810+
pass
811+
801812
def get(self, autoreset=True):
802813
"""
803-
Return the new value of the KVS key or None if the stream has
804-
terminated.
814+
Return the new value or None if the stream has terminated.
805815
806816
The future is auto-reset unless autoreset=False, so a subsequent
807817
call to get() will try to fetch the next value and thus
808818
may block.
809819
"""
810-
valp = ffi.new("char *[1]")
811820
try:
812821
# Block until Future is ready:
813822
self.wait_for()
814-
RAW.flux_kvs_lookup_get(self.pimpl, valp)
823+
ret = self.watch_get(self.pimpl)
815824
except OSError as exc:
816825
if exc.errno == errno.ENODATA:
817826
self.needs_cancel = False
@@ -827,23 +836,49 @@ def get(self, autoreset=True):
827836
#
828837
exc.strerror = self.error_string()
829838
raise
830-
val = _get_value(valp)
831839
if autoreset is True:
832840
self.reset()
833-
return val
841+
return ret
834842

835843
def cancel(self, stop=False):
836-
"""Cancel a streaming kvs_watch_async() future
844+
"""Cancel a streaming future
837845
838846
If stop=True, then deactivate the multi-response future so no
839847
further callbacks are called.
840848
"""
841-
RAW.flux_kvs_lookup_cancel(self.pimpl)
849+
self.watch_cancel(self.pimpl)
842850
self.needs_cancel = False
843851
if stop:
844852
self.stop()
845853

846854

855+
class KVSWatchFuture(WatchImplementation):
856+
"""
857+
A future returned from kvs_watch_async().
858+
"""
859+
860+
def __init__(self, future_handle):
861+
super().__init__(future_handle)
862+
863+
def watch_get(self, future):
864+
"""
865+
Implementation of watch_get() for KVSWatchFuture.
866+
867+
Will be called from WatchABC.get()
868+
"""
869+
valp = ffi.new("char *[1]")
870+
RAW.flux_kvs_lookup_get(future, valp)
871+
return _get_value(valp)
872+
873+
def watch_cancel(self, future):
874+
"""
875+
Implementation of watch_cancel() for KVSWatchFuture.
876+
877+
Will be called from WatchABC.cancel()
878+
"""
879+
RAW.flux_kvs_lookup_cancel(future)
880+
881+
847882
def kvs_watch_async(
848883
flux_handle, key, namespace=None, waitcreate=False, uniq=False, full=False
849884
):

0 commit comments

Comments
 (0)