|
| 1 | +// Copyright (c) Zefchain Labs, Inc. |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +//! SQLite database module for storing chain assignments. |
| 5 | +
|
| 6 | +use std::{collections::BTreeMap, path::PathBuf}; |
| 7 | + |
| 8 | +use anyhow::Context as _; |
| 9 | +use linera_base::{ |
| 10 | + bcs, |
| 11 | + crypto::CryptoHash, |
| 12 | + data_types::{BlockHeight, ChainDescription}, |
| 13 | + identifiers::{AccountOwner, ChainId}, |
| 14 | +}; |
| 15 | +use linera_chain::{data_types::Transaction, types::ConfirmedBlockCertificate}; |
| 16 | +use linera_core::client::ChainClient; |
| 17 | +use linera_execution::{system::SystemOperation, Operation}; |
| 18 | +use linera_storage::Storage; |
| 19 | +use sqlx::{ |
| 20 | + sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}, |
| 21 | + Row, |
| 22 | +}; |
| 23 | +use tracing::{debug, info}; |
| 24 | + |
| 25 | +/// SQLite database for persistent storage of chain assignments. |
| 26 | +pub struct FaucetDatabase { |
| 27 | + pool: SqlitePool, |
| 28 | +} |
| 29 | + |
| 30 | +/// Schema for creating the chains table. |
| 31 | +const CREATE_CHAINS_TABLE: &str = r#" |
| 32 | +CREATE TABLE IF NOT EXISTS chains ( |
| 33 | + owner TEXT PRIMARY KEY NOT NULL, |
| 34 | + chain_id TEXT NOT NULL, |
| 35 | + created_at DATETIME DEFAULT CURRENT_TIMESTAMP |
| 36 | +); |
| 37 | +
|
| 38 | +CREATE INDEX IF NOT EXISTS idx_chains_chain_id ON chains(chain_id); |
| 39 | +"#; |
| 40 | + |
| 41 | +impl FaucetDatabase { |
| 42 | + /// Creates a new SQLite database connection. |
| 43 | + pub async fn new(database_path: &PathBuf) -> anyhow::Result<Self> { |
| 44 | + // Create parent directory if it doesn't exist. |
| 45 | + if let Some(parent) = database_path.parent() { |
| 46 | + tokio::fs::create_dir_all(parent) |
| 47 | + .await |
| 48 | + .context("Failed to create database directory")?; |
| 49 | + } |
| 50 | + |
| 51 | + let database_url = format!("sqlite:{}", database_path.display()); |
| 52 | + info!(?database_url, "Connecting to SQLite database"); |
| 53 | + |
| 54 | + let options = SqliteConnectOptions::new() |
| 55 | + .filename(database_path) |
| 56 | + .create_if_missing(true); |
| 57 | + |
| 58 | + let pool = SqlitePoolOptions::new() |
| 59 | + .max_connections(5) |
| 60 | + .connect_with(options) |
| 61 | + .await |
| 62 | + .context("Failed to connect to SQLite database")?; |
| 63 | + |
| 64 | + let db = Self { pool }; |
| 65 | + db.initialize_schema().await?; |
| 66 | + Ok(db) |
| 67 | + } |
| 68 | + |
| 69 | + /// Initializes the database schema. |
| 70 | + async fn initialize_schema(&self) -> anyhow::Result<()> { |
| 71 | + sqlx::query(CREATE_CHAINS_TABLE) |
| 72 | + .execute(&self.pool) |
| 73 | + .await |
| 74 | + .context("Failed to create chains table")?; |
| 75 | + info!("Database schema initialized"); |
| 76 | + Ok(()) |
| 77 | + } |
| 78 | + |
| 79 | + /// Synchronizes the database with the blockchain by traversing block history. |
| 80 | + pub async fn sync_with_blockchain<E>(&self, client: &ChainClient<E>) -> anyhow::Result<()> |
| 81 | + where |
| 82 | + E: linera_core::Environment, |
| 83 | + E::Storage: Storage, |
| 84 | + { |
| 85 | + info!("Starting database synchronization with blockchain"); |
| 86 | + |
| 87 | + // Build height->hash map and find sync point in a single traversal. |
| 88 | + let height_to_hash = self.build_sync_map(client).await?; |
| 89 | + |
| 90 | + info!( |
| 91 | + "Found sync point at height {}, processing {} blocks", |
| 92 | + height_to_hash.keys().next().unwrap_or(&BlockHeight::ZERO), |
| 93 | + height_to_hash.len() |
| 94 | + ); |
| 95 | + |
| 96 | + // Sync forward from the sync point using the pre-built map. |
| 97 | + self.sync_forward_with_map(client, height_to_hash).await?; |
| 98 | + |
| 99 | + info!("Database synchronization completed"); |
| 100 | + Ok(()) |
| 101 | + } |
| 102 | + |
| 103 | + /// Builds a height->hash map and finds the sync point in a single blockchain traversal. |
| 104 | + /// Returns (sync_point, height_to_hash_map). |
| 105 | + async fn build_sync_map<E>( |
| 106 | + &self, |
| 107 | + client: &ChainClient<E>, |
| 108 | + ) -> anyhow::Result<BTreeMap<BlockHeight, CryptoHash>> |
| 109 | + where |
| 110 | + E: linera_core::Environment, |
| 111 | + E::Storage: Storage, |
| 112 | + { |
| 113 | + let info = client.chain_info().await?; |
| 114 | + let end_height = info.next_block_height; |
| 115 | + |
| 116 | + if end_height == BlockHeight::ZERO { |
| 117 | + info!("Chain is empty, no synchronization needed"); |
| 118 | + return Ok(BTreeMap::new()); |
| 119 | + } |
| 120 | + |
| 121 | + let mut height_to_hash = BTreeMap::new(); |
| 122 | + let mut current_hash = info.block_hash; |
| 123 | + |
| 124 | + // Traverse backwards to build the height -> hash mapping and find sync point |
| 125 | + while let Some(hash) = current_hash { |
| 126 | + let certificate = client |
| 127 | + .storage_client() |
| 128 | + .read_certificate(hash) |
| 129 | + .await? |
| 130 | + .ok_or_else(|| anyhow::anyhow!("Certificate not found for hash {}", hash))?; |
| 131 | + let current_height = certificate.block().header.height; |
| 132 | + |
| 133 | + // Check if this block's chains are already in our database |
| 134 | + let chains_in_block = self.extract_opened_chains(&certificate).await?; |
| 135 | + |
| 136 | + if !chains_in_block.is_empty() { |
| 137 | + let mut all_chains_exist = true; |
| 138 | + for (owner, _chain_id) in &chains_in_block { |
| 139 | + if self.get_chain_id(owner).await?.is_none() { |
| 140 | + all_chains_exist = false; |
| 141 | + break; |
| 142 | + } |
| 143 | + } |
| 144 | + |
| 145 | + if all_chains_exist { |
| 146 | + // All chains from this block are already in the database. |
| 147 | + break; |
| 148 | + } |
| 149 | + } |
| 150 | + |
| 151 | + // Add to our height->hash map |
| 152 | + height_to_hash.insert(current_height, hash); |
| 153 | + |
| 154 | + // Move to the previous block |
| 155 | + current_hash = certificate.block().header.previous_block_hash; |
| 156 | + } |
| 157 | + |
| 158 | + Ok(height_to_hash) |
| 159 | + } |
| 160 | + |
| 161 | + /// Syncs the database using a pre-built height->hash map. |
| 162 | + async fn sync_forward_with_map<E>( |
| 163 | + &self, |
| 164 | + client: &ChainClient<E>, |
| 165 | + height_to_hash: BTreeMap<BlockHeight, CryptoHash>, |
| 166 | + ) -> anyhow::Result<()> |
| 167 | + where |
| 168 | + E: linera_core::Environment, |
| 169 | + E::Storage: Storage, |
| 170 | + { |
| 171 | + if height_to_hash.is_empty() { |
| 172 | + return Ok(()); |
| 173 | + } |
| 174 | + |
| 175 | + // Process blocks in chronological order (forward) |
| 176 | + for (height, hash) in height_to_hash { |
| 177 | + let certificate = client |
| 178 | + .storage_client() |
| 179 | + .read_certificate(hash) |
| 180 | + .await? |
| 181 | + .ok_or_else(|| anyhow::anyhow!("Certificate not found for hash {}", hash))?; |
| 182 | + |
| 183 | + let chains_to_store = self.extract_opened_chains(&certificate).await?; |
| 184 | + |
| 185 | + if !chains_to_store.is_empty() { |
| 186 | + info!( |
| 187 | + "Processing block at height {height} with {} new chains", |
| 188 | + chains_to_store.len() |
| 189 | + ); |
| 190 | + self.store_chains_batch(chains_to_store).await?; |
| 191 | + } |
| 192 | + } |
| 193 | + |
| 194 | + Ok(()) |
| 195 | + } |
| 196 | + |
| 197 | + /// Extracts OpenChain operations from a certificate and returns (owner, chain_id) pairs. |
| 198 | + async fn extract_opened_chains( |
| 199 | + &self, |
| 200 | + certificate: &ConfirmedBlockCertificate, |
| 201 | + ) -> anyhow::Result<Vec<(AccountOwner, ChainId)>> { |
| 202 | + let mut chains = Vec::new(); |
| 203 | + let block = certificate.block(); |
| 204 | + |
| 205 | + // Parse chain descriptions from the block's blobs |
| 206 | + let blobs = block.body.blobs.iter().flatten(); |
| 207 | + let chain_descriptions = blobs |
| 208 | + .map(|blob| bcs::from_bytes::<ChainDescription>(blob.bytes())) |
| 209 | + .collect::<Result<Vec<ChainDescription>, _>>()?; |
| 210 | + |
| 211 | + let mut chain_desc_iter = chain_descriptions.into_iter(); |
| 212 | + |
| 213 | + // Examine each transaction in the block |
| 214 | + for transaction in &block.body.transactions { |
| 215 | + if let Transaction::ExecuteOperation(Operation::System(system_op)) = transaction { |
| 216 | + if let SystemOperation::OpenChain(config) = system_op.as_ref() { |
| 217 | + // Extract the owner from the OpenChain operation |
| 218 | + // We expect single-owner chains from the faucet |
| 219 | + let mut owners = config.ownership.all_owners(); |
| 220 | + if let Some(owner) = owners.next() { |
| 221 | + // Verify it's a single-owner chain (faucet only creates these) |
| 222 | + if owners.next().is_none() { |
| 223 | + // Get the corresponding chain description from the blobs |
| 224 | + if let Some(description) = chain_desc_iter.next() { |
| 225 | + chains.push((*owner, description.id())); |
| 226 | + debug!( |
| 227 | + "Found OpenChain operation for owner {} creating chain {}", |
| 228 | + owner, |
| 229 | + description.id() |
| 230 | + ); |
| 231 | + } |
| 232 | + } |
| 233 | + } |
| 234 | + } |
| 235 | + } |
| 236 | + } |
| 237 | + |
| 238 | + Ok(chains) |
| 239 | + } |
| 240 | + |
| 241 | + /// Gets the chain ID for an owner if it exists. |
| 242 | + pub async fn get_chain_id(&self, owner: &AccountOwner) -> anyhow::Result<Option<ChainId>> { |
| 243 | + let owner_str = owner.to_string(); |
| 244 | + |
| 245 | + let Some(row) = sqlx::query("SELECT chain_id FROM chains WHERE owner = ?") |
| 246 | + .bind(&owner_str) |
| 247 | + .fetch_optional(&self.pool) |
| 248 | + .await? |
| 249 | + else { |
| 250 | + return Ok(None); |
| 251 | + }; |
| 252 | + let chain_id_str: String = row.get("chain_id"); |
| 253 | + let chain_id: ChainId = chain_id_str.parse()?; |
| 254 | + Ok(Some(chain_id)) |
| 255 | + } |
| 256 | + |
| 257 | + /// Stores multiple chain mappings in a single transaction |
| 258 | + pub async fn store_chains_batch( |
| 259 | + &self, |
| 260 | + chains: Vec<(AccountOwner, ChainId)>, |
| 261 | + ) -> anyhow::Result<()> { |
| 262 | + let mut tx = self.pool.begin().await?; |
| 263 | + |
| 264 | + for (owner, chain_id) in chains { |
| 265 | + let owner_str = owner.to_string(); |
| 266 | + let chain_id_str = chain_id.to_string(); |
| 267 | + |
| 268 | + sqlx::query( |
| 269 | + r#" |
| 270 | + INSERT OR REPLACE INTO chains (owner, chain_id) |
| 271 | + VALUES (?, ?) |
| 272 | + "#, |
| 273 | + ) |
| 274 | + .bind(&owner_str) |
| 275 | + .bind(&chain_id_str) |
| 276 | + .execute(&mut *tx) |
| 277 | + .await?; |
| 278 | + } |
| 279 | + |
| 280 | + tx.commit().await?; |
| 281 | + Ok(()) |
| 282 | + } |
| 283 | +} |
0 commit comments