Skip to content

feat: Error sink service #1097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 314 additions & 11 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ members = [
"tools/fuzzing",
"tools/archive-breadcrumb-compare",
"tools/heartbeats-processor",
"tools/error-sink-service",
"tools/webrtc-sniffer",

"producer-dashboard",
Expand Down
140 changes: 120 additions & 20 deletions node/common/src/service/block_producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod vrf_evaluator;

use std::sync::Arc;
use std::{io::Write, sync::Arc};

use ledger::proofs::{
block::BlockParams, generate_block_proof, provers::BlockProver,
Expand Down Expand Up @@ -107,13 +107,30 @@ fn prover_loop(
let res = prove(provers, &mut input, &keypair, false);
if let Err(error) = &res {
openmina_core::error!(message = "Block proof failed", error = format!("{error:?}"));
if let Err(error) = dump_failed_block_proof_input(block_hash.clone(), input, error) {
openmina_core::error!(
message = "Failure when dumping failed block proof inputs",
error = format!("{error}")
);
let submission_url = std::env::var("OPENMINA_ERROR_SINK_SERVICE_URL").ok();
if let Some(submission_url) = submission_url {
if let Err(error) = submit_failed_block_proof_input(
block_hash.clone(),
input,
error,
&submission_url,
) {
openmina_core::error!(
message = "Failed to submit failed block proof",
error = format!("{error}")
);
}
} else {
if let Err(error) = dump_failed_block_proof_input(block_hash.clone(), input, error)
{
openmina_core::error!(
message = "Failure when dumping failed block proof inputs",
error = format!("{error}")
);
}
}
}
// TODO: error must include the input
let res = res.map_err(|err| err.to_string());
let _ = event_sender.send(BlockProducerEvent::BlockProve(block_hash, res).into());
}
Expand Down Expand Up @@ -181,11 +198,20 @@ impl node::service::BlockProducerService for crate::NodeService {
}
}

fn dump_failed_block_proof_input(
/// Represents the destination for failed block proof data
pub enum BlockProofOutputDestination {
/// Save the proof data to a file in the debug directory
FilesystemDump,
/// Submit the proof data to an external service via HTTP
ErrorService(String),
}

fn handle_failed_block_proof_input(
block_hash: StateHash,
mut input: Box<ProverExtendBlockchainInputStableV2>,
error: &anyhow::Error,
) -> std::io::Result<()> {
destination: BlockProofOutputDestination,
) -> anyhow::Result<()> {
use ledger::proofs::transaction::ProofError;
use rsa::Pkcs1v15Encrypt;

Expand Down Expand Up @@ -229,7 +255,7 @@ kGqG7QLzSPjAtP/YbUponwaD+t+A0kBg0hV4hhcJOkPeA2NOi04K93bz3HuYCVRe

let error_str = error.to_string();

let input = DumpBlockProof {
let input_data = DumpBlockProof {
input,
key: encrypted_producer_private_key,
error: error_str.as_bytes().to_vec(),
Expand All @@ -239,15 +265,89 @@ kGqG7QLzSPjAtP/YbUponwaD+t+A0kBg0hV4hhcJOkPeA2NOi04K93bz3HuYCVRe
},
};

let debug_dir = openmina_core::get_debug_dir();
let filename = debug_dir
.join(format!("failed_block_proof_input_{block_hash}.binprot"))
.to_string_lossy()
.to_string();
openmina_core::warn!(message = "Dumping failed block proof.", filename = filename);
std::fs::create_dir_all(&debug_dir)?;
let mut file = std::fs::File::create(&filename)?;
input.binprot_write(&mut file)?;
file.sync_all()?;
Ok(())
// Serialize the data
let mut buffer = Vec::new();
input_data.binprot_write(&mut buffer)?;

// Handle the data according to the destination
match destination {
BlockProofOutputDestination::FilesystemDump => {
let debug_dir = openmina_core::get_debug_dir();
let filename = debug_dir
.join(format!("failed_block_proof_input_{block_hash}.binprot"))
.to_string_lossy()
.to_string();
openmina_core::warn!(message = "Dumping failed block proof.", filename = filename);
std::fs::create_dir_all(&debug_dir)?;
let mut file = std::fs::File::create(&filename)?;
file.write_all(&buffer)?;
file.sync_all()?;
Ok(())
}
BlockProofOutputDestination::ErrorService(url) => {
use reqwest::blocking::Client;

openmina_core::warn!(
message = "Submitting failed block proof to external service.",
block_hash = format!("{block_hash}"),
url = url
);

let client = Client::new();
let response = client
.post(&url)
.header("Content-Type", "application/octet-stream")
.body(buffer)
.send()?;

// Check if the request was successful
if response.status().is_success() {
openmina_core::info!(
message = "Successfully submitted failed block proof.",
block_hash = format!("{block_hash}"),
status = response.status().as_u16()
);
Ok(())
} else {
let error_message = format!(
"Failed to submit block proof: HTTP error {}",
response.status()
);
openmina_core::error!(
message = "Failed to submit block proof",
block_hash = format!("{block_hash}"),
status = response.status().as_u16()
);
Err(anyhow::anyhow!(error_message))
}
}
}
}

fn dump_failed_block_proof_input(
block_hash: StateHash,
input: Box<ProverExtendBlockchainInputStableV2>,
error: &anyhow::Error,
) -> std::io::Result<()> {
handle_failed_block_proof_input(
block_hash,
input,
error,
BlockProofOutputDestination::FilesystemDump,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}

pub fn submit_failed_block_proof_input(
block_hash: StateHash,
input: Box<ProverExtendBlockchainInputStableV2>,
error: &anyhow::Error,
submission_url: &str,
) -> anyhow::Result<()> {
handle_failed_block_proof_input(
block_hash,
input,
error,
BlockProofOutputDestination::ErrorService(submission_url.to_string()),
)
}
10 changes: 10 additions & 0 deletions node/common/src/service/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
use super::{
archive::{config::ArchiveStorageOptions, ArchiveService},
block_producer::BlockProducerService,
error_sink::ErrorSinkService,
};

pub struct NodeServiceCommonBuilder {
Expand All @@ -41,6 +42,7 @@ pub struct NodeServiceCommonBuilder {
p2p: Option<P2pServiceCtx>,
gather_stats: bool,
rpc: RpcService,
error_sink: Option<ErrorSinkService>,
}

#[derive(thiserror::Error, Debug, Clone)]
Expand All @@ -59,6 +61,7 @@ impl NodeServiceCommonBuilder {
rng: StdRng::from_seed(rng_seed),
event_sender,
event_receiver: event_receiver.into(),
error_sink: None,
ledger_manager: None,
block_producer: None,
archive: None,
Expand Down Expand Up @@ -86,6 +89,12 @@ impl NodeServiceCommonBuilder {
self
}

pub fn error_sink_init(&mut self, error_sink_url: String) -> &mut Self {
let error_sink = ErrorSinkService::start(error_sink_url);
self.error_sink = Some(error_sink);
self
}

pub fn block_producer_init(
&mut self,
keypair: AccountSecretKey,
Expand Down Expand Up @@ -144,6 +153,7 @@ impl NodeServiceCommonBuilder {
snark_block_proof_verify: NodeService::snark_block_proof_verifier_spawn(
self.event_sender,
),
error_sink: self.error_sink,
ledger_manager,
block_producer: self.block_producer,
// initialized in state machine.
Expand Down
94 changes: 94 additions & 0 deletions node/common/src/service/error_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use node::core::{channels::mpsc, thread};

pub struct ErrorSinkService {
error_report_sender: mpsc::TrackedUnboundedSender<(String, Vec<u8>)>,
}

impl ErrorSinkService {
pub fn new(error_report_sender: mpsc::TrackedUnboundedSender<(String, Vec<u8>)>) -> Self {
Self {
error_report_sender,
}
}

pub fn start(_url: String) -> Self {
let (error_report_sender, error_report_receiver) = mpsc::unbounded_channel();

thread::Builder::new()
.name("openmina_error_sink".to_owned())
.spawn(move || {
error_sink_loop(error_report_receiver);
})
.unwrap();

ErrorSinkService::new(error_report_sender)
}

pub fn pending_reports(&self) -> usize {
self.error_report_sender.len()
}

pub fn submit_error_report(
&self,
category: &str,
payload: Vec<u8>,
) -> Result<(), mpsc::SendError<(String, Vec<u8>)>> {
self.error_report_sender
.tracked_send((category.to_string(), payload))
}
}

fn error_sink_loop(mut rx: mpsc::TrackedUnboundedReceiver<(String, Vec<u8>)>) {
while let Some(msg) = rx.blocking_recv() {
let (category, payload) = msg.0;
openmina_core::debug!(
message = "Processing error report",
category = category,
data_size = payload.len()
);

let submission_url = std::env::var("OPENMINA_ERROR_SINK_SERVICE_URL").ok();

if let Some(url) = submission_url {
if let Err(err) = submit_error_report(&category, &payload, &url) {
openmina_core::error!(
message = "Failed to submit error report",
category = category,
error = format!("{}", err)
);
} else {
openmina_core::debug!(
message = "Successfully submitted error report",
category = category
);
}
} else {
openmina_core::warn!(
message = "No error sink URL configured, skipping report submission",
category = category
);
}
}
}

fn submit_error_report(category: &str, payload: &[u8], url: &str) -> anyhow::Result<()> {
// TODO: Implement the actual submission logic to the external service
// This would likely use reqwest or a similar HTTP client to send the data

todo!("Implement error report submission to external service");

// Example implementation might look like:
// let client = reqwest::blocking::Client::new();
// let response = client
// .post(url)
// .header("Content-Type", "application/octet-stream")
// .header("X-Error-Category", category)
// .body(data.to_vec())
// .send()?;
//
// if response.status().is_success() {
// Ok(())
// } else {
// Err(anyhow::anyhow!("Failed with status: {}", response.status()))
// }
}
1 change: 1 addition & 0 deletions node/common/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub use event_receiver::*;

pub mod archive;
pub mod block_producer;
pub mod error_sink;
pub mod p2p;
pub mod record;
pub mod replay;
Expand Down
23 changes: 23 additions & 0 deletions node/common/src/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::rpc::RpcReceiver;
use super::{
archive::ArchiveService,
block_producer::BlockProducerService,
error_sink::ErrorSinkService,
p2p::webrtc_with_libp2p::P2pServiceCtx,
replay::ReplayerState,
rpc::{RpcSender, RpcService},
Expand All @@ -41,6 +42,8 @@ pub struct NodeService {

pub snark_block_proof_verify: mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs>,

pub error_sink: Option<ErrorSinkService>,

pub ledger_manager: LedgerManager,
pub snark_worker: Option<SnarkWorker>,
pub block_producer: Option<BlockProducerService>,
Expand Down Expand Up @@ -118,6 +121,7 @@ impl NodeService {
event_receiver: mpsc::unbounded_channel().1.into(),
snark_block_proof_verify: mpsc::unbounded_channel().0,
ledger_manager: LedgerManager::spawn(Default::default()),
error_sink: None,
snark_worker: None,
block_producer: None,
archive: None,
Expand Down Expand Up @@ -187,6 +191,25 @@ impl redux::TimeService for NodeService {
}
}

impl node::service::ErrorSinkService for NodeService {
fn submit_error_report_payload(&mut self, category: &str, payload: Vec<u8>) {
if let Some(error_sink) = &self.error_sink {
if let Err(err) = error_sink.submit_error_report(category, payload) {
openmina_core::error!(
message = "Failed to queue error report",
category = category,
error = format!("{}", err)
);
}
} else {
openmina_core::warn!(
message = "Error sink not initialized, dropping error report",
category = category
);
}
}
}

impl node::service::EventSourceService for NodeService {
fn next_event(&mut self) -> Option<Event> {
self.event_receiver.try_next()
Expand Down
6 changes: 6 additions & 0 deletions node/native/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ impl NodeBuilder {
Ok(self)
}

/// Set up error sink.
pub fn error_sink(&mut self, error_sink_url: String) -> &mut Self {
self.service.error_sink_init(error_sink_url);
self
}

/// Set up block producer.
pub fn block_producer(
&mut self,
Expand Down
Loading
Loading