@@ -43,15 +43,14 @@ impl PeerNetworkInterface {
4343 SyncPoint :: Snapshot => Some ( context. subscribe ( & cfg. snapshot_completion_topic ) . await ?) ,
4444 _ => None ,
4545 } ;
46- let ( events_sender, events) = mpsc:: channel ( 1024 ) ;
4746
4847 context. clone ( ) . run ( async move {
4948 let genesis_values = if let Some ( mut sub) = genesis_complete {
5049 Self :: wait_genesis_completion ( & mut sub)
5150 . await
5251 . expect ( "could not fetch genesis values" )
5352 } else {
54- cfg. genesis_values . expect ( "genesis values not found" )
53+ cfg. genesis_values . clone ( ) . expect ( "genesis values not found" )
5554 } ;
5655
5756 let mut upstream_cache = None ;
@@ -73,40 +72,53 @@ impl PeerNetworkInterface {
7372 }
7473 }
7574
76- let sink = BlockSink {
75+ let mut sink = BlockSink {
7776 context,
78- topic : cfg. block_topic ,
77+ topic : cfg. block_topic . clone ( ) ,
7978 genesis_values,
8079 upstream_cache,
8180 last_epoch,
8281 } ;
8382
84- let mut manager = NetworkManager :: new ( cfg. magic_number , events, events_sender, sink) ;
85- for address in cfg. node_addresses {
86- manager. handle_new_connection ( address, Duration :: ZERO ) ;
87- }
88-
89- match cfg. sync_point {
90- SyncPoint :: Origin => manager. sync_to_point ( Point :: Origin ) ,
83+ let manager = match cfg. sync_point {
84+ SyncPoint :: Origin => {
85+ let mut manager = Self :: init_manager ( cfg, sink) ;
86+ manager. sync_to_point ( Point :: Origin ) ;
87+ manager
88+ }
9189 SyncPoint :: Tip => {
90+ let mut manager = Self :: init_manager ( cfg, sink) ;
9291 if let Err ( error) = manager. sync_to_tip ( ) . await {
9392 warn ! ( "could not sync to tip: {error:#}" ) ;
9493 return ;
9594 }
95+ manager
96+ }
97+ SyncPoint :: Cache => {
98+ let mut manager = Self :: init_manager ( cfg, sink) ;
99+ manager. sync_to_point ( cache_sync_point) ;
100+ manager
96101 }
97- SyncPoint :: Cache => manager. sync_to_point ( cache_sync_point) ,
98102 SyncPoint :: Snapshot => {
99103 let mut subscription =
100104 snapshot_complete. expect ( "Snapshot topic subscription missing" ) ;
101105 match Self :: wait_snapshot_completion ( & mut subscription) . await {
102- Ok ( point) => manager. sync_to_point ( point) ,
106+ Ok ( point) => {
107+ if let Point :: Specific ( slot, _) = point {
108+ let ( epoch, _) = sink. genesis_values . slot_to_epoch ( slot) ;
109+ sink. last_epoch = Some ( epoch) ;
110+ }
111+ let mut manager = Self :: init_manager ( cfg, sink) ;
112+ manager. sync_to_point ( point) ;
113+ manager
114+ }
103115 Err ( error) => {
104116 warn ! ( "snapshot restoration never completed: {error:#}" ) ;
105117 return ;
106118 }
107119 }
108120 }
109- }
121+ } ;
110122
111123 if let Err ( err) = manager. run ( ) . await {
112124 error ! ( "chain sync failed: {err:#}" ) ;
@@ -116,6 +128,15 @@ impl PeerNetworkInterface {
116128 Ok ( ( ) )
117129 }
118130
131+ fn init_manager ( cfg : InterfaceConfig , sink : BlockSink ) -> NetworkManager {
132+ let ( events_sender, events) = mpsc:: channel ( 1024 ) ;
133+ let mut manager = NetworkManager :: new ( cfg. magic_number , events, events_sender, sink) ;
134+ for address in cfg. node_addresses {
135+ manager. handle_new_connection ( address, Duration :: ZERO ) ;
136+ }
137+ manager
138+ }
139+
119140 async fn init_cache (
120141 cache_dir : & Path ,
121142 block_topic : & str ,
0 commit comments