diff --git a/crates/grpc/src/server.rs b/crates/grpc/src/server.rs index 0c61a90e6..362267150 100644 --- a/crates/grpc/src/server.rs +++ b/crates/grpc/src/server.rs @@ -28,6 +28,14 @@ pub enum Error { /// Server has already been stopped. #[error("gRPC server has already been stopped")] AlreadyStopped, + + /// IO error from binding the TCP listener. + #[error(transparent)] + Io(#[from] std::io::Error), + + /// Error from creating the TCP incoming stream. + #[error("Failed to create TCP incoming stream: {0}")] + Incoming(String), } /// Handle to a running gRPC server. @@ -102,8 +110,9 @@ impl GrpcServer { /// Starts the gRPC server. /// - /// This method spawns the server on a new Tokio task and returns a handle - /// that can be used to manage the server. + /// This method binds to the given address, spawns the server on a new Tokio task, + /// and returns a handle that can be used to manage the server. The server is + /// guaranteed to be listening for connections when this method returns. pub async fn start(&self, addr: SocketAddr) -> Result { // Build reflection service for tooling support (grpcurl, Postman, etc.) let reflection_service = tonic_reflection::server::Builder::configure() @@ -114,23 +123,33 @@ impl GrpcServer { // Create shutdown channel let (shutdown_tx, mut shutdown_rx) = watch::channel(()); + // Bind the TCP listener BEFORE spawning the server task. This ensures: + // 1. The port is resolved (important when addr uses port 0 for auto-assignment) + // 2. The server is accepting connections when this method returns + let listener = tokio::net::TcpListener::bind(addr).await?; + let actual_addr = listener.local_addr()?; + + let incoming = tonic::transport::server::TcpIncoming::from_listener(listener, true, None) + .map_err(|e| Error::Incoming(e.to_string()))?; + let mut builder = Server::builder().timeout(self.timeout); let server = builder.add_routes(self.routes.clone()).add_service(reflection_service); - // Start the server with graceful shutdown - let server_future = server.serve_with_shutdown(addr, async move { - let _ = shutdown_rx.changed().await; - }); - + // Start the server with the already-bound listener tokio::spawn(async move { - if let Err(error) = server_future.await { + if let Err(error) = server + .serve_with_incoming_shutdown(incoming, async move { + let _ = shutdown_rx.changed().await; + }) + .await + { error!(target: "grpc", %error, "gRPC server error"); } }); - info!(target: "grpc", %addr, "gRPC server started."); + info!(target: "grpc", addr = %actual_addr, "gRPC server started."); - Ok(GrpcServerHandle { addr, shutdown_tx: Arc::new(shutdown_tx) }) + Ok(GrpcServerHandle { addr: actual_addr, shutdown_tx: Arc::new(shutdown_tx) }) } } diff --git a/crates/utils/src/bin/generate_migration_db.rs b/crates/utils/src/bin/generate_migration_db.rs index 3d557f15e..ee4517604 100644 --- a/crates/utils/src/bin/generate_migration_db.rs +++ b/crates/utils/src/bin/generate_migration_db.rs @@ -33,7 +33,9 @@ fn copy_dir_all(src: &Path, dst: &Path) -> std::io::Result<()> { let dst_path = dst.join(entry.file_name()); if entry.file_type()?.is_dir() { copy_dir_all(&entry.path(), &dst_path)?; - } else { + } else if entry.file_name() != "mdbx.lck" { + // Skip mdbx.lck as it contains platform-specific data (pthread mutexes, + // process IDs) that is not portable. MDBX creates a fresh lock file on open. std::fs::copy(entry.path(), dst_path)?; } } @@ -82,8 +84,16 @@ fn create_tar_gz(db_dir: &Path, output: &Path) -> std::io::Result<()> { async fn main() { let args = Args::parse(); - println!("Starting node with test_config()..."); - let node = TestNode::new_with_config(test_config()).await; + // Use a persistent database directory with SyncMode::Durable instead of the default + // in-memory database (which uses SyncMode::UtterlyNoSync). UtterlyNoSync doesn't + // guarantee that committed data is flushed to disk, which can produce corrupted + // snapshots when the database files are archived. + let db_dir = tempfile::tempdir().expect("failed to create temp dir for database"); + let mut config = test_config(); + config.db.dir = Some(db_dir.path().to_path_buf()); + + println!("Starting node with persistent database at {}...", db_dir.path().display()); + let node = TestNode::new_with_config(config).await; println!("Migrating example '{}'...", args.example); match args.example.as_str() { @@ -99,9 +109,14 @@ async fn main() { } } - // Get the database path from the running node + // Get the database path before stopping the node. let db_path = node.handle().node().db().path().to_path_buf(); + // Stop the node to properly close the MDBX environment and ensure all writes are + // flushed to disk before archiving. + println!("Stopping node..."); + node.stop().await.expect("failed to stop node"); + println!("Creating archive at {}...", args.output.display()); let output_abs = std::env::current_dir().unwrap().join(&args.output); create_tar_gz(&db_path, &output_abs).expect("failed to create tar.gz archive"); diff --git a/crates/utils/src/node.rs b/crates/utils/src/node.rs index e2d2f12b6..3900a55be 100644 --- a/crates/utils/src/node.rs +++ b/crates/utils/src/node.rs @@ -122,6 +122,14 @@ impl TestNode { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/db/simple"); Self::new_from_db(&db_path).await } + + /// Stops the node and releases all resources including the database. + /// + /// This ensures the MDBX environment is properly closed and all pending writes are + /// flushed. Must be called before archiving or copying database files. + pub async fn stop(self) -> anyhow::Result<()> { + self.node.stop().await + } } impl ForkTestNode { @@ -367,3 +375,51 @@ pub fn test_config() -> Config { Config { sequencing, rpc, dev, chain: ChainSpec::Dev(chain).into(), grpc, ..Default::default() } } + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::copy_db_dir; + + /// Verifies that the spawn_and_move database fixture can be opened without corruption. + /// + /// This test catches the bug where `generate_migration_db` produced corrupted snapshots + /// by archiving the MDBX database files while the environment was still open under + /// `SyncMode::UtterlyNoSync`. + #[test] + fn open_spawn_and_move_db_fixture() { + let fixture_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../../tests/fixtures/db/spawn_and_move"); + + if !fixture_path.exists() { + // Skip if fixtures haven't been extracted (e.g. local dev without `make fixtures`) + eprintln!("Skipping: fixture not found at {}", fixture_path.display()); + return; + } + + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + copy_db_dir(&fixture_path, temp_dir.path()).expect("failed to copy db files"); + + // This is the exact call that fails with MDBX_CORRUPTED when the fixture is bad. + let db = katana_db::Db::open_no_sync(temp_dir.path()); + assert!(db.is_ok(), "fixture database is corrupted: {}", db.unwrap_err()); + } + + #[test] + fn open_simple_db_fixture() { + let fixture_path = + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/db/simple"); + + if !fixture_path.exists() { + eprintln!("Skipping: fixture not found at {}", fixture_path.display()); + return; + } + + let temp_dir = tempfile::tempdir().expect("failed to create temp dir"); + copy_db_dir(&fixture_path, temp_dir.path()).expect("failed to copy db files"); + + let db = katana_db::Db::open_no_sync(temp_dir.path()); + assert!(db.is_ok(), "fixture database is corrupted: {}", db.unwrap_err()); + } +} diff --git a/tests/fixtures/db/simple.tar.gz b/tests/fixtures/db/simple.tar.gz index 12111d9e6..a289128ed 100644 Binary files a/tests/fixtures/db/simple.tar.gz and b/tests/fixtures/db/simple.tar.gz differ diff --git a/tests/fixtures/db/spawn_and_move.tar.gz b/tests/fixtures/db/spawn_and_move.tar.gz index 32981b059..0b513b6d1 100644 Binary files a/tests/fixtures/db/spawn_and_move.tar.gz and b/tests/fixtures/db/spawn_and_move.tar.gz differ