@@ -18,7 +18,7 @@ use async_graphql::InputType;
1818use futures:: {
1919 channel:: mpsc,
2020 future:: { self , Either } ,
21- SinkExt , StreamExt ,
21+ StreamExt ,
2222} ;
2323use guard:: INTEGRATION_TEST_GUARD ;
2424use linera_base:: {
@@ -4467,12 +4467,33 @@ async fn test_end_to_end_fungible_client_benchmark(config: impl LineraNetConfig)
44674467#[ cfg_attr( feature = "remote-net" , test_case( RemoteNetTestingConfig :: new( CloseChains ) ; "remote_net_grpc" ) ) ]
44684468#[ test_log:: test( tokio:: test) ]
44694469async fn test_end_to_end_listen_for_new_rounds ( config : impl LineraNetConfig ) -> Result < ( ) > {
4470- use std:: {
4471- sync:: { Arc , Barrier } ,
4472- thread,
4473- } ;
4474-
4475- use tokio:: task:: JoinHandle ;
4470+ /// Runs the `client` in a task, so that it can race to produce blocks transferring tokens.
4471+ ///
4472+ /// Stops when transferring fails or the `notifier` channel is closed.
4473+ ///
4474+ /// Drops the client wrapper in a separate thread: Only one of the clients can close the chain,
4475+ /// and the `Drop` implementation blocks the thread until the command returns.
4476+ async fn run_client (
4477+ client : ClientWrapper ,
4478+ mut notifier : mpsc:: Sender < ( ) > ,
4479+ source : ChainId ,
4480+ target : ChainId ,
4481+ ) {
4482+ let duration = Duration :: from_secs ( 60 ) ;
4483+ while tokio:: time:: timeout (
4484+ duration,
4485+ client. transfer_with_silent_logs ( Amount :: ONE , source, target) ,
4486+ )
4487+ . await
4488+ . expect ( "Transfer timed out" )
4489+ . is_ok ( )
4490+ {
4491+ notifier. try_send ( ( ) ) . unwrap ( ) ;
4492+ }
4493+ tokio:: task:: spawn_blocking ( move || drop ( client) )
4494+ . await
4495+ . unwrap ( ) ;
4496+ }
44764497
44774498 let _guard = INTEGRATION_TEST_GUARD . lock ( ) . await ;
44784499 tracing:: info!( "Starting test {}" , test_name!( ) ) ;
@@ -4483,78 +4504,34 @@ async fn test_end_to_end_listen_for_new_rounds(config: impl LineraNetConfig) ->
44834504 client2. wallet_init ( None ) . await ?;
44844505 let chain1 = * client1. load_wallet ( ) ?. owned_chain_ids ( ) . first ( ) . unwrap ( ) ;
44854506
4486- // Open a chain owned by both clients, with only single-leader rounds.
4507+ // Open a chain owned by both clients, with only single-leader rounds that don't time out:
4508+ // The first round should always succeed, because both leaders are active.
44874509 let owner1 = client1. keygen ( ) . await ?;
44884510 let owner2 = client2. keygen ( ) . await ?;
44894511 let chain2 = client1
44904512 . open_multi_owner_chain (
44914513 chain1,
44924514 vec ! [ owner1, owner2] ,
4493- vec ! [ 100 , 100 ] ,
4494- 0 ,
4495- Amount :: from_tokens ( 11 ) ,
4496- u64:: MAX ,
4515+ vec ! [ 100 , 100 ] , // Both owners have equal weight.
4516+ 0 , // No multi-leader rounds.
4517+ Amount :: from_millis ( 8900 ) , // Only 8 transfers can be made.
4518+ u64:: MAX , // 585 million years
44974519 )
44984520 . await ?;
44994521 client1. assign ( owner1, chain2) . await ?;
45004522 client2. assign ( owner2, chain2) . await ?;
45014523 client2. sync ( chain2) . await ?;
45024524
4503- let ( tx, mut rx) = mpsc:: channel ( 8 ) ;
4504- let drop_barrier = Arc :: new ( Barrier :: new ( 3 ) ) ;
4505- let handle1 = tokio:: spawn ( run_client (
4506- drop_barrier. clone ( ) ,
4507- client1,
4508- tx. clone ( ) ,
4509- chain2,
4510- chain1,
4511- ) ) ;
4512- let handle2 = tokio:: spawn ( run_client (
4513- drop_barrier. clone ( ) ,
4514- client2,
4515- tx,
4516- chain2,
4517- chain1,
4518- ) ) ;
4519- /// Runs the `client` in a task, so that it can race to produce blocks transferring tokens.
4520- ///
4521- /// Stops when transferring fails or the `notifier` channel is closed. When exiting, it will
4522- /// drop the client in a separate thread so that the synchronous `Drop` implementation
4523- /// can close the chains without blocking the asynchronous worker thread, which might be
4524- /// shared with the other client's task. If the asynchronous thread is blocked, the
4525- /// other client might have the round but not be able to execute and propose a block,
4526- /// deadlocking the test.
4527- async fn run_client (
4528- drop_barrier : Arc < Barrier > ,
4529- client : ClientWrapper ,
4530- mut notifier : mpsc:: Sender < ( ) > ,
4531- source : ChainId ,
4532- target : ChainId ,
4533- ) -> Result < JoinHandle < Result < ( ) > > > {
4534- let result = async {
4535- loop {
4536- client. transfer ( Amount :: ONE , source, target) . await ?;
4537- notifier. send ( ( ) ) . await ?;
4538- }
4539- }
4540- . await ;
4541- thread:: spawn ( move || {
4542- drop ( client) ;
4543- drop_barrier. wait ( ) ;
4544- } ) ;
4545- result
4546- }
4547-
4548- for _ in 0 ..8 {
4549- let ( ) = rx. next ( ) . await . unwrap ( ) ;
4550- }
4551- drop ( rx) ;
4552-
4553- let ( result1, result2) = futures:: join!( handle1, handle2) ;
4554- assert ! ( result1?. is_err( ) ) ;
4555- assert ! ( result2?. is_err( ) ) ;
4525+ // Both clients make transfers from chain 2 to chain 1 in a loop. We use a channel with
4526+ // capacity 8, the number of expected transfers.
4527+ let ( tx, rx) = mpsc:: channel ( 8 ) ;
4528+ futures:: join!(
4529+ run_client( client1, tx. clone( ) , chain2, chain1) ,
4530+ run_client( client2, tx, chain2, chain1)
4531+ ) ;
4532+ // 8 transfers succeeded, the ninth failed because the chain ran out of tokens.
4533+ assert_eq ! ( rx. count( ) . await , 8 ) ;
45564534
4557- drop_barrier. wait ( ) ;
45584535 net. ensure_is_running ( ) . await ?;
45594536 net. terminate ( ) . await ?;
45604537
0 commit comments