Skip to content

Commit 5f33f50

Browse files
committed
feat(heartbeats): Don't store full blocks, just headers
1 parent 7e58931 commit 5f33f50

File tree

5 files changed

+82
-59
lines changed

5 files changed

+82
-59
lines changed

node/src/rpc/heartbeat.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,16 @@ pub struct NodeHeartbeat {
9797
pub status: NodeStatus,
9898
pub node_timestamp: Timestamp,
9999
pub peer_id: PeerId,
100-
// binprot+base64 encoded block
101-
pub last_produced_block: Option<String>,
100+
// binprot+base64 encoded block header
101+
pub last_produced_block_info: Option<ProducedBlockInfo>,
102+
}
103+
104+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
105+
pub struct ProducedBlockInfo {
106+
pub height: u32,
107+
pub global_slot: u32,
108+
pub hash: String,
109+
pub base64_encoded_header: String,
102110
}
103111

104112
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -283,7 +291,7 @@ pub(crate) mod tests {
283291
peer_id: "2bEgBrPTzL8wov2D4Kz34WVLCxR4uCarsBmHYXWKQA5wvBQzd9H"
284292
.parse()
285293
.unwrap(),
286-
last_produced_block: None,
294+
last_produced_block_info: None,
287295
}
288296
}
289297
}

node/src/rpc/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub use rpc_reducer::collect_rpc_peers_info;
2929
mod rpc_impls;
3030

3131
mod heartbeat;
32-
pub use heartbeat::{NodeHeartbeat, SignedNodeHeartbeat};
32+
pub use heartbeat::{NodeHeartbeat, ProducedBlockInfo, SignedNodeHeartbeat};
3333

3434
pub use openmina_core::requests::{RpcId, RpcIdType};
3535

node/src/rpc_effectful/rpc_effectful_effects.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ use crate::{
1212
p2p_ready,
1313
rpc::{
1414
AccountQuery, AccountSlim, ActionStatsQuery, ActionStatsResponse, CurrentMessageProgress,
15-
MessagesStats, NodeHeartbeat, RootLedgerSyncProgress, RootStagedLedgerSyncProgress,
16-
RpcAction, RpcBlockProducerStats, RpcMessageProgressResponse, RpcNodeStatus,
17-
RpcNodeStatusLedger, RpcNodeStatusResources, RpcNodeStatusTransactionPool,
15+
MessagesStats, NodeHeartbeat, ProducedBlockInfo, RootLedgerSyncProgress,
16+
RootStagedLedgerSyncProgress, RpcAction, RpcBlockProducerStats, RpcMessageProgressResponse,
17+
RpcNodeStatus, RpcNodeStatusLedger, RpcNodeStatusResources, RpcNodeStatusTransactionPool,
1818
RpcNodeStatusTransitionFrontier, RpcNodeStatusTransitionFrontierBlockSummary,
1919
RpcNodeStatusTransitionFrontierSync, RpcRequestExtraData, RpcScanStateSummary,
2020
RpcScanStateSummaryBlock, RpcScanStateSummaryBlockTransaction,
@@ -70,10 +70,12 @@ pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcE
7070
.stats()
7171
.and_then(|stats| stats.block_producer().last_produced_block.take());
7272

73-
let last_produced_block = match base64_encode_block(last_produced_block) {
74-
Ok(block) => block,
73+
let last_produced_block_info = match make_produced_block_info(last_produced_block) {
74+
Ok(data) => data,
7575
Err(error) => {
76-
bug_condition!("HeartbeatGet: Failed to encode block, returning None: {error}");
76+
bug_condition!(
77+
"HeartbeatGet: Failed to encode block header, returning None: {error}"
78+
);
7779
None
7880
}
7981
};
@@ -82,7 +84,7 @@ pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcE
8284
status: status.into(),
8385
node_timestamp: meta.time(),
8486
peer_id: store.state().p2p.my_id(),
85-
last_produced_block,
87+
last_produced_block_info,
8688
};
8789
let response = store
8890
.service()
@@ -841,16 +843,26 @@ fn compute_node_status<S: Service>(store: &mut Store<S>) -> RpcNodeStatus {
841843
status
842844
}
843845

844-
fn base64_encode_block(block: Option<ArcBlockWithHash>) -> std::io::Result<Option<String>> {
846+
fn make_produced_block_info(
847+
block: Option<ArcBlockWithHash>,
848+
) -> std::io::Result<Option<ProducedBlockInfo>> {
845849
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
846850
use mina_p2p_messages::binprot::BinProtWrite;
847851

848852
let Some(block) = block else { return Ok(None) };
849853

850-
let mut buf = Vec::with_capacity(10 * 1024 * 1024);
851-
v2::MinaBlockBlockStableV2::binprot_write(&block.block, &mut buf)?;
854+
let height = block.height();
855+
let global_slot = block.global_slot();
856+
let hash = block.hash().to_string();
857+
let mut buf = Vec::with_capacity(5 * 1024 * 1024);
858+
v2::MinaBlockHeaderStableV2::binprot_write(block.header(), &mut buf)?;
852859

853-
let base64_encoded = URL_SAFE.encode(&buf);
860+
let base64_encoded_header = URL_SAFE.encode(&buf);
854861

855-
Ok(Some(base64_encoded))
862+
Ok(Some(ProducedBlockInfo {
863+
height,
864+
global_slot,
865+
hash,
866+
base64_encoded_header,
867+
}))
856868
}

tools/heartbeats-processor/src/local_db.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -338,17 +338,20 @@ pub async fn process_heartbeats(
338338
presence_count += 1;
339339

340340
// Add produced block if it exists
341-
match entry.last_produced_block_decoded() {
342-
Ok(Some(block)) => {
343-
let block_data = entry.last_produced_block_raw().unwrap(); // Cannot fail, we have the block
344-
let key = (public_key_id, block.hash().to_string());
341+
match entry
342+
.last_produced_block_info()
343+
.map(|bi| (bi.clone(), bi.block_header_decoded()))
344+
{
345+
None => (), // No block to process
346+
Some((block_info, Ok(_block_header))) => {
347+
let key = (public_key_id, block_info.hash.clone());
345348

346349
if let Some(first_seen) = seen_blocks.get(&key) {
347350
blocks_duplicate += 1;
348351
println!(
349352
"Duplicate block detected: {} (height: {}, producer: {}, peer_id: {}) [first seen at {}, now at {}]",
350353
key.1,
351-
block.height(),
354+
block_info.height,
352355
entry.submitter,
353356
entry.peer_id().unwrap_or_else(|| "unknown".to_string()),
354357
first_seen,
@@ -361,14 +364,13 @@ pub async fn process_heartbeats(
361364
produced_blocks_batch.push(ProducedBlock {
362365
window_id: window.id.unwrap(),
363366
public_key_id,
364-
block_hash: block.hash().to_string(),
365-
block_height: block.height(),
366-
block_global_slot: block.global_slot(),
367-
block_data,
367+
block_hash: block_info.hash,
368+
block_height: block_info.height,
369+
block_global_slot: block_info.global_slot,
370+
block_data: block_info.base64_encoded_header,
368371
});
369372
}
370-
Ok(None) => (), // No block to process
371-
Err(e) => {
373+
Some((_block_info, Err(e))) => {
372374
println!(
373375
"WARNING: Failed to decode block from {}: {}",
374376
entry.submitter, e
@@ -377,11 +379,11 @@ pub async fn process_heartbeats(
377379
}
378380
}
379381
} else {
380-
if let Ok(Some(block)) = entry.last_produced_block_decoded() {
382+
if let Some(block_info) = entry.last_produced_block_info() {
381383
println!(
382384
"Skipping unsynced block: {} (height: {}, producer: {}, peer_id: {})",
383-
block.hash(),
384-
block.height(),
385+
block_info.hash,
386+
block_info.height,
385387
entry.submitter,
386388
entry.peer_id().unwrap_or_else(|| "unknown".to_string())
387389
);

tools/heartbeats-processor/src/remote_db.rs

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
use std::sync::Arc;
2-
31
use anyhow::Result;
42
use base64::{engine::general_purpose, Engine as _};
53
use chrono::{DateTime, Duration, Utc};
64
use firestore::*;
75
use mina_p2p_messages::v2;
8-
use openmina_core::block::{ArcBlockWithHash, BlockWithHash};
96
use serde::{Deserialize, Serialize};
107
use serde_json::Value;
118

@@ -32,26 +29,34 @@ pub struct HeartbeatEntry {
3229
pub decoded_payload: Option<Value>,
3330
}
3431

32+
#[derive(Clone, Debug, Deserialize)]
33+
pub struct ProducedBlockInfo {
34+
pub height: u32,
35+
pub global_slot: u32,
36+
pub hash: String,
37+
pub base64_encoded_header: String,
38+
}
39+
3540
#[derive(Debug)]
3641
pub struct BlockInfo {
3742
pub hash: String,
3843
pub height: u64,
3944
pub global_slot: u64,
4045
}
4146

42-
fn base64_decode_block(
43-
encoded: &str,
44-
) -> Result<mina_p2p_messages::v2::MinaBlockBlockStableV2, String> {
45-
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
46-
use mina_p2p_messages::binprot::BinProtRead;
47+
impl ProducedBlockInfo {
48+
pub fn block_header_decoded(&self) -> Result<v2::MinaBlockHeaderStableV2, String> {
49+
use base64::{engine::general_purpose::URL_SAFE, Engine as _};
50+
use mina_p2p_messages::binprot::BinProtRead;
4751

48-
let decoded = URL_SAFE
49-
.decode(encoded)
50-
.map_err(|_| "Could not decode base64".to_string())?;
51-
let block = v2::MinaBlockBlockStableV2::binprot_read(&mut &decoded[..])
52-
.map_err(|e| format!("Could not decode block: {:?}", e))?;
52+
let decoded = URL_SAFE
53+
.decode(&self.base64_encoded_header)
54+
.map_err(|_| "Could not decode base64".to_string())?;
55+
let block_header = v2::MinaBlockHeaderStableV2::binprot_read(&mut &decoded[..])
56+
.map_err(|e| format!("Could not decode block header: {:?}", e))?;
5357

54-
Ok(block)
58+
Ok(block_header)
59+
}
5560
}
5661

5762
impl HeartbeatEntry {
@@ -70,22 +75,18 @@ impl HeartbeatEntry {
7075
.map(|s| s.to_string())
7176
}
7277

73-
pub fn last_produced_block_raw(&self) -> Option<String> {
74-
self.decoded_payload
78+
pub fn last_produced_block_info(&self) -> Option<ProducedBlockInfo> {
79+
let result = self
80+
.decoded_payload
7581
.as_ref()
76-
.and_then(|status| status.get("last_produced_block"))
77-
.and_then(|block| block.as_str())
78-
.map(|s| s.to_string())
79-
}
80-
81-
pub fn last_produced_block_decoded(&self) -> Result<Option<ArcBlockWithHash>, String> {
82-
match self.last_produced_block_raw() {
83-
None => Ok(None),
84-
Some(encoded) => {
85-
let block = base64_decode_block(&encoded)?;
86-
let block = BlockWithHash::try_new(Arc::new(block))
87-
.map_err(|e| format!("Invalid block: {}", e))?;
88-
Ok(Some(block))
82+
.and_then(|status| status.get("last_produced_block_info"))
83+
.map(|block_info| serde_json::from_value(block_info.clone()))?;
84+
85+
match result {
86+
Ok(info) => Some(info),
87+
Err(e) => {
88+
eprintln!("Invalid block header: {:?}", e);
89+
None
8990
}
9091
}
9192
}

0 commit comments

Comments
 (0)