@@ -1375,6 +1375,34 @@ async fn test_watcher_coexist_on_same_path() {
1375
1375
assert_that ! ( recursive_watcher. changed( ) . await ) . is_equal_to ( & expected) ;
1376
1376
}
1377
1377
1378
+ // Use "current_thread" explicitly.
1379
+ #[ test_log:: test( tokio:: test( flavor = "current_thread" ) ) ]
1380
+ async fn test_remove_no_watcher ( ) {
1381
+ let docker = DockerCli :: default ( ) ;
1382
+ let zookeeper = docker. run ( zookeeper_image ( ) ) ;
1383
+ let zk_port = zookeeper. get_host_port ( 2181 ) ;
1384
+ let cluster = format ! ( "127.0.0.1:{}" , zk_port) ;
1385
+
1386
+ let client = zk:: Client :: connect ( & cluster) . await . unwrap ( ) ;
1387
+
1388
+ let ( _, exist_watcher) = client. check_and_watch_stat ( "/a" ) . await . unwrap ( ) ;
1389
+ let create = client. create ( "/a" , & vec ! [ ] , PERSISTENT_OPEN ) ;
1390
+
1391
+ // Let session task issue `create` request first, oneshot watch will be removed by server.
1392
+ tokio:: task:: yield_now ( ) . await ;
1393
+
1394
+ // Issue `RemoveWatches` which likely happen before watch event notification as it involves
1395
+ // several IO paths.
1396
+ assert_that ! ( exist_watcher. remove( ) . await . unwrap_err( ) ) . is_equal_to ( zk:: Error :: NoWatcher ) ;
1397
+ create. await . unwrap ( ) ;
1398
+
1399
+ let ( _, _, data_watcher) = client. get_and_watch_data ( "/a" ) . await . unwrap ( ) ;
1400
+ let delete = client. delete ( "/a" , None ) ;
1401
+ tokio:: task:: yield_now ( ) . await ;
1402
+ assert_that ! ( data_watcher. remove( ) . await . unwrap_err( ) ) . is_equal_to ( zk:: Error :: NoWatcher ) ;
1403
+ delete. await . unwrap ( ) ;
1404
+ }
1405
+
1378
1406
#[ test_log:: test( tokio:: test) ]
1379
1407
async fn test_session_event ( ) {
1380
1408
let docker = DockerCli :: default ( ) ;
0 commit comments