Skip to content

Commit 766fa70

Browse files
committed
feat: proof processing via channels
1 parent 08d51b6 commit 766fa70

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

crates/batcher/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ mod eth;
6161
mod ffi;
6262
pub mod gnark;
6363
pub mod metrics;
64+
mod proof_processor;
6465
pub mod retry;
6566
pub mod risc_zero;
6667
pub mod s3;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use aligned_sdk::common::types::SubmitProofResponseMessage;
4+
use ethers::abi::Address;
5+
use tokio::sync::{
6+
mpsc::{self, Receiver, Sender},
7+
oneshot, Mutex,
8+
};
9+
10+
use crate::types::batch_state::BatchState;
11+
12+
pub enum ProofProcessorMessage {
13+
ValidateProof {
14+
response_tx: oneshot::Sender<SubmitProofResponseMessage>,
15+
},
16+
StopProcessing {
17+
response_tx: oneshot::Sender<bool>,
18+
},
19+
RestartProcessing,
20+
}
21+
22+
pub struct ProofProcessorForwarder {
23+
tx: Sender<ProofProcessorMessage>,
24+
}
25+
26+
impl ProofProcessorForwarder {
27+
/// Validates a proof for the given proof
28+
pub fn validate_proof(&self) {}
29+
/// Stops the processing of proofs and returns true when all the ongoing messages have been processed
30+
pub fn stop_processing(&self) {}
31+
/// Restarts the processing of proofs
32+
pub fn restart_processing(&self) {}
33+
}
34+
35+
pub struct ProofProcessor {
36+
user_mutexes: HashMap<Address, Mutex<bool>>,
37+
batch_state: Arc<Mutex<BatchState>>,
38+
rx: Receiver<ProofProcessorMessage>,
39+
processing_stopped: bool,
40+
current_messages: Arc<Mutex<usize>>,
41+
}
42+
43+
impl ProofProcessor {
44+
pub fn new(batch_state: Arc<Mutex<BatchState>>) -> (Self, Sender<ProofProcessorMessage>) {
45+
let (tx, rx) = mpsc::channel::<ProofProcessorMessage>(100);
46+
let processor = Self {
47+
batch_state,
48+
user_mutexes: HashMap::new(),
49+
rx,
50+
processing_stopped: false,
51+
current_messages: Arc::new(Mutex::new(0)),
52+
};
53+
(processor, tx)
54+
}
55+
56+
async fn start(&mut self) {
57+
self.recv().await;
58+
}
59+
60+
async fn recv(&mut self) {
61+
while let Some(message) = self.rx.recv().await {
62+
match message {
63+
ProofProcessorMessage::ValidateProof { response_tx } => {
64+
// check if processing should stop because the batcher requested for it
65+
if self.processing_stopped {
66+
continue;
67+
}
68+
69+
*self.current_messages.lock().await += 1;
70+
let current_messages_clone = self.current_messages.clone();
71+
let batch_state_clone = self.batch_state.clone();
72+
tokio::task::spawn(async move {
73+
// TODO: lock user mutex check, if is being attended stop
74+
let result = Self::handle_validate_proof_message(batch_state_clone).await;
75+
let _ = response_tx.send(result);
76+
*current_messages_clone.lock().await -= 1;
77+
});
78+
}
79+
ProofProcessorMessage::StopProcessing { response_tx } => {
80+
self.processing_stopped = true;
81+
// wait until it can lock all the mutex of the users
82+
// we know that no new mutex would appear since we have set processing_stopped = true
83+
for mutex in self.user_mutexes.values() {
84+
let _ = mutex.lock().await;
85+
}
86+
87+
let _ = response_tx.send(true);
88+
}
89+
ProofProcessorMessage::RestartProcessing => self.processing_stopped = false,
90+
}
91+
}
92+
}
93+
94+
async fn handle_validate_proof_message(
95+
batch_state: Arc<Mutex<BatchState>>,
96+
) -> SubmitProofResponseMessage {
97+
SubmitProofResponseMessage::InvalidMaxFee
98+
}
99+
}

0 commit comments

Comments
 (0)