Skip to content

Commit 7d04367

Browse files
entropidelicuri-99
andauthored
refactor(batcher): add user state & other code quality refactors (#1106)
Co-authored-by: Urix <[email protected]>
1 parent 34fb21e commit 7d04367

File tree

12 files changed

+917
-662
lines changed

12 files changed

+917
-662
lines changed

batcher/Cargo.lock

Lines changed: 135 additions & 49 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::sync::Arc;
2+
3+
use aligned_sdk::{
4+
communication::serialization::cbor_serialize,
5+
core::types::{BatchInclusionData, ResponseMessage, VerificationCommitmentBatch},
6+
};
7+
use futures_util::{stream::SplitSink, SinkExt};
8+
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
9+
use log::{error, info};
10+
use serde::Serialize;
11+
use tokio::{net::TcpStream, sync::RwLock};
12+
use tokio_tungstenite::{
13+
tungstenite::{Error, Message},
14+
WebSocketStream,
15+
};
16+
17+
use crate::types::{batch_queue::BatchQueueEntry, errors::BatcherError};
18+
19+
pub(crate) type WsMessageSink = Arc<RwLock<SplitSink<WebSocketStream<TcpStream>, Message>>>;
20+
21+
pub(crate) async fn send_batch_inclusion_data_responses(
22+
finalized_batch: Vec<BatchQueueEntry>,
23+
batch_merkle_tree: &MerkleTree<VerificationCommitmentBatch>,
24+
) -> Result<(), BatcherError> {
25+
for (vd_batch_idx, entry) in finalized_batch.iter().enumerate() {
26+
let batch_inclusion_data = BatchInclusionData::new(vd_batch_idx, batch_merkle_tree);
27+
let response = ResponseMessage::BatchInclusionData(batch_inclusion_data);
28+
29+
let serialized_response = cbor_serialize(&response)
30+
.map_err(|e| BatcherError::SerializationError(e.to_string()))?;
31+
32+
let Some(ws_sink) = entry.messaging_sink.as_ref() else {
33+
return Err(BatcherError::WsSinkEmpty);
34+
};
35+
36+
let sending_result = ws_sink
37+
.write()
38+
.await
39+
.send(Message::binary(serialized_response))
40+
.await;
41+
42+
match sending_result {
43+
Err(Error::AlreadyClosed) => (),
44+
Err(e) => error!("Error while sending batch inclusion data response: {}", e),
45+
Ok(_) => (),
46+
}
47+
48+
info!("Response sent");
49+
}
50+
51+
Ok(())
52+
}
53+
54+
pub(crate) async fn send_message<T: Serialize>(ws_conn_sink: WsMessageSink, message: T) {
55+
match cbor_serialize(&message) {
56+
Ok(serialized_response) => {
57+
if let Err(err) = ws_conn_sink
58+
.write()
59+
.await
60+
.send(Message::binary(serialized_response))
61+
.await
62+
{
63+
error!("Error while sending message: {}", err)
64+
}
65+
}
66+
Err(e) => error!("Error while serializing message: {}", e),
67+
}
68+
}

0 commit comments

Comments
 (0)