Skip to content

Commit f2d7b5b

Browse files
feat: add version check to batcher and client (#503)
1 parent b80d098 commit f2d7b5b

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

batcher/aligned-batcher/src/lib.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ mod zk_utils;
4343

4444
const S3_BUCKET_NAME: &str = "storage.alignedlayer.com";
4545

46+
const PROTOCOL_VERSION: u16 = 0;
47+
4648
pub struct Batcher {
4749
s3_client: S3Client,
4850
eth_ws_provider: Provider<Ws>,
@@ -54,6 +56,7 @@ pub struct Batcher {
5456
max_batch_size: usize,
5557
last_uploaded_batch_block: Mutex<u64>,
5658
pre_verification_is_enabled: bool,
59+
protocol_version: u16,
5760
}
5861

5962
impl Batcher {
@@ -100,6 +103,7 @@ impl Batcher {
100103
max_batch_size: config.batcher.max_batch_size,
101104
last_uploaded_batch_block: Mutex::new(last_uploaded_batch_block),
102105
pre_verification_is_enabled: config.batcher.pre_verification_is_enabled,
106+
protocol_version: PROTOCOL_VERSION,
103107
}
104108
}
105109

@@ -146,8 +150,18 @@ impl Batcher {
146150

147151
debug!("WebSocket connection established: {}", addr);
148152
let (outgoing, incoming) = ws_stream.split();
149-
150153
let outgoing = Arc::new(RwLock::new(outgoing));
154+
155+
// Send the protocol version to the client
156+
let protocol_version_msg = Message::binary(self.protocol_version.to_be_bytes().to_vec());
157+
158+
outgoing
159+
.write()
160+
.await
161+
.send(protocol_version_msg)
162+
.await
163+
.expect("Failed to send protocol version");
164+
151165
match incoming
152166
.try_filter(|msg| future::ready(msg.is_text()))
153167
.try_for_each(|msg| self.clone().handle_message(msg, outgoing.clone()))

batcher/aligned/src/main.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ impl From<ProvingSystemArg> for ProvingSystemId {
168168
}
169169
}
170170

171+
const PROTOCOL_VERSION: u16 = 0;
172+
171173
#[tokio::main]
172174
async fn main() -> Result<(), errors::BatcherClientError> {
173175
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
@@ -182,7 +184,29 @@ async fn main() -> Result<(), errors::BatcherClientError> {
182184
let (ws_stream, _) = connect_async(url).await?;
183185

184186
info!("WebSocket handshake has been successfully completed");
185-
let (mut ws_write, ws_read) = ws_stream.split();
187+
let (mut ws_write, mut ws_read) = ws_stream.split();
188+
189+
// First message from the batcher is the protocol version
190+
if let Some(Ok(msg)) = ws_read.next().await {
191+
match msg.into_data().try_into() {
192+
Ok(data) => {
193+
let current_protocol_version = u16::from_be_bytes(data);
194+
if current_protocol_version > PROTOCOL_VERSION {
195+
info!(
196+
"You are running an old version of the client, update it running:\ncurl -L https://raw.githubusercontent.com/yetanotherco/aligned_layer/main/batcher/aligned/install_aligned.sh | bash\nClient version: {}, Expected version: {}",
197+
PROTOCOL_VERSION, current_protocol_version
198+
);
199+
}
200+
}
201+
Err(_) => {
202+
error!("Error while reading protocol version");
203+
return Ok(());
204+
}
205+
}
206+
} else {
207+
error!("Batcher did not respond with the protocol version");
208+
return Ok(());
209+
}
186210

187211
let batch_inclusion_data_directory_path =
188212
PathBuf::from(&submit_args.batch_inclusion_data_directory_path);
@@ -336,6 +360,7 @@ async fn receive(
336360
*num_responses_lock += 1;
337361

338362
let data = msg.into_data();
363+
339364
match serde_json::from_slice::<BatchInclusionData>(&data) {
340365
Ok(batch_inclusion_data) => {
341366
info!("Received response from batcher");

0 commit comments

Comments
 (0)