@@ -13,6 +13,7 @@ use kyoto::{
1313 chain:: checkpoints:: HeaderCheckpoint ,
1414 core:: { client:: Client , node:: Node } ,
1515 BlockHash , Event , Log , NodeState , ServiceFlags , SqliteHeaderDb , SqlitePeerDb , TrustedPeer ,
16+ Warning ,
1617} ;
1718use tokio:: sync:: mpsc:: Receiver ;
1819use tokio:: sync:: mpsc:: UnboundedReceiver ;
@@ -111,11 +112,7 @@ async fn invalidate_block(rpc: &corepc_node::Client, hash: &bitcoin::BlockHash)
111112 tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
112113}
113114
114- async fn sync_assert (
115- best : & bitcoin:: BlockHash ,
116- channel : & mut UnboundedReceiver < Event > ,
117- log : & mut Receiver < Log > ,
118- ) {
115+ async fn sync_assert ( best : & bitcoin:: BlockHash , channel : & mut UnboundedReceiver < Event > ) {
119116 loop {
120117 tokio:: select! {
121118 event = channel. recv( ) => {
@@ -125,9 +122,21 @@ async fn sync_assert(
125122 break ;
126123 } ;
127124 }
128- log = log. recv( ) => {
125+ }
126+ }
127+ }
128+
129+ async fn print_logs ( mut log_rx : Receiver < Log > , mut warn_rx : UnboundedReceiver < Warning > ) {
130+ loop {
131+ tokio:: select! {
132+ log = log_rx. recv( ) => {
129133 if let Some ( log) = log {
130- println!( "{log}" ) ;
134+ println!( "{log}" )
135+ }
136+ }
137+ warn = warn_rx. recv( ) => {
138+ if let Some ( warn) = warn {
139+ println!( "{warn}" )
131140 }
132141 }
133142 }
@@ -156,11 +165,12 @@ async fn test_reorg() {
156165 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
157166 let Client {
158167 requester,
159- log_rx : mut log ,
160- warn_rx : _ ,
168+ log_rx,
169+ warn_rx,
161170 event_rx : mut channel,
162171 } = client;
163- sync_assert ( & best, & mut channel, & mut log) . await ;
172+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
173+ sync_assert ( & best, & mut channel) . await ;
164174 // Reorganize the blocks
165175 let old_best = best;
166176 let old_height = num_blocks ( rpc) ;
@@ -209,11 +219,12 @@ async fn test_mine_after_reorg() {
209219 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
210220 let Client {
211221 requester,
212- log_rx : mut log ,
213- warn_rx : _ ,
222+ log_rx,
223+ warn_rx,
214224 event_rx : mut channel,
215225 } = client;
216- sync_assert ( & best, & mut channel, & mut log) . await ;
226+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
227+ sync_assert ( & best, & mut channel) . await ;
217228 // Reorganize the blocks
218229 let old_best = best;
219230 let old_height = num_blocks ( rpc) ;
@@ -239,7 +250,7 @@ async fn test_mine_after_reorg() {
239250 }
240251 mine_blocks ( rpc, & miner, 2 , 1 ) . await ;
241252 let best = best_hash ( rpc) ;
242- sync_assert ( & best, & mut channel, & mut log ) . await ;
253+ sync_assert ( & best, & mut channel) . await ;
243254 requester. shutdown ( ) . await . unwrap ( ) ;
244255 rpc. stop ( ) . unwrap ( ) ;
245256}
@@ -265,11 +276,12 @@ async fn test_various_client_methods() {
265276 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
266277 let Client {
267278 requester,
268- log_rx : mut log ,
269- warn_rx : _ ,
279+ log_rx,
280+ warn_rx,
270281 event_rx : mut channel,
271282 } = client;
272- sync_assert ( & best, & mut channel, & mut log) . await ;
283+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
284+ sync_assert ( & best, & mut channel) . await ;
273285 let batch = requester. get_header_range ( 10_000 ..10_002 ) . await . unwrap ( ) ;
274286 assert ! ( batch. is_empty( ) ) ;
275287 let _ = requester. broadcast_min_feerate ( ) . await . unwrap ( ) ;
@@ -303,11 +315,12 @@ async fn test_sql_reorg() {
303315 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
304316 let Client {
305317 requester,
306- log_rx : mut log ,
307- warn_rx : _ ,
318+ log_rx,
319+ warn_rx,
308320 event_rx : mut channel,
309321 } = client;
310- sync_assert ( & best, & mut channel, & mut log) . await ;
322+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
323+ sync_assert ( & best, & mut channel) . await ;
311324 let batch = requester. get_header_range ( 0 ..10 ) . await . unwrap ( ) ;
312325 assert ! ( !batch. is_empty( ) ) ;
313326 requester. shutdown ( ) . await . unwrap ( ) ;
@@ -322,10 +335,11 @@ async fn test_sql_reorg() {
322335 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
323336 let Client {
324337 requester,
325- log_rx : _ ,
326- warn_rx : _ ,
338+ log_rx,
339+ warn_rx,
327340 event_rx : mut channel,
328341 } = client;
342+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
329343 // Make sure the reorganization is caught after a cold start
330344 while let Some ( message) = channel. recv ( ) . await {
331345 match message {
@@ -343,6 +357,7 @@ async fn test_sql_reorg() {
343357 }
344358 }
345359 requester. shutdown ( ) . await . unwrap ( ) ;
360+ drop ( handle) ;
346361 // Mine more blocks
347362 mine_blocks ( rpc, & miner, 2 , 1 ) . await ;
348363 let best = best_hash ( rpc) ;
@@ -351,12 +366,13 @@ async fn test_sql_reorg() {
351366 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
352367 let Client {
353368 requester,
354- log_rx : mut log ,
355- warn_rx : _ ,
369+ log_rx,
370+ warn_rx,
356371 event_rx : mut channel,
357372 } = client;
373+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
358374 // The node properly syncs after persisting a reorg
359- sync_assert ( & best, & mut channel, & mut log ) . await ;
375+ sync_assert ( & best, & mut channel) . await ;
360376 requester. shutdown ( ) . await . unwrap ( ) ;
361377 rpc. stop ( ) . unwrap ( ) ;
362378}
@@ -383,11 +399,12 @@ async fn test_two_deep_reorg() {
383399 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
384400 let Client {
385401 requester,
386- log_rx : mut log ,
387- warn_rx : _ ,
402+ log_rx,
403+ warn_rx,
388404 event_rx : mut channel,
389405 } = client;
390- sync_assert ( & best, & mut channel, & mut log) . await ;
406+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
407+ sync_assert ( & best, & mut channel) . await ;
391408 requester. shutdown ( ) . await . unwrap ( ) ;
392409 // Reorganize the blocks
393410 let old_height = num_blocks ( rpc) ;
@@ -397,15 +414,17 @@ async fn test_two_deep_reorg() {
397414 invalidate_block ( rpc, & best) . await ;
398415 mine_blocks ( rpc, & miner, 3 , 1 ) . await ;
399416 let best = best_hash ( rpc) ;
417+ drop ( handle) ;
400418 // Make sure the reorganization is caught after a cold start
401419 let ( node, client) = new_node_sql ( scripts. clone ( ) , socket_addr, tempdir. clone ( ) ) ;
402420 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
403421 let Client {
404422 requester,
405- log_rx : _ ,
406- warn_rx : _ ,
423+ log_rx,
424+ warn_rx,
407425 event_rx : mut channel,
408426 } = client;
427+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
409428 while let Some ( message) = channel. recv ( ) . await {
410429 match message {
411430 kyoto:: core:: messages:: Event :: BlocksDisconnected ( blocks) => {
@@ -421,6 +440,7 @@ async fn test_two_deep_reorg() {
421440 _ => { }
422441 }
423442 }
443+ drop ( handle) ;
424444 requester. shutdown ( ) . await . unwrap ( ) ;
425445 // Mine more blocks
426446 mine_blocks ( rpc, & miner, 2 , 1 ) . await ;
@@ -430,12 +450,13 @@ async fn test_two_deep_reorg() {
430450 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
431451 let Client {
432452 requester,
433- log_rx : mut log ,
434- warn_rx : _ ,
453+ log_rx,
454+ warn_rx,
435455 event_rx : mut channel,
436456 } = client;
457+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
437458 // The node properly syncs after persisting a reorg
438- sync_assert ( & best, & mut channel, & mut log ) . await ;
459+ sync_assert ( & best, & mut channel) . await ;
439460 requester. shutdown ( ) . await . unwrap ( ) ;
440461 rpc. stop ( ) . unwrap ( ) ;
441462}
@@ -461,11 +482,13 @@ async fn test_sql_stale_anchor() {
461482 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
462483 let Client {
463484 requester,
464- log_rx : mut log ,
465- warn_rx : _ ,
485+ log_rx,
486+ warn_rx,
466487 event_rx : mut channel,
467488 } = client;
468- sync_assert ( & best, & mut channel, & mut log) . await ;
489+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
490+ sync_assert ( & best, & mut channel) . await ;
491+ drop ( handle) ;
469492 requester. shutdown ( ) . await . unwrap ( ) ;
470493 // Reorganize the blocks
471494 let old_best = best;
@@ -483,10 +506,11 @@ async fn test_sql_stale_anchor() {
483506 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
484507 let Client {
485508 requester,
486- log_rx : _ ,
487- warn_rx : _ ,
509+ log_rx,
510+ warn_rx,
488511 event_rx : mut channel,
489512 } = client;
513+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
490514 // Ensure SQL is able to catch the fork by loading in headers from the database
491515 while let Some ( message) = channel. recv ( ) . await {
492516 match message {
@@ -503,6 +527,7 @@ async fn test_sql_stale_anchor() {
503527 _ => { }
504528 }
505529 }
530+ drop ( handle) ;
506531 requester. shutdown ( ) . await . unwrap ( ) ;
507532 // Don't do anything, but reload the node from the checkpoint
508533 let cp = best_hash ( rpc) ;
@@ -518,12 +543,14 @@ async fn test_sql_stale_anchor() {
518543 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
519544 let Client {
520545 requester,
521- log_rx : mut log ,
522- warn_rx : _ ,
546+ log_rx,
547+ warn_rx,
523548 event_rx : mut channel,
524549 } = client;
550+ let handle = tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
525551 // The node properly syncs after persisting a reorg
526- sync_assert ( & best, & mut channel, & mut log) . await ;
552+ sync_assert ( & best, & mut channel) . await ;
553+ drop ( handle) ;
527554 requester. shutdown ( ) . await . unwrap ( ) ;
528555 // Mine more blocks and reload from the checkpoint
529556 let cp = best_hash ( rpc) ;
@@ -540,12 +567,13 @@ async fn test_sql_stale_anchor() {
540567 tokio:: task:: spawn ( async move { node. run ( ) . await } ) ;
541568 let Client {
542569 requester,
543- log_rx : mut log ,
544- warn_rx : _ ,
570+ log_rx,
571+ warn_rx,
545572 event_rx : mut channel,
546573 } = client;
574+ tokio:: task:: spawn ( async move { print_logs ( log_rx, warn_rx) . await } ) ;
547575 // The node properly syncs after persisting a reorg
548- sync_assert ( & best, & mut channel, & mut log ) . await ;
576+ sync_assert ( & best, & mut channel) . await ;
549577 requester. shutdown ( ) . await . unwrap ( ) ;
550578 rpc. stop ( ) . unwrap ( ) ;
551579}
0 commit comments