diff --git a/.github/workflows/butterflynet.yml b/.github/workflows/butterflynet.yml new file mode 100644 index 000000000000..0139f0efc912 --- /dev/null +++ b/.github/workflows/butterflynet.yml @@ -0,0 +1,44 @@ +name: Butterflynet checks +concurrency: + group: "${{ github.workflow }}-${{ github.ref }}" + cancel-in-progress: "${{ github.ref != 'refs/heads/main' }}" +"on": + workflow_dispatch: + # TODO: disable for PR + pull_request: + branches: + - main +env: + CI: 1 + CARGO_INCREMENTAL: 0 + CACHE_TIMEOUT_MINUTES: 5 + SCRIPT_TIMEOUT_MINUTES: 30 + AWS_ACCESS_KEY_ID: "${{ secrets.AWS_ACCESS_KEY_ID }}" + AWS_SECRET_ACCESS_KEY: "${{ secrets.AWS_SECRET_ACCESS_KEY }}" + RUSTC_WRAPPER: sccache + CC: sccache clang + CXX: sccache clang++ + FIL_PROOFS_PARAMETER_CACHE: /var/tmp/filecoin-proof-parameters + SHELL_IMAGE: busybox +jobs: + butterflynet-checks: + name: Butterflynet checks + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout Sources + uses: actions/checkout@v4 + - name: Setup sccache + uses: mozilla-actions/sccache-action@v0.0.7 + timeout-minutes: "${{ fromJSON(env.CACHE_TIMEOUT_MINUTES) }}" + continue-on-error: true + - uses: actions/setup-go@v5 + with: + go-version-file: "go.work" + - name: Build and install Forest binaries + env: + # To minimize compile times: https://nnethercote.github.io/perf-book/build-configuration.html#minimizing-compile-times + RUSTFLAGS: "-C linker=clang -C link-arg=-fuse-ld=lld" + run: make install-slim-quick + - name: Run butterflynet checks + run: ./scripts/tests/butterflynet_check.sh + timeout-minutes: "${{ fromJSON(env.SCRIPT_TIMEOUT_MINUTES) }}" diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ed806290734..791f3d0040d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,8 @@ - [#4954](https://github.com/ChainSafe/forest/issues/4954) Add `--format json` to `forest-cli chain head` command. +- [#5232](https://github.com/ChainSafe/forest/issues/5232) Support `CARv2` stream decoding. + - [#5230](https://github.com/ChainSafe/forest/issues/5230) Add `CARv2` support to `forest-tool archive` command. ### Changed diff --git a/Cargo.lock b/Cargo.lock index 7f7e1449b03d..b078dff51c1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2943,6 +2943,7 @@ dependencies = [ "digest 0.10.7", "directories", "displaydoc", + "either", "ethereum-types", "ez-jsonrpc-types", "fil_actor_account_state", diff --git a/Cargo.toml b/Cargo.toml index 8fe21144c809..d1e647fa35b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ dialoguer = "0.11" digest = "0.10" directories = "6" displaydoc = "0.2" +either = "1" ethereum-types = { version = "0.15", features = ["ethbloom"] } ez-jsonrpc-types = "0.5" fil_actor_account_state = { version = "19" } diff --git a/build/bootstrap/butterflynet b/build/bootstrap/butterflynet index 822167597df6..8eef6793e1a4 100644 --- a/build/bootstrap/butterflynet +++ b/build/bootstrap/butterflynet @@ -1,2 +1 @@ -/dns4/bootstrap-0.butterfly.fildev.network/tcp/1347/p2p/12D3KooWFfuFm4eWfMR2Xk9jQ8VsxVqt9nyZXZRdi6WqNrPcg8qk -/dns4/bootstrap-1.butterfly.fildev.network/tcp/1347/p2p/12D3KooWEgUQub6ZS5M7hfiZPNf4kFFmgsiyYKNcdAk5oKMUcyft +/dnsaddr/bootstrap.butterfly.fildev.network diff --git a/build/manifest.json b/build/manifest.json index ffbfe4496745..c321194d38a8 100644 --- a/build/manifest.json +++ b/build/manifest.json @@ -1062,69 +1062,69 @@ "network": { "type": "butterflynet" }, - "version": "v16.0.0-dev", - "bundle_cid": "bafy2bzaced3uzaynkdi7wiqiwbje7l3lllpwbokuf7slak627gx7bk5pryjpa", + "version": "v16.0.0-dev1", + "bundle_cid": "bafy2bzaced3ucmgzcrkvaw6qgqdzl56t3rctlb7i66vrj23dacgeath7jcxne", "manifest": { "actors": [ [ "system", 1, - "bafk2bzacebzc4fklfws2ri4jv6mcu4xgs52ve3gqzgm476hkohfahj7s5lig2" + "bafk2bzacea5dls7jx2bhbhdkwl2c3qgyv22ldbeoc2me5z7qe5kbutf7tjeow" ], [ "init", 2, - "bafk2bzacebccioskxaudlyoydyy4yoswgfcffpadhbu6midk3edlhuzt3osu4" + "bafk2bzaced6b6odw6vt3ak7z7idhatex6gjsxck57wkum2yud6wubhpbwtm5e" ], [ "cron", 3, - "bafk2bzaceadysuxpaeqxwbzg7ksyl2olzizj4kgeq2b2oldlca4et55huauwo" + "bafk2bzacec7zpiconapx4veuplh5hk3iumnigsbx7yxhxfz7hirqbf2vpexqa" ], [ "account", 4, - "bafk2bzacebyxmngiugdhhio7zn7i67jzsythgy3jqqbqyan735g2rkalufhr6" + "bafk2bzaceaw5z2sungnovruolbxdc43xbsksx7ajlr2bw7insd55nho3gfbne" ], [ "storagepower", 5, - "bafk2bzaceauwj75apfux75zz4owkcjeggu4cp2ldn7weoxakzb5zsz55kthbe" + "bafk2bzacebnq5klygudstkq5tx6y7xusn2frygtotahiagmda3zy6rvzmyqme" ], [ "storageminer", 6, - "bafk2bzacedz6zjhfuyexwdco7zmpkypjzllk5v2sv4kdoz4gfqdzxuyq5uz5u" + "bafk2bzaceca27d33cwxwwbhbmijt453wkc3dvrnfkrdsuznol4cq33z3oqxbk" ], [ "storagemarket", 7, - "bafk2bzacedb6p35ax2s2muxvgir73vmdkbxn7uc5aw6n64dewb32drqils5zq" + "bafk2bzaceblznz3yqthmh2jbrknljiqu3kdfr7x7j2wn62abnszs4ziettqmm" ], [ "paymentchannel", 8, - "bafk2bzaceaigyawy2ywyfdqjccfyi62xtn456gavqrwdilpltiqxbeo7zsjf4" + "bafk2bzacebmpquxfvdh2lmgi7huqcln3ey56run7hkrsuhi6lgcwekbozhxac" ], [ "multisig", 9, - "bafk2bzacedcpbmiblrhh43kthgrkctrklh27jkvjcorzfreusd7fsjdw2llzq" + "bafk2bzacebius3sex65rxav4oo2qbbm6vuv5pcer3shgutqyyxy3vvcgezayg" ], [ "reward", 10, - "bafk2bzaceagt6mvup6z3atlaftepdex6f45ncml57zsuxy5puwtzcge5vy4wm" + "bafk2bzaceagmmgu3wt7fozbp3uhd6aepdl6c2ykt7xbpbldh2lvmmvvmt56gw" ], [ "verifiedregistry", 11, - "bafk2bzaceasoa42xnnbu2uftlfvlzhbok3q3nqetv6bxcw7vydbxbmwn53ad6" + "bafk2bzacecqbljsk5utms7pe4g3uy7zvrpwmwgop4spx6pjrpi4tjx663gkq2" ], [ "datacap", 12, - "bafk2bzaceals5hcpbvzm24dmmoddqpr2tcpolwwey3qvjf3okzk7ihf75gngu" + "bafk2bzaceb4owttyigypvl6pguxhqwe45rgfjubgpoitqhiyzumhlwwu6buge" ], [ "placeholder", @@ -1134,20 +1134,20 @@ [ "evm", 14, - "bafk2bzacebxlvhz665s2kbace6nzeqy5maasqixgirzn4xhbjx42xi2hkc5gk" + "bafk2bzacebdhgopsxunxykgehkbwtj5iyyvbqygi5uuvhtm7m4vsz3vcsp5iw" ], [ "eam", 15, - "bafk2bzacec2gt4teegjdhbpfwl6qbxjojffxgkmzhua2m2a4lks52abnjyypw" + "bafk2bzaceapofadtnyiulmdc5k3nujthqwyht67xu2pohatqjcexojm34j7ng" ], [ "ethaccount", 16, - "bafk2bzacec627lshgjxvfzjledk2wph4u7n47got2ultaijbh4v5wdyhjpxse" + "bafk2bzacebtz62oxftksx4f6efbuh6i5wb5nvuo447uefkbz5lis4rcw7djw2" ] ], - "actor_list_cid": "bafy2bzacebls3q4yivgxner4v3sltf4mk4sxmyr7lu65nic5manmsafqu3qkm" + "actor_list_cid": "bafy2bzacednuely5c7x43ykvspowkcbzrqym7wlfvgn4ceoqakkkxhu3g5i6m" } }, { diff --git a/documentation/src/offline-forest.md b/documentation/src/offline-forest.md index 0e712a1d31af..23d8f84aee88 100644 --- a/documentation/src/offline-forest.md +++ b/documentation/src/offline-forest.md @@ -9,7 +9,9 @@ chain's archive state without syncing, and various testing scenarios. ```bash forest-tool api serve --help ``` + Sample output (may vary depending on the version): + ```console Usage: forest-tool api serve [OPTIONS] [SNAPSHOT_FILES]... @@ -45,7 +47,9 @@ height: 1859736. ```bash forest-tool api serve --chain calibnet ~/Downloads/forest_snapshot_calibnet_2024-08-08_height_1859736.forest.car.zst ``` + Sample output: + ```console 2024-08-12T12:29:16.624698Z INFO forest::tool::offline_server::server: Configuring Offline RPC Server 2024-08-12T12:29:16.640402Z INFO forest::tool::offline_server::server: Using chain config for calibnet @@ -63,7 +67,9 @@ curl --silent -X POST -H "Content-Type: application/json" \ --data '{"jsonrpc":"2.0","id":2,"method":"Filecoin.ChainHead","param":"null"}' \ "http://127.0.0.1:2345/rpc/v0" | jq ``` + Sample output: + ```json { "jsonrpc": "2.0", @@ -101,7 +107,9 @@ curl --silent -X POST -H "Content-Type: application/json" \ --data '{"jsonrpc":"2.0","id":2,"method":"Filecoin.StateGetNetworkParams","param":"null"}' \ "http://127.0.0.1:2345/rpc/v0" | jq ``` + Sample output: + ```json { "jsonrpc": "2.0", diff --git a/scripts/tests/butterflynet_check.sh b/scripts/tests/butterflynet_check.sh new file mode 100755 index 000000000000..9be676edb50e --- /dev/null +++ b/scripts/tests/butterflynet_check.sh @@ -0,0 +1,28 @@ +#!/bin/bash +set -euxo pipefail + +# This script tests Forest is able to catch up the butterflynet. + +source "$(dirname "$0")/harness.sh" + +function shutdown { + kill -KILL $FOREST_NODE_PID +} + +trap shutdown EXIT + +function call_forest_chain_head { + curl --silent -X POST -H "Content-Type: application/json" \ + --data '{"jsonrpc":"2.0","id":2,"method":"Filecoin.ChainHead","param":"null"}' \ + "http://127.0.0.1:2345/rpc/v1" +} + +$FOREST_PATH --chain butterflynet --encrypt-keystore false & +FOREST_NODE_PID=$! + +until call_forest_chain_head; do + echo "Forest RPC endpoint is unavailable - sleeping for 1s" + sleep 1 +done + +forest_wait_for_sync diff --git a/src/cli/subcommands/healthcheck_cmd.rs b/src/cli/subcommands/healthcheck_cmd.rs index 658b57748d5c..0404cf102d3c 100644 --- a/src/cli/subcommands/healthcheck_cmd.rs +++ b/src/cli/subcommands/healthcheck_cmd.rs @@ -23,6 +23,24 @@ pub enum HealthcheckCommand { #[arg(long, default_value_t=DEFAULT_HEALTHCHECK_PORT)] healthcheck_port: u16, }, + /// Display live status + Live { + /// Don't exit until node is ready + #[arg(long)] + wait: bool, + /// Healthcheck port + #[arg(long, default_value_t=DEFAULT_HEALTHCHECK_PORT)] + healthcheck_port: u16, + }, + /// Display live status + Healthy { + /// Don't exit until node is ready + #[arg(long)] + wait: bool, + /// Healthcheck port + #[arg(long, default_value_t=DEFAULT_HEALTHCHECK_PORT)] + healthcheck_port: u16, + }, } impl HealthcheckCommand { @@ -31,42 +49,56 @@ impl HealthcheckCommand { Self::Ready { wait, healthcheck_port, - } => { - let ticker = Ticker::new(0.., Duration::from_secs(1)); - let mut stdout = stdout(); + } => Self::check(&client, "readyz", healthcheck_port, wait).await, + Self::Live { + wait, + healthcheck_port, + } => Self::check(&client, "livez", healthcheck_port, wait).await, + Self::Healthy { + wait, + healthcheck_port, + } => Self::check(&client, "healthz", healthcheck_port, wait).await, + } + } + + async fn check( + client: &rpc::Client, + endpoint: &str, + healthcheck_port: u16, + wait: bool, + ) -> anyhow::Result<()> { + let ticker = Ticker::new(0.., Duration::from_secs(1)); + let mut stdout = stdout(); - let url = format!( - "http://{}:{}/readyz?verbose", - client.base_url().host_str().unwrap_or("localhost"), - healthcheck_port, - ); + let url = format!( + "http://{}:{healthcheck_port}/{endpoint}?verbose", + client.base_url().host_str().unwrap_or("localhost"), + ); - for _ in ticker { - let response = reqwest::get(&url).await?; - let status = response.status(); - let text = response.text().await?; + for _ in ticker { + let response = reqwest::get(&url).await?; + let status = response.status(); + let text = response.text().await?; - println!("{}", text); + println!("{}", text); - if !wait { - break; - } - if status == StatusCode::OK { - println!("Done!"); - break; - } + if !wait { + break; + } + if status == StatusCode::OK { + println!("Done!"); + break; + } - for _ in 0..(text.matches('\n').count() + 1) { - write!( - stdout, - "\r{}{}", - anes::MoveCursorUp(1), - anes::ClearLine::All, - )?; - } - } - Ok(()) + for _ in 0..(text.matches('\n').count() + 1) { + write!( + stdout, + "\r{}{}", + anes::MoveCursorUp(1), + anes::ClearLine::All, + )?; } } + Ok(()) } } diff --git a/src/daemon/bundle.rs b/src/daemon/bundle.rs index 53bd66d9bcd7..025c93ba8666 100644 --- a/src/daemon/bundle.rs +++ b/src/daemon/bundle.rs @@ -13,8 +13,9 @@ use ahash::HashSet; use anyhow::ensure; use cid::Cid; use futures::{stream::FuturesUnordered, TryStreamExt}; +use std::io::Cursor; use std::mem::discriminant; -use std::{io::Cursor, path::Path}; +use std::path::Path; use tokio::io::BufReader; use tracing::{info, warn}; @@ -53,7 +54,7 @@ pub async fn load_actor_bundles_from_path( .await?; // Validate the bundle - let roots = HashSet::from_iter(car_stream.header.roots.iter()); + let roots = HashSet::from_iter(car_stream.header_v1.roots.iter()); for ActorBundleInfo { manifest, network, .. } in ACTOR_BUNDLES.iter().filter(|bundle| { @@ -109,7 +110,7 @@ pub async fn load_actor_bundles_from_server( while let Some(block) = stream.try_next().await? { db.put_keyed_persistent(&block.cid, &block.data)?; } - let header = stream.header; + let header = stream.header_v1; ensure!(header.roots.len() == 1); ensure!(header.roots.first() == root); Ok(*header.roots.first()) diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index f4aa1771c0f4..1b17481d5e8e 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -240,7 +240,7 @@ async fn transcode_into_forest_car(from: &Path, to: &Path) -> anyhow::Result<()> tokio::fs::File::open(from).await?, )) .await?; - let roots = car_stream.header.roots.clone(); + let roots = car_stream.header_v1.roots.clone(); let mut writer = tokio::io::BufWriter::new(tokio::fs::File::create(to).await?); let frames = crate::db::car::forest::Encoder::compress_stream_default( diff --git a/src/db/car/plain.rs b/src/db/car/plain.rs index 73b5d8ac0008..5dd0ec4f50e9 100644 --- a/src/db/car/plain.rs +++ b/src/db/car/plain.rs @@ -481,11 +481,15 @@ where #[cfg(test)] mod tests { use super::PlainCar; - use crate::utils::db::car_util::load_car; - use futures::executor::block_on; - use fvm_ipld_blockstore::{Blockstore as _, MemoryBlockstore}; + use crate::utils::db::{ + car_stream::{CarStream, CarV1Header}, + car_util::load_car, + }; + use futures::{executor::block_on, TryStreamExt as _}; + use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore}; use once_cell::sync::Lazy; - use tokio::io::AsyncBufRead; + use std::io::Cursor; + use tokio::io::{AsyncBufRead, AsyncSeek, BufReader}; #[test] fn test_uncompressed_v1() { @@ -496,11 +500,17 @@ mod tests { assert_eq!(car_backed.roots().len(), 1); assert_eq!(car_backed.cids().len(), 1222); - let reference = reference(car); + let reference_car = reference(Cursor::new(car)); + let reference_car_zst = reference(Cursor::new(chain4_car_zst())); + let reference_car_zst_unsafe = reference_unsafe(chain4_car_zst()); for cid in car_backed.cids() { - let expected = reference.get(&cid).unwrap().unwrap(); + let expected = reference_car.get(&cid).unwrap().unwrap(); + let expected2 = reference_car_zst.get(&cid).unwrap().unwrap(); + let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap(); let actual = car_backed.get(&cid).unwrap().unwrap(); assert_eq!(expected, actual); + assert_eq!(expected2, actual); + assert_eq!(expected3, actual); } } @@ -513,23 +523,50 @@ mod tests { assert_eq!(car_backed.roots().len(), 1); assert_eq!(car_backed.cids().len(), 7153); - // Uncomment below lines once CarStream supports CARv2 - // let reference = reference(car); - // for cid in car_backed.cids() { - // let expected = reference.get(&cid).unwrap().unwrap(); - // let actual = car_backed.get(&cid).unwrap().unwrap(); - // assert_eq!(expected, actual); - // } + let reference_car = reference(Cursor::new(car)); + let reference_car_zst = reference(Cursor::new(carv2_car_zst())); + let reference_car_zst_unsafe = reference_unsafe(carv2_car_zst()); + for cid in car_backed.cids() { + let expected = reference_car.get(&cid).unwrap().unwrap(); + let expected2 = reference_car_zst.get(&cid).unwrap().unwrap(); + let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap(); + let actual = car_backed.get(&cid).unwrap().unwrap(); + assert_eq!(expected, actual); + assert_eq!(expected2, actual); + assert_eq!(expected3, actual); + } } - fn reference(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore { + fn reference(reader: impl AsyncBufRead + AsyncSeek + Unpin) -> MemoryBlockstore { let blockstore = MemoryBlockstore::new(); block_on(load_car(&blockstore, reader)).unwrap(); blockstore } + fn reference_unsafe(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore { + let blockstore = MemoryBlockstore::new(); + block_on(load_car_unsafe(&blockstore, reader)).unwrap(); + blockstore + } + + pub async fn load_car_unsafe(db: &impl Blockstore, reader: R) -> anyhow::Result + where + R: AsyncBufRead + Unpin, + { + let mut stream = CarStream::new_unsafe(BufReader::new(reader)).await?; + while let Some(block) = stream.try_next().await? { + db.put_keyed(&block.cid, &block.data)?; + } + Ok(stream.header_v1) + } + + fn chain4_car_zst() -> &'static [u8] { + include_bytes!("../../../test-snapshots/chain4.car.zst") + } + fn chain4_car() -> &'static [u8] { - include_bytes!("../../../test-snapshots/chain4.car") + static CAR: Lazy> = Lazy::new(|| zstd::decode_all(chain4_car_zst()).unwrap()); + CAR.as_slice() } fn carv2_car_zst() -> &'static [u8] { diff --git a/src/genesis/mod.rs b/src/genesis/mod.rs index 57ecf853e454..80a92175b4e2 100644 --- a/src/genesis/mod.rs +++ b/src/genesis/mod.rs @@ -1,14 +1,17 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::path::Path; +use std::{io::Cursor, path::Path}; use crate::blocks::CachingBlockHeader; use crate::state_manager::StateManager; use crate::utils::db::car_util::load_car; use anyhow::Context as _; use fvm_ipld_blockstore::Blockstore; -use tokio::{fs::File, io::AsyncBufRead, io::BufReader}; +use tokio::{ + fs::File, + io::{AsyncBufRead, AsyncSeek, BufReader}, +}; use tracing::{debug, info}; #[cfg(test)] @@ -33,7 +36,7 @@ where None => { debug!("No specified genesis in config. Using default genesis."); let genesis_bytes = genesis_bytes.context("No default genesis.")?; - process_car(genesis_bytes, db).await? + process_car(Cursor::new(genesis_bytes), db).await? } }; @@ -57,7 +60,7 @@ where async fn process_car(reader: R, db: &BS) -> Result where - R: AsyncBufRead + Unpin, + R: AsyncBufRead + AsyncSeek + Unpin, BS: Blockstore, { // Load genesis state into the database and get the Cid @@ -98,6 +101,6 @@ mod tests { async fn load_header_from_car(genesis_bytes: &[u8]) -> CachingBlockHeader { let db = crate::db::MemoryDB::default(); - process_car(genesis_bytes, &db).await.unwrap() + process_car(Cursor::new(genesis_bytes), &db).await.unwrap() } } diff --git a/src/health/endpoints.rs b/src/health/endpoints.rs index 7351f1edf874..63a3da25867c 100644 --- a/src/health/endpoints.rs +++ b/src/health/endpoints.rs @@ -18,6 +18,7 @@ const VERBOSE_PARAM: &str = "verbose"; /// In our case, we require: /// - The node is not in an error state (i.e., boot-looping) /// - At least 1 peer is connected (without peers, the node is isolated and cannot sync) +/// - The RPC server is running if not disabled /// /// If any of these conditions are not met, the node is **not** healthy. If this happens for a prolonged period of time, the application should be restarted. pub(crate) async fn livez( @@ -29,6 +30,7 @@ pub(crate) async fn livez( let mut lively = true; lively &= check_sync_state_not_error(&state, &mut acc); lively &= check_peers_connected(&state, &mut acc); + lively &= check_rpc_server_running(&state, &mut acc).await; if lively { Ok(acc.result_ok()) @@ -42,7 +44,7 @@ pub(crate) async fn livez( /// In our case, we require: /// - The node is in sync with the network /// - The current epoch of the node is not too far behind the network -/// - The RPC server is running +/// - The RPC server is running if not disabled /// - The Ethereum mapping is up to date /// - The F3 side car is running if enabled /// diff --git a/src/libp2p/chain_exchange/provider.rs b/src/libp2p/chain_exchange/provider.rs index ed7552158ee9..bba808cb8051 100644 --- a/src/libp2p/chain_exchange/provider.rs +++ b/src/libp2p/chain_exchange/provider.rs @@ -156,12 +156,12 @@ mod tests { use crate::shim::address::Address; use crate::utils::db::car_util::load_car; use nunny::Vec as NonEmpty; - use std::sync::Arc; + use std::{io::Cursor, sync::Arc}; async fn populate_db() -> (NonEmpty, Arc) { let db = Arc::new(MemoryDB::default()); // The cids are the tipset cids of the most recent tipset (39th) - let header = load_car(&db, EXPORT_SR_40).await.unwrap(); + let header = load_car(&db, Cursor::new(EXPORT_SR_40)).await.unwrap(); (header.roots, db) } diff --git a/src/networks/actors_bundle.rs b/src/networks/actors_bundle.rs index 9aa42b9905a7..e1b45b187433 100644 --- a/src/networks/actors_bundle.rs +++ b/src/networks/actors_bundle.rs @@ -83,7 +83,7 @@ pub static ACTOR_BUNDLES: Lazy> = Lazy::new(|| { "bafy2bzacebq3hncszqpojglh2dkwekybq4zn6qpc4gceqbx36wndps5qehtau" @ "v14.0.0-rc.1" for "calibrationnet", "bafy2bzaceax5zkysst7vtyup4whdxwzlpnaya3qp34rnoi6gyt4pongps7obw" @ "v15.0.0-rc1" for "calibrationnet", "bafy2bzacearjal5rsmzloz3ny7aoju2rgw66wgxdrydgg27thcsazbmf5qihq" @ "v15.0.0-rc1" for "butterflynet", - "bafy2bzaced3uzaynkdi7wiqiwbje7l3lllpwbokuf7slak627gx7bk5pryjpa" @ "v16.0.0-dev" for "butterflynet", + "bafy2bzaced3ucmgzcrkvaw6qgqdzl56t3rctlb7i66vrj23dacgeath7jcxne" @ "v16.0.0-dev1" for "butterflynet", "bafy2bzacedozk3jh2j4nobqotkbofodq4chbrabioxbfrygpldgoxs3zwgggk" @ "v9.0.3" for "devnet", "bafy2bzacebzz376j5kizfck56366kdz5aut6ktqrvqbi3efa2d4l2o2m653ts" @ "v10.0.0" for "devnet", "bafy2bzaceay35go4xbjb45km6o46e5bib3bi46panhovcbedrynzwmm3drr4i" @ "v11.0.0" for "devnet", @@ -181,9 +181,9 @@ pub async fn generate_actor_bundle(output: &Path) -> anyhow::Result<()> { }; let bytes = response.bytes().await?; let car = CarStream::new(Cursor::new(bytes)).await?; - ensure!(car.header.version == 1); - ensure!(car.header.roots.len() == 1); - ensure!(car.header.roots.first() == root); + ensure!(car.header_v1.version == 1); + ensure!(car.header_v1.roots.len() == 1); + ensure!(car.header_v1.roots.first() == root); anyhow::Ok((*root, car.try_collect::>().await?)) }, )) @@ -286,11 +286,11 @@ mod tests { let car_secondary = CarStream::new(Cursor::new(alt)).await?; assert_eq!( - car_primary.header.roots, car_secondary.header.roots, + car_primary.header_v1.roots, car_secondary.header_v1.roots, "Roots for {url} and {alt_url} do not match" ); assert_eq!( - car_primary.header.roots.first(), + car_primary.header_v1.roots.first(), manifest, "Manifest for {url} and {alt_url} does not match" ); diff --git a/src/networks/butterflynet/mod.rs b/src/networks/butterflynet/mod.rs index 58cb8d6db701..ec26eb1fa4b3 100644 --- a/src/networks/butterflynet/mod.rs +++ b/src/networks/butterflynet/mod.rs @@ -40,12 +40,12 @@ pub async fn fetch_genesis(db: &DB) -> anyhow::Result /// Genesis CID pub static GENESIS_CID: Lazy = Lazy::new(|| { - Cid::from_str("bafy2bzacedz7udmigfoijokj3tnvw66bviygircxak7or2zrxzde2zsaouypi").unwrap() + Cid::from_str("bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4").unwrap() }); /// Compressed genesis file. It is compressed with zstd and cuts the download size by 80% (from 10 MB to 2 MB). static GENESIS_URL: Lazy = Lazy::new(|| { - "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacedz7udmigfoijokj3tnvw66bviygircxak7or2zrxzde2zsaouypi.car.zst" + "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/genesis/butterflynet-bafy2bzacecm7xklkq3hkc2kgm5wnb5shlxmffino6lzhh7lte5acytb7sssr4.car.zst" .parse() .expect("hard-coded URL must parse") }); @@ -55,7 +55,7 @@ static GENESIS_URL: Lazy = Lazy::new(|| { /// The genesis file does not live on the `master` branch, currently on `butterfly/v24` branch. /// `` static GENESIS_URL_ALT: Lazy = Lazy::new(|| { - "https://github.com/filecoin-project/lotus/raw/35052f6d1502d838ed36539996df9e70fe5a8fc4/build/genesis/butterflynet.car".parse().expect("hard-coded URL must parse") + "https://github.com/filecoin-project/lotus/raw/b15b3c40b9649e3bc52aa15968d558aa4514ba6a/build/genesis/butterflynet.car.zst".parse().expect("hard-coded URL must parse") }); pub(crate) const MINIMUM_CONSENSUS_POWER: i64 = 2 << 30; @@ -102,7 +102,7 @@ pub static HEIGHT_INFOS: Lazy> = Lazy::new(|| { make_height!(Phoenix, i64::MIN), make_height!(Waffle, -26), make_height!(TukTuk, -27, get_bundle_cid("v15.0.0-rc1")), - make_height!(Teep, 300, get_bundle_cid("v16.0.0-dev")), + make_height!(Teep, 100, get_bundle_cid("v16.0.0-dev1")), ]) }); diff --git a/src/networks/mod.rs b/src/networks/mod.rs index 3134c5a5404b..e0e518e20496 100644 --- a/src/networks/mod.rs +++ b/src/networks/mod.rs @@ -353,7 +353,7 @@ impl ChainConfig { ), f3_enabled: true, f3_consensus: true, - f3_bootstrap_epoch: 1000, + f3_bootstrap_epoch: -1, f3_initial_power_table: Default::default(), f3_manifest_server: Some( "12D3KooWJr9jy4ngtJNR7JC1xgLFra3DjEtyxskRYWvBK9TC3Yn6" diff --git a/src/tool/subcommands/benchmark_cmd.rs b/src/tool/subcommands/benchmark_cmd.rs index 85a2ff0ba8da..b89447030ab5 100644 --- a/src/tool/subcommands/benchmark_cmd.rs +++ b/src/tool/subcommands/benchmark_cmd.rs @@ -198,7 +198,7 @@ async fn benchmark_forest_encoding( let mut block_stream = CarStream::new(file).await?; let roots = std::mem::replace( - &mut block_stream.header.roots, + &mut block_stream.header_v1.roots, nunny::vec![Default::default()], ); diff --git a/src/tool/subcommands/car_cmd.rs b/src/tool/subcommands/car_cmd.rs index 846a20bfbc6a..bfba409704cc 100644 --- a/src/tool/subcommands/car_cmd.rs +++ b/src/tool/subcommands/car_cmd.rs @@ -59,7 +59,7 @@ impl CarCommands { let all_roots = NonEmpty::new( car_streams .iter() - .flat_map(|it| it.header.roots.iter()) + .flat_map(|it| it.header_v1.roots.iter()) .unique() .cloned() .collect_vec(), diff --git a/src/tool/subcommands/snapshot_cmd.rs b/src/tool/subcommands/snapshot_cmd.rs index 9f77f1cef586..63f517e209b0 100644 --- a/src/tool/subcommands/snapshot_cmd.rs +++ b/src/tool/subcommands/snapshot_cmd.rs @@ -247,7 +247,7 @@ impl SnapshotCommands { let mut block_stream = CarStream::new(file).await?; let roots = std::mem::replace( - &mut block_stream.header.roots, + &mut block_stream.header_v1.roots, nunny::vec![Default::default()], ); diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index eb831758778b..fd91e11139d0 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -1,5 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT + +use crate::db::car::plain::read_v2_header; use crate::utils::multihash::prelude::*; use async_compression::tokio::bufread::ZstdDecoder; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -11,10 +13,13 @@ use integer_encoding::VarInt; use nunny::Vec as NonEmpty; use pin_project_lite::pin_project; use serde::{Deserialize, Serialize}; -use std::io; +use std::io::{self, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite}; +use tokio::io::{ + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, + Take, +}; use tokio_util::codec::Encoder; use tokio_util::codec::FramedRead; use tokio_util::either::Either; @@ -94,8 +99,9 @@ pin_project! { /// automatically be decompressed. pub struct CarStream { #[pin] - reader: FramedRead>, UviBytes>, - pub header: CarV1Header, + reader: FramedRead>>, UviBytes>, + pub header_v1: CarV1Header, + pub header_v2: Option, first_block: Option, } } @@ -108,18 +114,50 @@ fn is_zstd(buf: &[u8]) -> bool { } impl CarStream { - pub async fn new(mut reader: ReaderT) -> io::Result { + /// Create a stream with automatic but unsafe CARv2 header extraction. + /// + /// Note that if the input is zstd compressed, the CARv2 header extraction + /// is on a best efforts basis. It could fail when `reader.fill_buf()` is insufficient + /// for decoding the first zstd frame, and treat input as CARv1, because this method + /// does not require the input to be [`tokio::io::AsyncSeek`]. + /// It's recommended to use [`CarStream::new`] for zstd compressed CARv2 input. + #[allow(dead_code)] + pub async fn new_unsafe(mut reader: ReaderT) -> io::Result { + let header_v2 = Self::try_decode_header_v2_from_fill_buf(reader.fill_buf().await?) + // treat input as CARv1 if zstd decoding failed + .ok() + .flatten(); + Self::new_with_header_v2(reader, header_v2).await + } + + /// Create a stream with pre-extracted CARv2 header + pub async fn new_with_header_v2( + mut reader: ReaderT, + header_v2: Option, + ) -> io::Result { let is_compressed = is_zstd(reader.fill_buf().await?); let mut reader = if is_compressed { let mut zstd = ZstdDecoder::new(reader); zstd.multiple_members(true); - FramedRead::new(Either::Right(zstd), UviBytes::default()) + Either::Right(zstd) } else { - FramedRead::new(Either::Left(reader), UviBytes::default()) + Either::Left(reader) }; - let header = read_v1_header(&mut reader) + + // Skip v2 header bytes + if let Some(header_v2) = &header_v2 { + let mut to_skip = vec![0; header_v2.data_offset as usize]; + reader.read_exact(&mut to_skip).await?; + } + + let max_car_v1_bytes = header_v2 + .as_ref() + .map(|h| h.data_size as u64) + .unwrap_or(u64::MAX); + let mut reader = FramedRead::new(reader.take(max_car_v1_bytes), UviBytes::default()); + let header_v1 = read_v1_header(&mut reader) .await - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid header block"))?; + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid v1 header block"))?; // Read the first block and check if it is valid. This check helps to // catch invalid CAR files as soon as we open. @@ -133,17 +171,72 @@ impl CarStream { } Ok(CarStream { reader, - header, + header_v1, + header_v2, first_block: Some(block), }) } else { Ok(CarStream { reader, - header, + header_v1, + header_v2, first_block: None, }) } } + + /// Extracts CARv2 header from the input, returns the reader and CARv2 header. + /// + /// Note that position of the input reader has to be reset before calling [`CarStream::new_with_header_v2`]. + /// Use [`CarStream::extract_header_v2_and_reset_reader_position`] to automatically reset stream position. + pub async fn extract_header_v2( + mut reader: ReaderT, + ) -> io::Result<(ReaderT, Option)> { + let is_compressed = is_zstd(reader.fill_buf().await?); + let mut reader = if is_compressed { + let mut zstd = ZstdDecoder::new(reader); + zstd.multiple_members(true); + Either::Right(zstd) + } else { + Either::Left(reader) + }; + let mut possible_header_bytes = [0; 51]; + reader.read_exact(&mut possible_header_bytes).await?; + let header_v2 = read_v2_header(possible_header_bytes.as_slice())?; + let reader = match reader { + Either::Left(reader) => reader, + Either::Right(zstd) => zstd.into_inner(), + }; + Ok((reader, header_v2)) + } + + fn try_decode_header_v2_from_fill_buf(fill_buf: &[u8]) -> io::Result> { + let is_compressed = is_zstd(fill_buf); + let fill_buf_reader = if is_compressed { + either::Either::Right(zstd::Decoder::new(fill_buf)?) + } else { + either::Either::Left(fill_buf) + }; + read_v2_header(fill_buf_reader) + } +} + +impl CarStream { + /// Create a stream with automatic CARv2 header extraction. + pub async fn new(reader: ReaderT) -> io::Result { + let (reader, header_v2) = Self::extract_header_v2_and_reset_reader_position(reader).await?; + Self::new_with_header_v2(reader, header_v2).await + } + + /// Extracts CARv2 header from the input, resets the reader position and returns the reader and CARv2 header. + pub async fn extract_header_v2_and_reset_reader_position( + mut reader: ReaderT, + ) -> io::Result<(ReaderT, Option)> { + let stream_position = reader.stream_position().await?; + let (mut reader, header_v2) = Self::extract_header_v2(reader).await?; + reader.seek(SeekFrom::Start(stream_position)).await?; + Ok((reader, header_v2)) + } } impl Stream for CarStream { @@ -220,6 +313,8 @@ async fn read_v1_header( #[cfg(test)] mod tests { + use std::io::Cursor; + use super::*; use crate::networks::{calibnet, mainnet}; use futures::TryStreamExt; @@ -243,9 +338,23 @@ mod tests { } } + #[tokio::test] + async fn stream_calibnet_genesis_unsafe() { + let stream = CarStream::new_unsafe(calibnet::DEFAULT_GENESIS) + .await + .unwrap(); + let blocks: Vec = stream.try_collect().await.unwrap(); + assert_eq!(blocks.len(), 1207); + for block in blocks { + block.validate().unwrap(); + } + } + #[tokio::test] async fn stream_calibnet_genesis() { - let stream = CarStream::new(calibnet::DEFAULT_GENESIS).await.unwrap(); + let stream = CarStream::new(Cursor::new(calibnet::DEFAULT_GENESIS)) + .await + .unwrap(); let blocks: Vec = stream.try_collect().await.unwrap(); assert_eq!(blocks.len(), 1207); for block in blocks { @@ -253,9 +362,23 @@ mod tests { } } + #[tokio::test] + async fn stream_mainnet_genesis_unsafe() { + let stream = CarStream::new_unsafe(mainnet::DEFAULT_GENESIS) + .await + .unwrap(); + let blocks: Vec = stream.try_collect().await.unwrap(); + assert_eq!(blocks.len(), 1222); + for block in blocks { + block.validate().unwrap(); + } + } + #[tokio::test] async fn stream_mainnet_genesis() { - let stream = CarStream::new(mainnet::DEFAULT_GENESIS).await.unwrap(); + let stream = CarStream::new(Cursor::new(mainnet::DEFAULT_GENESIS)) + .await + .unwrap(); let blocks: Vec = stream.try_collect().await.unwrap(); assert_eq!(blocks.len(), 1222); for block in blocks { diff --git a/src/utils/db/car_util.rs b/src/utils/db/car_util.rs index 193a7c039c76..0555e398f329 100644 --- a/src/utils/db/car_util.rs +++ b/src/utils/db/car_util.rs @@ -1,7 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt as _, TryStreamExt as _}; use fvm_ipld_blockstore::Blockstore; use tokio::io::{AsyncBufRead, AsyncSeek, BufReader}; @@ -12,13 +12,13 @@ use crate::utils::db::car_stream::{CarBlock, CarStream, CarV1Header}; /// The block store is not restored to its original state in case of errors. pub async fn load_car(db: &impl Blockstore, reader: R) -> anyhow::Result where - R: AsyncBufRead + Unpin, + R: AsyncBufRead + AsyncSeek + Unpin, { let mut stream = CarStream::new(BufReader::new(reader)).await?; while let Some(block) = stream.try_next().await? { db.put_keyed(&block.cid, &block.data)?; } - Ok(stream.header) + Ok(stream.header_v1) } pub fn merge_car_streams( @@ -39,6 +39,8 @@ pub fn dedup_block_stream( #[cfg(test)] mod tests { + use std::io::Cursor; + use super::*; use crate::block_on; use crate::utils::db::car_stream::CarWriter; @@ -116,7 +118,7 @@ mod tests { fn blocks_roundtrip(blocks: Blocks) -> anyhow::Result<()> { block_on(async move { let car = blocks.into_forest_car_zst_bytes().await; - let reader = CarStream::new(std::io::Cursor::new(&car)).await?; + let reader = CarStream::new(Cursor::new(car.as_slice())).await?; let blocks2 = Blocks(reader.try_collect().await?); let car2 = blocks2.into_forest_car_zst_bytes().await; @@ -130,7 +132,7 @@ mod tests { fn car_writer_roundtrip(blocks1: Blocks) -> anyhow::Result<()> { block_on(async move { let (all_roots, car) = blocks1.clone().into_forest_car_zst_bytes_with_roots().await; - let reader = CarStream::new(std::io::Cursor::new(&car)).await?; + let reader = CarStream::new(Cursor::new(car)).await?; let mut buff: Vec = vec![]; let zstd_encoder = ZstdEncoder::new(&mut buff); @@ -138,7 +140,7 @@ mod tests { .forward(CarWriter::new_carv1(all_roots, zstd_encoder)?) .await?; - let stream = CarStream::new(std::io::Cursor::new(buff)).await?; + let stream = CarStream::new(Cursor::new(buff)).await?; let blocks2 = Blocks(stream.try_collect().await?); assert_eq!(blocks1.0, blocks2.0);