diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 01dffa5f10b..f84af04d5b2 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -71,18 +71,26 @@ jobs: - name: Run the storage-service instance run: | cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE & + - name: Wait for storage service to be ready + run: | + until nc -z 127.0.0.1 1235; do sleep 1; done + - name: Build binaries + run: | + cargo build --features storage-service --bin linera-server --bin linera-proxy --bin linera - name: Run the validators run: | - cargo build --features storage-service mkdir /tmp/local-linera-net - cargo run --features storage-service --bin linera -- net up --storage service:tcp:$LINERA_STORAGE_SERVICE:table --policy-config testnet --path /tmp/local-linera-net --validators 4 --shards 4 & + cargo run --features storage-service --bin linera -- net up --storage service:tcp:$LINERA_STORAGE_SERVICE:table --policy-config testnet --path /tmp/local-linera-net --validators 2 --shards 2 & - name: Create two epochs and run the faucet # See https://github.com/linera-io/linera-protocol/pull/2835 for details. run: | - cargo build --bin linera + mkdir /tmp/linera-faucet cargo run --bin linera -- resource-control-policy --http-request-timeout-ms 1000 cargo run --bin linera -- resource-control-policy --http-request-timeout-ms 500 - cargo run --bin linera -- faucet --amount 1000 --port 8079 & + cargo run --bin linera -- faucet --storage-path /tmp/linera-faucet/faucet_storage.sqlite --amount 1000 --port 8079 & + - name: Wait for faucet to be ready + run: | + until curl -s http://localhost:8079 >/dev/null; do sleep 1; done - name: Run the remote-net tests run: | cargo test -p linera-service remote_net_grpc --features remote-net @@ -193,6 +201,9 @@ jobs: - name: Run the storage-service instance run: | cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE & + - name: Wait for storage service to be ready + run: | + until nc -z 127.0.0.1 1235; do sleep 1; done - name: Run the benchmark test run: | cargo build --locked -p linera-service --bin linera-benchmark --features storage-service @@ -250,6 +261,9 @@ jobs: - name: Run the storage-service instance run: | cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE & + - name: Wait for storage service to be ready + run: | + until nc -z 127.0.0.1 1235; do sleep 1; done - name: Run Ethereum tests run: | cargo test -p linera-ethereum --features ethereum @@ -274,9 +288,14 @@ jobs: run: | cd examples cargo build --locked --release --target wasm32-unknown-unknown - - name: Run the storage-service instance and the storage-service tests + - name: Run the storage-service instance run: | cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE & + - name: Wait for storage service to be ready + run: | + until nc -z 127.0.0.1 1235; do sleep 1; done + - name: Run the storage-service tests + run: | cargo test --features storage-service -- storage_service --nocapture web: diff --git a/CLI.md b/CLI.md index 4275c363093..3a38d78640a 100644 --- a/CLI.md +++ b/CLI.md @@ -727,7 +727,7 @@ Run a GraphQL service to explore and extend the chains of the wallet Run a GraphQL service that exposes a faucet where users can claim tokens. This gives away the chain's tokens, and is mainly intended for testing -**Usage:** `linera faucet [OPTIONS] --amount [CHAIN_ID]` +**Usage:** `linera faucet [OPTIONS] --amount --storage-path [CHAIN_ID]` ###### **Arguments:** diff --git a/Cargo.lock b/Cargo.lock index b7d8d9bb3f2..f22e213893a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5505,6 +5505,7 @@ dependencies = [ "prometheus", "serde", "serde_json", + "sqlx", "tempfile", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 1df68522ad9..7d7c1a4130d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -230,6 +230,7 @@ serde_with = { version = "3", default-features = false, features = [ serde_yaml = "0.9.34" sha3 = "0.10.8" similar-asserts = "1.5.0" +sqlx = "0.8" static_assertions = "1.1.0" stdext = "0.3.3" syn = "2.0.52" diff --git a/linera-faucet/server/Cargo.toml b/linera-faucet/server/Cargo.toml index 6ca1b71c8d0..17291ffd4fa 100644 --- a/linera-faucet/server/Cargo.toml +++ b/linera-faucet/server/Cargo.toml @@ -40,6 +40,11 @@ linera-version.workspace = true prometheus = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true +sqlx = { workspace = true, features = [ + "runtime-tokio-rustls", + "sqlite", + "migrate", +] } tempfile.workspace = true tokio.workspace = true tokio-util.workspace = true diff --git a/linera-faucet/server/src/database.rs b/linera-faucet/server/src/database.rs new file mode 100644 index 00000000000..7a693f87136 --- /dev/null +++ b/linera-faucet/server/src/database.rs @@ -0,0 +1,283 @@ +// Copyright (c) Zefchain Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! SQLite database module for storing chain assignments. + +use std::{collections::BTreeMap, path::PathBuf}; + +use anyhow::Context as _; +use linera_base::{ + bcs, + crypto::CryptoHash, + data_types::{BlockHeight, ChainDescription}, + identifiers::{AccountOwner, ChainId}, +}; +use linera_chain::{data_types::Transaction, types::ConfirmedBlockCertificate}; +use linera_core::client::ChainClient; +use linera_execution::{system::SystemOperation, Operation}; +use linera_storage::Storage; +use sqlx::{ + sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}, + Row, +}; +use tracing::{debug, info}; + +/// SQLite database for persistent storage of chain assignments. +pub struct FaucetDatabase { + pool: SqlitePool, +} + +/// Schema for creating the chains table. +const CREATE_CHAINS_TABLE: &str = r#" +CREATE TABLE IF NOT EXISTS chains ( + owner TEXT PRIMARY KEY NOT NULL, + chain_id TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_chains_chain_id ON chains(chain_id); +"#; + +impl FaucetDatabase { + /// Creates a new SQLite database connection. + pub async fn new(database_path: &PathBuf) -> anyhow::Result { + // Create parent directory if it doesn't exist. + if let Some(parent) = database_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("Failed to create database directory")?; + } + + let database_url = format!("sqlite:{}", database_path.display()); + info!(?database_url, "Connecting to SQLite database"); + + let options = SqliteConnectOptions::new() + .filename(database_path) + .create_if_missing(true); + + let pool = SqlitePoolOptions::new() + .max_connections(5) + .connect_with(options) + .await + .context("Failed to connect to SQLite database")?; + + let db = Self { pool }; + db.initialize_schema().await?; + Ok(db) + } + + /// Initializes the database schema. + async fn initialize_schema(&self) -> anyhow::Result<()> { + sqlx::query(CREATE_CHAINS_TABLE) + .execute(&self.pool) + .await + .context("Failed to create chains table")?; + info!("Database schema initialized"); + Ok(()) + } + + /// Synchronizes the database with the blockchain by traversing block history. + pub async fn sync_with_blockchain(&self, client: &ChainClient) -> anyhow::Result<()> + where + E: linera_core::Environment, + E::Storage: Storage, + { + info!("Starting database synchronization with blockchain"); + + // Build height->hash map and find sync point in a single traversal. + let height_to_hash = self.build_sync_map(client).await?; + + info!( + "Found sync point at height {}, processing {} blocks", + height_to_hash.keys().next().unwrap_or(&BlockHeight::ZERO), + height_to_hash.len() + ); + + // Sync forward from the sync point using the pre-built map. + self.sync_forward_with_map(client, height_to_hash).await?; + + info!("Database synchronization completed"); + Ok(()) + } + + /// Builds a height->hash map and finds the sync point in a single blockchain traversal. + /// Returns (sync_point, height_to_hash_map). + async fn build_sync_map( + &self, + client: &ChainClient, + ) -> anyhow::Result> + where + E: linera_core::Environment, + E::Storage: Storage, + { + let info = client.chain_info().await?; + let end_height = info.next_block_height; + + if end_height == BlockHeight::ZERO { + info!("Chain is empty, no synchronization needed"); + return Ok(BTreeMap::new()); + } + + let mut height_to_hash = BTreeMap::new(); + let mut current_hash = info.block_hash; + + // Traverse backwards to build the height -> hash mapping and find sync point + while let Some(hash) = current_hash { + let certificate = client + .storage_client() + .read_certificate(hash) + .await? + .ok_or_else(|| anyhow::anyhow!("Certificate not found for hash {}", hash))?; + let current_height = certificate.block().header.height; + + // Check if this block's chains are already in our database + let chains_in_block = self.extract_opened_chains(&certificate).await?; + + if !chains_in_block.is_empty() { + let mut all_chains_exist = true; + for (owner, _chain_id) in &chains_in_block { + if self.get_chain_id(owner).await?.is_none() { + all_chains_exist = false; + break; + } + } + + if all_chains_exist { + // All chains from this block are already in the database. + break; + } + } + + // Add to our height->hash map + height_to_hash.insert(current_height, hash); + + // Move to the previous block + current_hash = certificate.block().header.previous_block_hash; + } + + Ok(height_to_hash) + } + + /// Syncs the database using a pre-built height->hash map. + async fn sync_forward_with_map( + &self, + client: &ChainClient, + height_to_hash: BTreeMap, + ) -> anyhow::Result<()> + where + E: linera_core::Environment, + E::Storage: Storage, + { + if height_to_hash.is_empty() { + return Ok(()); + } + + // Process blocks in chronological order (forward) + for (height, hash) in height_to_hash { + let certificate = client + .storage_client() + .read_certificate(hash) + .await? + .ok_or_else(|| anyhow::anyhow!("Certificate not found for hash {}", hash))?; + + let chains_to_store = self.extract_opened_chains(&certificate).await?; + + if !chains_to_store.is_empty() { + info!( + "Processing block at height {height} with {} new chains", + chains_to_store.len() + ); + self.store_chains_batch(chains_to_store).await?; + } + } + + Ok(()) + } + + /// Extracts OpenChain operations from a certificate and returns (owner, chain_id) pairs. + async fn extract_opened_chains( + &self, + certificate: &ConfirmedBlockCertificate, + ) -> anyhow::Result> { + let mut chains = Vec::new(); + let block = certificate.block(); + + // Parse chain descriptions from the block's blobs + let blobs = block.body.blobs.iter().flatten(); + let chain_descriptions = blobs + .map(|blob| bcs::from_bytes::(blob.bytes())) + .collect::, _>>()?; + + let mut chain_desc_iter = chain_descriptions.into_iter(); + + // Examine each transaction in the block + for transaction in &block.body.transactions { + if let Transaction::ExecuteOperation(Operation::System(system_op)) = transaction { + if let SystemOperation::OpenChain(config) = system_op.as_ref() { + // Extract the owner from the OpenChain operation + // We expect single-owner chains from the faucet + let mut owners = config.ownership.all_owners(); + if let Some(owner) = owners.next() { + // Verify it's a single-owner chain (faucet only creates these) + if owners.next().is_none() { + // Get the corresponding chain description from the blobs + if let Some(description) = chain_desc_iter.next() { + chains.push((*owner, description.id())); + debug!( + "Found OpenChain operation for owner {} creating chain {}", + owner, + description.id() + ); + } + } + } + } + } + } + + Ok(chains) + } + + /// Gets the chain ID for an owner if it exists. + pub async fn get_chain_id(&self, owner: &AccountOwner) -> anyhow::Result> { + let owner_str = owner.to_string(); + + let Some(row) = sqlx::query("SELECT chain_id FROM chains WHERE owner = ?") + .bind(&owner_str) + .fetch_optional(&self.pool) + .await? + else { + return Ok(None); + }; + let chain_id_str: String = row.get("chain_id"); + let chain_id: ChainId = chain_id_str.parse()?; + Ok(Some(chain_id)) + } + + /// Stores multiple chain mappings in a single transaction + pub async fn store_chains_batch( + &self, + chains: Vec<(AccountOwner, ChainId)>, + ) -> anyhow::Result<()> { + let mut tx = self.pool.begin().await?; + + for (owner, chain_id) in chains { + let owner_str = owner.to_string(); + let chain_id_str = chain_id.to_string(); + + sqlx::query( + r#" + INSERT OR REPLACE INTO chains (owner, chain_id) + VALUES (?, ?) + "#, + ) + .bind(&owner_str) + .bind(&chain_id_str) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + Ok(()) + } +} diff --git a/linera-faucet/server/src/lib.rs b/linera-faucet/server/src/lib.rs index e5688eccd74..daf01d4c798 100644 --- a/linera-faucet/server/src/lib.rs +++ b/linera-faucet/server/src/lib.rs @@ -5,14 +5,9 @@ //! The server component of the Linera faucet. -use std::{ - collections::{HashMap, VecDeque}, - future::IntoFuture, - io, - net::SocketAddr, - path::PathBuf, - sync::Arc, -}; +mod database; + +use std::{collections::VecDeque, future::IntoFuture, net::SocketAddr, path::PathBuf, sync::Arc}; use anyhow::Context as _; use async_graphql::{EmptySubscription, Error, Schema, SimpleObject}; @@ -23,7 +18,7 @@ use linera_base::{ bcs, crypto::{CryptoHash, ValidatorPublicKey}, data_types::{Amount, ApplicationPermissions, ChainDescription, Timestamp}, - identifiers::{AccountOwner, ChainId}, + identifiers::{AccountOwner, BlobId, BlobType, ChainId}, ownership::ChainOwnership, }; use linera_chain::{ChainError, ChainExecutionContext}; @@ -44,12 +39,14 @@ use linera_execution::{ #[cfg(feature = "metrics")] use linera_metrics::monitoring_server; use linera_storage::{Clock as _, Storage}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use tokio::sync::{oneshot, Notify}; use tokio_util::sync::CancellationToken; use tower_http::cors::CorsLayer; use tracing::info; +use crate::database::FaucetDatabase; + /// Returns an HTML response constructing the GraphiQL web page for the given URI. pub(crate) async fn graphiql(uri: axum::http::Uri) -> impl axum::response::IntoResponse { axum::response::Html( @@ -70,10 +67,11 @@ pub struct QueryRoot { } /// The root GraphQL mutation type. -pub struct MutationRoot { - faucet_storage: Arc>, +pub struct MutationRoot { + faucet_storage: Arc, pending_requests: Arc>>, request_notifier: Arc, + storage: S, } /// The result of a successful `claim` mutation. @@ -91,83 +89,32 @@ pub struct Validator { pub network_address: String, } -/// A pending chain creation request +/// A pending chain creation request. #[derive(Debug)] struct PendingRequest { owner: AccountOwner, responder: oneshot::Sender>, } -/// Configuration for the batch processor +/// Configuration for the batch processor. struct BatchProcessorConfig { amount: Amount, end_timestamp: Timestamp, start_timestamp: Timestamp, start_balance: Amount, - storage_path: PathBuf, max_batch_size: usize, } -/// Batching coordinator for processing chain creation requests +/// Batching coordinator for processing chain creation requests. struct BatchProcessor { config: BatchProcessorConfig, context: Arc>, client: ChainClient, - faucet_storage: Arc>, + faucet_storage: Arc, pending_requests: Arc>>, request_notifier: Arc, } -/// Persistent mapping of account owners to chain descriptions -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -struct FaucetStorage { - /// Maps account owners to their corresponding chain descriptions - owner_to_chain: HashMap, -} - -impl FaucetStorage { - /// Loads the faucet storage from disk, creating a new one if it doesn't exist - async fn load(storage_path: &PathBuf) -> Result { - match tokio::fs::read(storage_path).await { - Ok(data) => serde_json::from_slice(&data).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("Failed to deserialize faucet storage: {}", e), - ) - }), - Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(Self::default()), - Err(e) => Err(e), - } - } - - /// Saves the faucet storage to disk - async fn save(&self, storage_path: &PathBuf) -> Result<(), io::Error> { - let data = serde_json::to_vec_pretty(self).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("Failed to serialize faucet storage: {}", e), - ) - })?; - - // Create parent directory if it doesn't exist - if let Some(parent) = storage_path.parent() { - tokio::fs::create_dir_all(parent).await?; - } - - tokio::fs::write(storage_path, data).await - } - - /// Gets the chain description for an owner if it exists - fn get_chain(&self, owner: &AccountOwner) -> Option<&ChainDescription> { - self.owner_to_chain.get(owner) - } - - /// Stores a new mapping from owner to chain description - fn store_chain(&mut self, owner: AccountOwner, description: ChainDescription) { - self.owner_to_chain.insert(owner, description); - } -} - #[async_graphql::Object(cache_control(no_cache))] impl QueryRoot where @@ -203,27 +150,36 @@ where } #[async_graphql::Object(cache_control(no_cache))] -impl MutationRoot { +impl MutationRoot +where + S: Storage + Send + Sync + 'static, +{ /// Creates a new chain with the given authentication key, and transfers tokens to it. async fn claim(&self, owner: AccountOwner) -> Result { self.do_claim(owner).await } } -impl MutationRoot { +impl MutationRoot +where + S: Storage + Send + Sync + 'static, +{ async fn do_claim(&self, owner: AccountOwner) -> Result { - // Check if this owner already has a chain + // Check if this owner already has a chain. + if let Some(existing_chain_id) = self + .faucet_storage + .get_chain_id(&owner) + .await + .map_err(|e| Error::new(e.to_string()))? { - let storage = self.faucet_storage.lock().await; - if let Some(existing_description) = storage.get_chain(&owner) { - return Ok(existing_description.clone()); - } + // Retrieve the chain description from local storage + return get_chain_description_from_storage(&self.storage, existing_chain_id).await; } - // Create a oneshot channel to receive the result + // Create a oneshot channel to receive the result. let (tx, rx) = oneshot::channel(); - // Add request to the queue + // Add request to the queue. { let mut requests = self.pending_requests.lock().await; requests.push_back(PendingRequest { @@ -232,7 +188,7 @@ impl MutationRoot { }); } - // Notify the batch processor that there's a new request + // Notify the batch processor that there's a new request. self.request_notifier.notify_one(); // Wait for the result @@ -250,16 +206,67 @@ fn multiply(a: u128, b: u64) -> [u64; 3] { [(a1 >> 64) as u64, (a1 & lower) as u64, (a0 & lower) as u64] } +/// Retrieves a chain description from storage by reading the blob directly. +/// +/// This function handles errors appropriately and returns a GraphQL-compatible result. +async fn get_chain_description_from_storage( + storage: &S, + chain_id: ChainId, +) -> Result +where + S: Storage, +{ + // Create blob ID from chain ID - the chain ID is the hash of the chain description blob + let blob_id = BlobId::new(chain_id.0, BlobType::ChainDescription); + + // Read the blob directly from storage + let blob = storage + .read_blob(blob_id) + .await + .map_err(|e| { + tracing::error!( + "Failed to read chain description blob for {}: {}", + chain_id, + e + ); + Error::new(format!( + "Storage error while reading chain description: {e}" + )) + })? + .ok_or_else(|| { + tracing::error!("Chain description blob not found for chain {}", chain_id); + Error::new(format!( + "Chain description not found for chain {}", + chain_id + )) + })?; + + // Deserialize the chain description from the blob bytes + let description = bcs::from_bytes::(blob.bytes()).map_err(|e| { + tracing::error!( + "Failed to deserialize chain description for {}: {}", + chain_id, + e + ); + Error::new(format!( + "Invalid chain description data for chain {}", + chain_id + )) + })?; + + Ok(description) +} + impl BatchProcessor where C: ClientContext + 'static, { - /// Creates a new batch processor + /// Creates a new batch processor. fn new( config: BatchProcessorConfig, context: Arc>, client: ChainClient, - faucet_storage: Arc>, + faucet_storage: Arc, pending_requests: Arc>>, request_notifier: Arc, ) -> Self { @@ -273,7 +280,7 @@ where } } - /// Runs the batch processor loop + /// Runs the batch processor loop. async fn run(&mut self, cancellation_token: CancellationToken) { loop { tokio::select! { @@ -322,32 +329,47 @@ where } } - /// Executes a batch of chain creation requests + /// Executes a batch of chain creation requests. async fn execute_batch( &mut self, batch_requests: Vec, ) -> Result<(), anyhow::Error> { - // Pre-validate: check rate limiting and existing chains + // Pre-validate: check existing chains. let mut valid_requests = Vec::new(); for request in batch_requests { - // Check if this owner already has a chain - { - let storage = self.faucet_storage.lock().await; - if let Some(existing_description) = storage.get_chain(&request.owner) { - let _ = request.responder.send(Ok(existing_description.clone())); + // Check if this owner already has a chain. Otherwise send response immediately. + let response = match self.faucet_storage.get_chain_id(&request.owner).await { + Ok(None) => { + valid_requests.push(request); continue; } + Ok(Some(existing_chain_id)) => { + // Retrieve the chain description from local storage. + get_chain_description_from_storage( + self.client.storage_client(), + existing_chain_id, + ) + .await + } + Err(err) => { + tracing::error!("Database error: {err}"); + Err(Error::new(err.to_string())) + } + }; + if let Err(response) = request.responder.send(response) { + tracing::error!( + "Receiver dropped while sending response {response:?} for request from {}", + request.owner + ); } - - valid_requests.push(request); } if valid_requests.is_empty() { return Ok(()); } - // Rate limiting check for the batch + // Rate limiting check for the batch. if self.config.start_timestamp < self.config.end_timestamp { let local_time = self.client.storage_client().clock().current_time(); if local_time < self.config.end_timestamp { @@ -474,23 +496,31 @@ where for request in valid_requests { let _ = request.responder.send(Err(Error::new(error_msg.clone()))); } - return Err(anyhow::anyhow!(error_msg)); + anyhow::bail!(error_msg); } - // Store results and respond to requests + // Store results. + let chains_to_store = valid_requests + .iter() + .zip(&chain_descriptions) + .map(|(request, description)| (request.owner, description.id())) + .collect(); + + if let Err(e) = self + .faucet_storage + .store_chains_batch(chains_to_store) + .await { - let mut storage = self.faucet_storage.lock().await; - for (request, description) in valid_requests - .into_iter() - .zip(chain_descriptions.into_iter()) - { - storage.store_chain(request.owner, description.clone()); - let _ = request.responder.send(Ok(description)); + let error_msg = format!("Failed to save chains to database: {}", e); + for request in valid_requests { + let _ = request.responder.send(Err(Error::new(error_msg.clone()))); } + anyhow::bail!(error_msg); + } - if let Err(e) = storage.save(&self.config.storage_path).await { - tracing::warn!("Failed to save faucet storage: {}", e); - } + // Respond to requests. + for (request, description) in valid_requests.into_iter().zip(chain_descriptions) { + let _ = request.responder.send(Ok(description)); } Ok(()) @@ -515,10 +545,8 @@ where end_timestamp: Timestamp, start_timestamp: Timestamp, start_balance: Amount, - faucet_storage: Arc>, + faucet_storage: Arc, storage_path: PathBuf, - /// Temporary directory handle to keep it alive (if using temporary storage) - _temp_dir: Option>, /// Batching components pending_requests: Arc>>, request_notifier: Arc, @@ -546,7 +574,6 @@ where start_balance: self.start_balance, faucet_storage: Arc::clone(&self.faucet_storage), storage_path: self.storage_path.clone(), - _temp_dir: self._temp_dir.clone(), pending_requests: Arc::clone(&self.pending_requests), request_notifier: Arc::clone(&self.request_notifier), max_batch_size: self.max_batch_size, @@ -563,7 +590,7 @@ pub struct FaucetConfig { pub end_timestamp: Timestamp, pub genesis_config: Arc, pub chain_listener_config: ChainListenerConfig, - pub storage_path: Option, + pub storage_path: PathBuf, pub max_batch_size: usize, } @@ -583,22 +610,20 @@ where client.process_inbox().await?; let start_balance = client.local_balance().await?; - // Create storage path: use provided path or create temporary directory - let (storage_path, temp_dir) = match config.storage_path { - Some(path) => (path, None), - None => { - let temp_dir = tempfile::tempdir() - .context("Failed to create temporary directory for faucet storage")?; - let storage_path = temp_dir.path().join("faucet_storage.json"); - (storage_path, Some(Arc::new(temp_dir))) - } - }; + // Use provided storage path + let storage_path = config.storage_path.clone(); - // Load the faucet storage - let faucet_storage = FaucetStorage::load(&storage_path) + // Initialize database. + let faucet_storage = FaucetDatabase::new(&storage_path) .await - .context("Failed to load faucet storage")?; - let faucet_storage = Arc::new(Mutex::new(faucet_storage)); + .context("Failed to initialize faucet database")?; + + // Synchronize database with blockchain history. + if let Err(e) = faucet_storage.sync_with_blockchain(&client).await { + tracing::warn!("Failed to synchronize database with blockchain: {}", e); + } + + let faucet_storage = Arc::new(faucet_storage); // Initialize batching components let pending_requests = Arc::new(Mutex::new(VecDeque::new())); @@ -620,18 +645,24 @@ where start_balance, faucet_storage, storage_path, - _temp_dir: temp_dir, pending_requests, request_notifier, max_batch_size: config.max_batch_size, }) } - fn schema(&self) -> Schema, MutationRoot, EmptySubscription> { + fn schema( + &self, + ) -> Schema< + QueryRoot, + MutationRoot<::Storage>, + EmptySubscription, + > { let mutation_root = MutationRoot { faucet_storage: Arc::clone(&self.faucet_storage), pending_requests: Arc::clone(&self.pending_requests), request_notifier: Arc::clone(&self.request_notifier), + storage: self.storage.clone(), }; let query_root = QueryRoot { genesis_config: Arc::clone(&self.genesis_config), @@ -669,7 +700,6 @@ where end_timestamp: self.end_timestamp, start_timestamp: self.start_timestamp, start_balance: self.start_balance, - storage_path: self.storage_path.clone(), max_batch_size: self.max_batch_size, }; let mut batch_processor = BatchProcessor::new( diff --git a/linera-faucet/server/src/tests.rs b/linera-faucet/server/src/tests.rs index ddf63a25695..f0f23c16677 100644 --- a/linera-faucet/server/src/tests.rs +++ b/linera-faucet/server/src/tests.rs @@ -3,7 +3,7 @@ #![allow(clippy::large_futures)] -use std::{collections::VecDeque, path::PathBuf, sync::Arc}; +use std::{collections::VecDeque, sync::Arc}; use futures::lock::Mutex; use linera_base::{ @@ -22,6 +22,8 @@ use tempfile::tempdir; use tokio::sync::{oneshot, Notify}; use tokio_util::sync::CancellationToken; +use crate::database::FaucetDatabase; + struct ClientContext { client: ChainClient, update_calls: usize, @@ -88,8 +90,12 @@ async fn test_faucet_rate_limiting() { update_calls: 0, }; let context = Arc::new(Mutex::new(context)); - let faucet_storage = Arc::new(Mutex::new(super::FaucetStorage::default())); - let storage_path = PathBuf::from("/tmp/test_faucet_rate_limiting.json"); + let temp_dir = tempdir().unwrap(); + let faucet_storage = Arc::new( + FaucetDatabase::new(&temp_dir.path().join("test_faucet_rate_limiting.sqlite")) + .await + .unwrap(), + ); // Set up the batching components let pending_requests = Arc::new(Mutex::new(VecDeque::new())); @@ -100,6 +106,7 @@ async fn test_faucet_rate_limiting() { faucet_storage: Arc::clone(&faucet_storage), pending_requests: Arc::clone(&pending_requests), request_notifier: Arc::clone(&request_notifier), + storage: client.storage_client().clone(), }; // Create the BatchProcessor configuration and instance @@ -108,7 +115,6 @@ async fn test_faucet_rate_limiting() { end_timestamp: Timestamp::from(6000), start_timestamp: Timestamp::from(0), start_balance: Amount::from_tokens(6), - storage_path, max_batch_size: 1, }; @@ -193,7 +199,7 @@ async fn test_batch_size_reduction_on_limit_errors() { // Set up test environment let temp_dir = tempdir().unwrap(); - let storage_path = temp_dir.path().join("test_batch_reduction.json"); + let storage_path = temp_dir.path().join("test_batch_reduction.sqlite"); let storage_builder = MemoryStorageBuilder::default(); let keys = InMemorySigner::new(None); @@ -219,7 +225,7 @@ async fn test_batch_size_reduction_on_limit_errors() { update_calls: 0, })); - let faucet_storage = Arc::new(Mutex::new(super::FaucetStorage::default())); + let faucet_storage = Arc::new(FaucetDatabase::new(&storage_path).await.unwrap()); let pending_requests = Arc::new(Mutex::new(VecDeque::new())); let request_notifier = Arc::new(Notify::new()); @@ -230,7 +236,6 @@ async fn test_batch_size_reduction_on_limit_errors() { start_balance: Amount::from_tokens(100), start_timestamp: Timestamp::from(1000), // start > end disables rate limiting end_timestamp: Timestamp::from(999), - storage_path: storage_path.clone(), max_batch_size: initial_batch_size, }; @@ -271,3 +276,415 @@ async fn test_batch_size_reduction_on_limit_errors() { // Now the batch size should be reduced. assert!(batch_processor.config.max_batch_size < initial_batch_size); } + +#[tokio::test] +async fn test_faucet_persistence() { + // Test that the faucet correctly persists chain IDs and retrieves them after restart. + // This ensures the database is working correctly across sessions. + + let storage_builder = MemoryStorageBuilder::default(); + let keys = InMemorySigner::new(None); + let clock = storage_builder.clock().clone(); + clock.set(Timestamp::from(0)); + let mut builder = TestBuilder::new(storage_builder, 4, 1, keys).await.unwrap(); + let client = builder + .add_root_chain(1, Amount::from_tokens(6)) + .await + .unwrap(); + + let temp_dir = tempdir().unwrap(); + let storage_path = temp_dir.path().join("test_faucet_persistence.sqlite"); + + // Create first faucet instance + let faucet_storage = Arc::new(FaucetDatabase::new(&storage_path).await.unwrap()); + + let context = ClientContext { + client: client.clone(), + update_calls: 0, + }; + let context = Arc::new(Mutex::new(context)); + + // Set up the first MutationRoot instance + let pending_requests = Arc::new(Mutex::new(VecDeque::new())); + let request_notifier = Arc::new(Notify::new()); + + let root = super::MutationRoot { + faucet_storage: Arc::clone(&faucet_storage), + pending_requests: Arc::clone(&pending_requests), + request_notifier: Arc::clone(&request_notifier), + storage: client.storage_client().clone(), + }; + + // Create the BatchProcessor configuration + let batch_config = super::BatchProcessorConfig { + amount: Amount::from_tokens(1), + end_timestamp: Timestamp::from(6000), + start_timestamp: Timestamp::from(0), + start_balance: Amount::from_tokens(6), + max_batch_size: 1, + }; + + let batch_processor = super::BatchProcessor::new( + batch_config, + Arc::clone(&context), + client.clone(), + Arc::clone(&faucet_storage), + Arc::clone(&pending_requests), + Arc::clone(&request_notifier), + ); + + // Start the batch processor + let cancellation_token = CancellationToken::new(); + let processor_task = { + let mut batch_processor = batch_processor; + let token = cancellation_token.clone(); + tokio::spawn(async move { batch_processor.run(token).await }) + }; + + // Set time to allow claims + clock.set(Timestamp::from(1000)); + + // Make first claim with a specific owner + let test_owner_1 = AccountPublicKey::test_key(42).into(); + let test_owner_2 = AccountPublicKey::test_key(43).into(); + + // Claim chains for two different owners + let chain_1 = root + .do_claim(test_owner_1) + .await + .expect("First claim should succeed"); + + clock.set(Timestamp::from(2000)); + let chain_2 = root + .do_claim(test_owner_2) + .await + .expect("Second claim should succeed"); + + // Verify that immediate re-claims return the same chains + let chain_1_again = root + .do_claim(test_owner_1) + .await + .expect("Re-claim should return existing chain"); + assert_eq!( + chain_1.id(), + chain_1_again.id(), + "Should return same chain for same owner" + ); + + let chain_2_again = root + .do_claim(test_owner_2) + .await + .expect("Re-claim should return existing chain"); + assert_eq!( + chain_2.id(), + chain_2_again.id(), + "Should return same chain for same owner" + ); + + // Store the chain IDs for later comparison + let chain_1_id = chain_1.id(); + let chain_2_id = chain_2.id(); + + // Stop the batch processor + cancellation_token.cancel(); + let _ = processor_task.await; + + // Drop the first faucet instance to simulate shutdown + drop(root); + drop(faucet_storage); + + // Create a new faucet instance with the same database path (simulating restart) + let faucet_storage_2 = Arc::new(FaucetDatabase::new(&storage_path).await.unwrap()); + + // Set up the new MutationRoot instance + let pending_requests_2 = Arc::new(Mutex::new(VecDeque::new())); + let request_notifier_2 = Arc::new(Notify::new()); + + let root_2 = super::MutationRoot { + faucet_storage: Arc::clone(&faucet_storage_2), + pending_requests: Arc::clone(&pending_requests_2), + request_notifier: Arc::clone(&request_notifier_2), + storage: client.storage_client().clone(), + }; + + // Create new batch processor for the second instance + let batch_config_2 = super::BatchProcessorConfig { + amount: Amount::from_tokens(1), + end_timestamp: Timestamp::from(6000), + start_timestamp: Timestamp::from(0), + start_balance: Amount::from_tokens(6), + max_batch_size: 1, + }; + let batch_processor_2 = super::BatchProcessor::new( + batch_config_2, + Arc::clone(&context), + client.clone(), + Arc::clone(&faucet_storage_2), + Arc::clone(&pending_requests_2), + Arc::clone(&request_notifier_2), + ); + + // Start the new batch processor + let cancellation_token_2 = CancellationToken::new(); + let processor_task_2 = { + let mut batch_processor = batch_processor_2; + let token = cancellation_token_2.clone(); + tokio::spawn(async move { batch_processor.run(token).await }) + }; + + // Verify that the new instance returns the same chain IDs for the same owners + let chain_1_after_restart = root_2 + .do_claim(test_owner_1) + .await + .expect("Should return existing chain after restart"); + assert_eq!( + chain_1_id, + chain_1_after_restart.id(), + "Chain ID should be preserved after restart for owner 1" + ); + + let chain_2_after_restart = root_2 + .do_claim(test_owner_2) + .await + .expect("Should return existing chain after restart"); + assert_eq!( + chain_2_id, + chain_2_after_restart.id(), + "Chain ID should be preserved after restart for owner 2" + ); + + // Verify that a new owner can still claim a new chain after restart + clock.set(Timestamp::from(3000)); + let test_owner_3 = AccountPublicKey::test_key(44).into(); + let chain_3 = root_2 + .do_claim(test_owner_3) + .await + .expect("New owner should be able to claim after restart"); + + // Verify the new chain is different from the existing ones + assert_ne!( + chain_3.id(), + chain_1_id, + "New chain should have different ID" + ); + assert_ne!( + chain_3.id(), + chain_2_id, + "New chain should have different ID" + ); + + // Clean up + cancellation_token_2.cancel(); + let _ = processor_task_2.await; +} + +#[tokio::test] +async fn test_blockchain_sync_after_database_deletion() { + // Test that the faucet correctly syncs with blockchain after database deletion. + // This verifies that the blockchain synchronization can restore chain mappings from + // the blockchain history when the database is lost or corrupted. + + let storage_builder = MemoryStorageBuilder::default(); + let keys = InMemorySigner::new(None); + let clock = storage_builder.clock().clone(); + clock.set(Timestamp::from(0)); + let mut builder = TestBuilder::new(storage_builder, 4, 1, keys).await.unwrap(); + let client = builder + .add_root_chain(1, Amount::from_tokens(6)) + .await + .unwrap(); + + let temp_dir = tempdir().unwrap(); + let storage_path = temp_dir.path().join("test_blockchain_sync.sqlite"); + + // === PHASE 1: Create chains with first faucet instance === + let faucet_storage = Arc::new(FaucetDatabase::new(&storage_path).await.unwrap()); + let context = ClientContext { + client: client.clone(), + update_calls: 0, + }; + let context = Arc::new(Mutex::new(context)); + + // Set up the first MutationRoot instance + let pending_requests = Arc::new(Mutex::new(VecDeque::new())); + let request_notifier = Arc::new(Notify::new()); + let root = super::MutationRoot { + faucet_storage: Arc::clone(&faucet_storage), + pending_requests: Arc::clone(&pending_requests), + request_notifier: Arc::clone(&request_notifier), + storage: client.storage_client().clone(), + }; + + // Create the BatchProcessor configuration + let batch_config = super::BatchProcessorConfig { + amount: Amount::from_tokens(1), + end_timestamp: Timestamp::from(6000), + start_timestamp: Timestamp::from(0), + start_balance: Amount::from_tokens(6), + max_batch_size: 1, + }; + let batch_processor = super::BatchProcessor::new( + batch_config, + Arc::clone(&context), + client.clone(), + Arc::clone(&faucet_storage), + Arc::clone(&pending_requests), + Arc::clone(&request_notifier), + ); + + // Start the batch processor + let cancellation_token = CancellationToken::new(); + let processor_task = { + let mut batch_processor = batch_processor; + let token = cancellation_token.clone(); + tokio::spawn(async move { batch_processor.run(token).await }) + }; + + // Set time to allow claims + clock.set(Timestamp::from(1000)); + + // Make claims with specific owners to create chain mappings + let test_owner_1 = AccountPublicKey::test_key(100).into(); + let test_owner_2 = AccountPublicKey::test_key(101).into(); + + // Claim chains for two different owners + let chain_1 = root + .do_claim(test_owner_1) + .await + .expect("First claim should succeed"); + + clock.set(Timestamp::from(2000)); + let chain_2 = root + .do_claim(test_owner_2) + .await + .expect("Second claim should succeed"); + + // Store the chain IDs for later comparison + let chain_1_id = chain_1.id(); + let chain_2_id = chain_2.id(); + + // Verify initial state works correctly + let chain_1_again = root + .do_claim(test_owner_1) + .await + .expect("Re-claim should return existing chain"); + assert_eq!( + chain_1_id, + chain_1_again.id(), + "Should return same chain for same owner initially" + ); + + // Stop the batch processor and clean up first instance + cancellation_token.cancel(); + let _ = processor_task.await; + drop(root); + drop(faucet_storage); + + // === PHASE 2: Delete the database file (simulate data loss) === + std::fs::remove_file(&storage_path).expect("Should be able to delete database file"); + + // === PHASE 3: Create new faucet instance (should sync from blockchain) === + let faucet_storage_2 = Arc::new(FaucetDatabase::new(&storage_path).await.unwrap()); + + // CRITICAL: Trigger blockchain sync before using the faucet + faucet_storage_2 + .sync_with_blockchain(&client) + .await + .expect("Blockchain sync should succeed"); + + // Set up the new MutationRoot instance + let pending_requests_2 = Arc::new(Mutex::new(VecDeque::new())); + let request_notifier_2 = Arc::new(Notify::new()); + + let root_2 = super::MutationRoot { + faucet_storage: Arc::clone(&faucet_storage_2), + pending_requests: Arc::clone(&pending_requests_2), + request_notifier: Arc::clone(&request_notifier_2), + storage: client.storage_client().clone(), + }; + + // Create new batch processor for the second instance + let batch_config_2 = super::BatchProcessorConfig { + amount: Amount::from_tokens(1), + end_timestamp: Timestamp::from(6000), + start_timestamp: Timestamp::from(0), + start_balance: Amount::from_tokens(6), + max_batch_size: 1, + }; + let batch_processor_2 = super::BatchProcessor::new( + batch_config_2, + Arc::clone(&context), + client.clone(), + Arc::clone(&faucet_storage_2), + Arc::clone(&pending_requests_2), + Arc::clone(&request_notifier_2), + ); + + // Start the new batch processor + let cancellation_token_2 = CancellationToken::new(); + let processor_task_2 = { + let mut batch_processor = batch_processor_2; + let token = cancellation_token_2.clone(); + tokio::spawn(async move { batch_processor.run(token).await }) + }; + + // === PHASE 4: Verify blockchain sync restored the correct mappings === + + // Test that the blockchain sync correctly restored the chain mappings + let chain_1_after_sync = root_2 + .do_claim(test_owner_1) + .await + .expect("Should return existing chain after blockchain sync"); + assert_eq!( + chain_1_id, + chain_1_after_sync.id(), + "Chain ID should be correctly restored from blockchain for owner 1" + ); + + let chain_2_after_sync = root_2 + .do_claim(test_owner_2) + .await + .expect("Should return existing chain after blockchain sync"); + assert_eq!( + chain_2_id, + chain_2_after_sync.id(), + "Chain ID should be correctly restored from blockchain for owner 2" + ); + + // === PHASE 5: Verify new claims still work correctly === + + // Verify that a completely new owner can still claim a new chain + clock.set(Timestamp::from(3000)); + let test_owner_3 = AccountPublicKey::test_key(102).into(); + let chain_3 = root_2 + .do_claim(test_owner_3) + .await + .expect("New owner should be able to claim after sync"); + + // Verify the new chain is different from the existing ones + assert_ne!( + chain_3.id(), + chain_1_id, + "New chain should have different ID from synced chains" + ); + assert_ne!( + chain_3.id(), + chain_2_id, + "New chain should have different ID from synced chains" + ); + + // Verify that the new chain mapping is also persisted + let chain_3_again = root_2 + .do_claim(test_owner_3) + .await + .expect("Re-claim should return the new chain"); + assert_eq!( + chain_3.id(), + chain_3_again.id(), + "New chain should be persistent after sync" + ); + + // Clean up + cancellation_token_2.cancel(); + let _ = processor_task_2.await; +} diff --git a/linera-indexer/lib/Cargo.toml b/linera-indexer/lib/Cargo.toml index 5c3f0edfc57..37373db55ac 100644 --- a/linera-indexer/lib/Cargo.toml +++ b/linera-indexer/lib/Cargo.toml @@ -42,7 +42,7 @@ linera-version.workspace = true linera-views.workspace = true prost.workspace = true reqwest.workspace = true -sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "sqlite"] } +sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] } thiserror.workspace = true tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tokio-stream = "0.1" diff --git a/linera-service/src/cli/command.rs b/linera-service/src/cli/command.rs index 2eedd5c8153..a8f4bf5a215 100644 --- a/linera-service/src/cli/command.rs +++ b/linera-service/src/cli/command.rs @@ -765,7 +765,7 @@ pub enum ClientCommand { /// Path to the persistent storage file for faucet mappings. #[arg(long)] - storage_path: Option, + storage_path: PathBuf, /// Maximum number of operations to include in a single block (default: 100). #[arg(long, default_value = "100")] diff --git a/linera-service/src/cli_wrappers/wallet.rs b/linera-service/src/cli_wrappers/wallet.rs index 3437a333423..150bced41ce 100644 --- a/linera-service/src/cli_wrappers/wallet.rs +++ b/linera-service/src/cli_wrappers/wallet.rs @@ -544,12 +544,19 @@ impl ClientWrapper { amount: Amount, ) -> Result { let port = port.into().unwrap_or(8080); + let temp_dir = tempfile::tempdir() + .context("Failed to create temporary directory for faucet storage")?; + let storage_path = temp_dir.path().join("faucet_storage.sqlite"); let mut command = self.command().await?; let child = command .arg("faucet") .arg(chain_id.to_string()) .args(["--port".to_string(), port.to_string()]) .args(["--amount".to_string(), amount.to_string()]) + .args([ + "--storage-path".to_string(), + storage_path.to_string_lossy().to_string(), + ]) .spawn_into()?; let client = reqwest_client(); for i in 0..10 { @@ -560,7 +567,7 @@ impl ClientWrapper { .await; if request.is_ok() { info!("Faucet has started"); - return Ok(FaucetService::new(port, child)); + return Ok(FaucetService::new(port, child, temp_dir)); } else { warn!("Waiting for faucet to start"); } @@ -1530,11 +1537,16 @@ impl NodeService { pub struct FaucetService { port: u16, child: Child, + _temp_dir: tempfile::TempDir, } impl FaucetService { - fn new(port: u16, child: Child) -> Self { - Self { port, child } + fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self { + Self { + port, + child, + _temp_dir: temp_dir, + } } pub async fn terminate(mut self) -> Result<()> {