@@ -779,6 +779,207 @@ def test_commic_async_04_kvsdir(self):
779779 f .get ()
780780 self .assertEqual (flux .kvs .get (self .f , "commitasync4" ), "qux" )
781781
782+ def test_kvswatch_01_key_ENOENT (self ):
783+ with self .assertRaises (OSError ) as cm :
784+ future = flux .kvs .kvs_watch_async (self .f , "grog" )
785+ future .get ()
786+ self .assertEqual (cm .exception .errno , errno .ENOENT )
787+
788+ def _change_value_kvs_nowait (self , key , val ):
789+ kvstxn = flux .kvs .KVSTxn (self .f )
790+ kvstxn .put (key , val )
791+ flux .kvs .commit_async (self .f , _kvstxn = kvstxn )
792+
793+ def test_kvswatch_02_kvs_watch_async_basic (self ):
794+ myarg = dict (a = 1 , b = 2 )
795+ vals = []
796+
797+ def cb (future , arg ):
798+ self .assertEqual (arg , myarg )
799+ val = future .get ()
800+ if val is None :
801+ return
802+ elif val == 1 :
803+ self ._change_value_kvs_nowait ("kvswatch1.val" , 2 )
804+ elif val == 2 :
805+ future .cancel ()
806+ vals .append (val )
807+
808+ with flux .kvs .get_dir (self .f ) as kd :
809+ kd .mkdir ("kvswatch1" )
810+ kd ["kvswatch1.val" ] = 1
811+
812+ future = flux .kvs .kvs_watch_async (self .f , "kvswatch1.val" )
813+ self .assertIsInstance (future , flux .kvs .KVSWatchFuture )
814+ future .then (cb , myarg )
815+ rc = self .f .reactor_run ()
816+ self .assertGreaterEqual (rc , 0 )
817+ self .assertEqual (len (vals ), 2 )
818+ self .assertEqual (vals [0 ], 1 )
819+ self .assertEqual (vals [1 ], 2 )
820+
821+ def test_kvswatch_03_kvs_watch_async_no_autoreset (self ):
822+ vals = []
823+
824+ def cb (future , arg ):
825+ val = future .get (autoreset = False )
826+ if val is None :
827+ return
828+ elif val == 1 :
829+ valtmp = future .get ()
830+ # Value hasn't changed
831+ self .assertEqual (valtmp , 1 )
832+ self ._change_value_kvs_nowait ("kvswatch2.val" , 2 )
833+ elif val == 2 :
834+ valtmp = future .get ()
835+ # Value hasn't changed
836+ self .assertEqual (valtmp , 2 )
837+ future .cancel ()
838+ vals .append (val )
839+
840+ with flux .kvs .get_dir (self .f ) as kd :
841+ kd .mkdir ("kvswatch2" )
842+ kd ["kvswatch2.val" ] = 1
843+
844+ future = flux .kvs .kvs_watch_async (self .f , "kvswatch2.val" )
845+ self .assertIsInstance (future , flux .kvs .KVSWatchFuture )
846+ future .then (cb , None )
847+ rc = self .f .reactor_run ()
848+ self .assertGreaterEqual (rc , 0 )
849+ self .assertEqual (len (vals ), 2 )
850+ self .assertEqual (vals [0 ], 1 )
851+ self .assertEqual (vals [1 ], 2 )
852+
853+ def test_kvswatch_04_kvs_watch_async_waitcreate (self ):
854+ # To test waitcreate, we create two KVS watchers, one with
855+ # waitcreate and one without. The one without waitcreate will
856+ # create the field we are waiting to be created.
857+ results = []
858+
859+ def cb_ENOENT (future , arg ):
860+ try :
861+ future .get ()
862+ except OSError as e :
863+ self .assertEqual (e .errno , errno .ENOENT )
864+ self ._change_value_kvs_nowait ("kvswatch3.val" , 1 )
865+ results .append ("ENOENT" )
866+
867+ def cb_waitcreate (future , arg ):
868+ val = future .get ()
869+ if val is None :
870+ return
871+ elif val == 1 :
872+ future .cancel ()
873+ results .append (val )
874+
875+ with flux .kvs .get_dir (self .f ) as kd :
876+ kd .mkdir ("kvswatch3" )
877+
878+ future1 = flux .kvs .kvs_watch_async (self .f , "kvswatch3.val" )
879+ future2 = flux .kvs .kvs_watch_async (self .f , "kvswatch3.val" , waitcreate = True )
880+ future1 .then (cb_ENOENT , None )
881+ future2 .then (cb_waitcreate , None )
882+ rc = self .f .reactor_run ()
883+ self .assertGreaterEqual (rc , 0 )
884+ self .assertEqual (len (results ), 2 )
885+ self .assertEqual (results [0 ], "ENOENT" )
886+ self .assertEqual (results [1 ], 1 )
887+
888+ def test_kvswatch_05_kvs_watch_async_uniq (self ):
889+ # To test uniq, we create two KVS watchers, one with uniq and
890+ # one without. The one with uniq should see fewer changes
891+ # than the one without.
892+ vals = []
893+ uniq_vals = []
894+
895+ def cb (future , arg ):
896+ otherfuture = arg
897+ val = future .get ()
898+ if val is None :
899+ return
900+ elif val == 1 :
901+ if len (vals ) == 0 :
902+ self ._change_value_kvs_nowait ("kvswatch4.val" , 1 )
903+ elif len (vals ) == 1 :
904+ self ._change_value_kvs_nowait ("kvswatch4.val" , 2 )
905+ vals .append (val )
906+ if len (vals ) == 3 and len (uniq_vals ) == 2 :
907+ future .cancel ()
908+ otherfuture .cancel ()
909+
910+ def cb_uniq (future , arg ):
911+ otherfuture = arg
912+ val = future .get ()
913+ if val is None :
914+ return
915+ uniq_vals .append (val )
916+ if len (vals ) == 3 and len (uniq_vals ) == 2 :
917+ future .cancel ()
918+ otherfuture .cancel ()
919+
920+ with flux .kvs .get_dir (self .f ) as kd :
921+ kd .mkdir ("kvswatch4" )
922+ kd ["kvswatch4.val" ] = 1
923+
924+ future1 = flux .kvs .kvs_watch_async (self .f , "kvswatch4.val" )
925+ future2 = flux .kvs .kvs_watch_async (self .f , "kvswatch4.val" , uniq = True )
926+ future1 .then (cb , future2 )
927+ future2 .then (cb_uniq , future1 )
928+ rc = self .f .reactor_run ()
929+ self .assertGreaterEqual (rc , 0 )
930+ self .assertEqual (len (vals ), 3 )
931+ self .assertEqual (vals [0 ], 1 )
932+ self .assertEqual (vals [1 ], 1 )
933+ self .assertEqual (vals [2 ], 2 )
934+ self .assertEqual (len (uniq_vals ), 2 )
935+ self .assertEqual (uniq_vals [0 ], 1 )
936+ self .assertEqual (uniq_vals [1 ], 2 )
937+
938+ def test_kvswatch_06_kvs_watch_async_full (self ):
939+ # To test full, we create two KVS watchers, one with full and
940+ # one without. The one with full should see a change if we
941+ # delete an upstream directory. The other watcher should not.
942+ vals = []
943+ full_vals = []
944+
945+ def cb (future , arg ):
946+ val = future .get ()
947+ if val is None :
948+ return
949+ vals .append (val )
950+
951+ def cb_full (future , arg ):
952+ otherfuture = arg
953+ try :
954+ val = future .get ()
955+ if val is None :
956+ return
957+ elif val == 1 :
958+ # Set to None == unlink/delete/remove
959+ self ._change_value_kvs_nowait ("kvswatch5" , None )
960+ except OSError :
961+ future .cancel ()
962+ otherfuture .cancel ()
963+ full_vals .append ("ENOENT" )
964+ return
965+ full_vals .append (val )
966+
967+ with flux .kvs .get_dir (self .f ) as kd :
968+ kd .mkdir ("kvswatch5" )
969+ kd ["kvswatch5.val" ] = 1
970+
971+ future1 = flux .kvs .kvs_watch_async (self .f , "kvswatch5.val" )
972+ future2 = flux .kvs .kvs_watch_async (self .f , "kvswatch5.val" , full = True )
973+ future1 .then (cb , None )
974+ future2 .then (cb_full , future1 )
975+ rc = self .f .reactor_run ()
976+ self .assertGreaterEqual (rc , 0 )
977+ self .assertEqual (len (vals ), 1 )
978+ self .assertEqual (vals [0 ], 1 )
979+ self .assertEqual (len (full_vals ), 2 )
980+ self .assertEqual (full_vals [0 ], 1 )
981+ self .assertEqual (full_vals [1 ], "ENOENT" )
982+
782983
783984if __name__ == "__main__" :
784985 if rerun_under_flux (__flux_size ()):
0 commit comments