Skip to content

Commit c697016

Browse files
ma2bdndr-dsafck
committed
backport faucet PRs #4408 and #4631 (#4633)
## Motivation Solve the issue of the forgetful faucet ## Proposal backport #4408 and #4631 ## Test Plan CI --------- Co-authored-by: Andre da Silva <[email protected]> Co-authored-by: Andreas Fackler <[email protected]>
1 parent 84aecfb commit c697016

File tree

11 files changed

+910
-142
lines changed

11 files changed

+910
-142
lines changed

.github/workflows/rust.yml

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,26 @@ jobs:
7171
- name: Run the storage-service instance
7272
run: |
7373
cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE &
74+
- name: Wait for storage service to be ready
75+
run: |
76+
until nc -z 127.0.0.1 1235; do sleep 1; done
77+
- name: Build binaries
78+
run: |
79+
cargo build --features storage-service --bin linera-server --bin linera-proxy --bin linera
7480
- name: Run the validators
7581
run: |
76-
cargo build --features storage-service
7782
mkdir /tmp/local-linera-net
78-
cargo run --features storage-service --bin linera -- net up --storage service:tcp:$LINERA_STORAGE_SERVICE:table --policy-config testnet --path /tmp/local-linera-net --validators 4 --shards 4 &
83+
cargo run --features storage-service --bin linera -- net up --storage service:tcp:$LINERA_STORAGE_SERVICE:table --policy-config testnet --path /tmp/local-linera-net --validators 2 --shards 2 &
7984
- name: Create two epochs and run the faucet
8085
# See https://github.com/linera-io/linera-protocol/pull/2835 for details.
8186
run: |
82-
cargo build --bin linera
87+
mkdir /tmp/linera-faucet
8388
cargo run --bin linera -- resource-control-policy --http-request-timeout-ms 1000
8489
cargo run --bin linera -- resource-control-policy --http-request-timeout-ms 500
85-
cargo run --bin linera -- faucet --amount 1000 --port 8079 &
90+
cargo run --bin linera -- faucet --storage-path /tmp/linera-faucet/faucet_storage.sqlite --amount 1000 --port 8079 &
91+
- name: Wait for faucet to be ready
92+
run: |
93+
until curl -s http://localhost:8079 >/dev/null; do sleep 1; done
8694
- name: Run the remote-net tests
8795
run: |
8896
cargo test -p linera-service remote_net_grpc --features remote-net
@@ -193,6 +201,9 @@ jobs:
193201
- name: Run the storage-service instance
194202
run: |
195203
cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE &
204+
- name: Wait for storage service to be ready
205+
run: |
206+
until nc -z 127.0.0.1 1235; do sleep 1; done
196207
- name: Run the benchmark test
197208
run: |
198209
cargo build --locked -p linera-service --bin linera-benchmark --features storage-service
@@ -250,6 +261,9 @@ jobs:
250261
- name: Run the storage-service instance
251262
run: |
252263
cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE &
264+
- name: Wait for storage service to be ready
265+
run: |
266+
until nc -z 127.0.0.1 1235; do sleep 1; done
253267
- name: Run Ethereum tests
254268
run: |
255269
cargo test -p linera-ethereum --features ethereum
@@ -274,9 +288,14 @@ jobs:
274288
run: |
275289
cd examples
276290
cargo build --locked --release --target wasm32-unknown-unknown
277-
- name: Run the storage-service instance and the storage-service tests
291+
- name: Run the storage-service instance
278292
run: |
279293
cargo run --release -p linera-storage-service -- memory --endpoint $LINERA_STORAGE_SERVICE &
294+
- name: Wait for storage service to be ready
295+
run: |
296+
until nc -z 127.0.0.1 1235; do sleep 1; done
297+
- name: Run the storage-service tests
298+
run: |
280299
cargo test --features storage-service -- storage_service --nocapture
281300
282301
web:

CLI.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,7 @@ Run a GraphQL service to explore and extend the chains of the wallet
727727

728728
Run a GraphQL service that exposes a faucet where users can claim tokens. This gives away the chain's tokens, and is mainly intended for testing
729729

730-
**Usage:** `linera faucet [OPTIONS] --amount <AMOUNT> [CHAIN_ID]`
730+
**Usage:** `linera faucet [OPTIONS] --amount <AMOUNT> --storage-path <STORAGE_PATH> [CHAIN_ID]`
731731

732732
###### **Arguments:**
733733

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ serde_with = { version = "3", default-features = false, features = [
230230
serde_yaml = "0.9.34"
231231
sha3 = "0.10.8"
232232
similar-asserts = "1.5.0"
233+
sqlx = "0.8"
233234
static_assertions = "1.1.0"
234235
stdext = "0.3.3"
235236
syn = "2.0.52"

linera-faucet/server/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ linera-version.workspace = true
4040
prometheus = { workspace = true, optional = true }
4141
serde.workspace = true
4242
serde_json.workspace = true
43+
sqlx = { workspace = true, features = [
44+
"runtime-tokio-rustls",
45+
"sqlite",
46+
"migrate",
47+
] }
4348
tempfile.workspace = true
4449
tokio.workspace = true
4550
tokio-util.workspace = true
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// Copyright (c) Zefchain Labs, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! SQLite database module for storing chain assignments.
5+
6+
use std::{collections::BTreeMap, path::PathBuf};
7+
8+
use anyhow::Context as _;
9+
use linera_base::{
10+
bcs,
11+
crypto::CryptoHash,
12+
data_types::{BlockHeight, ChainDescription},
13+
identifiers::{AccountOwner, ChainId},
14+
};
15+
use linera_chain::{data_types::Transaction, types::ConfirmedBlockCertificate};
16+
use linera_core::client::ChainClient;
17+
use linera_execution::{system::SystemOperation, Operation};
18+
use linera_storage::Storage;
19+
use sqlx::{
20+
sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions},
21+
Row,
22+
};
23+
use tracing::{debug, info};
24+
25+
/// SQLite database for persistent storage of chain assignments.
26+
pub struct FaucetDatabase {
27+
pool: SqlitePool,
28+
}
29+
30+
/// Schema for creating the chains table.
31+
const CREATE_CHAINS_TABLE: &str = r#"
32+
CREATE TABLE IF NOT EXISTS chains (
33+
owner TEXT PRIMARY KEY NOT NULL,
34+
chain_id TEXT NOT NULL,
35+
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
36+
);
37+
38+
CREATE INDEX IF NOT EXISTS idx_chains_chain_id ON chains(chain_id);
39+
"#;
40+
41+
impl FaucetDatabase {
42+
/// Creates a new SQLite database connection.
43+
pub async fn new(database_path: &PathBuf) -> anyhow::Result<Self> {
44+
// Create parent directory if it doesn't exist.
45+
if let Some(parent) = database_path.parent() {
46+
tokio::fs::create_dir_all(parent)
47+
.await
48+
.context("Failed to create database directory")?;
49+
}
50+
51+
let database_url = format!("sqlite:{}", database_path.display());
52+
info!(?database_url, "Connecting to SQLite database");
53+
54+
let options = SqliteConnectOptions::new()
55+
.filename(database_path)
56+
.create_if_missing(true);
57+
58+
let pool = SqlitePoolOptions::new()
59+
.max_connections(5)
60+
.connect_with(options)
61+
.await
62+
.context("Failed to connect to SQLite database")?;
63+
64+
let db = Self { pool };
65+
db.initialize_schema().await?;
66+
Ok(db)
67+
}
68+
69+
/// Initializes the database schema.
70+
async fn initialize_schema(&self) -> anyhow::Result<()> {
71+
sqlx::query(CREATE_CHAINS_TABLE)
72+
.execute(&self.pool)
73+
.await
74+
.context("Failed to create chains table")?;
75+
info!("Database schema initialized");
76+
Ok(())
77+
}
78+
79+
/// Synchronizes the database with the blockchain by traversing block history.
80+
pub async fn sync_with_blockchain<E>(&self, client: &ChainClient<E>) -> anyhow::Result<()>
81+
where
82+
E: linera_core::Environment,
83+
E::Storage: Storage,
84+
{
85+
info!("Starting database synchronization with blockchain");
86+
87+
// Build height->hash map and find sync point in a single traversal.
88+
let height_to_hash = self.build_sync_map(client).await?;
89+
90+
info!(
91+
"Found sync point at height {}, processing {} blocks",
92+
height_to_hash.keys().next().unwrap_or(&BlockHeight::ZERO),
93+
height_to_hash.len()
94+
);
95+
96+
// Sync forward from the sync point using the pre-built map.
97+
self.sync_forward_with_map(client, height_to_hash).await?;
98+
99+
info!("Database synchronization completed");
100+
Ok(())
101+
}
102+
103+
/// Builds a height->hash map and finds the sync point in a single blockchain traversal.
104+
/// Returns (sync_point, height_to_hash_map).
105+
async fn build_sync_map<E>(
106+
&self,
107+
client: &ChainClient<E>,
108+
) -> anyhow::Result<BTreeMap<BlockHeight, CryptoHash>>
109+
where
110+
E: linera_core::Environment,
111+
E::Storage: Storage,
112+
{
113+
let info = client.chain_info().await?;
114+
let end_height = info.next_block_height;
115+
116+
if end_height == BlockHeight::ZERO {
117+
info!("Chain is empty, no synchronization needed");
118+
return Ok(BTreeMap::new());
119+
}
120+
121+
let mut height_to_hash = BTreeMap::new();
122+
let mut current_hash = info.block_hash;
123+
124+
// Traverse backwards to build the height -> hash mapping and find sync point
125+
while let Some(hash) = current_hash {
126+
let certificate = client
127+
.storage_client()
128+
.read_certificate(hash)
129+
.await?
130+
.ok_or_else(|| anyhow::anyhow!("Certificate not found for hash {}", hash))?;
131+
let current_height = certificate.block().header.height;
132+
133+
// Check if this block's chains are already in our database
134+
let chains_in_block = self.extract_opened_chains(&certificate).await?;
135+
136+
if !chains_in_block.is_empty() {
137+
let mut all_chains_exist = true;
138+
for (owner, _chain_id) in &chains_in_block {
139+
if self.get_chain_id(owner).await?.is_none() {
140+
all_chains_exist = false;
141+
break;
142+
}
143+
}
144+
145+
if all_chains_exist {
146+
// All chains from this block are already in the database.
147+
break;
148+
}
149+
}
150+
151+
// Add to our height->hash map
152+
height_to_hash.insert(current_height, hash);
153+
154+
// Move to the previous block
155+
current_hash = certificate.block().header.previous_block_hash;
156+
}
157+
158+
Ok(height_to_hash)
159+
}
160+
161+
/// Syncs the database using a pre-built height->hash map.
162+
async fn sync_forward_with_map<E>(
163+
&self,
164+
client: &ChainClient<E>,
165+
height_to_hash: BTreeMap<BlockHeight, CryptoHash>,
166+
) -> anyhow::Result<()>
167+
where
168+
E: linera_core::Environment,
169+
E::Storage: Storage,
170+
{
171+
if height_to_hash.is_empty() {
172+
return Ok(());
173+
}
174+
175+
// Process blocks in chronological order (forward)
176+
for (height, hash) in height_to_hash {
177+
let certificate = client
178+
.storage_client()
179+
.read_certificate(hash)
180+
.await?
181+
.ok_or_else(|| anyhow::anyhow!("Certificate not found for hash {}", hash))?;
182+
183+
let chains_to_store = self.extract_opened_chains(&certificate).await?;
184+
185+
if !chains_to_store.is_empty() {
186+
info!(
187+
"Processing block at height {height} with {} new chains",
188+
chains_to_store.len()
189+
);
190+
self.store_chains_batch(chains_to_store).await?;
191+
}
192+
}
193+
194+
Ok(())
195+
}
196+
197+
/// Extracts OpenChain operations from a certificate and returns (owner, chain_id) pairs.
198+
async fn extract_opened_chains(
199+
&self,
200+
certificate: &ConfirmedBlockCertificate,
201+
) -> anyhow::Result<Vec<(AccountOwner, ChainId)>> {
202+
let mut chains = Vec::new();
203+
let block = certificate.block();
204+
205+
// Parse chain descriptions from the block's blobs
206+
let blobs = block.body.blobs.iter().flatten();
207+
let chain_descriptions = blobs
208+
.map(|blob| bcs::from_bytes::<ChainDescription>(blob.bytes()))
209+
.collect::<Result<Vec<ChainDescription>, _>>()?;
210+
211+
let mut chain_desc_iter = chain_descriptions.into_iter();
212+
213+
// Examine each transaction in the block
214+
for transaction in &block.body.transactions {
215+
if let Transaction::ExecuteOperation(Operation::System(system_op)) = transaction {
216+
if let SystemOperation::OpenChain(config) = system_op.as_ref() {
217+
// Extract the owner from the OpenChain operation
218+
// We expect single-owner chains from the faucet
219+
let mut owners = config.ownership.all_owners();
220+
if let Some(owner) = owners.next() {
221+
// Verify it's a single-owner chain (faucet only creates these)
222+
if owners.next().is_none() {
223+
// Get the corresponding chain description from the blobs
224+
if let Some(description) = chain_desc_iter.next() {
225+
chains.push((*owner, description.id()));
226+
debug!(
227+
"Found OpenChain operation for owner {} creating chain {}",
228+
owner,
229+
description.id()
230+
);
231+
}
232+
}
233+
}
234+
}
235+
}
236+
}
237+
238+
Ok(chains)
239+
}
240+
241+
/// Gets the chain ID for an owner if it exists.
242+
pub async fn get_chain_id(&self, owner: &AccountOwner) -> anyhow::Result<Option<ChainId>> {
243+
let owner_str = owner.to_string();
244+
245+
let Some(row) = sqlx::query("SELECT chain_id FROM chains WHERE owner = ?")
246+
.bind(&owner_str)
247+
.fetch_optional(&self.pool)
248+
.await?
249+
else {
250+
return Ok(None);
251+
};
252+
let chain_id_str: String = row.get("chain_id");
253+
let chain_id: ChainId = chain_id_str.parse()?;
254+
Ok(Some(chain_id))
255+
}
256+
257+
/// Stores multiple chain mappings in a single transaction
258+
pub async fn store_chains_batch(
259+
&self,
260+
chains: Vec<(AccountOwner, ChainId)>,
261+
) -> anyhow::Result<()> {
262+
let mut tx = self.pool.begin().await?;
263+
264+
for (owner, chain_id) in chains {
265+
let owner_str = owner.to_string();
266+
let chain_id_str = chain_id.to_string();
267+
268+
sqlx::query(
269+
r#"
270+
INSERT OR REPLACE INTO chains (owner, chain_id)
271+
VALUES (?, ?)
272+
"#,
273+
)
274+
.bind(&owner_str)
275+
.bind(&chain_id_str)
276+
.execute(&mut *tx)
277+
.await?;
278+
}
279+
280+
tx.commit().await?;
281+
Ok(())
282+
}
283+
}

0 commit comments

Comments
 (0)