Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion protocol-units/da-sequencer/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ rust-version.workspace = true

[dependencies]
ed25519-dalek = { workspace = true }
movement-da-sequencer-proto = { workspace = true, features = ["client"] }
futures-util = "0.3"
futures-core = "0.3"
futures = { workspace = true }
movement-da-sequencer-proto = { workspace = true, features = ["client", "server"] }
tonic = { workspace = true, features = ["tls", "tls-webpki-roots"]}
tonic-web = { workspace = true }
hyper-util = { workspace = true }
Expand All @@ -20,6 +23,8 @@ http-body-util = { workspace = true }
bytes = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }

[lints]
#workspace = true
169 changes: 130 additions & 39 deletions protocol-units/da-sequencer/client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,59 @@
use ed25519_dalek::Signer;
use ed25519_dalek::{Signature, SigningKey};
use anyhow::Result;
use ed25519_dalek::{Signature, Signer, SigningKey};
use futures_core::Stream;
use futures_util::stream::unfold;
use movement_da_sequencer_proto::da_sequencer_node_service_client::DaSequencerNodeServiceClient;
use movement_da_sequencer_proto::{
BatchWriteRequest, BatchWriteResponse, StreamReadFromHeightRequest,
StreamReadFromHeightResponse,
};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tonic::transport::{Channel, ClientTlsConfig};
use tonic::{Status, Streaming};

/// A wrapping MovementDaLightNodeClients over complex types.
///
/// The usage of hype by tonic and related libraries makes it very difficult to maintain generic types for the clients.
/// This simplifies client construction and usage.
#[derive(Debug, Clone)]
pub struct DaSequencerClient {
client: DaSequencerNodeServiceClient<tonic::transport::Channel>,
client: DaSequencerNodeServiceClient<Channel>,
connection_string: String,
last_received_height: Arc<Mutex<Option<u64>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_received_height is a u64 to you can use AtomicU64 that is more efficient than a mutex.

Copy link
Contributor Author

@andygolay andygolay Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should still use Arc though? In that case, in try_connect, what value should be used in the returned client?

We had

last_received_height: Arc::new(Mutex::new(None)),

So I'm thinking either

last_received_height: Arc::new(AtomicU64::new(0)),

or

last_received_height: Arc::new(AtomicU64::new(u64::MAX))

would make sense?

I'm thinking MAX would be better, to represent that no value is set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes Max is better for a non Value. You still need the Arc to share the Atomic reference.

}

impl DaSequencerClient {
/// Creates an http2 connection to the Da Sequencer node service.
pub async fn try_connect(connection_string: &str) -> Result<Self, anyhow::Error> {
pub async fn try_connect(connection_string: &str) -> Result<Self> {
for _ in 0..5 {
match DaSequencerClient::connect(connection_string).await {
Ok(client) => return Ok(DaSequencerClient { client }),
match Self::connect(connection_string).await {
Ok(client) => {
return Ok(Self {
client,
connection_string: connection_string.to_string(),
last_received_height: Arc::new(Mutex::new(None)),
});
}
Err(err) => {
tracing::warn!(
"DA sequencer Http2 connection failed: {}. Retrying in 10s...",
"DA sequencer HTTP/2 connection failed: {}. Retrying in 2s...",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set to 10 sec because it takes more than 2s to restart the DA.

err
);
std::thread::sleep(std::time::Duration::from_secs(10));
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
return Err(anyhow::anyhow!(
"Error DA Sequencer Http2 connection failed more than 5 time aborting.",
));
}

/// Stream reads from a given height.
pub async fn stream_read_from_height(
&mut self,
request: movement_da_sequencer_proto::StreamReadFromHeightRequest,
) -> Result<
tonic::Streaming<movement_da_sequencer_proto::StreamReadFromHeightResponse>,
tonic::Status,
> {
let response = self.client.stream_read_from_height(request).await?;
Ok(response.into_inner())
}

/// Writes a batch of transactions to the light node
pub async fn batch_write(
&mut self,
request: movement_da_sequencer_proto::BatchWriteRequest,
) -> Result<movement_da_sequencer_proto::BatchWriteResponse, tonic::Status> {
let response = self.client.batch_write(request).await?;
Ok(response.into_inner())
Err(anyhow::anyhow!("Connection failed more than 5 times"))
}

/// Connects to a da sequencer node service using the given connection string.
async fn connect(
connection_string: &str,
) -> Result<DaSequencerNodeServiceClient<tonic::transport::Channel>, anyhow::Error> {
tracing::info!("Grpc client connect using :{connection_string}");
/// Opens a raw tonic connection to the DA service.
async fn connect(connection_string: &str) -> Result<DaSequencerNodeServiceClient<Channel>> {
tracing::info!("Grpc client connect using: {}", connection_string);
let endpoint = Channel::from_shared(connection_string.to_string())?;

// Dynamically configure TLS based on the scheme (http or https)
let endpoint = if connection_string.starts_with("https://") {
endpoint
.tls_config(ClientTlsConfig::new().with_enabled_roots())?
Expand All @@ -71,9 +63,108 @@ impl DaSequencerClient {
};

let channel = endpoint.connect().await?;
let client = DaSequencerNodeServiceClient::new(channel);
Ok(DaSequencerNodeServiceClient::new(channel))
}

/// Reconnects the internal gRPC client.
async fn reconnect(&mut self) -> Result<()> {
tracing::info!("Reconnecting to {}", self.connection_string);
let client = Self::connect(&self.connection_string).await?;
self.client = client;
Ok(())
}

/// Streams blocks starting from a height, with reconnect and resume.
pub async fn stream_read_from_height(
&mut self,
start_request: StreamReadFromHeightRequest,
) -> Result<
Pin<Box<dyn Stream<Item = Result<StreamReadFromHeightResponse, Status>> + Send>>,
Status,
> {
let height = {
let last = self.last_received_height.lock().unwrap();
if let Some(last_h) = *last {
tracing::info!("Resuming stream from height: {}", last_h + 1);
last_h + 1
} else {
tracing::info!("Starting stream from requested height: {}", start_request.height);
start_request.height
}
};

match self
.client
.stream_read_from_height(StreamReadFromHeightRequest { height })
.await
{
Ok(response) => Ok(Self::wrap_stream_with_height_tracking(
response.into_inner(),
Arc::clone(&self.last_received_height),
)),
Err(e) => {
tracing::warn!("stream_read_from_height failed, trying reconnect: {e}");
self.reconnect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You try only one time immediately. You should have more retry and wait a little before retrying.

.await
.map_err(|e| Status::unavailable(format!("Reconnect failed: {e}")))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error should be return (ie: no ?) until the end of the reconnect loop.


let response = self
.client
.stream_read_from_height(StreamReadFromHeightRequest { height })
.await?;

Ok(Self::wrap_stream_with_height_tracking(
response.into_inner(),
Arc::clone(&self.last_received_height),
))
}
}
}

/// Wraps a stream to track and store the last received height.
fn wrap_stream_with_height_tracking(
stream: Streaming<StreamReadFromHeightResponse>,
last_received_height: Arc<Mutex<Option<u64>>>,
) -> Pin<Box<dyn Stream<Item = Result<StreamReadFromHeightResponse, Status>> + Send>> {
let wrapped = unfold((stream, last_received_height), |(mut s, tracker)| async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should call tracker last_received_height

match s.message().await {
Ok(Some(msg)) => {
if let Some(ref blob) = msg.response {
if let Some(height) = blob.blob_type.as_ref().and_then(|b| match b {
movement_da_sequencer_proto::blob_response::BlobType::Blockv1(
inner,
) => Some(inner.height),
_ => None,
}) {
*tracker.lock().unwrap() = Some(height);
}
}
Some((Ok(msg), (s, tracker)))
}
Ok(None) => None,
Err(e) => Some((Err(e), (s, tracker))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the stream break or return an error, we should try to reconnect. But it starts to be very complicated.

}
});

Ok(client)
Box::pin(wrapped)
}

/// Sends a batch write request with reconnect on failure.
pub async fn batch_write(
&mut self,
request: BatchWriteRequest,
) -> Result<BatchWriteResponse, Status> {
match self.client.batch_write(request.clone()).await {
Ok(response) => Ok(response.into_inner()),
Err(_) => {
self.reconnect()
.await
.map_err(|e| Status::unavailable(format!("Reconnect failed: {}", e)))?;

let response = self.client.batch_write(request).await?;
Ok(response.into_inner())
}
}
}
}

Expand Down
Loading
Loading