Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion protocol-units/da-sequencer/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rust-version.workspace = true

[dependencies]
ed25519-dalek = { workspace = true }
movement-da-sequencer-proto = { workspace = true, features = ["client"] }
movement-da-sequencer-proto = { workspace = true, features = ["client", "server"] }
tonic = { workspace = true, features = ["tls", "tls-webpki-roots"]}
tonic-web = { workspace = true }
hyper-util = { workspace = true }
Expand All @@ -21,5 +21,9 @@ bytes = { workspace = true }
anyhow = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
tokio = { version = "1.38", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"

[lints]
#workspace = true
149 changes: 149 additions & 0 deletions protocol-units/da-sequencer/client/tests/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use movement_da_sequencer_client::DaSequencerClient;
use movement_da_sequencer_proto::blob_response::BlobType;
use movement_da_sequencer_proto::da_sequencer_node_service_server::{
DaSequencerNodeService, DaSequencerNodeServiceServer,
};
use movement_da_sequencer_proto::{
BatchWriteRequest, BatchWriteResponse, BlobResponse, Blockv1, ReadAtHeightRequest,
ReadAtHeightResponse, StreamReadFromHeightRequest, StreamReadFromHeightResponse,
};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::transport::Server;
use tonic::{Request, Response, Status};

struct MockService;

#[tonic::async_trait]
impl DaSequencerNodeService for MockService {
type StreamReadFromHeightStream = ReceiverStream<Result<StreamReadFromHeightResponse, Status>>;

async fn stream_read_from_height(
&self,
_request: Request<StreamReadFromHeightRequest>,
) -> Result<Response<Self::StreamReadFromHeightStream>, Status> {
let (tx, rx) = mpsc::channel(1);

let blob = BlobResponse {
blob_type: Some(BlobType::Blockv1(Blockv1 {
blobckid: vec![],
data: vec![],
height: 0,
})),
};

let _ = tx.send(Ok(StreamReadFromHeightResponse { response: Some(blob) })).await;

Ok(Response::new(ReceiverStream::new(rx)))
}

async fn batch_write(
&self,
_request: Request<BatchWriteRequest>,
) -> Result<Response<BatchWriteResponse>, Status> {
Ok(Response::new(BatchWriteResponse { answer: true }))
}

async fn read_at_height(
&self,
_request: Request<ReadAtHeightRequest>,
) -> Result<Response<ReadAtHeightResponse>, Status> {
let blob = BlobResponse {
blob_type: Some(BlobType::Blockv1(Blockv1 {
blobckid: vec![],
data: vec![],
height: 0,
})),
};

Ok(Response::new(ReadAtHeightResponse { response: Some(blob) }))
}
}

async fn start_mock_server_with_control() -> (SocketAddr, oneshot::Sender<()>) {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
let bound_addr = listener.local_addr().unwrap();
let (shutdown_tx, shutdown_rx) = oneshot::channel();

let service = DaSequencerNodeServiceServer::new(MockService);
tokio::spawn(async move {
Server::builder()
.add_service(service)
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), async {
shutdown_rx.await.ok();
})
.await
.unwrap();
});

(bound_addr, shutdown_tx)
}

#[tokio::test]
async fn test_client_reconnect_if_connection_fails() {
let should_start_server = Arc::new(AtomicBool::new(false));
let signal_server = should_start_server.clone();

let (addr, shutdown_tx) = {
let (tx, rx) = oneshot::channel();
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
let bound_addr = listener.local_addr().unwrap();

let service = DaSequencerNodeServiceServer::new(MockService);

tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the purpose of the oneshot channel. To shut down the server you've to abort the task or stop the server but here, the channel just waits before starting the server.

rx.await.ok();
Server::builder()
.add_service(service)
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.unwrap();
});

(bound_addr, tx)
};

let url = format!("http://{}", addr);

let client_task = tokio::spawn(async move { DaSequencerClient::try_connect(&url).await });

// Wait before triggering the server to simulate retry
tokio::time::sleep(Duration::from_secs(3)).await;
signal_server.store(true, Ordering::Relaxed);
let _ = shutdown_tx.send(());

let result = client_task.await.unwrap();
assert!(result.is_ok(), "Expected client to reconnect after retries, but it failed");
}

#[tokio::test]
async fn test_reopen_block_stream_at_correct_height() {
let (addr, shutdown_tx) = start_mock_server_with_control().await;
let url = format!("http://{}", addr);

// Connect and open stream
let mut client1 = DaSequencerClient::try_connect(&url)
.await
.expect("Failed to connect with client1");

let request = StreamReadFromHeightRequest { height: 0 };
let stream_result1 = client1.stream_read_from_height(request).await;
assert!(stream_result1.is_ok());

// Simulate reconnection
let mut client2 = DaSequencerClient::try_connect(&url)
.await
.expect("Failed to reconnect with client2");

let stream_result2 =
client2.stream_read_from_height(StreamReadFromHeightRequest { height: 0 }).await;
assert!(stream_result2.is_ok(), "Failed to reopen block stream");

let _ = shutdown_tx.send(());
}
Loading