@@ -4,6 +4,12 @@ use node::ledger::write::BlockApplyResult;
44use std:: env;
55use std:: io:: Write ;
66
7+ use mina_p2p_messages:: v2:: PrecomputedBlock ;
8+ use openmina_core:: NetworkConfig ;
9+ use reqwest:: Url ;
10+ use std:: net:: SocketAddr ;
11+ use std:: { fs:: File , path:: Path } ;
12+
713use super :: NodeService ;
814
915pub mod aws;
@@ -31,6 +37,7 @@ pub struct ArchiveService {
3137}
3238
3339struct ArchiveServiceClients {
40+ archiver_address : Option < SocketAddr > ,
3441 aws_client : Option < aws:: ArchiveAWSClient > ,
3542 gcp_client : Option < gcp:: ArchiveGCPClient > ,
3643 local_path : Option < String > ,
@@ -60,23 +67,134 @@ impl ArchiveServiceClients {
6067 None
6168 } ;
6269
70+ let archiver_address = if options. uses_archiver_process ( ) {
71+ let address = std:: env:: var ( "OPENMINA_ARCHIVE_ADDRESS" )
72+ . expect ( "OPENMINA_ARCHIVE_ADDRESS is not set" ) ;
73+ let address = Url :: parse ( & address) . expect ( "Invalid URL" ) ;
74+
75+ // Convert URL to SocketAddr
76+ let socket_addrs = address. socket_addrs ( || None ) . expect ( "Invalid URL" ) ;
77+
78+ let socket_addr = socket_addrs. first ( ) . expect ( "No socket address found" ) ;
79+
80+ Some ( * socket_addr)
81+ } else {
82+ None
83+ } ;
84+
6385 Ok ( Self {
86+ archiver_address,
6487 aws_client,
6588 gcp_client,
6689 local_path,
6790 } )
6891 }
6992
70- pub fn aws_client ( & self ) -> Option < & aws:: ArchiveAWSClient > {
71- self . aws_client . as_ref ( )
72- }
93+ pub async fn send_block ( & self , breadcrumb : BlockApplyResult , options : & ArchiveStorageOptions ) {
94+ if options. uses_archiver_process ( ) {
95+ if let Some ( socket_addr) = self . archiver_address {
96+ Self :: handle_archiver_process ( & breadcrumb, & socket_addr) . await ;
97+ } else {
98+ node:: core:: warn!( summary = "Archiver address not set" ) ;
99+ }
100+ }
101+
102+ if options. requires_precomputed_block ( ) {
103+ let network_name = NetworkConfig :: global ( ) . name ;
104+ let height = breadcrumb. block . height ( ) ;
105+ let state_hash = breadcrumb. block . hash ( ) ;
106+
107+ let key = format ! ( "{network_name}-{height}-{state_hash}.json" ) ;
108+
109+ node:: core:: info!(
110+ summary = "Uploading precomputed block to archive" ,
111+ key = key. clone( )
112+ ) ;
113+
114+ let precomputed_block: PrecomputedBlock = if let Ok ( precomputed_block) =
115+ breadcrumb. try_into ( )
116+ {
117+ precomputed_block
118+ } else {
119+ node:: core:: warn!( summary = "Failed to convert breadcrumb to precomputed block" ) ;
120+ return ;
121+ } ;
122+
123+ let data = serde_json:: to_vec ( & precomputed_block) . unwrap ( ) ;
124+
125+ if options. uses_local_precomputed_storage ( ) {
126+ // TODO(adonagy): Cleanup the unwraps (fn that returns a Result + log the error)
127+ if let Some ( path) = & self . local_path {
128+ let file_path = Path :: new ( path) . join ( key. clone ( ) ) ;
129+ let mut file = File :: create ( file_path) . unwrap ( ) ;
130+ file. write_all ( & data) . unwrap ( ) ;
131+ } else {
132+ node:: core:: warn!( summary = "Local precomputed storage path not set" ) ;
133+ }
134+ }
73135
74- pub fn gcp_client ( & self ) -> Option < & gcp:: ArchiveGCPClient > {
75- self . gcp_client . as_ref ( )
136+ if options. uses_gcp_precomputed_storage ( ) {
137+ if let Some ( client) = & self . gcp_client {
138+ if let Err ( e) = client. upload_block ( & key, & data) . await {
139+ node:: core:: warn!(
140+ summary = "Failed to upload precomputed block to GCP" ,
141+ error = e. to_string( )
142+ ) ;
143+ }
144+ } else {
145+ node:: core:: warn!( summary = "GCP client not initialized" ) ;
146+ }
147+ }
148+ if options. uses_aws_precomputed_storage ( ) {
149+ if let Some ( client) = & self . aws_client {
150+ if let Err ( e) = client. upload_block ( & key, & data) . await {
151+ node:: core:: warn!(
152+ summary = "Failed to upload precomputed block to AWS" ,
153+ error = e. to_string( )
154+ ) ;
155+ }
156+ } else {
157+ node:: core:: warn!( summary = "AWS client not initialized" ) ;
158+ }
159+ }
160+ }
76161 }
77162
78- pub fn local_path ( & self ) -> Option < & str > {
79- self . local_path . as_deref ( )
163+ async fn handle_archiver_process ( breadcrumb : & BlockApplyResult , socket_addr : & SocketAddr ) {
164+ let mut retries = ARCHIVE_SEND_RETRIES ;
165+
166+ let archive_transition_frontier_diff: v2:: ArchiveTransitionFronntierDiff =
167+ breadcrumb. clone ( ) . try_into ( ) . unwrap ( ) ;
168+
169+ while retries > 0 {
170+ match rpc:: send_diff (
171+ * socket_addr,
172+ v2:: ArchiveRpc :: SendDiff ( archive_transition_frontier_diff. clone ( ) ) ,
173+ ) {
174+ Ok ( result) => {
175+ if result. should_retry ( ) {
176+ node:: core:: warn!(
177+ summary = "Archive suddenly closed connection, retrying..."
178+ ) ;
179+ retries -= 1 ;
180+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( RETRY_INTERVAL_MS ) )
181+ . await ;
182+ } else {
183+ node:: core:: warn!( summary = "Successfully sent diff to archive" ) ;
184+ break ;
185+ }
186+ }
187+ Err ( e) => {
188+ node:: core:: warn!(
189+ summary = "Failed sending diff to archive" ,
190+ error = e. to_string( ) ,
191+ retries = retries
192+ ) ;
193+ retries -= 1 ;
194+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( RETRY_INTERVAL_MS ) ) . await ;
195+ }
196+ }
197+ }
80198 }
81199}
82200
@@ -91,12 +209,6 @@ impl ArchiveService {
91209 options : ArchiveStorageOptions ,
92210 work_dir : String ,
93211 ) {
94- use std:: { fs:: File , path:: Path } ;
95-
96- use mina_p2p_messages:: v2:: PrecomputedBlock ;
97- use openmina_core:: NetworkConfig ;
98- use reqwest:: Url ;
99-
100212 let clients = if let Ok ( clients) = ArchiveServiceClients :: new ( & options, work_dir) . await {
101213 clients
102214 } else {
@@ -105,114 +217,7 @@ impl ArchiveService {
105217 } ;
106218
107219 while let Some ( breadcrumb) = archive_receiver. recv ( ) . await {
108- if options. uses_archiver_process ( ) {
109- let address = std:: env:: var ( "OPENMINA_ARCHIVE_ADDRESS" )
110- . expect ( "OPENMINA_ARCHIVE_ADDRESS is not set" ) ;
111- let address = Url :: parse ( & address) . expect ( "Invalid URL" ) ;
112-
113- // Convert URL to SocketAddr
114- let socket_addrs = address. socket_addrs ( || None ) . expect ( "Invalid URL" ) ;
115-
116- let socket_addr = socket_addrs. first ( ) . expect ( "No socket address found" ) ;
117- let mut retries = ARCHIVE_SEND_RETRIES ;
118-
119- let archive_transition_frontier_diff: v2:: ArchiveTransitionFronntierDiff =
120- breadcrumb. clone ( ) . try_into ( ) . unwrap ( ) ;
121-
122- while retries > 0 {
123- match rpc:: send_diff (
124- * socket_addr,
125- v2:: ArchiveRpc :: SendDiff ( archive_transition_frontier_diff. clone ( ) ) ,
126- ) {
127- Ok ( result) => {
128- if result. should_retry ( ) {
129- node:: core:: warn!(
130- summary = "Archive suddenly closed connection, retrying..."
131- ) ;
132- retries -= 1 ;
133- tokio:: time:: sleep ( std:: time:: Duration :: from_millis (
134- RETRY_INTERVAL_MS ,
135- ) )
136- . await ;
137- } else {
138- node:: core:: warn!( summary = "Successfully sent diff to archive" ) ;
139- break ;
140- }
141- }
142- Err ( e) => {
143- node:: core:: warn!(
144- summary = "Failed sending diff to archive" ,
145- error = e. to_string( ) ,
146- retries = retries
147- ) ;
148- retries -= 1 ;
149- tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( RETRY_INTERVAL_MS ) )
150- . await ;
151- }
152- }
153- }
154- }
155-
156- if options. requires_precomputed_block ( ) {
157- let network_name = NetworkConfig :: global ( ) . name ;
158- let height = breadcrumb. block . height ( ) ;
159- let state_hash = breadcrumb. block . hash ( ) ;
160-
161- let key = format ! ( "{network_name}-{height}-{state_hash}.json" ) ;
162-
163- node:: core:: info!(
164- summary = "Uploading precomputed block to archive" ,
165- key = key. clone( )
166- ) ;
167-
168- let precomputed_block: PrecomputedBlock =
169- if let Ok ( precomputed_block) = breadcrumb. try_into ( ) {
170- precomputed_block
171- } else {
172- node:: core:: warn!(
173- summary = "Failed to convert breadcrumb to precomputed block"
174- ) ;
175- continue ;
176- } ;
177-
178- let data = serde_json:: to_vec ( & precomputed_block) . unwrap ( ) ;
179-
180- if options. uses_local_precomputed_storage ( ) {
181- // TODO(adonagy): Cleanup the unwraps (fn that returns a Result + log the error)
182- if let Some ( path) = clients. local_path ( ) {
183- let file_path = Path :: new ( path) . join ( key. clone ( ) ) ;
184- let mut file = File :: create ( file_path) . unwrap ( ) ;
185- file. write_all ( & data) . unwrap ( ) ;
186- } else {
187- node:: core:: warn!( summary = "Local precomputed storage path not set" ) ;
188- }
189- }
190-
191- if options. uses_gcp_precomputed_storage ( ) {
192- if let Some ( client) = clients. gcp_client ( ) {
193- if let Err ( e) = client. upload_block ( & key, & data) . await {
194- node:: core:: warn!(
195- summary = "Failed to upload precomputed block to GCP" ,
196- error = e. to_string( )
197- ) ;
198- }
199- } else {
200- node:: core:: warn!( summary = "GCP client not initialized" ) ;
201- }
202- }
203- if options. uses_aws_precomputed_storage ( ) {
204- if let Some ( client) = clients. aws_client ( ) {
205- if let Err ( e) = client. upload_block ( & key, & data) . await {
206- node:: core:: warn!(
207- summary = "Failed to upload precomputed block to AWS" ,
208- error = e. to_string( )
209- ) ;
210- }
211- } else {
212- node:: core:: warn!( summary = "AWS client not initialized" ) ;
213- }
214- }
215- }
220+ clients. send_block ( breadcrumb, & options) . await ;
216221 }
217222 }
218223
0 commit comments