diff --git a/core-processor/src/precharge.rs b/core-processor/src/precharge.rs index 167419e54f6..6e2620ddbad 100644 --- a/core-processor/src/precharge.rs +++ b/core-processor/src/precharge.rs @@ -29,7 +29,7 @@ use core::marker::PhantomData; use gear_core::{ code::{CodeMetadata, InstantiatedSectionSizes, SectionName}, costs::{BytesAmount, ProcessCosts}, - gas::{ChargeResult, GasAllowanceCounter, GasCounter}, + gas::{GasAllowanceCounter, GasCounter}, ids::ActorId, message::IncomingDispatch, }; @@ -61,6 +61,27 @@ pub enum PreChargeGasOperation { /// Obtain program allocations. #[display("obtain program allocations")] Allocations, + /// Other operations. + #[display("{description}")] + Other { + /// Operation description. + description: &'static str, + + /// Whether the operation affects block gas allowance. + affects_allowance: bool, + }, +} + +impl PreChargeGasOperation { + /// Checks whether the operation affects block gas allowance. + pub const fn affects_allowance(&self) -> bool { + match self { + &Self::Other { + affects_allowance, .. + } => affects_allowance, + _ => true, + } + } } /// Defines result variants of the precharge functions. @@ -142,13 +163,34 @@ impl ContextCharged { self.gas_counter.left() } + /// Charges gas for a non-standard operation. + pub fn charge_extra_fee( + self, + description: &'static str, + affects_allowance: bool, + amount: u64, + ) -> PrechargeResult { + self.charge_gas( + PreChargeGasOperation::Other { + description, + affects_allowance, + }, + amount, + ) + } + /// Charges gas for the operation. fn charge_gas( mut self, operation: PreChargeGasOperation, amount: u64, ) -> PrechargeResult> { - if self.gas_allowance_counter.charge_if_enough(amount) != ChargeResult::Enough { + if operation.affects_allowance() + && self + .gas_allowance_counter + .charge_if_enough(amount) + .is_not_enough() + { let gas_burned = self.gas_counter.burned(); return Err(process_allowance_exceed( @@ -158,7 +200,7 @@ impl ContextCharged { )); } - if self.gas_counter.charge_if_enough(amount) != ChargeResult::Enough { + if self.gas_counter.charge_if_enough(amount).is_not_enough() { let gas_burned = self.gas_counter.burned(); let system_reservation_ctx = SystemReservationContext::from_dispatch(&self.dispatch); diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index e7587ada452..cd0ce12cb23 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -122,6 +122,7 @@ use ethexe_runtime_common::{ use gear_core::gas::GasAllowanceCounter; use gprimitives::{ActorId, H256}; use itertools::Itertools; +use std::collections::HashSet; #[derive(Debug, Clone)] pub struct RunnerConfig { @@ -163,11 +164,16 @@ pub async fn run( let mut allowance_counter = GasAllowanceCounter::new(config.block_gas_limit); let chunk_size = config.chunk_processing_threads; + // Set of programs which has already processed + // their queue. Used to charge first message fee. + let mut processed_first_queue = HashSet::new(); + // Start with injected queues processing. let is_out_of_gas_for_block = run_inner( &mut run_ctx, db.clone(), instance_creator.clone(), + &mut processed_first_queue, &mut allowance_counter, chunk_size, MessageType::Injected, @@ -180,6 +186,7 @@ pub async fn run( &mut run_ctx, db, instance_creator, + &mut processed_first_queue, &mut allowance_counter, chunk_size, MessageType::Canonical, @@ -203,6 +210,7 @@ pub async fn run_overlaid( &mut run_ctx, db, instance_creator, + &mut HashSet::new(), &mut allowance_counter, chunk_size, MessageType::Canonical, @@ -217,6 +225,7 @@ async fn run_inner( run_ctx: &mut C, db: Database, instance_creator: InstanceCreator, + processed_first_queue: &mut HashSet, allowance_counter: &mut GasAllowanceCounter, chunk_size: usize, processing_queue_type: MessageType, @@ -232,6 +241,7 @@ async fn run_inner( chunk_size, states, run_ctx, + processed_first_queue, processing_queue_type, ); @@ -246,6 +256,7 @@ async fn run_inner( chunk, db.clone(), instance_creator.clone(), + processed_first_queue, allowance_counter.left().min(CHUNK_PROCESSING_GAS_LIMIT), processing_queue_type, ) @@ -304,6 +315,7 @@ pub(crate) trait RunContext { &self, execution_chunks: &mut chunks_splitting::ExecutionChunks, actor_state: chunks_splitting::ActorStateHashWithQueueSize, + is_first_queue: bool, queue_type: MessageType, ) { let chunks_splitting::ActorStateHashWithQueueSize { @@ -319,7 +331,14 @@ pub(crate) trait RunContext { }; let chunk_idx = execution_chunks.chunk_idx(queue_size); - execution_chunks.insert_into(chunk_idx, actor_id, hash); + execution_chunks.insert_into( + chunk_idx, + chunks_splitting::ChunkItem { + actor_id, + hash, + is_first_queue, + }, + ); } /// Checks whether queues for specified program must not be executed in the current run. @@ -401,6 +420,7 @@ impl<'a> RunContext for OverlaidRunContext<'a> { &self, execution_chunks: &mut chunks_splitting::ExecutionChunks, actor_state: chunks_splitting::ActorStateHashWithQueueSize, + is_first_queue: bool, queue_type: MessageType, ) { let chunks_splitting::ActorStateHashWithQueueSize { @@ -415,14 +435,20 @@ impl<'a> RunContext for OverlaidRunContext<'a> { MessageType::Injected => injected_queue_size, }; + let chunk_item = chunks_splitting::ChunkItem { + actor_id, + hash, + is_first_queue, + }; + if self.overlaid_ctx.base_program() == actor_id { // Insert base program into heaviest chunk, which is going to be executed first. // This is done to get faster reply from the target dispatch for which overlaid // executor was created. - execution_chunks.insert_into_heaviest(actor_id, hash); + execution_chunks.insert_into_heaviest(chunk_item); } else { let chunk_idx = execution_chunks.chunk_idx(queue_size); - execution_chunks.insert_into(chunk_idx, actor_id, hash); + execution_chunks.insert_into(chunk_idx, chunk_item); } } @@ -463,8 +489,15 @@ fn states( mod chunks_splitting { use super::*; + #[derive(Debug, Clone, Copy)] + pub(super) struct ChunkItem { + pub actor_id: ActorId, + pub hash: H256, + pub is_first_queue: bool, + } + // An alias introduced for better readability of the chunks splitting steps. - type Chunks = Vec>; + pub(super) type Chunk = Vec; // `prepare_execution_chunks` is not exactly sorting (sorting usually `n*log(n)` this one is `O(n)`), // but rather partitioning into subsets (chunks) of programs with approximately similar queue sizes. @@ -472,12 +505,20 @@ mod chunks_splitting { chunk_size: usize, states: Vec, run_ctx: &mut R, + processed_first_queue: &HashSet, processing_queue_type: MessageType, - ) -> Chunks { + ) -> Vec { let mut execution_chunks = ExecutionChunks::new(chunk_size, states.len()); for state in states { - run_ctx.handle_chunk_data(&mut execution_chunks, state, processing_queue_type); + let is_first_queue = !processed_first_queue.contains(&state.actor_id); + + run_ctx.handle_chunk_data( + &mut execution_chunks, + state, + is_first_queue, + processing_queue_type, + ); } execution_chunks.arrange_execution_chunks(run_ctx) @@ -506,7 +547,7 @@ mod chunks_splitting { /// A helper struct to manage execution chunks during their preparation. pub(crate) struct ExecutionChunks { chunk_size: usize, - chunks: Chunks, + chunks: Vec, } impl ExecutionChunks { @@ -527,29 +568,29 @@ mod chunks_splitting { } /// Inserts chunk execution data into the specified chunk index. - pub(super) fn insert_into(&mut self, idx: usize, actor_id: ActorId, hash: H256) { - if let Some(chunk) = self.chunks.get_mut(idx) { - chunk.push((actor_id, hash)); - } else { + pub(super) fn insert_into(&mut self, idx: usize, item: ChunkItem) { + let Some(chunk) = self.chunks.get_mut(idx) else { panic!( "Chunk index {idx} out of bounds: chunks number - {}", self.chunks.len() - ); - } + ) + }; + + chunk.push(item); } /// Insert chunk execution data into the heaviest chunk (most prior, the last one). - pub(super) fn insert_into_heaviest(&mut self, actor_id: ActorId, hash: H256) { - if let Some(chunk) = self.chunks.last_mut() { - chunk.push((actor_id, hash)); - } else { + pub(super) fn insert_into_heaviest(&mut self, item: ChunkItem) { + let Some(chunk) = self.chunks.last_mut() else { panic!("Chunks are empty, cannot insert into heaviest chunk"); - } + }; + + chunk.push(item); } /// Arranges execution chunks by merging uneven chunks and reversing their order, /// so the heaviest chunks are processed first. - fn arrange_execution_chunks(self, run_ctx: &mut R) -> Chunks { + fn arrange_execution_chunks(self, run_ctx: &mut R) -> Vec { self.chunks .into_iter() // Merge uneven chunks @@ -566,7 +607,7 @@ mod chunks_splitting { // earlier the function will nullify it and skip spawning the job for the program queue as it's empty. If the queue // was already nullified, the function will return `false` and the job will be spawned as usual. // For more info, see impl of the [`OverlaidContext`]. - .filter(|&(program_id, _)| !run_ctx.check_task_no_run(program_id)) + .filter(|&chunk_item| !run_ctx.check_task_no_run(chunk_item.actor_id)) .collect() }) .collect() @@ -576,6 +617,7 @@ mod chunks_splitting { mod chunk_execution_spawn { use super::*; + use chunks_splitting::ChunkItem; use rayon::iter::{IntoParallelIterator, ParallelIterator}; /// An alias introduced for better readability of the chunks execution steps. @@ -588,31 +630,43 @@ mod chunk_execution_spawn { /// executed concurrently, then each of the program should have received a reference to the global gas allowance counter /// and charge gas from it concurrently. pub(super) async fn spawn_chunk_execution( - chunk: Vec<(ActorId, H256)>, + chunk: Vec, db: Database, instance_creator: InstanceCreator, + processed_first_queue: &mut HashSet, gas_allowance_for_chunk: u64, processing_queue_type: MessageType, ) -> Vec { + for chunk_item in &chunk { + processed_first_queue.insert(chunk_item.actor_id); + } + tokio::task::spawn_blocking(move || { chunk .into_par_iter() - .map(|(program_id, state_hash)| { - let db = db.clone(); - let mut executor = instance_creator - .instantiate() - .expect("Failed to instantiate executor"); - - let (jn, new_state_hash, gas_spent) = run_runtime( - db, - &mut executor, - program_id, - state_hash, - processing_queue_type, - gas_allowance_for_chunk, - ); - (program_id, new_state_hash, jn, gas_spent) - }) + .map( + |ChunkItem { + actor_id: program_id, + hash: state_hash, + is_first_queue, + }| { + let db = db.clone(); + let mut executor = instance_creator + .instantiate() + .expect("Failed to instantiate executor"); + + let (jn, new_state_hash, gas_spent) = run_runtime( + db, + &mut executor, + program_id, + state_hash, + processing_queue_type, + is_first_queue, + gas_allowance_for_chunk, + ); + (program_id, new_state_hash, jn, gas_spent) + }, + ) .collect() }) .await @@ -625,6 +679,7 @@ mod chunk_execution_spawn { program_id: ActorId, state_hash: H256, queue_type: MessageType, + is_first_queue: bool, gas_allowance: u64, ) -> (ProgramJournals, H256, u64) { let code_id = db.program_code_id(program_id).expect("Code ID must be set"); @@ -640,6 +695,7 @@ mod chunk_execution_spawn { queue_type, instrumented_code, code_metadata, + is_first_queue, gas_allowance, ) .expect("Some error occurs while running program in instance") @@ -788,10 +844,13 @@ mod tests { let mut common_run_context = CommonRunContext { in_block_transitions: &mut InBlockTransitions::default(), }; + + let processed_first_queue = HashSet::from([2, 3, 5].map(ActorId::from)); let chunks = chunks_splitting::prepare_execution_chunks( CHUNK_PROCESSING_THREADS, states, &mut common_run_context, + &processed_first_queue, MessageType::Canonical, ); @@ -801,9 +860,9 @@ mod tests { .map(|chunk| { chunk .into_iter() - .map(|(_, hash)| { + .map(|chunk_item| { states_to_queue_size - .get(&hash) + .get(&chunk_item.hash) .expect("State hash must be in the map") }) .sum::() diff --git a/ethexe/processor/src/host/mod.rs b/ethexe/processor/src/host/mod.rs index 2bfa36914d8..315e2191937 100644 --- a/ethexe/processor/src/host/mod.rs +++ b/ethexe/processor/src/host/mod.rs @@ -160,6 +160,7 @@ impl InstanceWrapper { queue_type: MessageType, maybe_instrumented_code: Option, maybe_code_metadata: Option, + is_first_queue: bool, gas_allowance: u64, ) -> Result<(ProgramJournals, H256, u64)> { let chain_head = self.chain_head.expect("chain head must be set before run"); @@ -171,6 +172,7 @@ impl InstanceWrapper { queue_type, maybe_instrumented_code, maybe_code_metadata, + is_first_queue, gas_allowance, ); diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index a5863fdf78f..676bb68e6bb 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -116,6 +116,21 @@ impl TransitionController<'_, S> { } } +/// Configuration for Ethereum-related constants. +pub struct EthereumConfig { + /// The amount of gas charged for the first message in announce. + first_message_fee: u64, +} + +/// Processes the program message queue of given type. +/// +/// Panics if the queue is empty. It's needed to guarantee +/// that the function always charges for the first message. +/// +/// Returns journals and the amount of gas burned. +// +// TODO: refactor the function to reduce the number of arguments (#5100) +#[allow(clippy::too_many_arguments)] pub fn process_queue( program_id: ActorId, mut program_state: ProgramState, @@ -123,6 +138,7 @@ pub fn process_queue( instrumented_code: Option, code_metadata: Option, ri: &RI, + is_first_queue: bool, gas_allowance: u64, ) -> (ProgramJournals, u64) where @@ -139,10 +155,10 @@ where MessageType::Injected => program_state.injected_queue.hash.is_empty(), }; - if is_queue_empty { - // Queue is empty, nothing to process. - return (Vec::new(), 0); - } + assert!( + !is_queue_empty, + "the function must not be run with empty queues" + ); let queue = program_state .queue_from_msg_type(queue_type) @@ -196,21 +212,31 @@ where reserve_for: 0, }; + // TODO: must be set somewhere by some runtime configuration + let ethereum_config = EthereumConfig { + // TODO: the value of the fee must be something sensible, + // not just some arbitrary number. + first_message_fee: 1000, + }; + let mut mega_journal = Vec::new(); let mut queue_gas_allowance_counter = GasAllowanceCounter::new(gas_allowance); ri.init_lazy_pages(); - for dispatch in queue { + for (i, dispatch) in queue.into_iter().enumerate() { let origin = dispatch.message_type; let call_reply = dispatch.call; + let is_first_message = i == 0 && is_first_queue; let is_first_execution = dispatch.context.is_none(); - let journal = process_dispatch( + let (Ok(journal) | Err(journal)) = process_dispatch( dispatch, &block_config, + ðereum_config, program_id, &program_state, + is_first_message, &instrumented_code, &code_metadata, ri, @@ -246,17 +272,20 @@ where (mega_journal, gas_spent) } +// TODO: refactor the function to reduce the number of arguments (#5100) #[allow(clippy::too_many_arguments)] fn process_dispatch( dispatch: Dispatch, block_config: &BlockConfig, + ethereum_config: &EthereumConfig, program_id: ActorId, program_state: &ProgramState, + is_first_message: bool, instrumented_code: &Option, code_metadata: &Option, ri: &RI, gas_allowance: u64, -) -> Vec +) -> Result, Vec> where S: Storage, RI: RuntimeInterface, @@ -285,22 +314,27 @@ where let dispatch = IncomingDispatch::new(kind, incoming_message, context); - let context = ContextCharged::new(program_id, dispatch, gas_allowance); + let mut context = ContextCharged::new(program_id, dispatch, gas_allowance); - let context = match context.charge_for_program(block_config) { - Ok(context) => context, - Err(journal) => return journal, - }; + if is_first_message { + context = context.charge_extra_fee( + "process the first message", + false, + ethereum_config.first_message_fee, + )?; + } + + let context = context.charge_for_program(block_config)?; let active_state = match &program_state.program { state::Program::Active(state) => state, state::Program::Terminated(program_id) => { log::trace!("Program {program_id} has failed init"); - return core_processor::process_failed_init(context); + return Err(core_processor::process_failed_init(context)); } state::Program::Exited(program_id) => { log::trace!("Program {program_id} has exited"); - return core_processor::process_program_exited(context, *program_id); + return Err(core_processor::process_program_exited(context, *program_id)); } }; @@ -319,13 +353,10 @@ where log::trace!( "Program {program_id} is not yet finished initialization, so cannot process handle message" ); - return core_processor::process_uninitialized(context); + return Err(core_processor::process_uninitialized(context)); } - let context = match context.charge_for_code_metadata(block_config) { - Ok(context) => context, - Err(journal) => return journal, - }; + let context = context.charge_for_code_metadata(block_config)?; let code = instrumented_code .as_ref() @@ -334,11 +365,7 @@ where .as_ref() .expect("Code metadata must be provided if program is active"); - let context = - match context.charge_for_instrumented_code(block_config, code.bytes().len() as u32) { - Ok(context) => context, - Err(journal) => return journal, - }; + let context = context.charge_for_instrumented_code(block_config, code.bytes().len() as u32)?; let allocations = active_state.allocations_hash.map_or_default(|hash| { ri.storage() @@ -346,10 +373,7 @@ where .expect("Cannot get allocations") }); - let context = match context.charge_for_allocations(block_config, allocations.tree_len()) { - Ok(context) => context, - Err(journal) => return journal, - }; + let context = context.charge_for_allocations(block_config, allocations.tree_len())?; let actor_data = ExecutableActorData { allocations: allocations.into(), @@ -357,15 +381,12 @@ where memory_infix: active_state.memory_infix, }; - let context = match context.charge_for_module_instantiation( + let context = context.charge_for_module_instantiation( block_config, actor_data, code.instantiated_section_sizes(), code_metadata, - ) { - Ok(context) => context, - Err(journal) => return journal, - }; + )?; let execution_context = ProcessExecutionContext::new( context, @@ -378,8 +399,10 @@ where let random_data = ri.random_data(); - core_processor::process::>(block_config, execution_context, random_data) - .unwrap_or_else(|err| unreachable!("{err}")) + Ok( + core_processor::process::>(block_config, execution_context, random_data) + .unwrap_or_else(|err| unreachable!("{err}")), + ) } pub const fn pack_u32_to_i64(low: u32, high: u32) -> i64 { diff --git a/ethexe/runtime/src/wasm/api/mod.rs b/ethexe/runtime/src/wasm/api/mod.rs index 5b32242b3aa..75a28a24464 100644 --- a/ethexe/runtime/src/wasm/api/mod.rs +++ b/ethexe/runtime/src/wasm/api/mod.rs @@ -50,6 +50,7 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { queue_kind, maybe_instrumented_code, maybe_code_metadata, + is_first_queue, gas_allowance, ) = Decode::decode(&mut get_slice(arg_ptr, arg_len)).unwrap(); @@ -59,6 +60,7 @@ fn _run(arg_ptr: i32, arg_len: i32) -> i64 { queue_kind, maybe_instrumented_code, maybe_code_metadata, + is_first_queue, gas_allowance, ); diff --git a/ethexe/runtime/src/wasm/api/run.rs b/ethexe/runtime/src/wasm/api/run.rs index 0dd9260dc25..b845e5dbd60 100644 --- a/ethexe/runtime/src/wasm/api/run.rs +++ b/ethexe/runtime/src/wasm/api/run.rs @@ -32,6 +32,7 @@ pub fn run( queue_type: MessageType, maybe_instrumented_code: Option, code_metadata: Option, + is_first_queue: bool, gas_allowance: u64, ) -> (ProgramJournals, u64) { log::debug!("You're calling 'run(..)'"); @@ -55,6 +56,7 @@ pub fn run( maybe_instrumented_code, code_metadata, &ri, + is_first_queue, gas_allowance, );