Skip to content

Commit 69b2e6c

Browse files
musitdevnicholasflintwillow
authored andcommitted
fix: Update light client protocol and heartbeat. (#1064)
1 parent ffb4633 commit 69b2e6c

File tree

17 files changed

+232
-52
lines changed

17 files changed

+232
-52
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

networks/movement/movement-full-node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ movement-signer = { workspace = true }
4747
movement-signer-loader = { workspace = true }
4848
syncador = { workspace = true }
4949
syncup = { workspace = true }
50+
chrono = { workspace = true }
5051

5152
[features]
5253
default = []

networks/movement/movement-full-node/src/da/stream_blocks/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,30 @@ impl StreamBlocks {
3838
.ok_or(anyhow::anyhow!("No blob type in response"))?
3939
{
4040
blob_response::BlobType::SequencedBlobBlock(blob) => {
41+
tracing::info!("Receive SequencedBlobBlock blob");
4142
(blob.data, blob.timestamp, blob.blob_id, blob.height)
4243
}
44+
blob_response::BlobType::PassedThroughBlob(blob) => {
45+
tracing::info!("Receive PassedThroughBlob blob");
46+
(blob.data, blob.timestamp, blob.blob_id, blob.height)
47+
}
48+
blob_response::BlobType::Heartbeat(_) => {
49+
tracing::info!("Receive heartbeat blob");
50+
continue;
51+
}
4352
_ => {
4453
anyhow::bail!("Invalid blob type in response")
4554
}
4655
};
47-
info!("{} {} {}", hex::encode(block_id), block_timestamp, da_height);
56+
// pretty print (with labels) the block_id, block_timestamp, and da_height
57+
tracing::info!(
58+
"Block ID: {}, Block Timestamp: {:?}, DA Height: {}",
59+
hex::encode(block_id),
60+
// unix date string from the block timestamp which is in microseconds
61+
chrono::DateTime::from_timestamp_micros(block_timestamp as i64)
62+
.context("Failed to convert timestamp to date")?,
63+
da_height
64+
);
4865
}
4966

5067
info!("Finished streaming blocks from DA");

networks/movement/movement-full-node/src/node/tasks/execute_settle.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ where
7373
let mut blocks_from_da = self
7474
.da_light_node_client
7575
.stream_read_from_height(StreamReadFromHeightRequest { height: synced_height })
76-
.await?;
76+
.await
77+
.map_err(|e| {
78+
error!("Failed to stream blocks from DA: {:?}", e);
79+
e
80+
})?;
7781

7882
loop {
7983
select! {
@@ -112,6 +116,11 @@ where
112116
blob_response::BlobType::PassedThroughBlob(blob) => {
113117
(blob.data, blob.timestamp, blob.blob_id, blob.height)
114118
}
119+
blob_response::BlobType::Heartbeat(_) => {
120+
tracing::info!("Receive DA heartbeat");
121+
// Do nothing.
122+
return Ok(());
123+
}
115124
_ => anyhow::bail!("Invalid blob type"),
116125
};
117126

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
syntax = "proto3";
2+
package movementlabs.protocol_units.da.light_node.v1beta2;
3+
4+
5+
// Request and response messages
6+
message Blob {
7+
bytes blob_id = 1;
8+
bytes data = 2;
9+
uint64 height = 3;
10+
bytes signature = 4;
11+
uint64 timestamp = 5;
12+
bytes signer = 6;
13+
}
14+
15+
message BlobResponse {
16+
oneof blob_type {
17+
Blob passed_through_blob = 1;
18+
Blob sequenced_blob_intent = 2;
19+
Blob sequenced_blob_block = 3;
20+
bool heartbeat = 4;
21+
}
22+
}
23+
24+
message BlobWrite {
25+
bytes data = 1;
26+
}
27+
28+
// StreamReadAtHeight
29+
message StreamReadFromHeightRequest {
30+
uint64 height = 1;
31+
}
32+
33+
message StreamReadFromHeightResponse {
34+
BlobResponse blob = 1;
35+
}
36+
37+
// StreamReadLatest
38+
message StreamReadLatestRequest {
39+
40+
}
41+
42+
message StreamReadLatestResponse {
43+
BlobResponse blob = 1;
44+
}
45+
46+
// StreamWriteBlob
47+
message StreamWriteBlobRequest {
48+
BlobWrite blob = 1;
49+
}
50+
51+
message StreamWriteBlobResponse {
52+
BlobResponse blob = 1;
53+
}
54+
55+
// ReadAtHeight
56+
message ReadAtHeightRequest {
57+
uint64 height = 1;
58+
}
59+
60+
message ReadAtHeightResponse {
61+
repeated BlobResponse blobs = 1;
62+
}
63+
64+
// BatchRead
65+
message BatchReadRequest {
66+
repeated uint64 heights = 1;
67+
}
68+
69+
message BatchReadResponse {
70+
repeated ReadAtHeightResponse responses = 1;
71+
}
72+
73+
message BatchWriteRequest {
74+
repeated BlobWrite blobs = 1;
75+
}
76+
77+
message BatchWriteResponse {
78+
repeated BlobResponse blobs = 1;
79+
}
80+
81+
82+
83+
// LightNode service definition
84+
service LightNodeService {
85+
// Stream blobs from a specified height or from the latest height.
86+
rpc StreamReadFromHeight (StreamReadFromHeightRequest) returns (stream StreamReadFromHeightResponse);
87+
rpc StreamReadLatest (StreamReadLatestRequest) returns (stream StreamReadLatestResponse);
88+
89+
// Stream blobs out, either individually or in batches.
90+
rpc StreamWriteBlob (stream StreamWriteBlobRequest) returns (stream StreamWriteBlobResponse);
91+
92+
// Read blobs at a specified height.
93+
rpc ReadAtHeight (ReadAtHeightRequest) returns (ReadAtHeightResponse);
94+
95+
// Batch read and write operations for efficiency.
96+
rpc BatchRead (BatchReadRequest) returns (BatchReadResponse);
97+
rpc BatchWrite (BatchWriteRequest) returns (BatchWriteResponse);
98+
99+
}

protocol-units/da/movement/celestia/light-node/src/v1/passthrough.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ where
184184
verified_blobs.push(blob);
185185
}
186186
Err(e) => {
187+
tracing::error!(error = %e, "ICI failed to verify blob");
187188
error!(error = %e, "failed to verify blob");
188189
}
189190
}
@@ -265,7 +266,7 @@ where
265266

266267
while let Some(blob) = blob_stream.next().await {
267268

268-
debug!("Stream got blob: {:?}", blob);
269+
debug!("Back fetch Stream got blob: {:?}", blob);
269270

270271
yield blob?;
271272
}

protocol-units/da/movement/protocol/da/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ async-stream = { workspace = true }
2323
tokio = { workspace = true }
2424
tracing = { workspace = true }
2525
anyhow = { workspace = true }
26+
hex = { workspace = true }
2627

2728
[lints]
2829
workspace = true

protocol-units/da/movement/protocol/da/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub enum DaError {
5454
/// Trait for DA operations.
5555
pub trait DaOperations<C>: Send + Sync
5656
where
57-
C: Curve + Send + Sync + 'static,
57+
C: Curve + Send + Sync + 'static + std::fmt::Debug,
5858
{
5959
fn submit_blob(
6060
&self,

protocol-units/da/movement/protocol/da/src/mock/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ where
6464

6565
impl<C> DaOperations<C> for Mock<C>
6666
where
67-
C: Curve + Send + Sync + 'static,
67+
C: Curve + Send + Sync + 'static + std::fmt::Debug,
6868
{
6969
fn submit_blob(
7070
&self,

protocol-units/da/movement/protocol/light-node/src/passthrough.rs

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use movement_da_light_node_digest_store::da::Da as DigestStoreDa;
1010
use movement_da_light_node_proto::light_node_service_server::LightNodeService;
1111
use movement_da_light_node_proto::*;
1212
use movement_da_light_node_verifier::signed::InKnownSignersVerifier;
13-
use movement_da_light_node_verifier::{Error as VerifierError, VerifierOperations};
13+
use movement_da_light_node_verifier::VerifierOperations;
1414
use movement_da_util::{
1515
blob::ir::blob::DaBlob, blob::ir::data::InnerSignedBlobV1Data, config::Config,
1616
};
@@ -35,7 +35,8 @@ where
3535
+ Serialize
3636
+ for<'de> Deserialize<'de>
3737
+ Clone
38-
+ 'static,
38+
+ 'static
39+
+ std::fmt::Debug,
3940
Da: DaOperations<C>,
4041
V: VerifierOperations<DaBlob<C>, DaBlob<C>>,
4142
{
@@ -56,7 +57,8 @@ where
5657
+ Serialize
5758
+ for<'de> Deserialize<'de>
5859
+ Clone
59-
+ 'static,
60+
+ 'static
61+
+ std::fmt::Debug,
6062
Da: DaOperations<C>,
6163
V: VerifierOperations<DaBlob<C>, DaBlob<C>>,
6264
{
@@ -113,7 +115,8 @@ where
113115
+ Serialize
114116
+ for<'de> Deserialize<'de>
115117
+ Clone
116-
+ 'static,
118+
+ 'static
119+
+ std::fmt::Debug,
117120
Da: DaOperations<C> + 'static,
118121
V: VerifierOperations<DaBlob<C>, DaBlob<C>> + Send + Sync + 'static,
119122
{
@@ -135,31 +138,51 @@ where
135138
let verifier = self.verifier.clone();
136139
let height = request.into_inner().height;
137140

141+
// Tick interval for generating HeartBeat.
142+
let mut tick_interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
143+
138144
let output = async_stream::try_stream! {
139145

140146
let mut blob_stream = da.stream_da_blobs_from_height(height).await.map_err(|e| tonic::Status::internal(e.to_string()))?;
141147

142-
while let Some(blob) = blob_stream.next().await {
143-
let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
144-
match verifier.verify(da_blob, height.as_u64()).await {
145-
Ok(verifed_blob) => {
146-
let blob = verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?;
147-
let response = StreamReadFromHeightResponse {
148-
blob: Some(blob)
149-
};
150-
yield response;
151-
},
152-
Err(VerifierError::Validation(e)) => {
153-
info!("Failed to verify blob: {}", e);
154-
},
155-
Err(VerifierError::Internal(e)) => {
156-
Err(tonic::Status::internal(e.to_string()))?;
148+
loop {
149+
let response_content = tokio::select! {
150+
// Yield from the data stream
151+
block_opt = blob_stream.next() => {
152+
match block_opt {
153+
Some(Ok((height, da_blob))) => {
154+
match verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string())).and_then(|verifed_blob| {
155+
verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))
156+
}) {
157+
Ok(blob) => blob,
158+
Err(err) => {
159+
// Not verified block, skip to next one.
160+
tracing::warn!("Stream blob of height: {} fail to verify error:{err}", height.as_u64());
161+
continue;
162+
}
163+
}
164+
}
165+
Some(Err(err)) => {
166+
tracing::warn!("Stream blob return an error, exit stream :{err}");
167+
return;
168+
},
169+
None => {
170+
tracing::warn!("Stream blob closed , exit stream.");
171+
return;
172+
}
173+
}
174+
}
175+
// Yield the periodic tick
176+
_ = tick_interval.tick() => {
177+
//Heart beat. The value can be use to indicate some status.
178+
BlobResponse { blob_type: Some(movement_da_light_node_proto::blob_response::BlobType::Heartbeat(true)) }
157179
}
158-
}
180+
};
181+
let response = StreamReadFromHeightResponse {
182+
blob: Some(response_content)
183+
};
184+
yield response;
159185
}
160-
161-
info!("Stream read from height closed for height: {}", height);
162-
163186
};
164187

165188
Ok(tonic::Response::new(Box::pin(output) as Self::StreamReadFromHeightStream))

0 commit comments

Comments
 (0)