@@ -13,6 +13,7 @@ use kyoto::{
1313 chain:: checkpoints:: HeaderCheckpoint ,
1414 core:: { client:: Client , node:: Node } ,
1515 BlockHash , Event , Log , NodeState , ServiceFlags , SqliteHeaderDb , SqlitePeerDb , TrustedPeer ,
16+ Warning ,
1617} ;
1718use tokio:: sync:: mpsc:: Receiver ;
1819use tokio:: sync:: mpsc:: UnboundedReceiver ;
@@ -39,7 +40,7 @@ fn start_bitcoind(with_v2_transport: bool) -> anyhow::Result<(corepc_node::Node,
3940 Ok ( ( bitcoind, socket_addr) )
4041}
4142
42- async fn new_node ( addrs : HashSet < ScriptBuf > , socket_addr : SocketAddrV4 ) -> ( Node < ( ) , ( ) > , Client ) {
43+ fn new_node ( addrs : HashSet < ScriptBuf > , socket_addr : SocketAddrV4 ) -> ( Node < ( ) , ( ) > , Client ) {
4344 let host = ( IpAddr :: V4 ( * socket_addr. ip ( ) ) , Some ( socket_addr. port ( ) ) ) ;
4445 let builder = kyoto:: core:: builder:: NodeBuilder :: new ( bitcoin:: Network :: Regtest ) ;
4546 let ( node, client) = builder
@@ -49,7 +50,7 @@ async fn new_node(addrs: HashSet<ScriptBuf>, socket_addr: SocketAddrV4) -> (Node
4950 ( node, client)
5051}
5152
52- async fn new_node_sql (
53+ fn new_node_sql (
5354 addrs : HashSet < ScriptBuf > ,
5455 socket_addr : SocketAddrV4 ,
5556 tempdir_path : PathBuf ,
@@ -67,7 +68,7 @@ async fn new_node_sql(
6768 ( node, client)
6869}
6970
70- async fn new_node_anchor_sql (
71+ fn new_node_anchor_sql (
7172 addrs : HashSet < ScriptBuf > ,
7273 checkpoint : HeaderCheckpoint ,
7374 socket_addr : SocketAddrV4 ,
@@ -111,11 +112,7 @@ async fn invalidate_block(rpc: &corepc_node::Client, hash: &bitcoin::BlockHash)
111112 tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
112113}
113114
114- async fn sync_assert (
115- best : & bitcoin:: BlockHash ,
116- channel : & mut UnboundedReceiver < Event > ,
117- log : & mut Receiver < Log > ,
118- ) {
115+ async fn sync_assert ( best : & bitcoin:: BlockHash , channel : & mut UnboundedReceiver < Event > ) {
119116 loop {
120117 tokio:: select! {
121118 event = channel. recv( ) => {
@@ -125,9 +122,21 @@ async fn sync_assert(
125122 break ;
126123 } ;
127124 }
128- log = log. recv( ) => {
125+ }
126+ }
127+ }
128+
129+ async fn print_logs ( mut log_rx : Receiver < Log > , mut warn_rx : UnboundedReceiver < Warning > ) {
130+ loop {
131+ tokio:: select! {
132+ log = log_rx. recv( ) => {
129133 if let Some ( log) = log {
130- println!( "{log}" ) ;
134+ println!( "{log}" )
135+ }
136+ }
137+ warn = warn_rx. recv( ) => {
138+ if let Some ( warn) = warn {
139+ println!( "{warn}" )
131140 }
132141 }
133142 }
@@ -152,15 +161,16 @@ async fn test_reorg() {
152161 let mut scripts = HashSet :: new ( ) ;
153162 let other = rpc. new_address ( ) . unwrap ( ) ;
154163 scripts. insert ( other. into ( ) ) ;
155- let ( node, client) = new_node ( scripts. clone ( ) , socket_addr) . await ;
164+ let ( node, client) = new_node ( scripts. clone ( ) , socket_addr) ;
156165 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
157166 let Client {
158167 requester,
159- log_rx : mut log ,
160- warn_rx : _ ,
168+ log_rx,
169+ warn_rx,
161170 event_rx : mut channel,
162171 } = client;
163- sync_assert ( & best, & mut channel, & mut log) . await ;
172+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
173+ sync_assert ( & best, & mut channel) . await ;
164174 // Reorganize the blocks
165175 let old_best = best;
166176 let old_height = num_blocks ( rpc) ;
@@ -205,15 +215,16 @@ async fn test_mine_after_reorg() {
205215 let mut scripts = HashSet :: new ( ) ;
206216 let other = rpc. new_address ( ) . unwrap ( ) ;
207217 scripts. insert ( other. into ( ) ) ;
208- let ( node, client) = new_node ( scripts. clone ( ) , socket_addr) . await ;
218+ let ( node, client) = new_node ( scripts. clone ( ) , socket_addr) ;
209219 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
210220 let Client {
211221 requester,
212- log_rx : mut log ,
213- warn_rx : _ ,
222+ log_rx,
223+ warn_rx,
214224 event_rx : mut channel,
215225 } = client;
216- sync_assert ( & best, & mut channel, & mut log) . await ;
226+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
227+ sync_assert ( & best, & mut channel) . await ;
217228 // Reorganize the blocks
218229 let old_best = best;
219230 let old_height = num_blocks ( rpc) ;
@@ -239,7 +250,7 @@ async fn test_mine_after_reorg() {
239250 }
240251 mine_blocks ( rpc, & miner, 2 , 1 ) . await ;
241252 let best = best_hash ( rpc) ;
242- sync_assert ( & best, & mut channel, & mut log ) . await ;
253+ sync_assert ( & best, & mut channel) . await ;
243254 requester. shutdown ( ) . await . unwrap ( ) ;
244255 rpc. stop ( ) . unwrap ( ) ;
245256}
@@ -261,15 +272,16 @@ async fn test_various_client_methods() {
261272 let mut scripts = HashSet :: new ( ) ;
262273 let other = rpc. new_address ( ) . unwrap ( ) ;
263274 scripts. insert ( other. into ( ) ) ;
264- let ( node, client) = new_node ( scripts. clone ( ) , socket_addr) . await ;
275+ let ( node, client) = new_node ( scripts. clone ( ) , socket_addr) ;
265276 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
266277 let Client {
267278 requester,
268- log_rx : mut log ,
269- warn_rx : _ ,
279+ log_rx,
280+ warn_rx,
270281 event_rx : mut channel,
271282 } = client;
272- sync_assert ( & best, & mut channel, & mut log) . await ;
283+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
284+ sync_assert ( & best, & mut channel) . await ;
273285 let batch = requester. get_header_range ( 10_000 ..10_002 ) . await . unwrap ( ) ;
274286 assert ! ( batch. is_empty( ) ) ;
275287 let _ = requester. broadcast_min_feerate ( ) . await . unwrap ( ) ;
@@ -299,15 +311,16 @@ async fn test_sql_reorg() {
299311 let mut scripts = HashSet :: new ( ) ;
300312 let other = rpc. new_address ( ) . unwrap ( ) ;
301313 scripts. insert ( other. into ( ) ) ;
302- let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) . await ;
314+ let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) ;
303315 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
304316 let Client {
305317 requester,
306- log_rx : mut log ,
307- warn_rx : _ ,
318+ log_rx,
319+ warn_rx,
308320 event_rx : mut channel,
309321 } = client;
310- sync_assert ( & best, & mut channel, & mut log) . await ;
322+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
323+ sync_assert ( & best, & mut channel) . await ;
311324 let batch = requester. get_header_range ( 0 ..10 ) . await . unwrap ( ) ;
312325 assert ! ( !batch. is_empty( ) ) ;
313326 requester. shutdown ( ) . await . unwrap ( ) ;
@@ -318,14 +331,15 @@ async fn test_sql_reorg() {
318331 mine_blocks ( rpc, & miner, 2 , 1 ) . await ;
319332 let best = best_hash ( rpc) ;
320333 // Spin up the node on a cold start
321- let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) . await ;
334+ let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) ;
322335 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
323336 let Client {
324337 requester,
325- log_rx : _ ,
326- warn_rx : _ ,
338+ log_rx,
339+ warn_rx,
327340 event_rx : mut channel,
328341 } = client;
342+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
329343 // Make sure the reorganization is caught after a cold start
330344 while let Some ( message) = channel. recv ( ) . await {
331345 match message {
@@ -343,20 +357,22 @@ async fn test_sql_reorg() {
343357 }
344358 }
345359 requester. shutdown ( ) . await . unwrap ( ) ;
360+ drop ( handle) ;
346361 // Mine more blocks
347362 mine_blocks ( rpc, & miner, 2 , 1 ) . await ;
348363 let best = best_hash ( rpc) ;
349364 // Make sure the node does not have any corrupted headers
350- let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir) . await ;
365+ let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir) ;
351366 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
352367 let Client {
353368 requester,
354- log_rx : mut log ,
355- warn_rx : _ ,
369+ log_rx,
370+ warn_rx,
356371 event_rx : mut channel,
357372 } = client;
373+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
358374 // The node properly syncs after persisting a reorg
359- sync_assert ( & best, & mut channel, & mut log ) . await ;
375+ sync_assert ( & best, & mut channel) . await ;
360376 requester. shutdown ( ) . await . unwrap ( ) ;
361377 rpc. stop ( ) . unwrap ( ) ;
362378}
@@ -379,15 +395,16 @@ async fn test_two_deep_reorg() {
379395 let mut scripts = HashSet :: new ( ) ;
380396 let other = rpc. new_address ( ) . unwrap ( ) ;
381397 scripts. insert ( other. into ( ) ) ;
382- let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) . await ;
398+ let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) ;
383399 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
384400 let Client {
385401 requester,
386- log_rx : mut log ,
387- warn_rx : _ ,
402+ log_rx,
403+ warn_rx,
388404 event_rx : mut channel,
389405 } = client;
390- sync_assert ( & best, & mut channel, & mut log) . await ;
406+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
407+ sync_assert ( & best, & mut channel) . await ;
391408 requester. shutdown ( ) . await . unwrap ( ) ;
392409 // Reorganize the blocks
393410 let old_height = num_blocks ( rpc) ;
@@ -397,15 +414,17 @@ async fn test_two_deep_reorg() {
397414 invalidate_block ( rpc, & best) . await ;
398415 mine_blocks ( rpc, & miner, 3 , 1 ) . await ;
399416 let best = best_hash ( rpc) ;
417+ drop ( handle) ;
400418 // Make sure the reorganization is caught after a cold start
401- let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) . await ;
419+ let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) ;
402420 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
403421 let Client {
404422 requester,
405- log_rx : _ ,
406- warn_rx : _ ,
423+ log_rx,
424+ warn_rx,
407425 event_rx : mut channel,
408426 } = client;
427+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
409428 while let Some ( message) = channel. recv ( ) . await {
410429 match message {
411430 kyoto:: core:: messages:: Event :: BlocksDisconnected ( blocks) => {
@@ -421,21 +440,23 @@ async fn test_two_deep_reorg() {
421440 _ => { }
422441 }
423442 }
443+ drop ( handle) ;
424444 requester. shutdown ( ) . await . unwrap ( ) ;
425445 // Mine more blocks
426446 mine_blocks ( rpc, & miner, 2 , 1 ) . await ;
427447 let best = best_hash ( rpc) ;
428448 // Make sure the node does not have any corrupted headers
429- let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir) . await ;
449+ let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir) ;
430450 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
431451 let Client {
432452 requester,
433- log_rx : mut log ,
434- warn_rx : _ ,
453+ log_rx,
454+ warn_rx,
435455 event_rx : mut channel,
436456 } = client;
457+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
437458 // The node properly syncs after persisting a reorg
438- sync_assert ( & best, & mut channel, & mut log ) . await ;
459+ sync_assert ( & best, & mut channel) . await ;
439460 requester. shutdown ( ) . await . unwrap ( ) ;
440461 rpc. stop ( ) . unwrap ( ) ;
441462}
@@ -457,15 +478,17 @@ async fn test_sql_stale_anchor() {
457478 let mut scripts = HashSet :: new ( ) ;
458479 let other = rpc. new_address ( ) . unwrap ( ) ;
459480 scripts. insert ( other. into ( ) ) ;
460- let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) . await ;
481+ let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) ;
461482 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
462483 let Client {
463484 requester,
464- log_rx : mut log ,
465- warn_rx : _ ,
485+ log_rx,
486+ warn_rx,
466487 event_rx : mut channel,
467488 } = client;
468- sync_assert ( & best, & mut channel, & mut log) . await ;
489+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
490+ sync_assert ( & best, & mut channel) . await ;
491+ drop ( handle) ;
469492 requester. shutdown ( ) . await . unwrap ( ) ;
470493 // Reorganize the blocks
471494 let old_best = best;
@@ -479,15 +502,15 @@ async fn test_sql_stale_anchor() {
479502 HeaderCheckpoint :: new ( old_height as u32 , old_best) ,
480503 socket_addr,
481504 tempdir. clone ( ) ,
482- )
483- . await ;
505+ ) ;
484506 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
485507 let Client {
486508 requester,
487- log_rx : _ ,
488- warn_rx : _ ,
509+ log_rx,
510+ warn_rx,
489511 event_rx : mut channel,
490512 } = client;
513+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
491514 // Ensure SQL is able to catch the fork by loading in headers from the database
492515 while let Some ( message) = channel. recv ( ) . await {
493516 match message {
@@ -504,6 +527,7 @@ async fn test_sql_stale_anchor() {
504527 _ => { }
505528 }
506529 }
530+ drop ( handle) ;
507531 requester. shutdown ( ) . await . unwrap ( ) ;
508532 // Don't do anything, but reload the node from the checkpoint
509533 let cp = best_hash ( rpc) ;
@@ -515,17 +539,18 @@ async fn test_sql_stale_anchor() {
515539 HeaderCheckpoint :: new ( old_height as u32 , cp) ,
516540 socket_addr,
517541 tempdir. clone ( ) ,
518- )
519- . await ;
542+ ) ;
520543 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
521544 let Client {
522545 requester,
523- log_rx : mut log ,
524- warn_rx : _ ,
546+ log_rx,
547+ warn_rx,
525548 event_rx : mut channel,
526549 } = client;
550+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
527551 // The node properly syncs after persisting a reorg
528- sync_assert ( & best, & mut channel, & mut log) . await ;
552+ sync_assert ( & best, & mut channel) . await ;
553+ drop ( handle) ;
529554 requester. shutdown ( ) . await . unwrap ( ) ;
530555 // Mine more blocks and reload from the checkpoint
531556 let cp = best_hash ( rpc) ;
@@ -538,17 +563,17 @@ async fn test_sql_stale_anchor() {
538563 HeaderCheckpoint :: new ( old_height as u32 , cp) ,
539564 socket_addr,
540565 tempdir,
541- )
542- . await ;
566+ ) ;
543567 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
544568 let Client {
545569 requester,
546- log_rx : mut log ,
547- warn_rx : _ ,
570+ log_rx,
571+ warn_rx,
548572 event_rx : mut channel,
549573 } = client;
574+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
550575 // The node properly syncs after persisting a reorg
551- sync_assert ( & best, & mut channel, & mut log ) . await ;
576+ sync_assert ( & best, & mut channel) . await ;
552577 requester. shutdown ( ) . await . unwrap ( ) ;
553578 rpc. stop ( ) . unwrap ( ) ;
554579}
0 commit comments