Skip to content

Commit a0282ee

Browse files
committed
python: add kvs watch interface
Problem: There is no interface to watch a KVS entry. Solution: Add a function kvs_watch_async() and a new class KVSWatchFuture to watch a KVS key. Fixes #5037
1 parent cb6bc7b commit a0282ee

File tree

1 file changed

+113
-6
lines changed
  • src/bindings/python/flux

1 file changed

+113
-6
lines changed

src/bindings/python/flux/kvs.py

Lines changed: 113 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import os
1515
from typing import Any, Mapping
1616

17+
import flux.constants
1718
from _flux._core import ffi, lib
1819
from flux.future import Future
1920
from flux.rpc import RPC
@@ -31,19 +32,24 @@ class KVSWrapper(Wrapper):
3132
RAW.flux_kvsitr_next.set_error_check(lambda x: False)
3233

3334

35+
def _get_value(valp):
36+
try:
37+
ret = json.loads(ffi.string(valp[0]).decode("utf-8"))
38+
except json.decoder.JSONDecodeError:
39+
ret = ffi.string(valp[0]).decode("utf-8")
40+
except UnicodeDecodeError:
41+
ret = ffi.string(valp[0])
42+
return ret
43+
44+
3445
def get_key_direct(flux_handle, key, namespace=None):
3546
valp = ffi.new("char *[1]")
3647
future = RAW.flux_kvs_lookup(flux_handle, namespace, 0, key)
3748
RAW.flux_kvs_lookup_get(future, valp)
3849
if valp[0] == ffi.NULL:
3950
return None
4051

41-
try:
42-
ret = json.loads(ffi.string(valp[0]).decode("utf-8"))
43-
except json.decoder.JSONDecodeError:
44-
ret = ffi.string(valp[0]).decode("utf-8")
45-
except UnicodeDecodeError:
46-
ret = ffi.string(valp[0])
52+
ret = _get_value(valp)
4753
RAW.flux_future_destroy(future)
4854
return ret
4955

@@ -773,3 +779,104 @@ def walk(directory, topdown=False, flux_handle=None, namespace=None):
773779
raise ValueError("If directory is a key, flux_handle must be specified")
774780
directory = KVSDir(flux_handle, directory, namespace=namespace)
775781
return _inner_walk(directory, "", topdown, namespace=namespace)
782+
783+
784+
class KVSWatchFuture(Future):
785+
"""
786+
A future returned from kvs_watch_async().
787+
"""
788+
789+
def __del__(self):
790+
if self.needs_cancel is not False:
791+
self.cancel()
792+
try:
793+
super().__del__()
794+
except AttributeError:
795+
pass
796+
797+
def __init__(self, future_handle):
798+
super().__init__(future_handle)
799+
self.needs_cancel = True
800+
801+
def get(self, autoreset=True):
802+
"""
803+
Return the new value of the KVS key or None if the stream has
804+
terminated.
805+
806+
The future is auto-reset unless autoreset=False, so a subsequent
807+
call to get() will try to fetch the next value and thus
808+
may block.
809+
"""
810+
valp = ffi.new("char *[1]")
811+
try:
812+
# Block until Future is ready:
813+
self.wait_for()
814+
RAW.flux_kvs_lookup_get(self.pimpl, valp)
815+
except OSError as exc:
816+
if exc.errno == errno.ENODATA:
817+
self.needs_cancel = False
818+
return None
819+
# raise handle exception if there is one
820+
self.raise_if_handle_exception()
821+
# re-raise all other exceptions
822+
#
823+
# Note: overwrite generic OSError strerror string with the
824+
# EventWatch future error string to give the caller appropriate
825+
# detail (e.g. instead of "No such file or directory" use
826+
# "job <jobid> does not exist"
827+
#
828+
exc.strerror = self.error_string()
829+
raise
830+
val = _get_value(valp)
831+
if autoreset is True:
832+
self.reset()
833+
return val
834+
835+
def cancel(self, stop=False):
836+
"""Cancel a streaming kvs_watch_async() future
837+
838+
If stop=True, then deactivate the multi-response future so no
839+
further callbacks are called.
840+
"""
841+
RAW.flux_kvs_lookup_cancel(self.pimpl)
842+
self.needs_cancel = False
843+
if stop:
844+
self.stop()
845+
846+
847+
def kvs_watch_async(
848+
flux_handle, key, namespace=None, waitcreate=False, uniq=False, full=False
849+
):
850+
"""Asynchronously get KVS updates for a key
851+
852+
Args:
853+
flux_handle: A Flux handle obtained from flux.Flux()
854+
key: the key on which to watch
855+
namespace: namespace to read from, defaults to None. If namespace
856+
is None, the namespace specified in the FLUX_KVS_NAMESPACE
857+
environment variable will be used. If FLUX_KVS_NAMESPACE is not
858+
set, the primary namespace will be used.
859+
waitcreate: If True and a key does not yet exist, will wait
860+
for it to exit. Defaults to False.
861+
uniq: If True, only different values will be returned by
862+
watch. Defaults to False.
863+
full: If True, any change that can affect the key is
864+
monitored. Typically, this is to capture when a parent directory
865+
is removed or altered in some way. Typically kvs watch will not
866+
detect this as the exact key has not been changed. Defaults to
867+
False.
868+
869+
Returns:
870+
KVSWatchFuture: a KVSWatchFuture object. Call .get() from the then
871+
callback to get the currently returned value from the Future object.
872+
"""
873+
874+
flags = flux.constants.FLUX_KVS_WATCH
875+
if waitcreate:
876+
flags |= flux.constants.FLUX_KVS_WAITCREATE
877+
if uniq:
878+
flags |= flux.constants.FLUX_KVS_WATCH_UNIQ
879+
if full:
880+
flags |= flux.constants.FLUX_KVS_WATCH_FULL
881+
future = RAW.flux_kvs_lookup(flux_handle, namespace, flags, key)
882+
return KVSWatchFuture(future)

0 commit comments

Comments
 (0)