@@ -8,7 +8,7 @@ use pallas_network::{
8
8
miniprotocols:: {
9
9
localstate:: {
10
10
queries_v16:: {
11
- self , Addr , Addrs , PostAlonsoTransactionOutput , StakeSnapshot , Stakes ,
11
+ self , Addr , Addrs , Genesis , PostAlonsoTransactionOutput , StakeSnapshot , Stakes ,
12
12
TransactionOutput , UTxOByAddress ,
13
13
} ,
14
14
Client ,
@@ -271,6 +271,40 @@ impl PallasChainObserver {
271
271
. with_context ( || "PallasChainObserver failed to convert kes period" ) ?)
272
272
}
273
273
274
+ /// Fetches the current chain point using the provided `statequery` client.
275
+ async fn do_get_chain_point_state_query ( & self , statequery : & mut Client ) -> StdResult < Point > {
276
+ let chain_point = queries_v16:: get_chain_point ( statequery)
277
+ . await
278
+ . map_err ( |err| anyhow ! ( err) )
279
+ . with_context ( || "PallasChainObserver failed to get chain point" ) ?;
280
+
281
+ Ok ( chain_point)
282
+ }
283
+
284
+ /// Fetches the current era using the provided `statequery` client.
285
+ async fn do_get_current_era_state_query ( & self , statequery : & mut Client ) -> StdResult < u16 > {
286
+ let era = queries_v16:: get_current_era ( statequery)
287
+ . await
288
+ . map_err ( |err| anyhow ! ( err) )
289
+ . with_context ( || "PallasChainObserver failed to get current era" ) ?;
290
+
291
+ Ok ( era)
292
+ }
293
+
294
+ /// Fetches the current genesis config using the provided `statequery` client.
295
+ async fn do_get_genesis_config_state_query (
296
+ & self ,
297
+ statequery : & mut Client ,
298
+ ) -> StdResult < Vec < Genesis > > {
299
+ let era = self . do_get_current_era_state_query ( statequery) . await ?;
300
+ let genesis_config = queries_v16:: get_genesis_config ( statequery, era)
301
+ . await
302
+ . map_err ( |err| anyhow ! ( err) )
303
+ . with_context ( || "PallasChainObserver failed to get genesis config" ) ?;
304
+
305
+ Ok ( genesis_config)
306
+ }
307
+
274
308
/// Fetches chain point and genesis config through the local statequery.
275
309
/// The KES period is calculated afterwards.
276
310
async fn get_kes_period (
@@ -285,20 +319,9 @@ impl PallasChainObserver {
285
319
. map_err ( |err| anyhow ! ( err) )
286
320
. with_context ( || "PallasChainObserver failed to acquire statequery" ) ?;
287
321
288
- let chain_point = queries_v16:: get_chain_point ( statequery)
289
- . await
290
- . map_err ( |err| anyhow ! ( err) )
291
- . with_context ( || "PallasChainObserver failed to get chain point" ) ?;
292
-
293
- let era = queries_v16:: get_current_era ( statequery)
294
- . await
295
- . map_err ( |err| anyhow ! ( err) )
296
- . with_context ( || "PallasChainObserver failed to get current era" ) ?;
322
+ let chain_point = self . do_get_chain_point_state_query ( statequery) . await ?;
297
323
298
- let genesis_config = queries_v16:: get_genesis_config ( statequery, era)
299
- . await
300
- . map_err ( |err| anyhow ! ( err) )
301
- . with_context ( || "PallasChainObserver failed to get genesis config" ) ?;
324
+ let genesis_config = self . do_get_genesis_config_state_query ( statequery) . await ?;
302
325
303
326
let config = genesis_config
304
327
. first ( )
@@ -579,8 +602,12 @@ mod tests {
579
602
TempDir :: create_with_short_path ( "pallas_chain_observer_test" , folder_name)
580
603
}
581
604
582
- /// Sets up a mock server.
583
- async fn setup_server ( socket_path : PathBuf ) -> tokio:: task:: JoinHandle < ( ) > {
605
+ /// Sets up a mock server for related tests.
606
+ ///
607
+ /// Use the `intersections` parameter to define exactly how many
608
+ /// local state queries should be intersepted by the `mock_server`
609
+ /// and avoid any panic errors.
610
+ async fn setup_server ( socket_path : PathBuf , intersections : u32 ) -> tokio:: task:: JoinHandle < ( ) > {
584
611
tokio:: spawn ( {
585
612
async move {
586
613
if socket_path. exists ( ) {
@@ -595,22 +622,18 @@ mod tests {
595
622
server. statequery ( ) . recv_while_idle ( ) . await . unwrap ( ) ;
596
623
server. statequery ( ) . send_acquired ( ) . await . unwrap ( ) ;
597
624
598
- let result = mock_server ( & mut server) . await ;
599
- server. statequery ( ) . send_result ( result) . await . unwrap ( ) ;
600
-
601
- let result = mock_server ( & mut server) . await ;
602
- server. statequery ( ) . send_result ( result) . await . unwrap ( ) ;
603
-
604
- let result = mock_server ( & mut server) . await ;
605
- server. statequery ( ) . send_result ( result) . await . unwrap ( ) ;
625
+ for _ in 0 ..intersections {
626
+ let result = mock_server ( & mut server) . await ;
627
+ server. statequery ( ) . send_result ( result) . await . unwrap ( ) ;
628
+ }
606
629
}
607
630
} )
608
631
}
609
632
610
633
#[ tokio:: test]
611
634
async fn get_current_epoch ( ) {
612
635
let socket_path = create_temp_dir ( "get_current_epoch" ) . join ( "node.socket" ) ;
613
- let server = setup_server ( socket_path. clone ( ) ) . await ;
636
+ let server = setup_server ( socket_path. clone ( ) , 2 ) . await ;
614
637
let client = tokio:: spawn ( async move {
615
638
let observer =
616
639
PallasChainObserver :: new ( socket_path. as_path ( ) , CardanoNetwork :: TestNet ( 10 ) ) ;
@@ -625,7 +648,7 @@ mod tests {
625
648
#[ tokio:: test]
626
649
async fn get_current_datums ( ) {
627
650
let socket_path = create_temp_dir ( "get_current_datums" ) . join ( "node.socket" ) ;
628
- let server = setup_server ( socket_path. clone ( ) ) . await ;
651
+ let server = setup_server ( socket_path. clone ( ) , 2 ) . await ;
629
652
let client = tokio:: spawn ( async move {
630
653
let observer =
631
654
PallasChainObserver :: new ( socket_path. as_path ( ) , CardanoNetwork :: TestNet ( 10 ) ) ;
@@ -642,7 +665,7 @@ mod tests {
642
665
#[ tokio:: test]
643
666
async fn get_current_stake_distribution ( ) {
644
667
let socket_path = create_temp_dir ( "get_current_stake_distribution" ) . join ( "node.socket" ) ;
645
- let server = setup_server ( socket_path. clone ( ) ) . await ;
668
+ let server = setup_server ( socket_path. clone ( ) , 2 ) . await ;
646
669
let client = tokio:: spawn ( async move {
647
670
let observer =
648
671
super :: PallasChainObserver :: new ( socket_path. as_path ( ) , CardanoNetwork :: TestNet ( 10 ) ) ;
@@ -672,7 +695,7 @@ mod tests {
672
695
#[ tokio:: test]
673
696
async fn get_current_kes_period ( ) {
674
697
let socket_path = create_temp_dir ( "get_current_kes_period" ) . join ( "node.socket" ) ;
675
- let server = setup_server ( socket_path. clone ( ) ) . await ;
698
+ let server = setup_server ( socket_path. clone ( ) , 3 ) . await ;
676
699
let client = tokio:: spawn ( async move {
677
700
let observer =
678
701
PallasChainObserver :: new ( socket_path. as_path ( ) , CardanoNetwork :: TestNet ( 10 ) ) ;
@@ -718,4 +741,76 @@ mod tests {
718
741
719
742
assert_eq ! ( 413 , current_kes_period) ;
720
743
}
744
+
745
+ #[ tokio:: test]
746
+ async fn get_chain_point ( ) {
747
+ let socket_path = create_temp_dir ( "get_chain_point" ) . join ( "node.socket" ) ;
748
+ let server = setup_server ( socket_path. clone ( ) , 1 ) . await ;
749
+ let client = tokio:: spawn ( async move {
750
+ let observer =
751
+ PallasChainObserver :: new ( socket_path. as_path ( ) , CardanoNetwork :: TestNet ( 10 ) ) ;
752
+ let mut client = observer. get_client ( ) . await . unwrap ( ) ;
753
+ let statequery = client. statequery ( ) ;
754
+ statequery. acquire ( None ) . await . unwrap ( ) ;
755
+ let chain_point = observer
756
+ . do_get_chain_point_state_query ( statequery)
757
+ . await
758
+ . unwrap ( ) ;
759
+ observer. post_process_statequery ( & mut client) . await . unwrap ( ) ;
760
+ client. abort ( ) . await ;
761
+ chain_point
762
+ } ) ;
763
+
764
+ let ( _, client_res) = tokio:: join!( server, client) ;
765
+ let chain_point = client_res. expect ( "Client failed" ) ;
766
+ assert_eq ! ( chain_point, Point :: Specific ( 52851885 , vec![ 1 , 2 , 3 ] ) ) ;
767
+ }
768
+
769
+ #[ tokio:: test]
770
+ async fn get_genesis_config ( ) {
771
+ let socket_path = create_temp_dir ( "get_genesis_config" ) . join ( "node.socket" ) ;
772
+ let server = setup_server ( socket_path. clone ( ) , 2 ) . await ;
773
+ let client = tokio:: spawn ( async move {
774
+ let observer =
775
+ PallasChainObserver :: new ( socket_path. as_path ( ) , CardanoNetwork :: TestNet ( 10 ) ) ;
776
+ let mut client = observer. get_client ( ) . await . unwrap ( ) ;
777
+ let statequery = client. statequery ( ) ;
778
+ statequery. acquire ( None ) . await . unwrap ( ) ;
779
+ let genesis_config = observer
780
+ . do_get_genesis_config_state_query ( statequery)
781
+ . await
782
+ . unwrap ( ) ;
783
+ observer. post_process_statequery ( & mut client) . await . unwrap ( ) ;
784
+ client. abort ( ) . await ;
785
+ genesis_config
786
+ } ) ;
787
+
788
+ let ( _, client_res) = tokio:: join!( server, client) ;
789
+ let genesis_config = client_res. expect ( "Client failed" ) ;
790
+ assert_eq ! ( genesis_config, get_fake_genesis_config( ) ) ;
791
+ }
792
+
793
+ #[ tokio:: test]
794
+ async fn get_current_era ( ) {
795
+ let socket_path = create_temp_dir ( "get_current_era" ) . join ( "node.socket" ) ;
796
+ let server = setup_server ( socket_path. clone ( ) , 1 ) . await ;
797
+ let client = tokio:: spawn ( async move {
798
+ let observer =
799
+ PallasChainObserver :: new ( socket_path. as_path ( ) , CardanoNetwork :: TestNet ( 10 ) ) ;
800
+ let mut client = observer. get_client ( ) . await . unwrap ( ) ;
801
+ let statequery = client. statequery ( ) ;
802
+ statequery. acquire ( None ) . await . unwrap ( ) ;
803
+ let era = observer
804
+ . do_get_current_era_state_query ( statequery)
805
+ . await
806
+ . unwrap ( ) ;
807
+ observer. post_process_statequery ( & mut client) . await . unwrap ( ) ;
808
+ client. abort ( ) . await ;
809
+ era
810
+ } ) ;
811
+
812
+ let ( _, client_res) = tokio:: join!( server, client) ;
813
+ let era = client_res. expect ( "Client failed" ) ;
814
+ assert_eq ! ( era, 4 ) ;
815
+ }
721
816
}
0 commit comments