diff --git a/.changes/added/3116.md b/.changes/added/3116.md new file mode 100644 index 00000000000..96d67f653dc --- /dev/null +++ b/.changes/added/3116.md @@ -0,0 +1 @@ +Complete coverage of proto block types to cover all cases \ No newline at end of file diff --git a/.changes/fixed/3112.md b/.changes/fixed/3112.md new file mode 100644 index 00000000000..7efa291b31c --- /dev/null +++ b/.changes/fixed/3112.md @@ -0,0 +1 @@ +Use Protobuf types in serialization rather than opaque bytes \ No newline at end of file diff --git a/.gitignore b/.gitignore index 42a58004912..5ec17be005d 100644 --- a/.gitignore +++ b/.gitignore @@ -18,5 +18,6 @@ package-lock.json package.json bin/fuel-core/chainspec/local-testnet/state_transition_bytecode.wasm .DS_Store +.fueldb/ local-testnet/ diff --git a/.typos.toml b/.typos.toml index d6069bd48a2..a6c9bbab2a3 100644 --- a/.typos.toml +++ b/.typos.toml @@ -4,4 +4,6 @@ extend-ignore-identifiers-re = [ "tro", "Tro", "typ", + "aloc", + "ALOC", ] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2636573c7da..f297766eb7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3740,6 +3740,7 @@ dependencies = [ "log", "num_enum", "postcard", + "proptest", "prost 0.14.1", "rand 0.8.5", "serde", @@ -4375,6 +4376,7 @@ dependencies = [ "k256", "parking_lot", "postcard", + "proptest", "rand 0.8.5", "secrecy", "serde", diff --git a/crates/services/block_aggregator_api/Cargo.toml b/crates/services/block_aggregator_api/Cargo.toml index 312e9e3fcfc..03342654df9 100644 --- a/crates/services/block_aggregator_api/Cargo.toml +++ b/crates/services/block_aggregator_api/Cargo.toml @@ -10,6 +10,9 @@ repository = { workspace = true } rust-version = { workspace = true } build = "build.rs" +[features] +fault-proving = ["fuel-core-types/fault-proving"] + [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } @@ -22,7 +25,7 @@ futures = { workspace = true } log = "0.4.27" num_enum = { workspace = true } postcard = { workspace = true } -prost = { workspace = true } +prost = { workspace = true, features = ["derive"] } rand = { workspace = true } serde = { workspace = true, features = ["derive"] } strum = { workspace = true } @@ -41,5 +44,6 @@ tonic-prost-build = { workspace = true } fuel-core-services = { workspace = true, features = ["test-helpers"] } fuel-core-storage = { workspace = true, features = ["test-helpers"] } fuel-core-types = { workspace = true, features = ["std", "test-helpers"] } +proptest = { workspace = true } tokio-stream = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/services/block_aggregator_api/build.rs b/crates/services/block_aggregator_api/build.rs index c438a06453f..190a1538000 100644 --- a/crates/services/block_aggregator_api/build.rs +++ b/crates/services/block_aggregator_api/build.rs @@ -1,4 +1,7 @@ fn main() -> Result<(), Box> { - tonic_prost_build::compile_protos("proto/api.proto")?; + tonic_prost_build::configure() + .type_attribute(".", "#[derive(serde::Serialize,serde::Deserialize)]") + .type_attribute(".", "#[allow(clippy::large_enum_variant)]") + .compile_protos(&["proto/api.proto"], &["proto/"])?; Ok(()) } diff --git a/crates/services/block_aggregator_api/proptest-regressions/blocks/importer_and_db_source/serializer_adapter.txt b/crates/services/block_aggregator_api/proptest-regressions/blocks/importer_and_db_source/serializer_adapter.txt new file mode 100644 index 00000000000..45867ce5dfb --- /dev/null +++ b/crates/services/block_aggregator_api/proptest-regressions/blocks/importer_and_db_source/serializer_adapter.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 3d8a1dc0826956e2454ff1a3d6b8d75c5b5b0eebe2986c5668745ffb2bb9b0e4 # shrinks to block = V1(BlockV1 { header: V1(BlockHeaderV1 { application: ApplicationHeader { da_height: DaBlockHeight(0), consensus_parameters_version: 0, state_transition_bytecode_version: 31, generated: GeneratedApplicationFieldsV1 { transactions_count: 0, message_receipt_count: 0, transactions_root: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, message_outbox_root: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, event_inbox_root: 0000000000000000000000000000000000000000000000000000000000000000 } }, consensus: ConsensusHeader { prev_root: 0000000000000000000000000000000000000000000000000000000000000000, height: 00000000, time: Tai64(4611686018427387914), generated: GeneratedConsensusFields { application_hash: cda084575be17b88d98713807263d2f5b4ffbe79ba9a4fbf544bf6493a1d641a } }, metadata: Some(BlockHeaderMetadata { id: BlockId(c636fad695fad5e9211cd08b2cb66c024d7b972572cb1005c6ab56aeca4f34b4) }) }), transactions: [Script(ChargeableTransaction { body: ScriptBody { script_gas_limit: 0, receipts_root: 0000000000000000000000000000000000000000000000000000000000000000, script: ScriptCode { bytes: Bytes(24400000) }, script_data: Bytes() }, policies: Policies { bits: PoliciesBits(WitnessLimit | Maturity), values: [0, 10000, 0, 0, 0, 0] }, inputs: [], outputs: [], witnesses: [], metadata: None })] }) diff --git a/crates/services/block_aggregator_api/proto/api.proto b/crates/services/block_aggregator_api/proto/api.proto index 1e34a8fa8de..b478c8b69b2 100644 --- a/crates/services/block_aggregator_api/proto/api.proto +++ b/crates/services/block_aggregator_api/proto/api.proto @@ -14,9 +14,653 @@ message BlockRangeRequest { } message Block { - bytes data = 1; + oneof versioned_block { + V1Block v1 = 1; + } +} + +message V1Block { + Header header = 1; + repeated Transaction transactions = 2; +} + +message Header { + oneof versioned_header { + V1Header v1 = 1; + V2Header v2 = 2; + } +} + +// pub struct BlockHeaderV1 { +// /// The application header. +// pub(crate) application: ApplicationHeader, +// /// The consensus header. +// pub(crate) consensus: ConsensusHeader, +// /// The header metadata calculated during creation. +// /// The field is pub(crate) to enforce the use of the [`PartialBlockHeader::generate`] method. +// #[cfg_attr(feature = "serde", serde(skip))] +// #[educe(PartialEq(ignore))] +// pub(crate) metadata: Option, +//} +// pub struct ApplicationHeader { +// /// The layer 1 height of messages and events to include since the last layer 1 block number. +// /// This is not meant to represent the layer 1 block this was committed to. Validators will need +// /// to have some rules in place to ensure the block number was chosen in a reasonable way. For +// /// example, they should verify that the block number satisfies the finality requirements of the +// /// layer 1 chain. They should also verify that the block number isn't too stale and is increasing. +// /// Some similar concerns are noted in this issue: +// pub da_height: DaBlockHeight, +// /// The version of the consensus parameters used to execute this block. +// pub consensus_parameters_version: ConsensusParametersVersion, +// /// The version of the state transition bytecode used to execute this block. +// pub state_transition_bytecode_version: StateTransitionBytecodeVersion, +// /// Generated application fields. +// pub generated: Generated, +//} +// pub struct GeneratedApplicationFieldsV1 { +// /// Number of transactions in this block. +// pub transactions_count: u16, +// /// Number of message receipts in this block. +// pub message_receipt_count: u32, +// /// Merkle root of transactions. +// pub transactions_root: Bytes32, +// /// Merkle root of message receipts in this block. +// pub message_outbox_root: Bytes32, +// /// Root hash of all imported events from L1 +// pub event_inbox_root: Bytes32, +//} +// pub struct ConsensusHeader { +// /// Merkle root of all previous block header hashes. +// pub prev_root: Bytes32, +// /// Fuel block height. +// pub height: BlockHeight, +// /// The block producer time. +// pub time: Tai64, +// /// generated consensus fields. +// pub generated: Generated, +//} +// pub struct GeneratedConsensusFields { +// /// Hash of the application header. +// pub application_hash: Bytes32, +//} +// pub struct BlockHeaderMetadata { +// /// Hash of the header. +// id: BlockId, +//} +message V1Header { + uint64 da_height = 1; + uint32 consensus_parameters_version = 2; + uint32 state_transition_bytecode_version = 3; + uint32 transactions_count = 4; + uint32 message_receipt_count = 5; + bytes transactions_root = 6; + bytes message_outbox_root = 7; + bytes event_inbox_root = 8; + bytes prev_root = 9; + uint32 height = 10; + uint64 time = 11; + bytes application_hash = 12; + optional bytes block_id = 13; +} + +// pub struct GeneratedApplicationFieldsV2 { +// /// Number of transactions in this block. +// pub transactions_count: u16, +// /// Number of message receipts in this block. +// pub message_receipt_count: u32, +// /// Merkle root of transactions. +// pub transactions_root: Bytes32, +// /// Merkle root of message receipts in this block. +// pub message_outbox_root: Bytes32, +// /// Root hash of all imported events from L1 +// pub event_inbox_root: Bytes32, +// /// TxID commitment +// pub tx_id_commitment: Bytes32, +//} +message V2Header { + uint64 da_height = 1; + uint32 consensus_parameters_version = 2; + uint32 state_transition_bytecode_version = 3; + uint32 transactions_count = 4; + uint32 message_receipt_count = 5; + bytes transactions_root = 6; + bytes message_outbox_root = 7; + bytes event_inbox_root = 8; + bytes tx_id_commitment = 9; + bytes prev_root = 10; + uint32 height = 11; + uint64 time = 12; + bytes application_hash = 13; + optional bytes block_id = 14; +} + +message Transaction { + oneof variant { + ScriptTransaction script = 1; + CreateTransaction create = 2; + MintTransaction mint = 3; + UpgradeTransaction upgrade = 4; + UploadTransaction upload = 5; + BlobTransaction blob = 6; + } +} + +// pub struct ChargeableTransaction +//where +// Body: BodyConstraints, +//{ +// pub(crate) body: Body, +// pub(crate) policies: Policies, +// pub(crate) inputs: Vec, +// pub(crate) outputs: Vec, +// pub(crate) witnesses: Vec, +// #[serde(skip)] +// #[cfg_attr(feature = "da-compression", compress(skip))] +// #[educe(PartialEq(ignore))] +// #[educe(Hash(ignore))] +// #[canonical(skip)] +// pub(crate) metadata: Option>, +//} +// pub struct ScriptBody { +// pub(crate) script_gas_limit: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub(crate) receipts_root: Bytes32, +// pub(crate) script: ScriptCode, +// #[educe(Debug(method(fmt_truncated_hex::<16>)))] +// pub(crate) script_data: Vec, +//} +// #[derive(Default, Debug, Clone, PartialEq, Eq, Hash)] +//pub struct ScriptMetadata { +// pub script_data_offset: usize, +//} +message ScriptTransaction { + uint64 script_gas_limit = 1; + bytes receipts_root = 2; + bytes script = 3; + bytes script_data = 4; + Policies policies = 5; + repeated Input inputs = 6; + repeated Output outputs = 7; + repeated bytes witnesses = 8; + ScriptMetadata metadata = 9; +} + +message CreateTransaction { + uint32 bytecode_witness_index = 1; + bytes salt = 2; + repeated StorageSlot storage_slots = 3; + Policies policies = 4; + repeated Input inputs = 5; + repeated Output outputs = 6; + repeated bytes witnesses = 7; + CreateMetadata metadata = 8; +} + +message MintTransaction { + TxPointer tx_pointer = 1; + ContractInput input_contract = 2; + ContractOutput output_contract = 3; + uint64 mint_amount = 4; + bytes mint_asset_id = 5; + uint64 gas_price = 6; + MintMetadata metadata = 7; +} + +message UpgradeTransaction { + UpgradePurpose purpose = 1; + Policies policies = 2; + repeated Input inputs = 3; + repeated Output outputs = 4; + repeated bytes witnesses = 5; + UpgradeMetadata metadata = 6; +} + +message UploadTransaction { + bytes root = 1; + uint32 witness_index = 2; + uint32 subsection_index = 3; + uint32 subsections_number = 4; + repeated bytes proof_set = 5; + Policies policies = 6; + repeated Input inputs = 7; + repeated Output outputs = 8; + repeated bytes witnesses = 9; + UploadMetadata metadata = 10; } +message BlobTransaction { + bytes blob_id = 1; + uint32 witness_index = 2; + Policies policies = 3; + repeated Input inputs = 4; + repeated Output outputs = 5; + repeated bytes witnesses = 6; + BlobMetadata metadata = 7; +} + +// pub struct Policies { +// /// A bitmask that indicates what policies are set. +// bits: PoliciesBits, +// /// The array of policy values. +// values: [Word; POLICIES_NUMBER], +//} +message Policies { + uint32 bits = 1; + repeated uint64 values = 2; +} + +// pub enum Input { +// CoinSigned(CoinSigned), +// CoinPredicate(CoinPredicate), +// Contract(Contract), +// MessageCoinSigned(MessageCoinSigned), +// MessageCoinPredicate(MessageCoinPredicate), +// MessageDataSigned(MessageDataSigned), +// MessageDataPredicate(MessageDataPredicate), +//} +message Input { + oneof variant { + CoinSignedInput coin_signed = 1; + CoinPredicateInput coin_predicate = 2; + ContractInput contract = 3; + MessageCoinSignedInput message_coin_signed = 4; + MessageCoinPredicateInput message_coin_predicate = 5; + MessageDataSignedInput message_data_signed = 6; + MessageDataPredicateInput message_data_predicate = 7; + } +} + +// pub struct Coin +//where +// Specification: CoinSpecification, +//{ +// pub utxo_id: UtxoId, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub owner: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub amount: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub asset_id: AssetId, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub tx_pointer: TxPointer, +// #[educe(Debug(method(fmt_as_field)))] +// pub witness_index: Specification::Witness, +// /// Exact amount of gas used by the predicate. +// /// If the predicate consumes different amount of gas, +// /// it's considered to be false. +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_gas_used: Specification::PredicateGasUsed, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate: Specification::Predicate, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_data: Specification::PredicateData, +//} +// impl CoinSpecification for Signed { +// type Predicate = Empty; +// type PredicateData = Empty>; +// type PredicateGasUsed = Empty; +// type Witness = u16; +//} +message CoinSignedInput { + UtxoId utxo_id = 1; + bytes owner = 2; + uint64 amount = 3; + bytes asset_id = 4; + TxPointer tx_pointer = 5; + uint32 witness_index = 6; + uint64 predicate_gas_used = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +//impl CoinSpecification for Predicate { +// type Predicate = PredicateCode; +// type PredicateData = Vec; +// type PredicateGasUsed = Word; +// type Witness = Empty; +//} +message CoinPredicateInput { + UtxoId utxo_id = 1; + bytes owner = 2; + uint64 amount = 3; + bytes asset_id = 4; + TxPointer tx_pointer = 5; + uint32 witness_index = 6; + uint64 predicate_gas_used = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// pub struct Contract { +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub utxo_id: UtxoId, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub balance_root: Bytes32, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub state_root: Bytes32, +// /// Pointer to transaction that last modified the contract state. +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub tx_pointer: TxPointer, +// pub contract_id: ContractId, +//} +message ContractInput { + UtxoId utxo_id = 1; + bytes balance_root = 2; + bytes state_root = 3; + TxPointer tx_pointer = 4; + bytes contract_id = 5; +} + +// pub struct Message +//where +// Specification: MessageSpecification, +//{ +// /// The sender from the L1 chain. +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub sender: Address, +// /// The receiver on the `Fuel` chain. +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub recipient: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub amount: Word, +// // Unique identifier of the message +// pub nonce: Nonce, +// #[educe(Debug(method(fmt_as_field)))] +// pub witness_index: Specification::Witness, +// /// Exact amount of gas used by the predicate. +// /// If the predicate consumes different amount of gas, +// /// it's considered to be false. +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_gas_used: Specification::PredicateGasUsed, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// #[educe(Debug(method(fmt_as_field)))] +// pub data: Specification::Data, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate: Specification::Predicate, +// #[educe(Debug(method(fmt_as_field)))] +// pub predicate_data: Specification::PredicateData, +//} +// pub struct MessageCoin(core::marker::PhantomData); +// +// impl MessageSpecification for MessageCoin { +// type Data = Empty>; +// type Predicate = Empty; +// type PredicateData = Empty>; +// type PredicateGasUsed = Empty; +// type Witness = u16; +// } +message MessageCoinSignedInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + bytes nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// impl MessageSpecification for MessageCoin { +// type Data = Empty>; +// type Predicate = PredicateCode; +// type PredicateData = Vec; +// type PredicateGasUsed = Word; +// type Witness = Empty; +// } +message MessageCoinPredicateInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + bytes nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// pub type MessageDataSigned = Message>; +message MessageDataSignedInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + bytes nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// pub type MessageDataPredicate = +// Message>; +message MessageDataPredicateInput { + bytes sender = 1; + bytes recipient = 2; + uint64 amount = 3; + bytes nonce = 4; + uint32 witness_index = 5; + uint64 predicate_gas_used = 6; + bytes data = 7; + bytes predicate = 8; + bytes predicate_data = 9; +} + +// pub enum Output { +// Coin { +// to: Address, +// amount: Word, +// asset_id: AssetId, +// }, +// +// Contract(Contract), +// +// Change { +// to: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// amount: Word, +// asset_id: AssetId, +// }, +// +// Variable { +// #[cfg_attr(feature = "da-compression", compress(skip))] +// to: Address, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// amount: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// asset_id: AssetId, +// }, +// +// ContractCreated { +// contract_id: ContractId, +// state_root: Bytes32, +// }, +//} +message Output { + oneof variant { + CoinOutput coin = 1; + ContractOutput contract = 2; + ChangeOutput change = 3; + VariableOutput variable = 4; + ContractCreatedOutput contract_created = 5; + } +} +message CoinOutput { + bytes to = 1; + uint64 amount = 2; + bytes asset_id = 3; +} +message ContractOutput { + uint32 input_index = 1; + bytes balance_root = 2; + bytes state_root = 3; +} +message ChangeOutput { + bytes to = 1; + uint64 amount = 2; + bytes asset_id = 3; +} +message VariableOutput { + bytes to = 1; + uint64 amount = 2; + bytes asset_id = 3; +} +message ContractCreatedOutput { + bytes contract_id = 1; + bytes state_root = 2; +} + +// pub struct UtxoId { +// /// transaction id +// tx_id: TxId, +// /// output index +// output_index: u16, +//} +message UtxoId { + bytes tx_id = 1; + uint32 output_index = 2; +} + +message TxPointer { + uint32 block_height = 1; + uint32 tx_index = 2; +} + +message StorageSlot { + bytes key = 1; + bytes value = 2; +} + + +// #[derive(Debug, Clone, PartialEq, Eq, Hash)] +//pub struct ChargeableMetadata { +// pub common: CommonMetadata, +// pub body: Body, +//} +// pub struct ScriptBody { +// pub(crate) script_gas_limit: Word, +// #[cfg_attr(feature = "da-compression", compress(skip))] +// pub(crate) receipts_root: Bytes32, +// pub(crate) script: ScriptCode, +// #[educe(Debug(method(fmt_truncated_hex::<16>)))] +// pub(crate) script_data: Vec, +//} +// #[derive(Debug, Clone, PartialEq, Eq, Hash)] +//pub struct CommonMetadata { +// pub id: Bytes32, +// pub inputs_offset: usize, +// pub inputs_offset_at: Vec, +// pub inputs_predicate_offset_at: Vec>, +// pub outputs_offset: usize, +// pub outputs_offset_at: Vec, +// pub witnesses_offset: usize, +// pub witnesses_offset_at: Vec, +//} + +message ScriptMetadata { + bytes id = 1; + uint32 inputs_offset = 2; + repeated uint32 inputs_offset_at = 3; + repeated PredicateOffset inputs_predicate_offset_at = 4; + uint32 outputs_offset = 5; + repeated uint32 outputs_offset_at = 6; + uint32 witnesses_offset = 7; + repeated uint32 witnesses_offset_at = 8; + uint64 script_gas_limit = 9; + bytes receipts_root = 10; + bytes script = 11; + bytes script_data = 12; +} + +message CreateMetadata { + bytes id = 1; + uint32 inputs_offset = 2; + repeated uint32 inputs_offset_at = 3; + repeated PredicateOffset inputs_predicate_offset_at = 4; + uint32 outputs_offset = 5; + repeated uint32 outputs_offset_at = 6; + uint32 witnesses_offset = 7; + repeated uint32 witnesses_offset_at = 8; + bytes contract_id = 9; + bytes contract_root = 10; + bytes state_root = 11; +} + +message MintMetadata { + bytes id = 1; +} + +message UpgradePurpose { + oneof variant { + UpgradeConsensusParameters consensus_parameters = 1; + UpgradeStateTransition state_transition = 2; + } +} + +message UpgradeConsensusParameters { + uint32 witness_index = 1; + bytes checksum = 2; +} + +message UpgradeStateTransition { + bytes root = 1; +} + +message UpgradeMetadata { + bytes id = 1; + uint32 inputs_offset = 2; + repeated uint32 inputs_offset_at = 3; + repeated PredicateOffset inputs_predicate_offset_at = 4; + uint32 outputs_offset = 5; + repeated uint32 outputs_offset_at = 6; + uint32 witnesses_offset = 7; + repeated uint32 witnesses_offset_at = 8; + oneof variant { + UpgradeConsensusParametersMetadata consensus_parameters = 9; + UpgradeStateTransitionMetadata state_transition = 10; + } +} + +message UpgradeConsensusParametersMetadata { + bytes consensus_parameters = 1; + bytes calculated_checksum = 2; +} + +message UpgradeStateTransitionMetadata {} + +message UploadMetadata { + bytes id = 1; + uint32 inputs_offset = 2; + repeated uint32 inputs_offset_at = 3; + repeated PredicateOffset inputs_predicate_offset_at = 4; + uint32 outputs_offset = 5; + repeated uint32 outputs_offset_at = 6; + uint32 witnesses_offset = 7; + repeated uint32 witnesses_offset_at = 8; +} + +message BlobMetadata { + bytes id = 1; + uint32 inputs_offset = 2; + repeated uint32 inputs_offset_at = 3; + repeated PredicateOffset inputs_predicate_offset_at = 4; + uint32 outputs_offset = 5; + repeated uint32 outputs_offset_at = 6; + uint32 witnesses_offset = 7; + repeated uint32 witnesses_offset_at = 8; +} + +message PredicateOffset { + optional InnerPredicateOffset offset = 1; +} + +message InnerPredicateOffset { + uint32 offset = 1; + uint32 length = 2; +} + + message BlockResponse { oneof payload { Block literal = 1; @@ -30,4 +674,4 @@ service BlockAggregator { rpc GetBlockHeight (BlockHeightRequest) returns (BlockHeightResponse); rpc GetBlockRange (BlockRangeRequest) returns (stream BlockResponse); rpc NewBlockSubscription (NewBlockSubscriptionRequest) returns (stream BlockResponse); -} \ No newline at end of file +} diff --git a/crates/services/block_aggregator_api/src/api.rs b/crates/services/block_aggregator_api/src/api.rs index 3cc652bdd09..4beb51c47f3 100644 --- a/crates/services/block_aggregator_api/src/api.rs +++ b/crates/services/block_aggregator_api/src/api.rs @@ -1,7 +1,4 @@ -use crate::{ - NewBlock, - result::Result, -}; +use crate::result::Result; use fuel_core_types::fuel_types::BlockHeight; use std::fmt; @@ -11,14 +8,17 @@ pub mod protobuf_adapter; pub trait BlockAggregatorApi: Send + Sync { /// The type of the block range response. type BlockRangeResponse; + type Block; /// Awaits the next query to the block aggregator service. fn await_query( &mut self, - ) -> impl Future>> + Send; + ) -> impl Future< + Output = Result>, + > + Send; } -pub enum BlockAggregatorQuery { +pub enum BlockAggregatorQuery { GetBlockRange { first: BlockHeight, last: BlockHeight, @@ -29,11 +29,11 @@ pub enum BlockAggregatorQuery { }, // TODO: Do we need a way to unsubscribe or can we just see that the receiver is dropped? NewBlockSubscription { - response: tokio::sync::mpsc::Sender, + response: tokio::sync::mpsc::Sender, }, } -impl fmt::Debug for BlockAggregatorQuery { +impl fmt::Debug for BlockAggregatorQuery { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { BlockAggregatorQuery::GetBlockRange { first, last, .. } => f @@ -52,7 +52,7 @@ impl fmt::Debug for BlockAggregatorQuery { } #[cfg(test)] -impl BlockAggregatorQuery { +impl BlockAggregatorQuery { pub fn get_block_range>( first: H, last: H, @@ -74,7 +74,7 @@ impl BlockAggregatorQuery { (query, receiver) } - pub fn new_block_subscription() -> (Self, tokio::sync::mpsc::Receiver) { + pub fn new_block_subscription() -> (Self, tokio::sync::mpsc::Receiver) { const ARBITRARY_CHANNEL_SIZE: usize = 10; let (sender, receiver) = tokio::sync::mpsc::channel(ARBITRARY_CHANNEL_SIZE); let query = Self::NewBlockSubscription { response: sender }; diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs index 03d5c468655..c944e199917 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs @@ -4,28 +4,42 @@ use crate::{ BlockAggregatorQuery, }, block_range_response::BlockRangeResponse, - result::Result, + protobuf_types::{ + Block as ProtoBlock, + BlockHeightRequest as ProtoBlockHeightRequest, + BlockHeightResponse as ProtoBlockHeightResponse, + BlockRangeRequest as ProtoBlockRangeRequest, + BlockResponse as ProtoBlockResponse, + NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest, + block_aggregator_server::{ + BlockAggregator, + BlockAggregatorServer as ProtoBlockAggregatorServer, + }, + block_response as proto_block_response, + }, + result::{ + Error, + Result, + }, }; use async_trait::async_trait; use futures::StreamExt; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; -tonic::include_proto!("blockaggregator"); - -use crate::result::Error; -use block_aggregator_server::BlockAggregator; - #[cfg(test)] mod tests; pub struct Server { - query_sender: tokio::sync::mpsc::Sender>, + query_sender: + tokio::sync::mpsc::Sender>, } impl Server { pub fn new( - query_sender: tokio::sync::mpsc::Sender>, + query_sender: tokio::sync::mpsc::Sender< + BlockAggregatorQuery, + >, ) -> Self { Self { query_sender } } @@ -35,8 +49,8 @@ impl Server { impl BlockAggregator for Server { async fn get_block_height( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { tracing::debug!("get_block_height: {:?}", request); let (response, receiver) = tokio::sync::oneshot::channel(); let query = BlockAggregatorQuery::GetCurrentHeight { response }; @@ -45,7 +59,7 @@ impl BlockAggregator for Server { })?; let res = receiver.await; match res { - Ok(height) => Ok(tonic::Response::new(BlockHeightResponse { + Ok(height) => Ok(tonic::Response::new(ProtoBlockHeightResponse { height: *height, })), Err(e) => Err(tonic::Status::internal(format!( @@ -54,13 +68,13 @@ impl BlockAggregator for Server { ))), } } - type GetBlockRangeStream = ReceiverStream>; + type GetBlockRangeStream = ReceiverStream>; async fn get_block_range( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - tracing::debug!("get_block_range: {:?}", request); + const ARB_LITERAL_BLOCK_BUFFER_SIZE: usize = 100; let req = request.into_inner(); let (response, receiver) = tokio::sync::oneshot::channel(); let query = BlockAggregatorQuery::GetBlockRange { @@ -76,17 +90,15 @@ impl BlockAggregator for Server { match res { Ok(block_range_response) => match block_range_response { BlockRangeResponse::Literal(inner) => { - let (tx, rx) = - tokio::sync::mpsc::channel::>(16); + let (tx, rx) = tokio::sync::mpsc::channel::< + Result, + >(ARB_LITERAL_BLOCK_BUFFER_SIZE); tokio::spawn(async move { let mut s = inner; - while let Some(block) = s.next().await { - let pb = Block { - data: block.bytes().to_vec(), - }; - let response = BlockResponse { - payload: Some(block_response::Payload::Literal(pb)), + while let Some(pb) = s.next().await { + let response = ProtoBlockResponse { + payload: Some(proto_block_response::Payload::Literal(pb)), }; if tx.send(Ok(response)).await.is_err() { break; @@ -108,11 +120,11 @@ impl BlockAggregator for Server { } } - type NewBlockSubscriptionStream = ReceiverStream>; + type NewBlockSubscriptionStream = ReceiverStream>; async fn new_block_subscription( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { const ARB_CHANNEL_SIZE: usize = 100; tracing::warn!("get_block_range: {:?}", request); @@ -126,11 +138,8 @@ impl BlockAggregator for Server { let (task_sender, task_receiver) = tokio::sync::mpsc::channel(ARB_CHANNEL_SIZE); tokio::spawn(async move { while let Some(nb) = receiver.recv().await { - let block = Block { - data: nb.block.bytes().to_vec(), - }; - let response = BlockResponse { - payload: Some(block_response::Payload::Literal(block)), + let response = ProtoBlockResponse { + payload: Some(proto_block_response::Payload::Literal(nb)), }; if task_sender.send(Ok(response)).await.is_err() { break; @@ -145,19 +154,21 @@ impl BlockAggregator for Server { pub struct ProtobufAPI { _server_task_handle: tokio::task::JoinHandle<()>, shutdown_sender: Option>, - query_receiver: tokio::sync::mpsc::Receiver>, + query_receiver: + tokio::sync::mpsc::Receiver>, } impl ProtobufAPI { pub fn new(url: String) -> Self { - let (query_sender, query_receiver) = - tokio::sync::mpsc::channel::>(100); + let (query_sender, query_receiver) = tokio::sync::mpsc::channel::< + BlockAggregatorQuery, + >(100); let server = Server::new(query_sender); let addr = url.parse().unwrap(); let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>(); let _server_task_handle = tokio::spawn(async move { let service = tonic::transport::Server::builder() - .add_service(block_aggregator_server::BlockAggregatorServer::new(server)); + .add_service(ProtoBlockAggregatorServer::new(server)); tokio::select! { res = service.serve(addr) => { if let Err(e) = res { @@ -181,10 +192,11 @@ impl ProtobufAPI { impl BlockAggregatorApi for ProtobufAPI { type BlockRangeResponse = BlockRangeResponse; + type Block = ProtoBlock; async fn await_query( &mut self, - ) -> Result> { + ) -> Result> { let query = self .query_receiver .recv() diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs index 1617090a7dd..7807ac02180 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs @@ -1,24 +1,32 @@ #![allow(non_snake_case)] use crate::{ - NewBlock, api::{ BlockAggregatorApi, BlockAggregatorQuery, - protobuf_adapter::{ - BlockHeightRequest, - BlockRangeRequest, - NewBlockSubscriptionRequest, - ProtobufAPI, - block_aggregator_client::BlockAggregatorClient, - block_response::Payload, - }, + protobuf_adapter::ProtobufAPI, }, block_range_response::BlockRangeResponse, - blocks::Block, + blocks::importer_and_db_source::{ + BlockSerializer, + serializer_adapter::SerializerAdapter, + }, + protobuf_types::{ + Block as ProtoBlock, + BlockHeightRequest, + BlockRangeRequest, + NewBlockSubscriptionRequest, + block_aggregator_client::{ + BlockAggregatorClient as ProtoBlockAggregatorClient, + BlockAggregatorClient, + }, + block_response::Payload, + }, +}; +use fuel_core_types::{ + blockchain::block::Block as FuelBlock, + fuel_types::BlockHeight, }; -use bytes::Bytes; -use fuel_core_types::fuel_types::BlockHeight; use futures::{ StreamExt, TryStreamExt, @@ -40,7 +48,7 @@ async fn await_query__get_current_height__client_receives_expected_value() { // call get current height endpoint with client let url = format!("http://{}", path); - let mut client = BlockAggregatorClient::connect(url.to_string()) + let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) .await .expect("could not connect to server"); let handle = tokio::spawn(async move { @@ -77,7 +85,7 @@ async fn await_query__get_block_range__client_receives_expected_value() { // call get current height endpoint with client let url = format!("http://{}", path); - let mut client = BlockAggregatorClient::connect(url.to_string()) + let mut client = ProtoBlockAggregatorClient::connect(url.to_string()) .await .expect("could not connect to server"); let request = BlockRangeRequest { start: 0, end: 1 }; @@ -94,8 +102,17 @@ async fn await_query__get_block_range__client_receives_expected_value() { let query = api.await_query().await.unwrap(); // then - let block1 = Block::new(Bytes::from(vec![0u8; 100])); - let block2 = Block::new(Bytes::from(vec![1u8; 100])); + let serializer_adapter = SerializerAdapter; + let fuel_block_1 = FuelBlock::default(); + let mut fuel_block_2 = FuelBlock::default(); + let block_height_2 = fuel_block_1.header().height().succ().unwrap(); + fuel_block_2.header_mut().set_block_height(block_height_2); + let block1 = serializer_adapter + .serialize_block(&fuel_block_1) + .expect("could not serialize block"); + let block2 = serializer_adapter + .serialize_block(&fuel_block_2) + .expect("could not serialize block"); let list = vec![block1, block2]; // return response through query's channel if let BlockAggregatorQuery::GetBlockRange { @@ -115,8 +132,8 @@ async fn await_query__get_block_range__client_receives_expected_value() { } tracing::info!("awaiting query"); let response = handle.await.unwrap(); - let expected: Vec> = list.iter().map(|b| b.bytes().to_vec()).collect(); - let actual: Vec> = response + let expected = list; + let actual: Vec = response .into_inner() .try_collect::>() .await @@ -124,7 +141,7 @@ async fn await_query__get_block_range__client_receives_expected_value() { .into_iter() .map(|b| { if let Some(Payload::Literal(inner)) = b.payload { - inner.data.to_vec() + inner } else { panic!("unexpected response type") } @@ -162,22 +179,30 @@ async fn await_query__new_block_stream__client_receives_expected_value() { // then let height1 = BlockHeight::new(0); let height2 = BlockHeight::new(1); - let block1 = Block::new(Bytes::from(vec![0u8; 100])); - let block2 = Block::new(Bytes::from(vec![1u8; 100])); - let list = vec![(height1, block1), (height2, block2)]; + let serializer_adapter = SerializerAdapter; + let mut fuel_block_1 = FuelBlock::default(); + fuel_block_1.header_mut().set_block_height(height1); + let mut fuel_block_2 = FuelBlock::default(); + fuel_block_2.header_mut().set_block_height(height2); + let block1 = serializer_adapter + .serialize_block(&fuel_block_1) + .expect("could not serialize block"); + let block2 = serializer_adapter + .serialize_block(&fuel_block_2) + .expect("could not serialize block"); + let list = vec![block1, block2]; if let BlockAggregatorQuery::NewBlockSubscription { response } = query { tracing::info!("correct query received, sending response"); - for (height, block) in list.clone() { - let new_block = NewBlock::new(height, block); - response.send(new_block).await.unwrap(); + for block in list.clone() { + response.send(block).await.unwrap(); } } else { panic!("expected GetBlockRange query"); } tracing::info!("awaiting query"); let response = handle.await.unwrap(); - let expected: Vec> = list.iter().map(|(_, b)| b.bytes().to_vec()).collect(); - let actual: Vec> = response + let expected = list; + let actual: Vec = response .into_inner() .try_collect::>() .await @@ -185,7 +210,7 @@ async fn await_query__new_block_stream__client_receives_expected_value() { .into_iter() .map(|b| { if let Some(Payload::Literal(inner)) = b.payload { - inner.data.to_vec() + inner } else { panic!("unexpected response type") } diff --git a/crates/services/block_aggregator_api/src/block_aggregator.rs b/crates/services/block_aggregator_api/src/block_aggregator.rs index a271c129b8e..4fde80d22b7 100644 --- a/crates/services/block_aggregator_api/src/block_aggregator.rs +++ b/crates/services/block_aggregator_api/src/block_aggregator.rs @@ -1,6 +1,5 @@ use crate::{ BlockAggregator, - NewBlock, api::{ BlockAggregatorApi, BlockAggregatorQuery, @@ -17,11 +16,12 @@ use fuel_core_services::{ }; use fuel_core_types::fuel_types::BlockHeight; -impl BlockAggregator +impl BlockAggregator where Api: BlockAggregatorApi, - DB: BlockAggregatorDB, + DB: BlockAggregatorDB, Blocks: BlockSource, + ::Block: Clone + std::fmt::Debug, BlockRangeResponse: Send, { pub fn new(query: Api, database: DB, block_source: Blocks) -> Self { @@ -40,7 +40,9 @@ where pub async fn handle_query( &mut self, - res: crate::result::Result>, + res: crate::result::Result< + BlockAggregatorQuery, + >, ) -> TaskNextAction { tracing::debug!("Handling query: {res:?}"); let query = try_or_stop!(res, |e| { @@ -98,7 +100,7 @@ where async fn handle_new_block_subscription( &mut self, - response: tokio::sync::mpsc::Sender, + response: tokio::sync::mpsc::Sender, ) -> TaskNextAction { self.new_block_subscriptions.push(response); TaskNextAction::Continue @@ -106,8 +108,11 @@ where pub async fn handle_block( &mut self, - res: crate::result::Result, - ) -> TaskNextAction { + res: crate::result::Result::Block>>, + ) -> TaskNextAction + where + ::Block: std::fmt::Debug, + { tracing::debug!("Handling block: {res:?}"); let event = try_or_stop!(res, |e| { tracing::error!("Error receiving block from source: {e:?}"); @@ -115,7 +120,7 @@ where let (id, block) = match event { BlockSourceEvent::NewBlock(id, block) => { self.new_block_subscriptions.retain_mut(|sub| { - let send_res = sub.try_send(NewBlock::new(id, block.clone())); + let send_res = sub.try_send(block.clone()); match send_res { Ok(_) => true, Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { diff --git a/crates/services/block_aggregator_api/src/block_range_response.rs b/crates/services/block_aggregator_api/src/block_range_response.rs index 5e071bc3328..24e78af6ff4 100644 --- a/crates/services/block_aggregator_api/src/block_range_response.rs +++ b/crates/services/block_aggregator_api/src/block_range_response.rs @@ -1,4 +1,4 @@ -use crate::blocks::Block; +use crate::protobuf_types::Block as ProtoBlock; use fuel_core_services::stream::Stream; pub type BoxStream = core::pin::Pin + Send + 'static>>; @@ -6,7 +6,7 @@ pub type BoxStream = core::pin::Pin + Send + 'static /// The response to a block range query, either as a literal stream of blocks or as a remote URL pub enum BlockRangeResponse { /// A literal stream of blocks - Literal(BoxStream), + Literal(BoxStream), /// A remote URL where the blocks can be fetched Remote(String), } diff --git a/crates/services/block_aggregator_api/src/blocks.rs b/crates/services/block_aggregator_api/src/blocks.rs index de56f280975..fb8dc76a9c1 100644 --- a/crates/services/block_aggregator_api/src/blocks.rs +++ b/crates/services/block_aggregator_api/src/blocks.rs @@ -7,17 +7,20 @@ pub mod importer_and_db_source; /// Source from which blocks can be gathered for aggregation pub trait BlockSource: Send + Sync { + type Block; /// Asynchronously fetch the next block and its height - fn next_block(&mut self) -> impl Future> + Send; + fn next_block( + &mut self, + ) -> impl Future>> + Send; /// Drain any remaining blocks from the source fn drain(&mut self) -> impl Future> + Send; } #[derive(Debug, Eq, PartialEq, Hash)] -pub enum BlockSourceEvent { - NewBlock(BlockHeight, Block), - OldBlock(BlockHeight, Block), +pub enum BlockSourceEvent { + NewBlock(BlockHeight, B), + OldBlock(BlockHeight, B), } #[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs index 48450b118f1..892b2b40120 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs @@ -1,6 +1,5 @@ use crate::{ blocks::{ - Block, BlockSource, BlockSourceEvent, importer_and_db_source::importer_service::ImporterTask, @@ -37,21 +36,23 @@ mod tests; pub mod serializer_adapter; pub trait BlockSerializer { - fn serialize_block(&self, block: &FuelBlock) -> Result; + type Block; + fn serialize_block(&self, block: &FuelBlock) -> Result; } pub struct ImporterAndDbSource where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: Send + Sync + 'static, DB: StorageInspect, DB: StorageInspect, E: std::fmt::Debug + Send, { - importer_task: ServiceRunner>, - sync_task: ServiceRunner>, + importer_task: ServiceRunner>, + sync_task: ServiceRunner>, /// Receive blocks from the importer and sync tasks - receiver: tokio::sync::mpsc::Receiver, + receiver: tokio::sync::mpsc::Receiver>, _error_marker: std::marker::PhantomData, } @@ -59,6 +60,7 @@ where impl ImporterAndDbSource where Serializer: BlockSerializer + Clone + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: StorageInspect + Send + Sync, DB: StorageInspect + Send + 'static, E: std::fmt::Debug + Send, @@ -103,12 +105,15 @@ where impl BlockSource for ImporterAndDbSource where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: Send + Sync, DB: StorageInspect, DB: StorageInspect, E: std::fmt::Debug + Send + Sync, { - async fn next_block(&mut self) -> Result { + type Block = Serializer::Block; + + async fn next_block(&mut self) -> Result> { tracing::debug!("awaiting next block"); tokio::select! { block_res = self.receiver.recv() => { diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs index 500d7d0de08..74151e2a0c7 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/importer_service.rs @@ -18,21 +18,22 @@ use fuel_core_types::{ use futures::StreamExt; use tokio::sync::mpsc::Sender; -pub struct ImporterTask { +pub struct ImporterTask { importer: BoxStream, serializer: Serializer, - block_return_sender: Sender, + block_return_sender: Sender>, new_end_sender: Option>, } -impl ImporterTask +impl ImporterTask where Serializer: BlockSerializer + Send, + ::Block: Send, { pub fn new( importer: BoxStream, serializer: Serializer, - block_return: Sender, + block_return: Sender>, new_end_sender: Option>, ) -> Self { Self { @@ -43,9 +44,10 @@ where } } } -impl RunnableTask for ImporterTask +impl RunnableTask for ImporterTask where Serializer: BlockSerializer + Send + Sync, + ::Block: Send, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { @@ -61,7 +63,7 @@ where } } -impl ImporterTask +impl ImporterTask where Serializer: BlockSerializer + Send + Sync, { @@ -110,9 +112,10 @@ where } #[async_trait::async_trait] -impl RunnableService for ImporterTask +impl RunnableService for ImporterTask where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + 'static, { const NAME: &'static str = "BlockSourceImporterTask"; type SharedData = (); diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs index 028c66081bb..fa7e7db2d8f 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/serializer_adapter.rs @@ -1,23 +1,1524 @@ +#[cfg(feature = "fault-proving")] +use crate::protobuf_types::V2Header as ProtoV2Header; use crate::{ - blocks::{ - Block, - importer_and_db_source::BlockSerializer, + blocks::importer_and_db_source::BlockSerializer, + protobuf_types::{ + BlobTransaction as ProtoBlobTx, + Block as ProtoBlock, + ChangeOutput as ProtoChangeOutput, + CoinOutput as ProtoCoinOutput, + CoinPredicateInput as ProtoCoinPredicateInput, + CoinSignedInput as ProtoCoinSignedInput, + ContractCreatedOutput as ProtoContractCreatedOutput, + ContractInput as ProtoContractInput, + ContractOutput as ProtoContractOutput, + CreateTransaction as ProtoCreateTx, + Header as ProtoHeader, + Input as ProtoInput, + MessageCoinPredicateInput as ProtoMessageCoinPredicateInput, + MessageCoinSignedInput as ProtoMessageCoinSignedInput, + MessageDataPredicateInput as ProtoMessageDataPredicateInput, + MessageDataSignedInput as ProtoMessageDataSignedInput, + MintTransaction as ProtoMintTx, + Output as ProtoOutput, + Policies as ProtoPolicies, + ScriptTransaction as ProtoScriptTx, + StorageSlot as ProtoStorageSlot, + Transaction as ProtoTransaction, + TxPointer as ProtoTxPointer, + UpgradeConsensusParameters as ProtoUpgradeConsensusParameters, + UpgradePurpose as ProtoUpgradePurpose, + UpgradeStateTransition as ProtoUpgradeStateTransition, + UpgradeTransaction as ProtoUpgradeTx, + UploadTransaction as ProtoUploadTx, + UtxoId as ProtoUtxoId, + V1Block as ProtoV1Block, + V1Header as ProtoV1Header, + VariableOutput as ProtoVariableOutput, + block::VersionedBlock as ProtoVersionedBlock, + header::VersionedHeader as ProtoVersionedHeader, + input::Variant as ProtoInputVariant, + output::Variant as ProtoOutputVariant, + transaction::Variant as ProtoTransactionVariant, + upgrade_purpose::Variant as ProtoUpgradePurposeVariant, + }, + result::{ + Error, + Result, }, - result::Error, }; - use anyhow::anyhow; -use fuel_core_types::blockchain::block::Block as FuelBlock; -use postcard::to_allocvec; +#[cfg(feature = "fault-proving")] +use fuel_core_types::{ + blockchain::header::BlockHeaderV2, + fuel_types::ChainId, +}; + +use fuel_core_types::{ + blockchain::{ + block::Block as FuelBlock, + header::{ + ApplicationHeader, + BlockHeader, + BlockHeaderV1, + ConsensusHeader, + GeneratedConsensusFields, + PartialBlockHeader, + }, + primitives::{ + BlockId, + DaBlockHeight, + Empty, + }, + }, + fuel_tx::{ + Address, + BlobBody, + Bytes32, + Input, + Output, + StorageSlot, + Transaction as FuelTransaction, + TxPointer, + UpgradePurpose, + UploadBody, + UtxoId, + Witness, + field::{ + BlobId as _, + BytecodeRoot as _, + BytecodeWitnessIndex as _, + InputContract as _, + Inputs, + MintAmount as _, + MintAssetId as _, + MintGasPrice as _, + OutputContract as _, + Outputs, + Policies as _, + ProofSet as _, + ReceiptsRoot as _, + Salt as _, + Script as _, + ScriptData as _, + ScriptGasLimit as _, + StorageSlots as _, + SubsectionIndex as _, + SubsectionsNumber as _, + TxPointer as TxPointerField, + UpgradePurpose as UpgradePurposeField, + Witnesses as _, + }, + policies::{ + Policies as FuelPolicies, + PoliciesBits, + PolicyType, + }, + }, + tai64, +}; #[derive(Clone)] pub struct SerializerAdapter; impl BlockSerializer for SerializerAdapter { - fn serialize_block(&self, block: &FuelBlock) -> crate::result::Result { - let bytes_vec = to_allocvec(block).map_err(|e| { - Error::BlockSource(anyhow!("failed to serialize block: {}", e)) - })?; - Ok(crate::blocks::Block::from(bytes_vec)) + type Block = ProtoBlock; + + fn serialize_block(&self, block: &FuelBlock) -> crate::result::Result { + // TODO: Should this be owned to begin with? + let (header, txs) = block.clone().into_inner(); + let proto_header = proto_header_from_header(header); + match &block { + FuelBlock::V1(_) => { + let proto_v1_block = ProtoV1Block { + header: Some(proto_header), + transactions: txs.into_iter().map(proto_tx_from_tx).collect(), + }; + Ok(ProtoBlock { + versioned_block: Some(ProtoVersionedBlock::V1(proto_v1_block)), + }) + } + } + } +} + +fn proto_header_from_header(header: BlockHeader) -> ProtoHeader { + let block_id = header.id(); + let consensus = *header.consensus(); + let versioned_header = match header { + BlockHeader::V1(header) => { + let proto_v1_header = + proto_v1_header_from_v1_header(consensus, block_id, header); + ProtoVersionedHeader::V1(proto_v1_header) + } + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => { + let proto_v2_header = + proto_v2_header_from_v2_header(consensus, block_id, header); + ProtoVersionedHeader::V2(proto_v2_header) + } + }; + + ProtoHeader { + versioned_header: Some(versioned_header), + } +} + +fn proto_v1_header_from_v1_header( + consensus: ConsensusHeader, + block_id: BlockId, + header: BlockHeaderV1, +) -> ProtoV1Header { + let application = header.application(); + let generated = application.generated; + + ProtoV1Header { + da_height: application.da_height.0, + consensus_parameters_version: application.consensus_parameters_version, + state_transition_bytecode_version: application.state_transition_bytecode_version, + transactions_count: u32::from(generated.transactions_count), + message_receipt_count: generated.message_receipt_count, + transactions_root: bytes32_to_vec(&generated.transactions_root), + message_outbox_root: bytes32_to_vec(&generated.message_outbox_root), + event_inbox_root: bytes32_to_vec(&generated.event_inbox_root), + prev_root: bytes32_to_vec(&consensus.prev_root), + height: u32::from(consensus.height), + time: consensus.time.0, + application_hash: bytes32_to_vec(&consensus.generated.application_hash), + block_id: Some(block_id.as_slice().to_vec()), + } +} + +#[cfg(feature = "fault-proving")] +fn proto_v2_header_from_v2_header( + consensus: ConsensusHeader, + block_id: BlockId, + header: BlockHeaderV2, +) -> ProtoV2Header { + let application = *header.application(); + let generated = application.generated; + + ProtoV2Header { + da_height: application.da_height.0, + consensus_parameters_version: application.consensus_parameters_version, + state_transition_bytecode_version: application.state_transition_bytecode_version, + transactions_count: u32::from(generated.transactions_count), + message_receipt_count: generated.message_receipt_count, + transactions_root: bytes32_to_vec(&generated.transactions_root), + message_outbox_root: bytes32_to_vec(&generated.message_outbox_root), + event_inbox_root: bytes32_to_vec(&generated.event_inbox_root), + tx_id_commitment: bytes32_to_vec(&generated.tx_id_commitment), + prev_root: bytes32_to_vec(&consensus.prev_root), + height: u32::from(consensus.height), + time: consensus.time.0, + application_hash: bytes32_to_vec(&consensus.generated.application_hash), + block_id: Some(block_id.as_slice().to_vec()), + } +} + +fn proto_tx_from_tx(tx: FuelTransaction) -> ProtoTransaction { + match tx { + FuelTransaction::Script(script) => { + let proto_script = ProtoScriptTx { + script_gas_limit: *script.script_gas_limit(), + receipts_root: bytes32_to_vec(script.receipts_root()), + script: script.script().clone(), + script_data: script.script_data().clone(), + policies: Some(proto_policies_from_policies(script.policies())), + inputs: script + .inputs() + .iter() + .cloned() + .map(proto_input_from_input) + .collect(), + outputs: script + .outputs() + .iter() + .cloned() + .map(proto_output_from_output) + .collect(), + witnesses: script + .witnesses() + .iter() + .map(|witness| witness.as_ref().to_vec()) + .collect(), + metadata: None, + }; + + ProtoTransaction { + variant: Some(ProtoTransactionVariant::Script(proto_script)), + } + } + FuelTransaction::Create(create) => { + let proto_create = ProtoCreateTx { + bytecode_witness_index: u32::from(*create.bytecode_witness_index()), + salt: create.salt().as_ref().to_vec(), + storage_slots: create + .storage_slots() + .iter() + .map(proto_storage_slot_from_storage_slot) + .collect(), + policies: Some(proto_policies_from_policies(create.policies())), + inputs: create + .inputs() + .iter() + .cloned() + .map(proto_input_from_input) + .collect(), + outputs: create + .outputs() + .iter() + .cloned() + .map(proto_output_from_output) + .collect(), + witnesses: create + .witnesses() + .iter() + .map(|witness| witness.as_ref().to_vec()) + .collect(), + metadata: None, + }; + + ProtoTransaction { + variant: Some(ProtoTransactionVariant::Create(proto_create)), + } + } + FuelTransaction::Mint(mint) => { + let proto_mint = ProtoMintTx { + tx_pointer: Some(proto_tx_pointer(mint.tx_pointer())), + input_contract: Some(proto_contract_input_from_contract( + mint.input_contract(), + )), + output_contract: Some(proto_contract_output_from_contract( + mint.output_contract(), + )), + mint_amount: *mint.mint_amount(), + mint_asset_id: mint.mint_asset_id().as_ref().to_vec(), + gas_price: *mint.gas_price(), + metadata: None, + }; + + ProtoTransaction { + variant: Some(ProtoTransactionVariant::Mint(proto_mint)), + } + } + FuelTransaction::Upgrade(upgrade) => { + let proto_upgrade = ProtoUpgradeTx { + purpose: Some(proto_upgrade_purpose(upgrade.upgrade_purpose())), + policies: Some(proto_policies_from_policies(upgrade.policies())), + inputs: upgrade + .inputs() + .iter() + .cloned() + .map(proto_input_from_input) + .collect(), + outputs: upgrade + .outputs() + .iter() + .cloned() + .map(proto_output_from_output) + .collect(), + witnesses: upgrade + .witnesses() + .iter() + .map(|witness| witness.as_ref().to_vec()) + .collect(), + metadata: None, + }; + + ProtoTransaction { + variant: Some(ProtoTransactionVariant::Upgrade(proto_upgrade)), + } + } + FuelTransaction::Upload(upload) => { + let proto_upload = ProtoUploadTx { + root: bytes32_to_vec(upload.bytecode_root()), + witness_index: u32::from(*upload.bytecode_witness_index()), + subsection_index: u32::from(*upload.subsection_index()), + subsections_number: u32::from(*upload.subsections_number()), + proof_set: upload.proof_set().iter().map(bytes32_to_vec).collect(), + policies: Some(proto_policies_from_policies(upload.policies())), + inputs: upload + .inputs() + .iter() + .cloned() + .map(proto_input_from_input) + .collect(), + outputs: upload + .outputs() + .iter() + .cloned() + .map(proto_output_from_output) + .collect(), + witnesses: upload + .witnesses() + .iter() + .map(|witness| witness.as_ref().to_vec()) + .collect(), + metadata: None, + }; + + ProtoTransaction { + variant: Some(ProtoTransactionVariant::Upload(proto_upload)), + } + } + FuelTransaction::Blob(blob) => { + let proto_blob = ProtoBlobTx { + blob_id: blob.blob_id().as_ref().to_vec(), + witness_index: u32::from(*blob.bytecode_witness_index()), + policies: Some(proto_policies_from_policies(blob.policies())), + inputs: blob + .inputs() + .iter() + .cloned() + .map(proto_input_from_input) + .collect(), + outputs: blob + .outputs() + .iter() + .cloned() + .map(proto_output_from_output) + .collect(), + witnesses: blob + .witnesses() + .iter() + .map(|witness| witness.as_ref().to_vec()) + .collect(), + metadata: None, + }; + + ProtoTransaction { + variant: Some(ProtoTransactionVariant::Blob(proto_blob)), + } + } + } +} + +fn proto_input_from_input(input: Input) -> ProtoInput { + match input { + Input::CoinSigned(coin_signed) => ProtoInput { + variant: Some(ProtoInputVariant::CoinSigned(ProtoCoinSignedInput { + utxo_id: Some(proto_utxo_id_from_utxo_id(&coin_signed.utxo_id)), + owner: coin_signed.owner.as_ref().to_vec(), + amount: coin_signed.amount, + asset_id: coin_signed.asset_id.as_ref().to_vec(), + tx_pointer: Some(proto_tx_pointer(&coin_signed.tx_pointer)), + witness_index: coin_signed.witness_index.into(), + predicate_gas_used: 0, + predicate: vec![], + predicate_data: vec![], + })), + }, + Input::CoinPredicate(coin_predicate) => ProtoInput { + variant: Some(ProtoInputVariant::CoinPredicate(ProtoCoinPredicateInput { + utxo_id: Some(proto_utxo_id_from_utxo_id(&coin_predicate.utxo_id)), + owner: coin_predicate.owner.as_ref().to_vec(), + amount: coin_predicate.amount, + asset_id: coin_predicate.asset_id.as_ref().to_vec(), + tx_pointer: Some(proto_tx_pointer(&coin_predicate.tx_pointer)), + witness_index: 0, + predicate_gas_used: coin_predicate.predicate_gas_used, + predicate: coin_predicate.predicate.as_ref().to_vec(), + predicate_data: coin_predicate.predicate_data.as_ref().to_vec(), + })), + }, + Input::Contract(contract) => ProtoInput { + variant: Some(ProtoInputVariant::Contract(ProtoContractInput { + utxo_id: Some(proto_utxo_id_from_utxo_id(&contract.utxo_id)), + balance_root: bytes32_to_vec(&contract.balance_root), + state_root: bytes32_to_vec(&contract.state_root), + tx_pointer: Some(proto_tx_pointer(&contract.tx_pointer)), + contract_id: contract.contract_id.as_ref().to_vec(), + })), + }, + Input::MessageCoinSigned(message) => ProtoInput { + variant: Some(ProtoInputVariant::MessageCoinSigned( + ProtoMessageCoinSignedInput { + sender: message.sender.as_ref().to_vec(), + recipient: message.recipient.as_ref().to_vec(), + amount: message.amount, + nonce: message.nonce.as_ref().to_vec(), + witness_index: message.witness_index.into(), + predicate_gas_used: 0, + data: Vec::new(), + predicate: Vec::new(), + predicate_data: Vec::new(), + }, + )), + }, + Input::MessageCoinPredicate(message) => ProtoInput { + variant: Some(ProtoInputVariant::MessageCoinPredicate( + ProtoMessageCoinPredicateInput { + sender: message.sender.as_ref().to_vec(), + recipient: message.recipient.as_ref().to_vec(), + amount: message.amount, + nonce: message.nonce.as_ref().to_vec(), + witness_index: 0, + predicate_gas_used: message.predicate_gas_used, + data: Vec::new(), + predicate: message.predicate.as_ref().to_vec(), + predicate_data: message.predicate_data.as_ref().to_vec(), + }, + )), + }, + Input::MessageDataSigned(message) => ProtoInput { + variant: Some(ProtoInputVariant::MessageDataSigned( + ProtoMessageDataSignedInput { + sender: message.sender.as_ref().to_vec(), + recipient: message.recipient.as_ref().to_vec(), + amount: message.amount, + nonce: message.nonce.as_ref().to_vec(), + witness_index: message.witness_index.into(), + predicate_gas_used: 0, + data: message.data.as_ref().to_vec(), + predicate: Vec::new(), + predicate_data: Vec::new(), + }, + )), + }, + Input::MessageDataPredicate(message) => ProtoInput { + variant: Some(ProtoInputVariant::MessageDataPredicate( + ProtoMessageDataPredicateInput { + sender: message.sender.as_ref().to_vec(), + recipient: message.recipient.as_ref().to_vec(), + amount: message.amount, + nonce: message.nonce.as_ref().to_vec(), + witness_index: 0, + predicate_gas_used: message.predicate_gas_used, + data: message.data.as_ref().to_vec(), + predicate: message.predicate.as_ref().to_vec(), + predicate_data: message.predicate_data.as_ref().to_vec(), + }, + )), + }, + } +} + +fn proto_utxo_id_from_utxo_id(utxo_id: &UtxoId) -> ProtoUtxoId { + ProtoUtxoId { + tx_id: utxo_id.tx_id().as_ref().to_vec(), + output_index: utxo_id.output_index().into(), + } +} + +fn proto_tx_pointer(tx_pointer: &TxPointer) -> ProtoTxPointer { + ProtoTxPointer { + block_height: tx_pointer.block_height().into(), + tx_index: tx_pointer.tx_index().into(), + } +} + +fn proto_storage_slot_from_storage_slot(slot: &StorageSlot) -> ProtoStorageSlot { + ProtoStorageSlot { + key: slot.key().as_ref().to_vec(), + value: slot.value().as_ref().to_vec(), + } +} + +fn proto_contract_input_from_contract( + contract: &fuel_core_types::fuel_tx::input::contract::Contract, +) -> ProtoContractInput { + ProtoContractInput { + utxo_id: Some(proto_utxo_id_from_utxo_id(&contract.utxo_id)), + balance_root: bytes32_to_vec(&contract.balance_root), + state_root: bytes32_to_vec(&contract.state_root), + tx_pointer: Some(proto_tx_pointer(&contract.tx_pointer)), + contract_id: contract.contract_id.as_ref().to_vec(), + } +} + +fn proto_contract_output_from_contract( + contract: &fuel_core_types::fuel_tx::output::contract::Contract, +) -> ProtoContractOutput { + ProtoContractOutput { + input_index: u32::from(contract.input_index), + balance_root: bytes32_to_vec(&contract.balance_root), + state_root: bytes32_to_vec(&contract.state_root), + } +} + +fn proto_output_from_output(output: Output) -> ProtoOutput { + let variant = match output { + Output::Coin { + to, + amount, + asset_id, + } => ProtoOutputVariant::Coin(ProtoCoinOutput { + to: to.as_ref().to_vec(), + amount, + asset_id: asset_id.as_ref().to_vec(), + }), + Output::Contract(contract) => { + ProtoOutputVariant::Contract(proto_contract_output_from_contract(&contract)) + } + Output::Change { + to, + amount, + asset_id, + } => ProtoOutputVariant::Change(ProtoChangeOutput { + to: to.as_ref().to_vec(), + amount, + asset_id: asset_id.as_ref().to_vec(), + }), + Output::Variable { + to, + amount, + asset_id, + } => ProtoOutputVariant::Variable(ProtoVariableOutput { + to: to.as_ref().to_vec(), + amount, + asset_id: asset_id.as_ref().to_vec(), + }), + Output::ContractCreated { + contract_id, + state_root, + } => ProtoOutputVariant::ContractCreated(ProtoContractCreatedOutput { + contract_id: contract_id.as_ref().to_vec(), + state_root: bytes32_to_vec(&state_root), + }), + }; + + ProtoOutput { + variant: Some(variant), } } + +fn proto_upgrade_purpose(purpose: &UpgradePurpose) -> ProtoUpgradePurpose { + let variant = match purpose { + UpgradePurpose::ConsensusParameters { + witness_index, + checksum, + } => ProtoUpgradePurposeVariant::ConsensusParameters( + ProtoUpgradeConsensusParameters { + witness_index: u32::from(*witness_index), + checksum: checksum.as_ref().to_vec(), + }, + ), + UpgradePurpose::StateTransition { root } => { + ProtoUpgradePurposeVariant::StateTransition(ProtoUpgradeStateTransition { + root: root.as_ref().to_vec(), + }) + } + }; + + ProtoUpgradePurpose { + variant: Some(variant), + } +} + +fn proto_policies_from_policies( + policies: &fuel_core_types::fuel_tx::policies::Policies, +) -> ProtoPolicies { + let mut values = [0u64; 6]; + if policies.is_set(PolicyType::Tip) { + values[0] = policies.get(PolicyType::Tip).unwrap_or_default(); + } + if policies.is_set(PolicyType::WitnessLimit) { + let value = policies.get(PolicyType::WitnessLimit).unwrap_or_default(); + values[1] = value; + } + if policies.is_set(PolicyType::Maturity) { + let value = policies.get(PolicyType::Maturity).unwrap_or_default(); + values[2] = value; + } + if policies.is_set(PolicyType::MaxFee) { + values[3] = policies.get(PolicyType::MaxFee).unwrap_or_default(); + } + if policies.is_set(PolicyType::Expiration) { + values[4] = policies.get(PolicyType::Expiration).unwrap_or_default(); + } + if policies.is_set(PolicyType::Owner) { + values[5] = policies.get(PolicyType::Owner).unwrap_or_default(); + } + let bits = policies.bits(); + ProtoPolicies { + bits, + values: values.to_vec(), + } +} + +fn tx_pointer_from_proto(proto: &ProtoTxPointer) -> Result { + let block_height = proto.block_height.into(); + #[allow(clippy::useless_conversion)] + let tx_index = proto.tx_index.try_into().map_err(|e| { + Error::Serialization(anyhow!("Could not convert tx_index to target type: {}", e)) + })?; + Ok(TxPointer::new(block_height, tx_index)) +} + +fn storage_slot_from_proto(proto: &ProtoStorageSlot) -> Result { + let key = Bytes32::try_from(proto.key.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert storage slot key to Bytes32: {}", + e + )) + })?; + let value = Bytes32::try_from(proto.value.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert storage slot value to Bytes32: {}", + e + )) + })?; + Ok(StorageSlot::new(key, value)) +} + +fn contract_input_from_proto( + proto: &ProtoContractInput, +) -> Result { + let utxo_proto = proto.utxo_id.as_ref().ok_or_else(|| { + Error::Serialization(anyhow!("Missing utxo_id on contract input")) + })?; + let utxo_id = utxo_id_from_proto(utxo_proto)?; + let balance_root = Bytes32::try_from(proto.balance_root.as_slice()).map_err(|e| { + Error::Serialization(anyhow!("Could not convert balance_root to Bytes32: {}", e)) + })?; + let state_root = Bytes32::try_from(proto.state_root.as_slice()).map_err(|e| { + Error::Serialization(anyhow!("Could not convert state_root to Bytes32: {}", e)) + })?; + let tx_pointer_proto = proto.tx_pointer.as_ref().ok_or_else(|| { + Error::Serialization(anyhow!("Missing tx_pointer on contract input")) + })?; + let tx_pointer = tx_pointer_from_proto(tx_pointer_proto)?; + let contract_id = + fuel_core_types::fuel_types::ContractId::try_from(proto.contract_id.as_slice()) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + + Ok(fuel_core_types::fuel_tx::input::contract::Contract { + utxo_id, + balance_root, + state_root, + tx_pointer, + contract_id, + }) +} + +fn contract_output_from_proto( + proto: &ProtoContractOutput, +) -> Result { + let input_index = u16::try_from(proto.input_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert contract output input_index to u16: {}", + e + )) + })?; + let balance_root = Bytes32::try_from(proto.balance_root.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert contract output balance_root to Bytes32: {}", + e + )) + })?; + let state_root = Bytes32::try_from(proto.state_root.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert contract output state_root to Bytes32: {}", + e + )) + })?; + + Ok(fuel_core_types::fuel_tx::output::contract::Contract { + input_index, + balance_root, + state_root, + }) +} + +fn output_from_proto_output(proto_output: &ProtoOutput) -> Result { + match proto_output + .variant + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing output variant")))? + { + ProtoOutputVariant::Coin(coin) => { + let to = Address::try_from(coin.to.as_slice()) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let asset_id = + fuel_core_types::fuel_types::AssetId::try_from(coin.asset_id.as_slice()) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + Ok(Output::coin(to, coin.amount, asset_id)) + } + ProtoOutputVariant::Contract(contract) => { + let contract = contract_output_from_proto(contract)?; + Ok(Output::Contract(contract)) + } + ProtoOutputVariant::Change(change) => { + let to = Address::try_from(change.to.as_slice()) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let asset_id = fuel_core_types::fuel_types::AssetId::try_from( + change.asset_id.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + Ok(Output::change(to, change.amount, asset_id)) + } + ProtoOutputVariant::Variable(variable) => { + let to = Address::try_from(variable.to.as_slice()) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let asset_id = fuel_core_types::fuel_types::AssetId::try_from( + variable.asset_id.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + Ok(Output::variable(to, variable.amount, asset_id)) + } + ProtoOutputVariant::ContractCreated(contract_created) => { + let contract_id = fuel_core_types::fuel_types::ContractId::try_from( + contract_created.contract_id.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let state_root = Bytes32::try_from(contract_created.state_root.as_slice()) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert state_root to Bytes32: {}", + e + )) + })?; + Ok(Output::contract_created(contract_id, state_root)) + } + } +} + +fn upgrade_purpose_from_proto(proto: &ProtoUpgradePurpose) -> Result { + match proto + .variant + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing upgrade purpose variant")))? + { + ProtoUpgradePurposeVariant::ConsensusParameters(consensus) => { + let witness_index = u16::try_from(consensus.witness_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert witness_index to u16: {}", + e + )) + })?; + let checksum = + Bytes32::try_from(consensus.checksum.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert checksum to Bytes32: {}", + e + )) + })?; + Ok(UpgradePurpose::ConsensusParameters { + witness_index, + checksum, + }) + } + ProtoUpgradePurposeVariant::StateTransition(state) => { + let root = Bytes32::try_from(state.root.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert state transition root to Bytes32: {}", + e + )) + })?; + Ok(UpgradePurpose::StateTransition { root }) + } + } +} + +fn utxo_id_from_proto(proto_utxo: &ProtoUtxoId) -> Result { + let tx_id = Bytes32::try_from(proto_utxo.tx_id.as_slice()).map_err(|e| { + Error::Serialization(anyhow!("Could not convert tx_id to Bytes32: {}", e)) + })?; + let output_index = u16::try_from(proto_utxo.output_index).map_err(|e| { + Error::Serialization(anyhow!("Could not convert output_index to u16: {}", e)) + })?; + Ok(UtxoId::new(tx_id, output_index)) +} + +fn bytes32_to_vec(bytes: &fuel_core_types::fuel_types::Bytes32) -> Vec { + bytes.as_ref().to_vec() +} + +pub fn fuel_block_from_protobuf( + proto_block: ProtoBlock, + msg_ids: &[fuel_core_types::fuel_tx::MessageId], + event_inbox_root: Bytes32, +) -> Result { + let versioned_block = proto_block + .versioned_block + .ok_or_else(|| anyhow::anyhow!("Missing protobuf versioned_block")) + .map_err(Error::Serialization)?; + let partial_header = match &versioned_block { + ProtoVersionedBlock::V1(v1_block) => { + let proto_header = v1_block + .header + .clone() + .ok_or_else(|| anyhow::anyhow!("Missing protobuf header")) + .map_err(Error::Serialization)?; + partial_header_from_proto_header(proto_header)? + } + }; + let txs = match versioned_block { + ProtoVersionedBlock::V1(v1_inner) => v1_inner + .transactions + .iter() + .map(tx_from_proto_tx) + .collect::>()?, + }; + FuelBlock::new( + partial_header, + txs, + msg_ids, + event_inbox_root, + #[cfg(feature = "fault-proving")] + &ChainId::default(), + ) + .map_err(|e| anyhow!(e)) + .map_err(Error::Serialization) +} + +pub fn partial_header_from_proto_header( + proto_header: ProtoHeader, +) -> Result { + let partial_header = PartialBlockHeader { + consensus: proto_header_to_empty_consensus_header(&proto_header)?, + application: proto_header_to_empty_application_header(&proto_header)?, + }; + Ok(partial_header) +} + +pub fn tx_from_proto_tx(proto_tx: &ProtoTransaction) -> Result { + let variant = proto_tx + .variant + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing transaction variant")))?; + + match variant { + ProtoTransactionVariant::Script(proto_script) => { + let policies = proto_script + .policies + .clone() + .map(policies_from_proto_policies) + .unwrap_or_default(); + let inputs = proto_script + .inputs + .iter() + .map(input_from_proto_input) + .collect::>>()?; + let outputs = proto_script + .outputs + .iter() + .map(output_from_proto_output) + .collect::>>()?; + let witnesses = proto_script + .witnesses + .iter() + .map(|w| Ok(Witness::from(w.clone()))) + .collect::>>()?; + let mut script_tx = FuelTransaction::script( + proto_script.script_gas_limit, + proto_script.script.clone(), + proto_script.script_data.clone(), + policies, + inputs, + outputs, + witnesses, + ); + *script_tx.receipts_root_mut() = Bytes32::try_from( + proto_script.receipts_root.as_slice(), + ) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert receipts_root to Bytes32: {}", + e + )) + })?; + + Ok(FuelTransaction::Script(script_tx)) + } + ProtoTransactionVariant::Create(proto_create) => { + let policies = proto_create + .policies + .clone() + .map(policies_from_proto_policies) + .unwrap_or_default(); + let inputs = proto_create + .inputs + .iter() + .map(input_from_proto_input) + .collect::>>()?; + let outputs = proto_create + .outputs + .iter() + .map(output_from_proto_output) + .collect::>>()?; + let witnesses = proto_create + .witnesses + .iter() + .map(|w| Ok(Witness::from(w.clone()))) + .collect::>>()?; + let storage_slots = proto_create + .storage_slots + .iter() + .map(storage_slot_from_proto) + .collect::>>()?; + let salt = + fuel_core_types::fuel_types::Salt::try_from(proto_create.salt.as_slice()) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let bytecode_witness_index = + u16::try_from(proto_create.bytecode_witness_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert bytecode_witness_index to u16: {}", + e + )) + })?; + + let create_tx = FuelTransaction::create( + bytecode_witness_index, + policies, + salt, + storage_slots, + inputs, + outputs, + witnesses, + ); + + Ok(FuelTransaction::Create(create_tx)) + } + ProtoTransactionVariant::Mint(proto_mint) => { + let tx_pointer_proto = proto_mint.tx_pointer.as_ref().ok_or_else(|| { + Error::Serialization(anyhow!("Missing tx_pointer on mint transaction")) + })?; + let tx_pointer = tx_pointer_from_proto(tx_pointer_proto)?; + let input_contract_proto = + proto_mint.input_contract.as_ref().ok_or_else(|| { + Error::Serialization(anyhow!( + "Missing input_contract on mint transaction" + )) + })?; + let input_contract = contract_input_from_proto(input_contract_proto)?; + let output_contract_proto = + proto_mint.output_contract.as_ref().ok_or_else(|| { + Error::Serialization(anyhow!( + "Missing output_contract on mint transaction" + )) + })?; + let output_contract = contract_output_from_proto(output_contract_proto)?; + let mint_asset_id = fuel_core_types::fuel_types::AssetId::try_from( + proto_mint.mint_asset_id.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + + let mint_tx = FuelTransaction::mint( + tx_pointer, + input_contract, + output_contract, + proto_mint.mint_amount, + mint_asset_id, + proto_mint.gas_price, + ); + + Ok(FuelTransaction::Mint(mint_tx)) + } + ProtoTransactionVariant::Upgrade(proto_upgrade) => { + let purpose_proto = proto_upgrade.purpose.as_ref().ok_or_else(|| { + Error::Serialization(anyhow!("Missing purpose on upgrade transaction")) + })?; + let upgrade_purpose = upgrade_purpose_from_proto(purpose_proto)?; + let policies = proto_upgrade + .policies + .clone() + .map(policies_from_proto_policies) + .unwrap_or_default(); + let inputs = proto_upgrade + .inputs + .iter() + .map(input_from_proto_input) + .collect::>>()?; + let outputs = proto_upgrade + .outputs + .iter() + .map(output_from_proto_output) + .collect::>>()?; + let witnesses = proto_upgrade + .witnesses + .iter() + .map(|w| Ok(Witness::from(w.clone()))) + .collect::>>()?; + + let upgrade_tx = FuelTransaction::upgrade( + upgrade_purpose, + policies, + inputs, + outputs, + witnesses, + ); + + Ok(FuelTransaction::Upgrade(upgrade_tx)) + } + ProtoTransactionVariant::Upload(proto_upload) => { + let policies = proto_upload + .policies + .clone() + .map(policies_from_proto_policies) + .unwrap_or_default(); + let inputs = proto_upload + .inputs + .iter() + .map(input_from_proto_input) + .collect::>>()?; + let outputs = proto_upload + .outputs + .iter() + .map(output_from_proto_output) + .collect::>>()?; + let witnesses = proto_upload + .witnesses + .iter() + .map(|w| Ok(Witness::from(w.clone()))) + .collect::>>()?; + let root = Bytes32::try_from(proto_upload.root.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert upload root to Bytes32: {}", + e + )) + })?; + let witness_index = + u16::try_from(proto_upload.witness_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert witness_index to u16: {}", + e + )) + })?; + let subsection_index = + u16::try_from(proto_upload.subsection_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert subsection_index to u16: {}", + e + )) + })?; + let subsections_number = u16::try_from(proto_upload.subsections_number) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert subsections_number to u16: {}", + e + )) + })?; + let proof_set = proto_upload + .proof_set + .iter() + .map(|entry| { + Bytes32::try_from(entry.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert proof_set entry to Bytes32: {}", + e + )) + }) + }) + .collect::>>()?; + + let body = UploadBody { + root, + witness_index, + subsection_index, + subsections_number, + proof_set, + }; + + let upload_tx = + FuelTransaction::upload(body, policies, inputs, outputs, witnesses); + + Ok(FuelTransaction::Upload(upload_tx)) + } + ProtoTransactionVariant::Blob(proto_blob) => { + let policies = proto_blob + .policies + .clone() + .map(policies_from_proto_policies) + .unwrap_or_default(); + let inputs = proto_blob + .inputs + .iter() + .map(input_from_proto_input) + .collect::>>()?; + let outputs = proto_blob + .outputs + .iter() + .map(output_from_proto_output) + .collect::>>()?; + let witnesses = proto_blob + .witnesses + .iter() + .map(|w| Ok(Witness::from(w.clone()))) + .collect::>>()?; + let blob_id = fuel_core_types::fuel_types::BlobId::try_from( + proto_blob.blob_id.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let witness_index = u16::try_from(proto_blob.witness_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert blob witness_index to u16: {}", + e + )) + })?; + let body = BlobBody { + id: blob_id, + witness_index, + }; + + let blob_tx = + FuelTransaction::blob(body, policies, inputs, outputs, witnesses); + + Ok(FuelTransaction::Blob(blob_tx)) + } + } +} + +fn input_from_proto_input(proto_input: &ProtoInput) -> Result { + let variant = proto_input + .variant + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing input variant")))?; + + match variant { + ProtoInputVariant::CoinSigned(proto_coin_signed) => { + let utxo_proto = proto_coin_signed + .utxo_id + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing utxo_id")))?; + let utxo_id = utxo_id_from_proto(utxo_proto)?; + let owner = + Address::try_from(proto_coin_signed.owner.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert owner to Address: {}", + e + )) + })?; + let asset_id = fuel_core_types::fuel_types::AssetId::try_from( + proto_coin_signed.asset_id.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let tx_pointer_proto = proto_coin_signed + .tx_pointer + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing tx_pointer")))?; + let tx_pointer = tx_pointer_from_proto(tx_pointer_proto)?; + let witness_index = + u16::try_from(proto_coin_signed.witness_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert witness_index to u16: {}", + e + )) + })?; + + Ok(Input::coin_signed( + utxo_id, + owner, + proto_coin_signed.amount, + asset_id, + tx_pointer, + witness_index, + )) + } + ProtoInputVariant::CoinPredicate(proto_coin_predicate) => { + let utxo_proto = proto_coin_predicate + .utxo_id + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing utxo_id")))?; + let utxo_id = utxo_id_from_proto(utxo_proto)?; + let owner = Address::try_from(proto_coin_predicate.owner.as_slice()) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert owner to Address: {}", + e + )) + })?; + let asset_id = fuel_core_types::fuel_types::AssetId::try_from( + proto_coin_predicate.asset_id.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let tx_pointer_proto = proto_coin_predicate + .tx_pointer + .as_ref() + .ok_or_else(|| Error::Serialization(anyhow!("Missing tx_pointer")))?; + let tx_pointer = tx_pointer_from_proto(tx_pointer_proto)?; + + Ok(Input::coin_predicate( + utxo_id, + owner, + proto_coin_predicate.amount, + asset_id, + tx_pointer, + proto_coin_predicate.predicate_gas_used, + proto_coin_predicate.predicate.clone(), + proto_coin_predicate.predicate_data.clone(), + )) + } + ProtoInputVariant::Contract(proto_contract) => { + let contract = contract_input_from_proto(proto_contract)?; + Ok(Input::Contract(contract)) + } + ProtoInputVariant::MessageCoinSigned(proto_message) => { + let sender = + Address::try_from(proto_message.sender.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert sender to Address: {}", + e + )) + })?; + let recipient = Address::try_from(proto_message.recipient.as_slice()) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert recipient to Address: {}", + e + )) + })?; + let nonce = fuel_core_types::fuel_types::Nonce::try_from( + proto_message.nonce.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let witness_index = + u16::try_from(proto_message.witness_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert witness_index to u16: {}", + e + )) + })?; + + Ok(Input::message_coin_signed( + sender, + recipient, + proto_message.amount, + nonce, + witness_index, + )) + } + ProtoInputVariant::MessageCoinPredicate(proto_message) => { + let sender = + Address::try_from(proto_message.sender.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert sender to Address: {}", + e + )) + })?; + let recipient = Address::try_from(proto_message.recipient.as_slice()) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert recipient to Address: {}", + e + )) + })?; + let nonce = fuel_core_types::fuel_types::Nonce::try_from( + proto_message.nonce.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + + Ok(Input::message_coin_predicate( + sender, + recipient, + proto_message.amount, + nonce, + proto_message.predicate_gas_used, + proto_message.predicate.clone(), + proto_message.predicate_data.clone(), + )) + } + ProtoInputVariant::MessageDataSigned(proto_message) => { + let sender = + Address::try_from(proto_message.sender.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert sender to Address: {}", + e + )) + })?; + let recipient = Address::try_from(proto_message.recipient.as_slice()) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert recipient to Address: {}", + e + )) + })?; + let nonce = fuel_core_types::fuel_types::Nonce::try_from( + proto_message.nonce.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + let witness_index = + u16::try_from(proto_message.witness_index).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert witness_index to u16: {}", + e + )) + })?; + + Ok(Input::message_data_signed( + sender, + recipient, + proto_message.amount, + nonce, + witness_index, + proto_message.data.clone(), + )) + } + ProtoInputVariant::MessageDataPredicate(proto_message) => { + let sender = + Address::try_from(proto_message.sender.as_slice()).map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert sender to Address: {}", + e + )) + })?; + let recipient = Address::try_from(proto_message.recipient.as_slice()) + .map_err(|e| { + Error::Serialization(anyhow!( + "Could not convert recipient to Address: {}", + e + )) + })?; + let nonce = fuel_core_types::fuel_types::Nonce::try_from( + proto_message.nonce.as_slice(), + ) + .map_err(|e| Error::Serialization(anyhow!(e)))?; + + Ok(Input::message_data_predicate( + sender, + recipient, + proto_message.amount, + nonce, + proto_message.predicate_gas_used, + proto_message.data.clone(), + proto_message.predicate.clone(), + proto_message.predicate_data.clone(), + )) + } + } +} + +fn policies_from_proto_policies(proto_policies: ProtoPolicies) -> FuelPolicies { + let ProtoPolicies { bits, values } = proto_policies; + let mut policies = FuelPolicies::default(); + let bits = + PoliciesBits::from_bits(bits).expect("Should be able to create from `u32`"); + if bits.contains(PoliciesBits::Tip) + && let Some(tip) = values.first() + { + policies.set(PolicyType::Tip, Some(*tip)); + } + if bits.contains(PoliciesBits::WitnessLimit) + && let Some(witness_limit) = values.get(1) + { + policies.set(PolicyType::WitnessLimit, Some(*witness_limit)); + } + if bits.contains(PoliciesBits::Maturity) + && let Some(maturity) = values.get(2) + { + policies.set(PolicyType::Maturity, Some(*maturity)); + } + if bits.contains(PoliciesBits::MaxFee) + && let Some(max_fee) = values.get(3) + { + policies.set(PolicyType::MaxFee, Some(*max_fee)); + } + if bits.contains(PoliciesBits::Expiration) + && let Some(expiration) = values.get(4) + { + policies.set(PolicyType::Expiration, Some(*expiration)); + } + if bits.contains(PoliciesBits::Owner) + && let Some(owner) = values.get(5) + { + policies.set(PolicyType::Owner, Some(*owner)); + } + policies +} + +pub fn proto_header_to_empty_application_header( + proto_header: &ProtoHeader, +) -> Result> { + match proto_header.versioned_header.clone() { + Some(ProtoVersionedHeader::V1(header)) => { + let app_header = ApplicationHeader { + da_height: DaBlockHeight::from(header.da_height), + consensus_parameters_version: header.consensus_parameters_version, + state_transition_bytecode_version: header + .state_transition_bytecode_version, + generated: Empty {}, + }; + Ok(app_header) + } + Some(ProtoVersionedHeader::V2(header)) => { + if cfg!(feature = "fault-proving") { + let app_header = ApplicationHeader { + da_height: DaBlockHeight::from(header.da_height), + consensus_parameters_version: header.consensus_parameters_version, + state_transition_bytecode_version: header + .state_transition_bytecode_version, + generated: Empty {}, + }; + Ok(app_header) + } else { + Err(anyhow!("V2 headers require the 'fault-proving' feature")) + .map_err(Error::Serialization) + } + } + None => Err(anyhow!("Missing protobuf versioned_header")) + .map_err(Error::Serialization), + } +} + +/// Alias the consensus header into an empty one. +pub fn proto_header_to_empty_consensus_header( + proto_header: &ProtoHeader, +) -> Result> { + match proto_header.versioned_header.clone() { + Some(ProtoVersionedHeader::V1(header)) => { + let consensus_header = ConsensusHeader { + prev_root: *Bytes32::from_bytes_ref_checked(&header.prev_root).ok_or( + Error::Serialization(anyhow!("Could create `Bytes32` from bytes")), + )?, + height: header.height.into(), + time: tai64::Tai64(header.time), + generated: Empty {}, + }; + Ok(consensus_header) + } + Some(ProtoVersionedHeader::V2(header)) => { + if cfg!(feature = "fault-proving") { + let consensus_header = ConsensusHeader { + prev_root: *Bytes32::from_bytes_ref_checked(&header.prev_root) + .ok_or(Error::Serialization(anyhow!( + "Could create `Bytes32` from bytes" + )))?, + height: header.height.into(), + time: tai64::Tai64(header.time), + generated: Empty {}, + }; + Ok(consensus_header) + } else { + Err(anyhow!("V2 headers require the 'fault-proving' feature")) + .map_err(Error::Serialization) + } + } + None => Err(anyhow!("Missing protobuf versioned_header")) + .map_err(Error::Serialization), + } +} + +// TODO: Add coverage for V2 Block stuff +// https://github.com/FuelLabs/fuel-core/issues/3139 +#[cfg(not(feature = "fault-proving"))] +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use super::*; + use fuel_core_types::test_helpers::arb_block; + use proptest::prelude::*; + + proptest! { + #![proptest_config(ProptestConfig { + cases: 100, .. ProptestConfig::default() + })] + #[test] + fn serialize_block__roundtrip((block, msg_ids, event_inbox_root) in arb_block()) { + // given + let serializer = SerializerAdapter; + + // when + let proto_block = serializer.serialize_block(&block).unwrap(); + + // then + let deserialized_block = fuel_block_from_protobuf(proto_block, &msg_ids, event_inbox_root).unwrap(); + assert_eq!(block, deserialized_block); + + } + } + + #[test] + #[ignore] + fn _dummy() {} +} diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs index af0cbf801be..be8b6b19e94 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/sync_service.rs @@ -28,16 +28,16 @@ use fuel_core_types::{ use std::time::Duration; use tokio::sync::mpsc::Sender; -pub struct SyncTask { +pub struct SyncTask { serializer: Serializer, - block_return_sender: Sender, + block_return_sender: Sender>, db: DB, next_height: BlockHeight, maybe_stop_height: Option, new_ending_height: tokio::sync::oneshot::Receiver, } -impl SyncTask +impl SyncTask where Serializer: BlockSerializer + Send, DB: StorageInspect + Send + 'static, @@ -46,7 +46,7 @@ where { pub fn new( serializer: Serializer, - block_return: Sender, + block_return: Sender>, db: DB, db_starting_height: BlockHeight, db_ending_height: Option, @@ -107,9 +107,10 @@ where } } -impl RunnableTask for SyncTask +impl RunnableTask for SyncTask where Serializer: BlockSerializer + Send + Sync, + Serializer::Block: Send + Sync + 'static, DB: Send + Sync + 'static, DB: StorageInspect + Send + 'static, DB: StorageInspect + Send + 'static, @@ -152,9 +153,10 @@ where } #[async_trait::async_trait] -impl RunnableService for SyncTask +impl RunnableService for SyncTask where Serializer: BlockSerializer + Send + Sync + 'static, + ::Block: Send + Sync + 'static, DB: Send + Sync + 'static, DB: StorageInspect + Send + 'static, DB: StorageInspect + Send + 'static, diff --git a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs index 92e04d69e5f..64d0256dbae 100644 --- a/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs +++ b/crates/services/block_aggregator_api/src/blocks/importer_and_db_source/tests.rs @@ -1,6 +1,7 @@ #![allow(non_snake_case)] use super::*; +use crate::blocks::Block; use ::postcard::to_allocvec; use fuel_core_services::stream::{ IntoBoxStream, @@ -34,6 +35,8 @@ use std::sync::Arc; pub struct MockSerializer; impl BlockSerializer for MockSerializer { + type Block = Block; + fn serialize_block(&self, block: &FuelBlock) -> Result { let bytes_vec = to_allocvec(block).map_err(|e| { Error::BlockSource(anyhow!("failed to serialize block: {}", e)) @@ -46,7 +49,6 @@ fn database() -> StorageTransaction> { InMemoryStorage::default().into_transaction() } -// let block_stream = tokio_stream::iter(blocks).chain(pending()).into_boxed(); fn stream_with_pending(items: Vec) -> BoxStream { tokio_stream::iter(items).chain(pending()).into_boxed() } diff --git a/crates/services/block_aggregator_api/src/db.rs b/crates/services/block_aggregator_api/src/db.rs index 13a0bcc8489..d664bd13932 100644 --- a/crates/services/block_aggregator_api/src/db.rs +++ b/crates/services/block_aggregator_api/src/db.rs @@ -1,13 +1,11 @@ -use crate::{ - blocks::Block, - result::Result, -}; +use crate::result::Result; use fuel_core_types::fuel_types::BlockHeight; pub mod storage_db; /// The definition of the block aggregator database. pub trait BlockAggregatorDB: Send + Sync { + type Block; /// The type used to report a range of blocks type BlockRangeResponse; @@ -15,7 +13,7 @@ pub trait BlockAggregatorDB: Send + Sync { fn store_block( &mut self, height: BlockHeight, - block: Block, + block: Self::Block, ) -> impl Future> + Send; /// Retrieves a range of blocks from the database diff --git a/crates/services/block_aggregator_api/src/db/storage_db.rs b/crates/services/block_aggregator_api/src/db/storage_db.rs index cac501b2ddf..7aeac0a91d1 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db.rs @@ -1,10 +1,10 @@ use crate::{ block_range_response::BlockRangeResponse, - blocks::Block, db::{ BlockAggregatorDB, storage_db::table::Column, }, + protobuf_types::Block as ProtoBlock, result::{ Error, Result, @@ -105,9 +105,14 @@ where T: Unpin + Send + Sync + KeyValueInspect + 'static + std::fmt::Debug, StorageTransaction: StorageInspect, { + type Block = ProtoBlock; type BlockRangeResponse = BlockRangeResponse; - async fn store_block(&mut self, height: BlockHeight, block: Block) -> Result<()> { + async fn store_block( + &mut self, + height: BlockHeight, + block: ProtoBlock, + ) -> Result<()> { self.update_highest_contiguous_block(height); let mut tx = self.storage.write_transaction(); tx.storage_as_mut::() @@ -156,7 +161,7 @@ where S: Unpin + ReadTransaction + std::fmt::Debug, for<'a> StorageTransaction<&'a S>: StorageInspect, { - type Item = Block; + type Item = ProtoBlock; fn poll_next( self: Pin<&mut Self>, diff --git a/crates/services/block_aggregator_api/src/db/storage_db/table.rs b/crates/services/block_aggregator_api/src/db/storage_db/table.rs index 525645100e8..be11785c7af 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db/table.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db/table.rs @@ -1,4 +1,4 @@ -use crate::blocks::Block; +use crate::protobuf_types::Block as ProtoBlock; use fuel_core_storage::{ Mappable, blueprint::plain::Plain, @@ -51,7 +51,7 @@ impl Mappable for Blocks { type Key = Self::OwnedKey; type OwnedKey = BlockHeight; type Value = Self::OwnedValue; - type OwnedValue = Block; + type OwnedValue = ProtoBlock; } impl TableWithBlueprint for Blocks { diff --git a/crates/services/block_aggregator_api/src/db/storage_db/tests.rs b/crates/services/block_aggregator_api/src/db/storage_db/tests.rs index f09cdaafc2b..593839e406a 100644 --- a/crates/services/block_aggregator_api/src/db/storage_db/tests.rs +++ b/crates/services/block_aggregator_api/src/db/storage_db/tests.rs @@ -1,31 +1,43 @@ #![allow(non_snake_case)] use super::*; -use crate::db::storage_db::table::Column; +use crate::{ + blocks::importer_and_db_source::{ + BlockSerializer, + serializer_adapter::SerializerAdapter, + }, + db::storage_db::table::Column, +}; use fuel_core_storage::{ StorageAsRef, structured_storage::test::InMemoryStorage, transactional::IntoTransaction, }; use fuel_core_types::{ - ed25519::signature::rand_core::SeedableRng, + blockchain::block::Block as FuelBlock, + fuel_tx::Transaction, fuel_types::BlockHeight, }; use futures::StreamExt; -use rand::rngs::StdRng; fn database() -> StorageTransaction> { InMemoryStorage::default().into_transaction() } +fn proto_block_with_height(height: BlockHeight) -> ProtoBlock { + let serializer_adapter = SerializerAdapter; + let mut default_block = FuelBlock::::default(); + default_block.header_mut().set_block_height(height); + serializer_adapter.serialize_block(&default_block).unwrap() +} + #[tokio::test] async fn store_block__adds_to_storage() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let mut adapter = StorageDB::new(db); let height = BlockHeight::from(1u32); - let expected = Block::random(&mut rng); + let expected = proto_block_with_height(height); // when adapter.store_block(height, expected.clone()).await.unwrap(); @@ -43,15 +55,15 @@ async fn store_block__adds_to_storage() { #[tokio::test] async fn get_block__can_get_expected_range() { - let mut rng = StdRng::seed_from_u64(666); // given let mut db = database(); let height_1 = BlockHeight::from(1u32); let height_2 = BlockHeight::from(2u32); let height_3 = BlockHeight::from(3u32); - let expected_1 = Block::random(&mut rng); - let expected_2 = Block::random(&mut rng); - let expected_3 = Block::random(&mut rng); + + let expected_1 = proto_block_with_height(height_1); + let expected_2 = proto_block_with_height(height_2); + let expected_3 = proto_block_with_height(height_3); let mut tx = db.write_transaction(); tx.storage_as_mut::() @@ -82,12 +94,11 @@ async fn get_block__can_get_expected_range() { #[tokio::test] async fn store_block__updates_the_highest_continuous_block_if_contiguous() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let mut adapter = StorageDB::new_with_height(db, BlockHeight::from(0u32)); let height = BlockHeight::from(1u32); - let expected = Block::random(&mut rng); + let expected = proto_block_with_height(height); // when adapter.store_block(height, expected.clone()).await.unwrap(); @@ -100,13 +111,12 @@ async fn store_block__updates_the_highest_continuous_block_if_contiguous() { #[tokio::test] async fn store_block__does_not_update_the_highest_continuous_block_if_not_contiguous() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let starting_height = BlockHeight::from(0u32); let mut adapter = StorageDB::new_with_height(db, starting_height); let height = BlockHeight::from(2u32); - let expected = Block::random(&mut rng); + let expected = proto_block_with_height(height); // when adapter.store_block(height, expected.clone()).await.unwrap(); @@ -119,7 +129,6 @@ async fn store_block__does_not_update_the_highest_continuous_block_if_not_contig #[tokio::test] async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { - let mut rng = StdRng::seed_from_u64(666); // given let db = database(); let starting_height = BlockHeight::from(0u32); @@ -129,7 +138,7 @@ async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { for height in 2..=10u32 { let height = BlockHeight::from(height); orphaned_height = Some(height); - let block = Block::random(&mut rng); + let block = proto_block_with_height(height); adapter.store_block(height, block).await.unwrap(); } let expected = starting_height; @@ -138,8 +147,11 @@ async fn store_block__updates_the_highest_continuous_block_if_filling_a_gap() { // when let height = BlockHeight::from(1u32); - let expected = Block::random(&mut rng); - adapter.store_block(height, expected.clone()).await.unwrap(); + let some_block = proto_block_with_height(height); + adapter + .store_block(height, some_block.clone()) + .await + .unwrap(); // then let expected = orphaned_height.unwrap(); diff --git a/crates/services/block_aggregator_api/src/lib.rs b/crates/services/block_aggregator_api/src/lib.rs index 900e1c56087..e3e9057d7d7 100644 --- a/crates/services/block_aggregator_api/src/lib.rs +++ b/crates/services/block_aggregator_api/src/lib.rs @@ -1,9 +1,6 @@ use crate::{ api::BlockAggregatorApi, - blocks::{ - Block, - BlockSource, - }, + blocks::BlockSource, db::BlockAggregatorDB, }; use fuel_core_services::{ @@ -13,6 +10,8 @@ use fuel_core_services::{ TaskNextAction, }; use fuel_core_types::fuel_types::BlockHeight; +use protobuf_types::Block as ProtoBlock; +use std::fmt::Debug; pub mod api; pub mod blocks; @@ -21,6 +20,8 @@ pub mod result; pub mod block_range_response; +pub mod protobuf_types; + pub mod integration { use crate::{ BlockAggregator, @@ -33,6 +34,7 @@ pub mod integration { ImporterAndDbSource, }, db::BlockAggregatorDB, + protobuf_types::Block as ProtoBlock, }; use fuel_core_services::{ ServiceRunner, @@ -56,6 +58,7 @@ pub mod integration { pub addr: SocketAddr, } + #[allow(clippy::type_complexity)] pub fn new_service( config: &Config, db: DB, @@ -63,13 +66,19 @@ pub mod integration { onchain_db: OnchainDB, importer: BoxStream, ) -> ServiceRunner< - BlockAggregator>, + BlockAggregator< + ProtobufAPI, + DB, + ImporterAndDbSource, + ProtoBlock, + >, > where DB: BlockAggregatorDB< BlockRangeResponse = ::BlockRangeResponse, + Block = ProtoBlock, >, - S: BlockSerializer + Clone + Send + Sync + 'static, + S: BlockSerializer + Clone + Send + Sync + 'static, OnchainDB: Send + Sync, OnchainDB: StorageInspect, OnchainDB: StorageInspect, @@ -104,33 +113,35 @@ pub mod block_aggregator; // but we can change the name later /// The Block Aggregator service, which aggregates blocks from a source and stores them in a database /// Queries can be made to the service to retrieve data from the `DB` -pub struct BlockAggregator { +pub struct BlockAggregator { query: Api, database: DB, block_source: Blocks, - new_block_subscriptions: Vec>, + new_block_subscriptions: Vec>, } pub struct NewBlock { height: BlockHeight, - block: Block, + block: ProtoBlock, } impl NewBlock { - pub fn new(height: BlockHeight, block: Block) -> Self { + pub fn new(height: BlockHeight, block: ProtoBlock) -> Self { Self { height, block } } - pub fn into_inner(self) -> (BlockHeight, Block) { + pub fn into_inner(self) -> (BlockHeight, ProtoBlock) { (self.height, self.block) } } -impl RunnableTask for BlockAggregator +impl RunnableTask + for BlockAggregator where - Api: BlockAggregatorApi, - DB: BlockAggregatorDB, + Api: BlockAggregatorApi, + DB: BlockAggregatorDB, Blocks: BlockSource, + ::Block: Clone + std::fmt::Debug + Send, BlockRange: Send, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { @@ -151,12 +162,15 @@ where } #[async_trait::async_trait] -impl RunnableService for BlockAggregator +impl RunnableService + for BlockAggregator where - Api: BlockAggregatorApi, - DB: BlockAggregatorDB, + Api: + BlockAggregatorApi + Send, + DB: BlockAggregatorDB + Send, Blocks: BlockSource, BlockRange: Send, + ::Block: Clone + Debug + Send, { const NAME: &'static str = "BlockAggregatorService"; type SharedData = (); diff --git a/crates/services/block_aggregator_api/src/protobuf_types.rs b/crates/services/block_aggregator_api/src/protobuf_types.rs new file mode 100644 index 00000000000..648ac0e278d --- /dev/null +++ b/crates/services/block_aggregator_api/src/protobuf_types.rs @@ -0,0 +1 @@ +tonic::include_proto!("blockaggregator"); diff --git a/crates/services/block_aggregator_api/src/result.rs b/crates/services/block_aggregator_api/src/result.rs index b687f1ec6cc..ab91f71ece0 100644 --- a/crates/services/block_aggregator_api/src/result.rs +++ b/crates/services/block_aggregator_api/src/result.rs @@ -7,6 +7,8 @@ pub enum Error { BlockSource(anyhow::Error), #[error("Database error: {0}")] DB(anyhow::Error), + #[error("Serialization error: {0}")] + Serialization(anyhow::Error), } pub type Result = core::result::Result; diff --git a/crates/services/block_aggregator_api/src/tests.rs b/crates/services/block_aggregator_api/src/tests.rs index ac069687760..d8b9a8744e5 100644 --- a/crates/services/block_aggregator_api/src/tests.rs +++ b/crates/services/block_aggregator_api/src/tests.rs @@ -36,21 +36,22 @@ use tokio::{ type BlockRangeResponse = BoxStream; -struct FakeApi { - receiver: Receiver>, +struct FakeApi { + receiver: Receiver>, } -impl FakeApi { - fn new() -> (Self, Sender>) { +impl FakeApi { + fn new() -> (Self, Sender>) { let (sender, receiver) = tokio::sync::mpsc::channel(1); let api = Self { receiver }; (api, sender) } } -impl BlockAggregatorApi for FakeApi { +impl BlockAggregatorApi for FakeApi { type BlockRangeResponse = T; - async fn await_query(&mut self) -> Result> { + type Block = B; + async fn await_query(&mut self) -> Result> { Ok(self.receiver.recv().await.unwrap()) } } @@ -75,6 +76,7 @@ impl FakeDB { } impl BlockAggregatorDB for FakeDB { + type Block = Block; type BlockRangeResponse = BlockRangeResponse; async fn store_block(&mut self, id: BlockHeight, block: Block) -> Result<()> { @@ -111,11 +113,11 @@ impl BlockAggregatorDB for FakeDB { } struct FakeBlockSource { - blocks: Receiver, + blocks: Receiver>, } impl FakeBlockSource { - fn new() -> (Self, Sender) { + fn new() -> (Self, Sender>) { let (_sender, receiver) = tokio::sync::mpsc::channel(1); let _self = Self { blocks: receiver }; (_self, _sender) @@ -123,7 +125,9 @@ impl FakeBlockSource { } impl BlockSource for FakeBlockSource { - async fn next_block(&mut self) -> Result { + type Block = Block; + + async fn next_block(&mut self) -> Result> { self.blocks .recv() .await @@ -243,12 +247,8 @@ async fn run__new_block_subscription__sends_new_block() { let _ = srv.run(&mut watcher).await; // then - let (actual_height, actual_block) = await_response_with_timeout(response) - .await - .unwrap() - .into_inner(); + let actual_block = await_response_with_timeout(response).await.unwrap(); assert_eq!(expected_block, actual_block); - assert_eq!(expected_height, actual_height); // cleanup drop(source_sender); diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index ae85771e9de..f23b0e2126c 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -35,7 +35,7 @@ std = [ "ed25519-dalek/std", ] random = ["dep:rand", "fuel-vm-private/random"] -test-helpers = ["random", "fuel-vm-private/test-helpers"] +test-helpers = ["random", "fuel-vm-private/test-helpers", "dep:proptest"] aws-kms = ["dep:aws-sdk-kms"] fault-proving = [] @@ -53,6 +53,7 @@ fuel-vm-private = { workspace = true, default-features = false, features = [ ] } k256 = { version = "0.13", default-features = false, features = ["ecdsa"] } parking_lot = { workspace = true } +proptest = { workspace = true, optional = true } rand = { workspace = true, optional = true } secrecy = "0.8" serde = { workspace = true, features = ["derive"], optional = true } diff --git a/crates/types/src/blockchain/header.rs b/crates/types/src/blockchain/header.rs index 836c7874b60..896cab86c3a 100644 --- a/crates/types/src/blockchain/header.rs +++ b/crates/types/src/blockchain/header.rs @@ -79,6 +79,17 @@ impl BlockHeader { } } + /// Get the application portion of the header. + pub fn application_v1( + &self, + ) -> Option<&ApplicationHeader> { + match self { + BlockHeader::V1(header) => Some(header.application()), + #[cfg(feature = "fault-proving")] + BlockHeader::V2(_header) => None, + } + } + /// Get the consensus portion of the header. pub fn consensus(&self) -> &ConsensusHeader { match self { @@ -133,6 +144,20 @@ impl BlockHeader { } } + /// Setter for the transactions count + #[cfg(feature = "test-helpers")] + pub fn set_transactions_count(&mut self, count: u16) { + match self { + BlockHeader::V1(header) => { + header.application_mut().generated.transactions_count = count + } + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => { + header.application_mut().generated.transactions_count = count + } + } + } + /// Getter for the message receipt count pub fn message_receipt_count(&self) -> u32 { match self { @@ -195,6 +220,25 @@ impl BlockHeader { }, } } + + /// Alias the consensus header into an empty one. + pub fn as_empty_consensus_header(&self) -> ConsensusHeader { + match self { + BlockHeader::V1(header) => ConsensusHeader { + prev_root: header.consensus().prev_root, + height: header.consensus().height, + time: header.consensus().time, + generated: Empty {}, + }, + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => ConsensusHeader { + prev_root: header.consensus().prev_root, + height: header.consensus().height, + time: header.consensus().time, + generated: Empty {}, + }, + } + } } #[cfg(any(test, feature = "test-helpers"))] @@ -269,6 +313,45 @@ impl BlockHeader { } } + /// Set the message outbox root for the header + pub fn set_message_outbox_root(&mut self, root: Bytes32) { + match self { + BlockHeader::V1(header) => { + header.set_message_outbox_root(root); + } + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => { + header.set_message_outbox_root(root); + } + } + } + + /// Set the message receipt count + pub fn set_message_receipt_count(&mut self, count: u32) { + match self { + BlockHeader::V1(header) => { + header.set_message_receipt_count(count); + } + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => { + header.set_message_receipt_count(count); + } + } + } + + /// Set the event inbox root for the header + pub fn set_event_inbox_root(&mut self, root: Bytes32) { + match self { + BlockHeader::V1(header) => { + header.set_event_inbox_root(root); + } + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => { + header.set_event_inbox_root(root); + } + } + } + /// Set the consensus parameters version pub fn set_consensus_parameters_version( &mut self, @@ -285,6 +368,22 @@ impl BlockHeader { } } + /// Set the state transition bytecode version + pub fn set_state_transition_bytecode_version( + &mut self, + version: StateTransitionBytecodeVersion, + ) { + match self { + BlockHeader::V1(header) => { + header.set_stf_version(version); + } + #[cfg(feature = "fault-proving")] + BlockHeader::V2(header) => { + header.set_stf_version(version); + } + } + } + /// Set the stf version pub fn set_stf_version(&mut self, version: StateTransitionBytecodeVersion) { match self { @@ -674,7 +773,8 @@ impl PartialBlockHeader { } } -fn generate_txns_root(transactions: &[Transaction]) -> Bytes32 { +/// Generate the transactions root from the list of transactions. +pub fn generate_txns_root(transactions: &[Transaction]) -> Bytes32 { let transaction_ids = transactions.iter().map(|tx| tx.to_bytes()); // Generate the transaction merkle root. let mut transaction_tree = diff --git a/crates/types/src/blockchain/header/v1.rs b/crates/types/src/blockchain/header/v1.rs index 0d450a664e2..ac1e76cb7fd 100644 --- a/crates/types/src/blockchain/header/v1.rs +++ b/crates/types/src/blockchain/header/v1.rs @@ -132,6 +132,21 @@ impl BlockHeaderV1 { self.recalculate_metadata(); } + pub(crate) fn set_message_outbox_root(&mut self, root: crate::fuel_tx::Bytes32) { + self.application_mut().generated.message_outbox_root = root; + self.recalculate_metadata(); + } + + pub(crate) fn set_message_receipt_count(&mut self, count: u32) { + self.application_mut().generated.message_receipt_count = count; + self.recalculate_metadata(); + } + + pub(crate) fn set_event_inbox_root(&mut self, event_inbox_root: Bytes32) { + self.application_mut().generated.event_inbox_root = event_inbox_root; + self.recalculate_metadata(); + } + pub(crate) fn set_da_height( &mut self, da_height: crate::blockchain::primitives::DaBlockHeight, diff --git a/crates/types/src/blockchain/header/v2.rs b/crates/types/src/blockchain/header/v2.rs index 9aea3a2a614..b464fc7464d 100644 --- a/crates/types/src/blockchain/header/v2.rs +++ b/crates/types/src/blockchain/header/v2.rs @@ -155,6 +155,21 @@ impl BlockHeaderV2 { self.recalculate_metadata(); } + pub(crate) fn set_message_outbox_root(&mut self, root: crate::fuel_tx::Bytes32) { + self.application_mut().generated.message_outbox_root = root; + self.recalculate_metadata(); + } + + pub(crate) fn set_message_receipt_count(&mut self, count: u32) { + self.application_mut().generated.message_receipt_count = count; + self.recalculate_metadata(); + } + + pub(crate) fn set_event_inbox_root(&mut self, root: crate::fuel_tx::Bytes32) { + self.application_mut().generated.event_inbox_root = root; + self.recalculate_metadata(); + } + pub(crate) fn set_da_height( &mut self, da_height: crate::blockchain::primitives::DaBlockHeight, diff --git a/crates/types/src/test_helpers.rs b/crates/types/src/test_helpers.rs index 3885007eb5d..37e187f6b81 100644 --- a/crates/types/src/test_helpers.rs +++ b/crates/types/src/test_helpers.rs @@ -1,17 +1,52 @@ use crate::{ + blockchain::{ + block::Block, + header::{ + GeneratedConsensusFields, + generate_txns_root, + }, + primitives::DaBlockHeight, + }, + fuel_merkle::binary::root_calculator::MerkleRootCalculator, fuel_tx::{ + BlobBody, + BlobIdExt, + Bytes32, ContractId, Create, Finalizable, + Input, + MessageId, Output, + StorageSlot, + Transaction, TransactionBuilder, + TxPointer, + UpgradePurpose, + UploadBody, + UtxoId, + Witness, + field::ReceiptsRoot, + input::{ + coin::CoinSigned, + contract::Contract as InputContract, + }, + output::contract::Contract as OutputContract, + policies::Policies, + }, + fuel_types::{ + BlobId, + BlockHeight, + Nonce, }, fuel_vm::{ Contract, Salt, }, }; +use proptest::prelude::*; use rand::Rng; +use tai64::Tai64; /// Helper function to create a contract creation transaction /// from a given contract bytecode. @@ -36,3 +71,603 @@ pub fn create_contract( .finalize(); (tx, contract_id) } + +#[allow(unused)] +fn arb_txs() -> impl Strategy> { + prop::collection::vec(arb_transaction(), 0..10) +} + +fn arb_script_transaction() -> impl Strategy { + ( + 1..10000u64, + any::<[u8; 32]>(), + prop::collection::vec(any::(), 0..100), + prop::collection::vec(any::(), 0..100), + arb_policies(), + arb_inputs(), + arb_outputs(), + prop::collection::vec(arb_witness(), 0..4), + ) + .prop_map( + |( + script_gas_limit, + receipts_root, + script_bytes, + script_data, + policies, + inputs, + outputs, + witnesses, + )| { + let mut script = crate::fuel_tx::Transaction::script( + script_gas_limit, + script_bytes, + script_data, + policies, + inputs, + outputs, + witnesses, + ); + *script.receipts_root_mut() = receipts_root.into(); + Transaction::Script(script) + }, + ) +} + +prop_compose! { + fn arb_storage_slot()( + key in any::<[u8; 32]>(), + value in any::<[u8; 32]>(), + ) -> StorageSlot { + StorageSlot::new(key.into(), value.into()) + } +} + +prop_compose! { + fn arb_coin_output()( + to in arb_address(), + amount in any::(), + asset in arb_asset_id(), + ) -> Output { + Output::coin(to, amount, asset) + } +} + +prop_compose! { + fn arb_contract_output()( + input_index in any::(), + balance_root in any::<[u8; 32]>(), + state_root in any::<[u8; 32]>(), + ) -> Output { + Output::Contract(OutputContract { + input_index, + balance_root: balance_root.into(), + state_root: state_root.into(), + }) + } +} + +prop_compose! { + fn arb_change_output()( + to in arb_address(), + amount in any::(), + asset in arb_asset_id(), + ) -> Output { + Output::change(to, amount, asset) + } +} + +prop_compose! { + fn arb_variable_output()( + to in arb_address(), + amount in any::(), + asset in arb_asset_id(), + ) -> Output { + Output::variable(to, amount, asset) + } +} + +prop_compose! { + fn arb_contract_created_output()( + contract_id in any::<[u8; 32]>(), + state_root in any::<[u8; 32]>(), + ) -> Output { + Output::contract_created(ContractId::new(contract_id), state_root.into()) + } +} + +fn arb_output_any() -> impl Strategy { + prop_oneof![ + arb_coin_output(), + arb_contract_output(), + arb_change_output(), + arb_variable_output(), + arb_contract_created_output(), + ] +} + +fn arb_outputs() -> impl Strategy> { + prop::collection::vec(arb_output_any(), 0..10) +} + +fn arb_witness() -> impl Strategy { + prop::collection::vec(any::(), 0..128).prop_map(Witness::from) +} + +prop_compose! { + fn arb_policies()( + tip in prop::option::of(any::()), + witness_limit in prop::option::of(any::()), + maturity in prop::option::of(0..100u32), + max_fee in prop::option::of(any::()), + expiration in prop::option::of(0..100u32), + owner in prop::option::of(any::()), + ) -> Policies { + let mut policies = Policies::new(); + if let Some(tip) = tip { + policies = policies.with_tip(tip); + } + if let Some(witness_limit) = witness_limit { + policies = policies.with_witness_limit(witness_limit); + } + if let Some(value) = maturity { + policies = policies.with_maturity(BlockHeight::new(value)); + } + if let Some(max_fee) = max_fee { + policies = policies.with_max_fee(max_fee); + } + if let Some(value) = expiration { + policies = policies.with_expiration(BlockHeight::new(value)); + } + if let Some(owner) = owner { + policies = policies.with_owner(owner); + } + policies + } +} + +prop_compose! { + fn arb_msg_id()(inner in any::<[u8; 32]>()) -> MessageId { + MessageId::new(inner) + } +} + +#[allow(unused)] +fn arb_inputs() -> impl Strategy> { + prop::collection::vec(arb_input_any(), 0..10) +} + +prop_compose! { + fn arb_coin_signed()( + utxo_id in arb_utxo_id(), + owner in arb_address(), + amount in 1..1_000_000u64, + asset_id in arb_asset_id(), + tx_pointer in arb_tx_pointer(), + witness_index in 0..1000u16, + ) -> Input { + let inner = CoinSigned { + utxo_id, + owner, + amount, + asset_id, + tx_pointer, + witness_index, + predicate_gas_used: Default::default(), + predicate: Default::default(), + predicate_data: Default::default(), + }; + Input::CoinSigned(inner) + } +} + +prop_compose! { + fn arb_coin_predicate()( + utxo_id in arb_utxo_id(), + owner in arb_address(), + amount in 1..1_000_000u64, + asset_id in arb_asset_id(), + tx_pointer in arb_tx_pointer(), + predicate_gas_used in any::(), + predicate in prop::collection::vec(any::(), 0..100), + predicate_data in prop::collection::vec(any::(), 0..100), + ) -> Input { + let inner = crate::fuel_tx::input::coin::CoinPredicate { + utxo_id, + owner, + amount, + asset_id, + tx_pointer, + witness_index: Default::default(), + predicate_gas_used, + predicate: predicate.into(), + predicate_data: predicate_data.into(), + }; + Input::CoinPredicate(inner) + } +} + +prop_compose! { + fn arb_contract_input_variant()( + utxo_id in arb_utxo_id(), + balance_root in any::<[u8; 32]>(), + state_root in any::<[u8; 32]>(), + tx_pointer in arb_tx_pointer(), + contract_id in any::<[u8; 32]>(), + ) -> Input { + let contract = InputContract { + utxo_id, + balance_root: balance_root.into(), + state_root: state_root.into(), + tx_pointer, + contract_id: ContractId::new(contract_id), + }; + Input::Contract(contract) + } +} + +prop_compose! { + fn arb_nonce()(bytes in any::<[u8; 32]>()) -> Nonce { + Nonce::new(bytes) + } +} + +prop_compose! { + fn arb_message_coin_signed_input()( + sender in arb_address(), + recipient in arb_address(), + amount in any::(), + nonce in arb_nonce(), + witness_index in any::(), + ) -> Input { + Input::message_coin_signed(sender, recipient, amount, nonce, witness_index) + } +} + +prop_compose! { + fn arb_message_coin_predicate_input()( + sender in arb_address(), + recipient in arb_address(), + amount in any::(), + nonce in arb_nonce(), + predicate_gas_used in any::(), + predicate in prop::collection::vec(any::(), 0..64), + predicate_data in prop::collection::vec(any::(), 0..64), + ) -> Input { + Input::message_coin_predicate( + sender, + recipient, + amount, + nonce, + predicate_gas_used, + predicate, + predicate_data, + ) + } +} + +prop_compose! { + fn arb_message_data_signed_input()( + sender in arb_address(), + recipient in arb_address(), + amount in any::(), + nonce in arb_nonce(), + witness_index in any::(), + data in prop::collection::vec(any::(), 0..128), + ) -> Input { + Input::message_data_signed(sender, recipient, amount, nonce, witness_index, data) + } +} + +prop_compose! { + fn arb_message_data_predicate_input()( + sender in arb_address(), + recipient in arb_address(), + amount in any::(), + nonce in arb_nonce(), + predicate_gas_used in any::(), + data in prop::collection::vec(any::(), 0..128), + predicate in prop::collection::vec(any::(), 0..64), + predicate_data in prop::collection::vec(any::(), 0..64), + ) -> Input { + Input::message_data_predicate( + sender, + recipient, + amount, + nonce, + predicate_gas_used, + data, + predicate, + predicate_data, + ) + } +} + +fn arb_input_any() -> impl Strategy { + prop_oneof![ + arb_coin_signed(), + arb_coin_predicate(), + arb_contract_input_variant(), + arb_message_coin_signed_input(), + arb_message_coin_predicate_input(), + arb_message_data_signed_input(), + arb_message_data_predicate_input(), + ] +} + +prop_compose! { + fn arb_utxo_id()( + inner in any::<[u8; 32]>(), + index in any::(), + ) -> UtxoId { + let tx_id = inner.into(); + UtxoId::new(tx_id, index) + } +} + +prop_compose! { + fn arb_address()(inner in any::<[u8; 32]>()) -> crate::fuel_types::Address { + crate::fuel_types::Address::new(inner) + } +} + +prop_compose! { + fn arb_asset_id()(inner in any::<[u8; 32]>()) -> crate::fuel_types::AssetId { + crate::fuel_types::AssetId::new(inner) + } +} + +prop_compose! { + fn arb_tx_pointer()( + block_height in 0..1_000_000u32, + tx_index in 0..1_000u16, + ) -> TxPointer { + let block_height = block_height.into(); + TxPointer::new(block_height, tx_index) + } +} + +fn arb_msg_ids() -> impl Strategy> { + prop::collection::vec(arb_msg_id(), 0..10usize) +} + +fn arb_transaction() -> impl Strategy { + prop_oneof![ + arb_script_transaction(), + arb_create_transaction(), + arb_mint_transaction(), + arb_upgrade_transaction(), + arb_upload_transaction(), + arb_blob_transaction(), + ] +} + +prop_compose! { + fn arb_input_contract_core()( + utxo_id in arb_utxo_id(), + balance_root in any::<[u8; 32]>(), + state_root in any::<[u8; 32]>(), + tx_pointer in arb_tx_pointer(), + contract_id in any::<[u8; 32]>(), + ) -> InputContract { + InputContract { + utxo_id, + balance_root: balance_root.into(), + state_root: state_root.into(), + tx_pointer, + contract_id: ContractId::new(contract_id), + } + } +} + +prop_compose! { + fn arb_output_contract_core()( + input_index in any::(), + balance_root in any::<[u8; 32]>(), + state_root in any::<[u8; 32]>(), + ) -> OutputContract { + OutputContract { + input_index, + balance_root: balance_root.into(), + state_root: state_root.into(), + } + } +} + +fn arb_create_transaction() -> impl Strategy { + ( + arb_policies(), + any::<[u8; 32]>(), + prop::collection::vec(arb_storage_slot(), 0..4), + arb_inputs(), + arb_outputs(), + prop::collection::vec(arb_witness(), 1..4), + ) + .prop_map( + |(policies, salt_bytes, storage_slots, inputs, outputs, witnesses)| { + let create = crate::fuel_tx::Transaction::create( + 0, + policies, + Salt::from(salt_bytes), + storage_slots, + inputs, + outputs, + witnesses, + ); + Transaction::Create(create) + }, + ) +} + +fn arb_mint_transaction() -> impl Strategy { + ( + arb_tx_pointer(), + arb_input_contract_core(), + arb_output_contract_core(), + any::(), + arb_asset_id(), + any::(), + ) + .prop_map( + |( + tx_pointer, + input_contract, + output_contract, + mint_amount, + mint_asset_id, + gas_price, + )| { + let mint = crate::fuel_tx::Transaction::mint( + tx_pointer, + input_contract, + output_contract, + mint_amount, + mint_asset_id, + gas_price, + ); + Transaction::Mint(mint) + }, + ) +} + +fn arb_upgrade_transaction() -> impl Strategy { + ( + arb_policies(), + arb_inputs(), + arb_outputs(), + prop::collection::vec(arb_witness(), 1..4), + any::<[u8; 32]>(), + ) + .prop_map(|(policies, inputs, outputs, witnesses, checksum_bytes)| { + let purpose = UpgradePurpose::ConsensusParameters { + witness_index: 0, + checksum: checksum_bytes.into(), + }; + let upgrade = crate::fuel_tx::Transaction::upgrade( + purpose, policies, inputs, outputs, witnesses, + ); + Transaction::Upgrade(upgrade) + }) +} + +fn arb_upload_transaction() -> impl Strategy { + ( + arb_policies(), + arb_inputs(), + arb_outputs(), + prop::collection::vec(arb_witness(), 1..4), + any::<[u8; 32]>(), + prop::collection::vec(any::<[u8; 32]>(), 0..4), + 1u16..=4, + any::(), + ) + .prop_map( + |( + policies, + inputs, + outputs, + witnesses, + root_bytes, + proof_entries, + subsections_number, + subsection_index_candidate, + )| { + let proof_set = proof_entries + .into_iter() + .map(Bytes32::from) + .collect::>(); + let subsections_number = subsections_number.max(1); + let body = UploadBody { + root: root_bytes.into(), + witness_index: 0, + subsection_index: subsection_index_candidate, + subsections_number, + proof_set, + }; + let upload = crate::fuel_tx::Transaction::upload( + body, policies, inputs, outputs, witnesses, + ); + Transaction::Upload(upload) + }, + ) +} + +fn arb_blob_transaction() -> impl Strategy { + ( + arb_policies(), + arb_inputs(), + arb_outputs(), + prop::collection::vec(arb_witness(), 0..3), + prop::collection::vec(any::(), 0..256), + ) + .prop_map(|(policies, inputs, outputs, witnesses, payload)| { + let blob_id = BlobId::compute(&payload); + let body = BlobBody { + id: blob_id, + witness_index: 0, + }; + let blob = crate::fuel_tx::Transaction::blob( + body, policies, inputs, outputs, witnesses, + ); + Transaction::Blob(blob) + }) +} + +prop_compose! { + fn arb_consensus_header()( + prev_root in any::<[u8; 32]>(), + time in any::(), + ) -> crate::blockchain::header::ConsensusHeader { + crate::blockchain::header::ConsensusHeader { + prev_root: prev_root.into(), + height: BlockHeight::new(0), + time: Tai64(time), + generated: GeneratedConsensusFields::default(), + } + } +} + +prop_compose! { + /// Generate an arbitrary block with a variable number of transactions + pub fn arb_block()( + txs in arb_txs(), + da_height in any::(), + consensus_parameter_version in any::(), + state_transition_bytecode_version in any::(), + msg_ids in arb_msg_ids(), + event_root in any::<[u8; 32]>(), + mut consensus_header in arb_consensus_header(), + ) -> (Block, Vec, Bytes32) { + let mut fuel_block = Block::default(); + + *fuel_block.transactions_mut() = txs; + + fuel_block.header_mut().set_da_height(DaBlockHeight(da_height)); + fuel_block.header_mut().set_consensus_parameters_version(consensus_parameter_version); + fuel_block.header_mut().set_state_transition_bytecode_version(state_transition_bytecode_version); + + let count = fuel_block.transactions().len().try_into().expect("we shouldn't have more than u16::MAX transactions"); + let msg_root = msg_ids + .iter() + .fold(MerkleRootCalculator::new(), |mut tree, id| { + tree.push(id.as_ref()); + tree + }) + .root() + .into(); + let tx_root = generate_txns_root(fuel_block.transactions()); + let event_root = event_root.into(); + fuel_block.header_mut().set_transactions_count(count); + fuel_block.header_mut().set_message_receipt_count(msg_ids.len().try_into().expect("we shouldn't have more than u32::MAX messages")); + fuel_block.header_mut().set_transaction_root(tx_root); + fuel_block.header_mut().set_message_outbox_root(msg_root); + fuel_block.header_mut().set_event_inbox_root(event_root); + + // Consensus + // TODO: Include V2 Application with V2 Header + let application_hash = fuel_block.header().application_v1().unwrap().hash(); + consensus_header.generated.application_hash = application_hash; + fuel_block.header_mut().set_consensus_header(consensus_header); + (fuel_block, msg_ids, event_root) + } +} diff --git a/tests/Cargo.toml b/tests/Cargo.toml index d68dcf22481..377d92b1740 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -19,6 +19,7 @@ aws-kms = ["dep:aws-config", "dep:aws-sdk-kms", "fuel-core-bin/aws-kms"] fault-proving = [ "fuel-core/fault-proving", "fuel-core-types/fault-proving", + "fuel-core-block-aggregator-api/fault-proving", "fuel-core-storage/fault-proving", "fuel-core-upgradable-executor/fault-proving", "fuel-core-poa/fault-proving", diff --git a/tests/tests/rpc.rs b/tests/tests/rpc.rs index c2aecedecc1..aa6c564834b 100644 --- a/tests/tests/rpc.rs +++ b/tests/tests/rpc.rs @@ -1,5 +1,4 @@ #![allow(non_snake_case)] - use fuel_core::{ database::Database, service::{ @@ -7,16 +6,17 @@ use fuel_core::{ FuelService, }, }; -use fuel_core_block_aggregator_api::api::protobuf_adapter::{ - block_aggregator_client::BlockAggregatorClient, - block_response::Payload, +use fuel_core_block_aggregator_api::protobuf_types::{ + BlockHeightRequest as ProtoBlockHeightRequest, + BlockRangeRequest as ProtoBlockRangeRequest, + NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest, + block::VersionedBlock as ProtoVersionedBlock, + block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient, + block_response::Payload as ProtoPayload, + header::VersionedHeader as ProtoVersionedHeader, }; use fuel_core_client::client::FuelClient; -use fuel_core_types::{ - blockchain::block::Block, - fuel_tx::*, - fuel_types::BlockHeight, -}; +use fuel_core_types::fuel_tx::*; use futures::StreamExt; use test_helpers::client_ext::ClientExt; @@ -35,7 +35,7 @@ async fn get_block_range__can_get_serialized_block_from_rpc() { let _ = graphql_client.submit_and_await_commit(&tx).await.unwrap(); let rpc_url = format!("http://{}", rpc_url); - let mut rpc_client = BlockAggregatorClient::connect(rpc_url) + let mut rpc_client = ProtoBlockAggregatorClient::connect(rpc_url) .await .expect("could not connect to server"); @@ -44,15 +44,11 @@ async fn get_block_range__can_get_serialized_block_from_rpc() { .await .unwrap() .unwrap(); - let header = expected_block.header; + let expected_header = expected_block.header; // when - let request = - fuel_core_block_aggregator_api::api::protobuf_adapter::BlockRangeRequest { - start: 1, - end: 1, - }; - let actual_bytes = if let Some(Payload::Literal(block)) = rpc_client + let request = ProtoBlockRangeRequest { start: 1, end: 1 }; + let actual_block = if let Some(ProtoPayload::Literal(block)) = rpc_client .get_block_range(request) .await .unwrap() @@ -63,23 +59,18 @@ async fn get_block_range__can_get_serialized_block_from_rpc() { .unwrap() .payload { - block.data + block } else { panic!("expected literal block payload"); }; - let actual_block: Block = postcard::from_bytes(&actual_bytes).unwrap(); + let ProtoVersionedBlock::V1(v1_block) = actual_block.versioned_block.unwrap(); + let actual_height = match v1_block.header.unwrap().versioned_header.unwrap() { + ProtoVersionedHeader::V1(v1_header) => v1_header.height, + ProtoVersionedHeader::V2(v2_header) => v2_header.height, + }; // then - assert_eq!( - BlockHeight::from(header.height.0), - *actual_block.header().height() - ); - // check txs - let actual_tx = actual_block.transactions().first().unwrap(); - let expected_opaque_tx = expected_block.transactions.first().unwrap().to_owned(); - let expected_tx: Transaction = expected_opaque_tx.try_into().unwrap(); - - assert_eq!(&expected_tx, actual_tx); + assert_eq!(expected_header.height.0, actual_height); } #[tokio::test(flavor = "multi_thread")] @@ -98,13 +89,12 @@ async fn get_block_height__can_get_value_from_rpc() { let _ = graphql_client.submit_and_await_commit(&tx).await.unwrap(); let rpc_url = format!("http://{}", rpc_url); - let mut rpc_client = BlockAggregatorClient::connect(rpc_url) + let mut rpc_client = ProtoBlockAggregatorClient::connect(rpc_url) .await .expect("could not connect to server"); // when - let request = - fuel_core_block_aggregator_api::api::protobuf_adapter::BlockHeightRequest {}; + let request = ProtoBlockHeightRequest {}; let expected_height = 1; let actual_height = rpc_client .get_block_height(request) @@ -131,12 +121,11 @@ async fn new_block_subscription__can_get_expect_block() { let tx = Transaction::default_test_tx(); let rpc_url = format!("http://{}", rpc_url); - let mut rpc_client = BlockAggregatorClient::connect(rpc_url) + let mut rpc_client = ProtoBlockAggregatorClient::connect(rpc_url) .await .expect("could not connect to server"); - let request = - fuel_core_block_aggregator_api::api::protobuf_adapter::NewBlockSubscriptionRequest {}; + let request = ProtoNewBlockSubscriptionRequest {}; let mut stream = rpc_client .new_block_subscription(request) .await @@ -148,29 +137,20 @@ async fn new_block_subscription__can_get_expect_block() { let next = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next()) .await .unwrap(); - let actual_bytes = - if let Some(Payload::Literal(block)) = next.unwrap().unwrap().payload { - block.data + let actual_block = + if let Some(ProtoPayload::Literal(block)) = next.unwrap().unwrap().payload { + block } else { panic!("expected literal block payload"); }; + let ProtoVersionedBlock::V1(v1_block) = actual_block.versioned_block.unwrap(); + let actual_height = match v1_block.header.unwrap().versioned_header.unwrap() { + ProtoVersionedHeader::V1(v1_header) => v1_header.height, + ProtoVersionedHeader::V2(v2_header) => v2_header.height, + }; + // then - let expected_block = graphql_client - .full_block_by_height(1) - .await - .unwrap() - .unwrap(); - let header = expected_block.header; - let actual_block: Block = postcard::from_bytes(&actual_bytes).unwrap(); - assert_eq!( - BlockHeight::from(header.height.0), - *actual_block.header().height() - ); - // check txs - let actual_tx = actual_block.transactions().first().unwrap(); - let expected_opaque_tx = expected_block.transactions.first().unwrap().to_owned(); - let expected_tx: Transaction = expected_opaque_tx.try_into().unwrap(); - - assert_eq!(&expected_tx, actual_tx); + let expected_height = 1; + assert_eq!(expected_height, actual_height); }