Skip to content

Commit e67b9c9

Browse files
author
Adrian Nagy
committed
feat: AWS support
1 parent 6089375 commit e67b9c9

File tree

7 files changed

+160
-55
lines changed

7 files changed

+160
-55
lines changed

cli/src/commands/node/mod.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,9 @@ pub struct Node {
138138
#[arg(short = 'c', long, env)]
139139
pub config: Option<PathBuf>,
140140

141-
/// Enable archive mode (seding blocks to the archive process).
142-
#[arg(long, env)]
143-
pub archive_address: Option<Url>,
144-
141+
// /// Enable archive mode (seding blocks to the archive process).
142+
// #[arg(long, env)]
143+
// pub archive_address: Option<Url>,
145144
/// Enable local precomputed storage.
146145
#[arg(long, env)]
147146
pub archive_local_storage: bool,
@@ -157,6 +156,13 @@ pub struct Node {
157156
pub archive_gcp_storage: bool,
158157

159158
/// Enable AWS precomputed storage.
159+
///
160+
/// This requires the following environment variables to be set:
161+
/// - AWS_ACCESS_KEY_ID
162+
/// - AWS_SECRET_ACCESS_KEY
163+
/// - AWS_SESSION_TOKEN
164+
/// - AWS_DEFAULT_REGION
165+
/// - OPENMINA_AWS_BUCKET_NAME
160166
#[arg(long, env)]
161167
pub archive_aws_storage: bool,
162168
}
@@ -314,17 +320,21 @@ impl Node {
314320
.map(|(_, option)| option.clone()),
315321
);
316322

317-
if let Some(address) = self.archive_address {
323+
if archive_storage_options.is_enabled() {
318324
node::core::info!(
319325
summary = "Archive mode enabled",
320-
address = address.to_string()
326+
local_storage = archive_storage_options.uses_local_precomputed_storage(),
327+
archiver_process = archive_storage_options.uses_archiver_process(),
328+
gcp_storage = archive_storage_options.uses_gcp_precomputed_storage(),
329+
aws_storage = archive_storage_options.uses_aws_precomputed_storage(),
321330
);
322-
// Convert URL to SocketAddr
323-
let socket_addrs = address.socket_addrs(|| None).expect("Invalid URL");
324331

325-
let socket_addr = socket_addrs.first().expect("No socket address found");
332+
archive_storage_options
333+
.validate_env_vars()
334+
.map_err(|e| anyhow::anyhow!(e))?;
335+
326336
// TODO(adonagy): add options
327-
node_builder.archive(*socket_addr, archive_storage_options);
337+
node_builder.archive(archive_storage_options, work_dir.clone());
328338
}
329339

330340
if let Some(sec_key) = self.run_snarker {

node/common/src/service/archive.rs

Lines changed: 123 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use bitflags::bitflags;
2-
use mina_p2p_messages::v2::{self, ArchiveTransitionFronntierDiff};
2+
use mina_p2p_messages::v2::{self};
33
use node::core::{channels::mpsc, thread};
44
use node::ledger::write::BlockApplyResult;
55
use std::env;
6-
use std::net::SocketAddr;
6+
use std::io::Write;
7+
use std::path::PathBuf;
78

89
use super::NodeService;
910

@@ -23,7 +24,7 @@ bitflags! {
2324

2425
impl ArchiveStorageOptions {
2526
pub fn is_enabled(&self) -> bool {
26-
self.is_empty()
27+
!self.is_empty()
2728
}
2829

2930
pub fn requires_precomputed_block(&self) -> bool {
@@ -60,6 +61,27 @@ impl ArchiveStorageOptions {
6061
.to_string(),
6162
);
6263
}
64+
65+
if env::var("AWS_DEFAULT_REGION").is_err() {
66+
return Err(
67+
"AWS_DEFAULT_REGION is required when AWS_PRECOMPUTED_STORAGE is enabled"
68+
.to_string(),
69+
);
70+
}
71+
72+
if env::var("OPENMINA_AWS_BUCKET_NAME").is_err() {
73+
return Err(
74+
"OPENMINA_AWS_BUCKET_NAME is required when AWS_PRECOMPUTED_STORAGE is enabled"
75+
.to_string(),
76+
);
77+
}
78+
79+
// if env::var("OPENMINA_AWS_BUCKET_PATH").is_err() {
80+
// return Err(
81+
// "OPENMINA_AWS_BUCKET_PATH is required when AWS_PRECOMPUTED_STORAGE is enabled"
82+
// .to_string(),
83+
// );
84+
// }
6385
}
6486

6587
// TODO(adonagy): Add GCP precomputed storage validation
@@ -89,31 +111,58 @@ pub struct ArchiveService {
89111
}
90112

91113
struct ArchiveServiceClients {
92-
aws_client: Option<aws_sdk_s3::Client>,
114+
aws_client: Option<ArchiveAWSClient>,
93115
gcp_client: Option<()>,
116+
local_path: Option<String>,
117+
}
118+
119+
struct ArchiveAWSClient {
120+
client: aws_sdk_s3::Client,
121+
bucket_name: String,
122+
bucket_path: String,
94123
}
95124

96125
impl ArchiveServiceClients {
97-
async fn new(options: &ArchiveStorageOptions) -> Self {
126+
async fn new(options: &ArchiveStorageOptions, work_dir: String) -> Self {
98127
let aws_client = if options.uses_aws_precomputed_storage() {
99128
let config = aws_config::load_from_env().await;
100-
Some(aws_sdk_s3::Client::new(&config))
129+
let bucket_name = env::var("OPENMINA_AWS_BUCKET_NAME").unwrap_or_default();
130+
let bucket_path = env::var("OPENMINA_AWS_BUCKET_PATH").unwrap_or_default();
131+
Some(ArchiveAWSClient {
132+
client: aws_sdk_s3::Client::new(&config),
133+
bucket_name,
134+
bucket_path,
135+
})
101136
} else {
102137
None
103138
};
139+
140+
let local_path = if options.uses_local_precomputed_storage() {
141+
let env_path = env::var("OPENMINA_LOCAL_PRECOMPUTED_STORAGE_PATH");
142+
let default = format!("{}/archive-precomputed", work_dir);
143+
Some(env_path.unwrap_or(default))
144+
} else {
145+
None
146+
};
147+
104148
Self {
105149
aws_client,
106150
gcp_client: None,
151+
local_path,
107152
}
108153
}
109154

110-
pub fn aws_client(&self) -> Option<&aws_sdk_s3::Client> {
155+
pub fn aws_client(&self) -> Option<&ArchiveAWSClient> {
111156
self.aws_client.as_ref()
112157
}
113158

114159
pub fn gcp_client(&self) -> Option<&()> {
115160
self.gcp_client.as_ref()
116161
}
162+
163+
pub fn local_path(&self) -> Option<&str> {
164+
self.local_path.as_deref()
165+
}
117166
}
118167

119168
impl ArchiveService {
@@ -124,23 +173,35 @@ impl ArchiveService {
124173
#[cfg(not(target_arch = "wasm32"))]
125174
async fn run(
126175
mut archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
127-
address: SocketAddr,
128176
options: ArchiveStorageOptions,
177+
work_dir: String,
129178
) {
179+
use std::{fs::File, path::Path};
180+
130181
use mina_p2p_messages::v2::PrecomputedBlock;
182+
use openmina_core::NetworkConfig;
183+
use reqwest::Url;
131184

132-
let clients = ArchiveServiceClients::new(&options).await;
185+
let clients = ArchiveServiceClients::new(&options, work_dir).await;
133186

134-
while let Some(breadcrumb) = archive_receiver.blocking_recv() {
187+
while let Some(breadcrumb) = archive_receiver.recv().await {
135188
if options.uses_archiver_process() {
189+
let address = std::env::var("OPENMINA_ARCHIVE_ADDRESS")
190+
.expect("OPENMINA_ARCHIVE_ADDRESS is not set");
191+
let address = Url::parse(&address).expect("Invalid URL");
192+
193+
// Convert URL to SocketAddr
194+
let socket_addrs = address.socket_addrs(|| None).expect("Invalid URL");
195+
196+
let socket_addr = socket_addrs.first().expect("No socket address found");
136197
let mut retries = ARCHIVE_SEND_RETRIES;
137198

138199
let archive_transition_frontier_diff: v2::ArchiveTransitionFronntierDiff =
139200
breadcrumb.clone().try_into().unwrap();
140201

141202
while retries > 0 {
142203
match rpc::send_diff(
143-
address,
204+
*socket_addr,
144205
v2::ArchiveRpc::SendDiff(archive_transition_frontier_diff.clone()),
145206
) {
146207
Ok(result) => {
@@ -149,9 +210,10 @@ impl ArchiveService {
149210
summary = "Archive suddenly closed connection, retrying..."
150211
);
151212
retries -= 1;
152-
std::thread::sleep(std::time::Duration::from_millis(
213+
tokio::time::sleep(std::time::Duration::from_millis(
153214
RETRY_INTERVAL_MS,
154-
));
215+
))
216+
.await;
155217
} else {
156218
node::core::warn!(summary = "Successfully sent diff to archive");
157219
break;
@@ -164,15 +226,25 @@ impl ArchiveService {
164226
retries = retries
165227
);
166228
retries -= 1;
167-
std::thread::sleep(std::time::Duration::from_millis(RETRY_INTERVAL_MS));
229+
tokio::time::sleep(std::time::Duration::from_millis(RETRY_INTERVAL_MS))
230+
.await;
168231
}
169232
}
170233
}
171234
}
172235

173236
if options.requires_precomputed_block() {
174-
// let key =
175-
// TODO(adonagy)
237+
let network_name = NetworkConfig::global().name;
238+
let height = breadcrumb.block.height();
239+
let state_hash = breadcrumb.block.hash();
240+
241+
let key = format!("{network_name}-{height}-{state_hash}.json");
242+
243+
node::core::info!(
244+
summary = "Uploading precomputed block to archive",
245+
key = key.clone()
246+
);
247+
176248
let precomputed_block: PrecomputedBlock =
177249
if let Ok(precomputed_block) = breadcrumb.try_into() {
178250
precomputed_block
@@ -184,16 +256,46 @@ impl ArchiveService {
184256
};
185257

186258
if options.uses_local_precomputed_storage() {
187-
// TODO(adonagy): Implement local precomputed storage
259+
// TODO(adonagy): Cleanup the unwraps (fn that returns a Result + log the error)
260+
if let Some(path) = clients.local_path() {
261+
let file_path = Path::new(path).join(key.clone());
262+
let mut file = File::create(file_path).unwrap();
263+
let json = serde_json::to_vec(&precomputed_block).unwrap();
264+
file.write_all(&json).unwrap();
265+
} else {
266+
node::core::warn!(summary = "Local precomputed storage path not set");
267+
}
188268
}
189269

190270
if options.uses_gcp_precomputed_storage() {
191271
// TODO(adonagy): Implement GCP precomputed storage
192272
}
193273

194274
if options.uses_aws_precomputed_storage() {
195-
let client = clients.aws_client().unwrap();
196-
// put
275+
if let Some(client) = clients.aws_client() {
276+
let json = serde_json::to_string(&precomputed_block).unwrap();
277+
let res = client
278+
.client
279+
.put_object()
280+
.bucket(client.bucket_name.clone())
281+
.key(format!("{}/{}", client.bucket_path, key))
282+
.body(json.as_bytes().to_vec().into())
283+
.send()
284+
.await;
285+
286+
if let Err(e) = res {
287+
node::core::warn!(
288+
summary = "Failed to upload precomputed block to AWS",
289+
error = e.to_string()
290+
);
291+
} else {
292+
node::core::warn!(
293+
summary = "Successfully uploaded precomputed block to AWS"
294+
);
295+
}
296+
} else {
297+
node::core::warn!(summary = "AWS client not initialized");
298+
}
197299
}
198300
}
199301
}
@@ -209,7 +311,7 @@ impl ArchiveService {
209311
unimplemented!()
210312
}
211313

212-
pub fn start(address: SocketAddr, options: ArchiveStorageOptions) -> Self {
314+
pub fn start(options: ArchiveStorageOptions, work_dir: String) -> Self {
213315
let (archive_sender, archive_receiver) = mpsc::unbounded_channel::<BlockApplyResult>();
214316

215317
let runtime = tokio::runtime::Builder::new_current_thread()
@@ -220,7 +322,7 @@ impl ArchiveService {
220322
thread::Builder::new()
221323
.name("openmina_archive".to_owned())
222324
.spawn(move || {
223-
runtime.block_on(Self::run(archive_receiver, address, options));
325+
runtime.block_on(Self::run(archive_receiver, options, work_dir));
224326
})
225327
.unwrap();
226328

node/common/src/service/builder.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::net::SocketAddr;
1+
use std::{net::SocketAddr, path::PathBuf};
22

33
use ledger::proofs::provers::BlockProver;
44
use node::{
@@ -101,12 +101,8 @@ impl NodeServiceCommonBuilder {
101101
self
102102
}
103103

104-
pub fn archive_init(
105-
&mut self,
106-
address: SocketAddr,
107-
options: ArchiveStorageOptions,
108-
) -> &mut Self {
109-
self.archive = Some(ArchiveService::start(address, options));
104+
pub fn archive_init(&mut self, options: ArchiveStorageOptions, work_dir: String) -> &mut Self {
105+
self.archive = Some(ArchiveService::start(options, work_dir));
110106
self
111107
}
112108

node/native/src/node/builder.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{
22
fs::File,
33
io::{BufRead, BufReader, Read},
4-
net::{IpAddr, SocketAddr},
4+
net::IpAddr,
55
path::Path,
66
sync::Arc,
77
time::Duration,
@@ -218,9 +218,9 @@ impl NodeBuilder {
218218
Ok(self.block_producer(key, provers))
219219
}
220220

221-
pub fn archive(&mut self, address: SocketAddr, options: ArchiveStorageOptions) -> &mut Self {
222-
self.archive = Some(ArchiveConfig::new(&address.to_string()));
223-
self.service.archive_init(address, options);
221+
pub fn archive(&mut self, options: ArchiveStorageOptions, work_dir: String) -> &mut Self {
222+
self.archive = Some(ArchiveConfig::new(work_dir.clone()));
223+
self.service.archive_init(options, work_dir.clone());
224224
self
225225
}
226226

node/native/src/service/builder.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::net::SocketAddr;
1+
use std::{net::SocketAddr, path::PathBuf};
22

33
use ledger::proofs::provers::BlockProver;
44
use node::{
@@ -56,12 +56,8 @@ impl NodeServiceBuilder {
5656
self
5757
}
5858

59-
pub fn archive_init(
60-
&mut self,
61-
address: SocketAddr,
62-
options: ArchiveStorageOptions,
63-
) -> &mut Self {
64-
self.common.archive_init(address, options);
59+
pub fn archive_init(&mut self, options: ArchiveStorageOptions, work_dir: String) -> &mut Self {
60+
self.common.archive_init(options, work_dir);
6561
self
6662
}
6763

0 commit comments

Comments
 (0)