@@ -1238,7 +1238,71 @@ fn replicated_return() {
1238
1238
. simulation_duration ( Duration :: from_secs ( 1000 ) )
1239
1239
. build ( ) ;
1240
1240
1241
- make_primary ( & mut sim, tmp_host_path. clone ( ) ) ;
1241
+ // make_primary(&mut sim, tmp_host_path.clone());
1242
+
1243
+ let mut sim = Builder :: new ( )
1244
+ . simulation_duration ( Duration :: from_secs ( 1000 ) )
1245
+ . build ( ) ;
1246
+ let tmp = tempdir ( ) . unwrap ( ) ;
1247
+
1248
+ let notify = Arc :: new ( Notify :: new ( ) ) ;
1249
+ let notify_clone = notify. clone ( ) ;
1250
+
1251
+ init_tracing ( ) ;
1252
+ sim. host ( "primary" , move || {
1253
+ let notify = notify_clone. clone ( ) ;
1254
+ let path = tmp. path ( ) . to_path_buf ( ) ;
1255
+ async move {
1256
+ let make_server = || async {
1257
+ TestServer {
1258
+ path : path. clone ( ) . into ( ) ,
1259
+ user_api_config : UserApiConfig {
1260
+ ..Default :: default ( )
1261
+ } ,
1262
+ admin_api_config : Some ( AdminApiConfig {
1263
+ acceptor : TurmoilAcceptor :: bind ( ( [ 0 , 0 , 0 , 0 ] , 9090 ) ) . await . unwrap ( ) ,
1264
+ connector : TurmoilConnector ,
1265
+ disable_metrics : true ,
1266
+ } ) ,
1267
+ rpc_server_config : Some ( RpcServerConfig {
1268
+ acceptor : TurmoilAcceptor :: bind ( ( [ 0 , 0 , 0 , 0 ] , 4567 ) ) . await . unwrap ( ) ,
1269
+ tls_config : None ,
1270
+ } ) ,
1271
+ ..Default :: default ( )
1272
+ }
1273
+ } ;
1274
+ let server = make_server ( ) . await ;
1275
+ let shutdown = server. shutdown . clone ( ) ;
1276
+
1277
+ let fut = async move { server. start_sim ( 8080 ) . await } ;
1278
+
1279
+ tokio:: pin!( fut) ;
1280
+
1281
+ loop {
1282
+ tokio:: select! {
1283
+ res = & mut fut => {
1284
+ res. unwrap( ) ;
1285
+ break
1286
+ }
1287
+ _ = notify. notified( ) => {
1288
+ shutdown. notify_waiters( ) ;
1289
+ } ,
1290
+ }
1291
+ }
1292
+
1293
+ drop ( fut) ;
1294
+
1295
+ tokio:: fs:: File :: create ( path. join ( ".sentinel" ) )
1296
+ . await
1297
+ . unwrap ( ) ;
1298
+
1299
+ notify. notify_waiters ( ) ;
1300
+ let server = make_server ( ) . await ;
1301
+ server. start_sim ( 8080 ) . await . unwrap ( ) ;
1302
+
1303
+ Ok ( ( ) )
1304
+ }
1305
+ } ) ;
1242
1306
1243
1307
sim. client ( "client" , async move {
1244
1308
let client = Client :: new ( ) ;
@@ -1279,24 +1343,14 @@ fn replicated_return() {
1279
1343
assert_eq ! ( rep. frame_no( ) , Some ( 4 ) ) ;
1280
1344
assert_eq ! ( rep. start_frame_no( ) , Some ( 1 ) ) ;
1281
1345
1282
- let wal_index_file = format ! ( "{}-client_wal_index" , path. to_str( ) . unwrap( ) ) ;
1346
+ notify. notify_waiters ( ) ;
1347
+ notify. notified ( ) . await ;
1283
1348
1284
- std:: fs:: remove_file ( wal_index_file) . unwrap ( ) ;
1285
-
1286
- let db = Database :: open_with_remote_sync_connector (
1287
- path. to_str ( ) . unwrap ( ) ,
1288
- "http://foo.primary:8080" ,
1289
- "" ,
1290
- TurmoilConnector ,
1291
- false ,
1292
- None ,
1293
- )
1294
- . await ?;
1349
+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) . await ;
1295
1350
1296
1351
let rep = db. sync ( ) . await . unwrap ( ) ;
1297
1352
assert_eq ! ( rep. frame_no( ) , Some ( 4 ) ) ;
1298
- assert_eq ! ( rep. start_frame_no( ) , Some ( 1 ) ) ;
1299
-
1353
+ assert_eq ! ( rep. start_frame_no( ) , Some ( 4 ) ) ;
1300
1354
1301
1355
Ok ( ( ) )
1302
1356
} ) ;
0 commit comments