Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions crates/grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ pub enum Error {
/// Server has already been stopped.
#[error("gRPC server has already been stopped")]
AlreadyStopped,

/// IO error from binding the TCP listener.
#[error(transparent)]
Io(#[from] std::io::Error),

/// Error from creating the TCP incoming stream.
#[error("Failed to create TCP incoming stream: {0}")]
Incoming(String),
}

/// Handle to a running gRPC server.
Expand Down Expand Up @@ -102,8 +110,9 @@ impl GrpcServer {

/// Starts the gRPC server.
///
/// This method spawns the server on a new Tokio task and returns a handle
/// that can be used to manage the server.
/// This method binds to the given address, spawns the server on a new Tokio task,
/// and returns a handle that can be used to manage the server. The server is
/// guaranteed to be listening for connections when this method returns.
pub async fn start(&self, addr: SocketAddr) -> Result<GrpcServerHandle, Error> {
// Build reflection service for tooling support (grpcurl, Postman, etc.)
let reflection_service = tonic_reflection::server::Builder::configure()
Expand All @@ -114,23 +123,33 @@ impl GrpcServer {
// Create shutdown channel
let (shutdown_tx, mut shutdown_rx) = watch::channel(());

// Bind the TCP listener BEFORE spawning the server task. This ensures:
// 1. The port is resolved (important when addr uses port 0 for auto-assignment)
// 2. The server is accepting connections when this method returns
let listener = tokio::net::TcpListener::bind(addr).await?;
let actual_addr = listener.local_addr()?;

let incoming = tonic::transport::server::TcpIncoming::from_listener(listener, true, None)
.map_err(|e| Error::Incoming(e.to_string()))?;

let mut builder = Server::builder().timeout(self.timeout);
let server = builder.add_routes(self.routes.clone()).add_service(reflection_service);

// Start the server with graceful shutdown
let server_future = server.serve_with_shutdown(addr, async move {
let _ = shutdown_rx.changed().await;
});

// Start the server with the already-bound listener
tokio::spawn(async move {
if let Err(error) = server_future.await {
if let Err(error) = server
.serve_with_incoming_shutdown(incoming, async move {
let _ = shutdown_rx.changed().await;
})
.await
{
error!(target: "grpc", %error, "gRPC server error");
}
});

info!(target: "grpc", %addr, "gRPC server started.");
info!(target: "grpc", addr = %actual_addr, "gRPC server started.");

Ok(GrpcServerHandle { addr, shutdown_tx: Arc::new(shutdown_tx) })
Ok(GrpcServerHandle { addr: actual_addr, shutdown_tx: Arc::new(shutdown_tx) })
}
}

Expand Down
23 changes: 19 additions & 4 deletions crates/utils/src/bin/generate_migration_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ fn copy_dir_all(src: &Path, dst: &Path) -> std::io::Result<()> {
let dst_path = dst.join(entry.file_name());
if entry.file_type()?.is_dir() {
copy_dir_all(&entry.path(), &dst_path)?;
} else {
} else if entry.file_name() != "mdbx.lck" {
// Skip mdbx.lck as it contains platform-specific data (pthread mutexes,
// process IDs) that is not portable. MDBX creates a fresh lock file on open.
std::fs::copy(entry.path(), dst_path)?;
}
}
Expand Down Expand Up @@ -82,8 +84,16 @@ fn create_tar_gz(db_dir: &Path, output: &Path) -> std::io::Result<()> {
async fn main() {
let args = Args::parse();

println!("Starting node with test_config()...");
let node = TestNode::new_with_config(test_config()).await;
// Use a persistent database directory with SyncMode::Durable instead of the default
// in-memory database (which uses SyncMode::UtterlyNoSync). UtterlyNoSync doesn't
// guarantee that committed data is flushed to disk, which can produce corrupted
// snapshots when the database files are archived.
let db_dir = tempfile::tempdir().expect("failed to create temp dir for database");
let mut config = test_config();
config.db.dir = Some(db_dir.path().to_path_buf());

println!("Starting node with persistent database at {}...", db_dir.path().display());
let node = TestNode::new_with_config(config).await;

println!("Migrating example '{}'...", args.example);
match args.example.as_str() {
Expand All @@ -99,9 +109,14 @@ async fn main() {
}
}

// Get the database path from the running node
// Get the database path before stopping the node.
let db_path = node.handle().node().db().path().to_path_buf();

// Stop the node to properly close the MDBX environment and ensure all writes are
// flushed to disk before archiving.
println!("Stopping node...");
node.stop().await.expect("failed to stop node");

println!("Creating archive at {}...", args.output.display());
let output_abs = std::env::current_dir().unwrap().join(&args.output);
create_tar_gz(&db_path, &output_abs).expect("failed to create tar.gz archive");
Expand Down
56 changes: 56 additions & 0 deletions crates/utils/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ impl TestNode {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/db/simple");
Self::new_from_db(&db_path).await
}

/// Stops the node and releases all resources including the database.
///
/// This ensures the MDBX environment is properly closed and all pending writes are
/// flushed. Must be called before archiving or copying database files.
pub async fn stop(self) -> anyhow::Result<()> {
self.node.stop().await
}
}

impl ForkTestNode {
Expand Down Expand Up @@ -367,3 +375,51 @@ pub fn test_config() -> Config {

Config { sequencing, rpc, dev, chain: ChainSpec::Dev(chain).into(), grpc, ..Default::default() }
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

use super::copy_db_dir;

/// Verifies that the spawn_and_move database fixture can be opened without corruption.
///
/// This test catches the bug where `generate_migration_db` produced corrupted snapshots
/// by archiving the MDBX database files while the environment was still open under
/// `SyncMode::UtterlyNoSync`.
#[test]
fn open_spawn_and_move_db_fixture() {
let fixture_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("../../tests/fixtures/db/spawn_and_move");

if !fixture_path.exists() {
// Skip if fixtures haven't been extracted (e.g. local dev without `make fixtures`)
eprintln!("Skipping: fixture not found at {}", fixture_path.display());
return;
}

let temp_dir = tempfile::tempdir().expect("failed to create temp dir");
copy_db_dir(&fixture_path, temp_dir.path()).expect("failed to copy db files");

// This is the exact call that fails with MDBX_CORRUPTED when the fixture is bad.
let db = katana_db::Db::open_no_sync(temp_dir.path());
assert!(db.is_ok(), "fixture database is corrupted: {}", db.unwrap_err());
}

#[test]
fn open_simple_db_fixture() {
let fixture_path =
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/db/simple");

if !fixture_path.exists() {
eprintln!("Skipping: fixture not found at {}", fixture_path.display());
return;
}

let temp_dir = tempfile::tempdir().expect("failed to create temp dir");
copy_db_dir(&fixture_path, temp_dir.path()).expect("failed to copy db files");

let db = katana_db::Db::open_no_sync(temp_dir.path());
assert!(db.is_ok(), "fixture database is corrupted: {}", db.unwrap_err());
}
}
Binary file modified tests/fixtures/db/simple.tar.gz
Binary file not shown.
Binary file modified tests/fixtures/db/spawn_and_move.tar.gz
Binary file not shown.
Loading