Skip to content

Commit 556d42c

Browse files
authored
Merge pull request #5389 from chu11/issue5327_python_kvs_modernize_kvswatch
python: support interface to perform KVS watch
2 parents cb6bc7b + 4a58554 commit 556d42c

File tree

3 files changed

+370
-52
lines changed

3 files changed

+370
-52
lines changed

src/bindings/python/flux/job/event.py

Lines changed: 21 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import json
1212

1313
from _flux._core import ffi
14-
from flux.future import Future
1514
from flux.job._wrapper import _RAW as RAW
15+
from flux.kvs import WatchImplementation
1616

1717
# Names of events that may appear in the main eventlog (i.e. ``eventlog="eventlog"``)
1818
# See Flux RFC 21 for documentation on each event.
@@ -74,23 +74,32 @@ def context_string(self):
7474
)
7575

7676

77-
class JobEventWatchFuture(Future):
77+
class JobEventWatchFuture(WatchImplementation):
7878
"""
7979
A future returned from job.event_watch_async().
8080
Adds get_event() method to return an EventLogEntry event
8181
"""
8282

83-
def __del__(self):
84-
if self.needs_cancel is not False:
85-
self.cancel()
86-
try:
87-
super().__del__()
88-
except AttributeError:
89-
pass
90-
9183
def __init__(self, future_handle):
9284
super().__init__(future_handle)
93-
self.needs_cancel = True
85+
86+
def watch_get(self, future):
87+
"""
88+
Implementation of watch_get() for JobEventWatchFuture.
89+
90+
Will be called from WatchABC.get()
91+
"""
92+
result = ffi.new("char *[1]")
93+
RAW.event_watch_get(future, result)
94+
return EventLogEvent(ffi.string(result[0]).decode("utf-8"))
95+
96+
def watch_cancel(self, future):
97+
"""
98+
Implementation of watch_cancel() for JobEventWatchFuture.
99+
100+
Will be called from WatchABC.cancel()
101+
"""
102+
RAW.event_watch_cancel(future)
94103

95104
def get_event(self, autoreset=True):
96105
"""
@@ -101,41 +110,7 @@ def get_event(self, autoreset=True):
101110
call to get_event() will try to fetch the next event and thus
102111
may block.
103112
"""
104-
result = ffi.new("char *[1]")
105-
try:
106-
# Block until Future is ready:
107-
self.wait_for()
108-
RAW.event_watch_get(self.pimpl, result)
109-
except OSError as exc:
110-
if exc.errno == errno.ENODATA:
111-
self.needs_cancel = False
112-
return None
113-
# raise handle exception if there is one
114-
self.raise_if_handle_exception()
115-
# re-raise all other exceptions
116-
#
117-
# Note: overwrite generic OSError strerror string with the
118-
# EventWatch future error string to give the caller appropriate
119-
# detail (e.g. instead of "No such file or directory" use
120-
# "job <jobid> does not exist"
121-
#
122-
exc.strerror = self.error_string()
123-
raise
124-
event = EventLogEvent(ffi.string(result[0]).decode("utf-8"))
125-
if autoreset is True:
126-
self.reset()
127-
return event
128-
129-
def cancel(self, stop=False):
130-
"""Cancel a streaming job.event_watch_async() future
131-
132-
If stop=True, then deactivate the multi-response future so no
133-
further callbacks are called.
134-
"""
135-
RAW.event_watch_cancel(self.pimpl)
136-
self.needs_cancel = False
137-
if stop:
138-
self.stop()
113+
return self.get(autoreset=autoreset)
139114

140115

141116
def event_watch_async(flux_handle, jobid, eventlog="eventlog"):

src/bindings/python/flux/kvs.py

Lines changed: 148 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import errno
1313
import json
1414
import os
15+
from abc import ABC, abstractmethod
1516
from typing import Any, Mapping
1617

18+
import flux.constants
1719
from _flux._core import ffi, lib
1820
from flux.future import Future
1921
from flux.rpc import RPC
@@ -31,19 +33,24 @@ class KVSWrapper(Wrapper):
3133
RAW.flux_kvsitr_next.set_error_check(lambda x: False)
3234

3335

36+
def _get_value(valp):
37+
try:
38+
ret = json.loads(ffi.string(valp[0]).decode("utf-8"))
39+
except json.decoder.JSONDecodeError:
40+
ret = ffi.string(valp[0]).decode("utf-8")
41+
except UnicodeDecodeError:
42+
ret = ffi.string(valp[0])
43+
return ret
44+
45+
3446
def get_key_direct(flux_handle, key, namespace=None):
3547
valp = ffi.new("char *[1]")
3648
future = RAW.flux_kvs_lookup(flux_handle, namespace, 0, key)
3749
RAW.flux_kvs_lookup_get(future, valp)
3850
if valp[0] == ffi.NULL:
3951
return None
4052

41-
try:
42-
ret = json.loads(ffi.string(valp[0]).decode("utf-8"))
43-
except json.decoder.JSONDecodeError:
44-
ret = ffi.string(valp[0]).decode("utf-8")
45-
except UnicodeDecodeError:
46-
ret = ffi.string(valp[0])
53+
ret = _get_value(valp)
4754
RAW.flux_future_destroy(future)
4855
return ret
4956

@@ -773,3 +780,138 @@ def walk(directory, topdown=False, flux_handle=None, namespace=None):
773780
raise ValueError("If directory is a key, flux_handle must be specified")
774781
directory = KVSDir(flux_handle, directory, namespace=namespace)
775782
return _inner_walk(directory, "", topdown, namespace=namespace)
783+
784+
785+
class WatchImplementation(Future, ABC):
786+
"""
787+
Interface for KVS based watchers
788+
789+
Users to implement watch_get() and watch_cancel() functions.
790+
"""
791+
792+
def __del__(self):
793+
if self.needs_cancel is not False:
794+
self.cancel()
795+
try:
796+
super().__del__()
797+
except AttributeError:
798+
pass
799+
800+
def __init__(self, future_handle):
801+
super().__init__(future_handle)
802+
self.needs_cancel = True
803+
804+
@abstractmethod
805+
def watch_get(self, future):
806+
pass
807+
808+
@abstractmethod
809+
def watch_cancel(self, future):
810+
pass
811+
812+
def get(self, autoreset=True):
813+
"""
814+
Return the new value or None if the stream has terminated.
815+
816+
The future is auto-reset unless autoreset=False, so a subsequent
817+
call to get() will try to fetch the next value and thus
818+
may block.
819+
"""
820+
try:
821+
# Block until Future is ready:
822+
self.wait_for()
823+
ret = self.watch_get(self.pimpl)
824+
except OSError as exc:
825+
if exc.errno == errno.ENODATA:
826+
self.needs_cancel = False
827+
return None
828+
# raise handle exception if there is one
829+
self.raise_if_handle_exception()
830+
# re-raise all other exceptions
831+
#
832+
# Note: overwrite generic OSError strerror string with the
833+
# EventWatch future error string to give the caller appropriate
834+
# detail (e.g. instead of "No such file or directory" use
835+
# "job <jobid> does not exist"
836+
#
837+
exc.strerror = self.error_string()
838+
raise
839+
if autoreset is True:
840+
self.reset()
841+
return ret
842+
843+
def cancel(self, stop=False):
844+
"""Cancel a streaming future
845+
846+
If stop=True, then deactivate the multi-response future so no
847+
further callbacks are called.
848+
"""
849+
self.watch_cancel(self.pimpl)
850+
self.needs_cancel = False
851+
if stop:
852+
self.stop()
853+
854+
855+
class KVSWatchFuture(WatchImplementation):
856+
"""
857+
A future returned from kvs_watch_async().
858+
"""
859+
860+
def __init__(self, future_handle):
861+
super().__init__(future_handle)
862+
863+
def watch_get(self, future):
864+
"""
865+
Implementation of watch_get() for KVSWatchFuture.
866+
867+
Will be called from WatchABC.get()
868+
"""
869+
valp = ffi.new("char *[1]")
870+
RAW.flux_kvs_lookup_get(future, valp)
871+
return _get_value(valp)
872+
873+
def watch_cancel(self, future):
874+
"""
875+
Implementation of watch_cancel() for KVSWatchFuture.
876+
877+
Will be called from WatchABC.cancel()
878+
"""
879+
RAW.flux_kvs_lookup_cancel(future)
880+
881+
882+
def kvs_watch_async(
883+
flux_handle, key, namespace=None, waitcreate=False, uniq=False, full=False
884+
):
885+
"""Asynchronously get KVS updates for a key
886+
887+
Args:
888+
flux_handle: A Flux handle obtained from flux.Flux()
889+
key: the key on which to watch
890+
namespace: namespace to read from, defaults to None. If namespace
891+
is None, the namespace specified in the FLUX_KVS_NAMESPACE
892+
environment variable will be used. If FLUX_KVS_NAMESPACE is not
893+
set, the primary namespace will be used.
894+
waitcreate: If True and a key does not yet exist, will wait
895+
for it to exit. Defaults to False.
896+
uniq: If True, only different values will be returned by
897+
watch. Defaults to False.
898+
full: If True, any change that can affect the key is
899+
monitored. Typically, this is to capture when a parent directory
900+
is removed or altered in some way. Typically kvs watch will not
901+
detect this as the exact key has not been changed. Defaults to
902+
False.
903+
904+
Returns:
905+
KVSWatchFuture: a KVSWatchFuture object. Call .get() from the then
906+
callback to get the currently returned value from the Future object.
907+
"""
908+
909+
flags = flux.constants.FLUX_KVS_WATCH
910+
if waitcreate:
911+
flags |= flux.constants.FLUX_KVS_WAITCREATE
912+
if uniq:
913+
flags |= flux.constants.FLUX_KVS_WATCH_UNIQ
914+
if full:
915+
flags |= flux.constants.FLUX_KVS_WATCH_FULL
916+
future = RAW.flux_kvs_lookup(flux_handle, namespace, flags, key)
917+
return KVSWatchFuture(future)

0 commit comments

Comments
 (0)