diff --git a/Cargo.lock b/Cargo.lock index 505887cacf8..70d6cfbd6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5080,6 +5080,7 @@ dependencies = [ name = "ethexe-compute" version = "1.10.0" dependencies = [ + "demo-ping", "derive_more 2.1.1", "ethexe-common", "ethexe-db", @@ -5276,7 +5277,9 @@ dependencies = [ "ethexe-db", "ethexe-runtime", "ethexe-runtime-common", + "futures", "gear-core", + "gear-core-errors", "gear-core-processor", "gear-lazy-pages", "gear-runtime-interface", @@ -5288,6 +5291,7 @@ dependencies = [ "parity-scale-codec", "rand 0.8.5", "rayon", + "scopeguard", "sp-allocator", "sp-wasm-interface", "thiserror 2.0.17", @@ -10966,13 +10970,12 @@ dependencies = [ [[package]] name = "metrics-derive" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3dbdd96ed57d565ec744cba02862d707acf373c5772d152abae6ec5c4e24f6c" +checksum = "161ab904c2c62e7bda0f7562bf22f96440ca35ff79e66c800cbac298f2f4f5ec" dependencies = [ "proc-macro2", "quote", - "regex", "syn 2.0.114", ] diff --git a/ethexe/common/src/gear.rs b/ethexe/common/src/gear.rs index c1bf9edff86..5871df68540 100644 --- a/ethexe/common/src/gear.rs +++ b/ethexe/common/src/gear.rs @@ -430,7 +430,20 @@ impl ToDigest for ValueClaim { } } -#[derive(Clone, Copy, Debug, Encode, Decode, PartialEq, Eq, Default, PartialOrd, Ord, Hash)] +#[derive( + Clone, + Copy, + Debug, + Encode, + Decode, + PartialEq, + Eq, + Default, + PartialOrd, + Ord, + Hash, + derive_more::IsVariant, +)] #[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] pub enum MessageType { #[default] diff --git a/ethexe/common/src/mock.rs b/ethexe/common/src/mock.rs index a133ecde58c..3734b5a0ce8 100644 --- a/ethexe/common/src/mock.rs +++ b/ethexe/common/src/mock.rs @@ -19,8 +19,8 @@ pub use tap::Tap; use crate::{ - Announce, BlockData, BlockHeader, CodeBlobInfo, ComputedAnnounce, Digest, HashOf, - ProgramStates, ProtocolTimelines, Schedule, SimpleBlockData, ValidatorsVec, + Announce, BlockData, BlockHeader, CodeBlobInfo, Digest, HashOf, ProgramStates, + ProtocolTimelines, Schedule, SimpleBlockData, ValidatorsVec, consensus::BatchCommitmentValidationRequest, db::*, ecdsa::{PrivateKey, SignedMessage}, @@ -632,21 +632,3 @@ impl BlockData { self } } - -impl Mock for ComputedAnnounce { - fn mock(_: ()) -> Self { - Self { - announce_hash: HashOf::random(), - promises: Default::default(), - } - } -} - -impl Mock> for ComputedAnnounce { - fn mock(announce_hash: HashOf) -> Self { - Self { - announce_hash, - promises: Default::default(), - } - } -} diff --git a/ethexe/common/src/primitives.rs b/ethexe/common/src/primitives.rs index d1dfb702178..cc6ee1d7697 100644 --- a/ethexe/common/src/primitives.rs +++ b/ethexe/common/src/primitives.rs @@ -17,9 +17,8 @@ // along with this program. If not, see . use crate::{ - DEFAULT_BLOCK_GAS_LIMIT, HashOf, ToDigest, - events::BlockEvent, - injected::{Promise, SignedInjectedTransaction}, + DEFAULT_BLOCK_GAS_LIMIT, HashOf, ToDigest, events::BlockEvent, + injected::SignedInjectedTransaction, }; use alloc::{ collections::{btree_map::BTreeMap, btree_set::BTreeSet}, @@ -130,24 +129,19 @@ impl ToDigest for Announce { } } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ComputedAnnounce { - pub announce_hash: HashOf, - pub promises: Vec, +/// [`PromisePolicy`] tells processor whether should it emits promises or not. +#[derive(Clone, Debug, Copy, Default, PartialEq, Eq, Encode, Decode, derive_more::IsVariant)] +pub enum PromisePolicy { + /// Emits promises in execution process. + Enabled, + // Do not emit promises in execution process. + #[default] + Disabled, } -impl ComputedAnnounce { - pub fn from_announce_hash(announce_hash: HashOf) -> Self { - Self { - announce_hash, - promises: Default::default(), - } - } - - pub fn merge_promises(&mut self, other: ComputedAnnounce) { - self.promises.extend(other.promises); - } -} +// Producer -> (announce, PromisePolicy::Enabled) +// Subordinate -> (announce, PromisePolicy::Disabled) +// ConnectNode -> (announce, PromisePolicy::Disabled) #[derive(PartialEq, Eq, Hash, Debug, Clone, Copy, Default, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "std", derive(serde::Serialize))] diff --git a/ethexe/compute/Cargo.toml b/ethexe/compute/Cargo.toml index f457a556a4b..f92be57ca7c 100644 --- a/ethexe/compute/Cargo.toml +++ b/ethexe/compute/Cargo.toml @@ -34,4 +34,5 @@ wat.workspace = true wasmparser.workspace = true ethexe-common = { workspace = true, features = ["mock"] } ntest.workspace = true - +# test examples +demo-ping.workspace = true diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index a858a6ab2c7..e62d16b5b28 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -16,24 +16,26 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{ComputeError, ProcessorExt, Result, service::SubService}; +use crate::{ComputeError, ComputeEvent, ProcessorExt, Result, service::SubService}; use ethexe_common::{ - Announce, ComputedAnnounce, HashOf, SimpleBlockData, + Announce, HashOf, PromisePolicy, SimpleBlockData, db::{ AnnounceStorageRO, AnnounceStorageRW, BlockMetaStorageRO, CodesStorageRW, LatestDataStorageRO, LatestDataStorageRW, OnChainStorageRO, }, events::BlockEvent, + injected::Promise, }; use ethexe_db::Database; -use ethexe_processor::ExecutableData; +use ethexe_processor::{ExecutableData, event_stream::Event}; use ethexe_runtime_common::FinalizedBlockTransitions; -use futures::{FutureExt, future::BoxFuture}; +use futures::{FutureExt, StreamExt, TryStreamExt, future::BoxFuture}; use gprimitives::H256; use std::{ collections::VecDeque, task::{Context, Poll}, }; +use tokio::sync::mpsc; #[derive(Debug, Clone, Copy)] pub struct ComputeConfig { @@ -71,7 +73,8 @@ impl ComputeConfig { } /// Type alias for computation future with timing. -type ComputationFuture = future_timing::Timed>>; +type ComputationFuture = future_timing::Timed>>>; +type ComputationStream = futures::stream::BoxStream>; pub struct ComputeSubService { db: Database, @@ -79,8 +82,12 @@ pub struct ComputeSubService { config: ComputeConfig, metrics: Metrics, - input: VecDeque, - computation: Option, + input: VecDeque<(Announce, PromisePolicy)>, + + // computation: Option, + computation_stream: Option>, + // promises_stream: Option, + // pending_event: Option>, } impl ComputeSubService

{ @@ -91,29 +98,385 @@ impl ComputeSubService

{ config, metrics: Metrics::default(), input: VecDeque::new(), - computation: None, + computation_stream: None + // computation: None, + // promises_stream: None, + // pending_event: None, } } - pub fn receive_announce_to_compute(&mut self, announce: Announce) { - self.input.push_back(announce); + pub fn receive_announce_to_compute( + &mut self, + announce: Announce, + promise_policy: PromisePolicy, + ) { + self.input.push_back((announce, promise_policy)); } - async fn compute( + // async fn compute( + // db: Database, + // config: ComputeConfig, + // mut processor: P, + // announce: Announce, + // promise_policy: PromisePolicy, + // ) -> Result> { + // let announce_hash = announce.to_hash(); + // let block_hash = announce.block_hash; + + // if !db.block_meta(block_hash).prepared { + // return Err(ComputeError::BlockNotPrepared(block_hash)); + // } + + // let not_computed_announces = utils::find_parent_not_computed_announces(&announce, &db)?; + // if !not_computed_announces.is_empty() { + // log::trace!( + // "compute-sub-service: announce({announce_hash}) contains a {} previous not computed announce, start computing...", + // not_computed_announces.len(), + // ); + + // for (announce_hash, announce) in not_computed_announces { + // // Set the promise_out_tx = None, because we want to receive the promises only from target announce. + // Self::compute_one( + // &db, + // &mut processor, + // config, + // announce_hash, + // announce, + // PromisePolicy::Disabled, + // ) + // .await?; + // } + // } + + // // Compute the target announce + // Self::compute_one( + // &db, + // &mut processor, + // config, + // announce_hash, + // announce, + // promise_policy, + // ) + // .await + // } + + fn computation_stream( db: Database, + processor: P, config: ComputeConfig, - mut processor: P, + promise_policy: PromisePolicy, + ) -> Result> { + let stream = match promise_policy { + PromisePolicy::Disabled => { + futures::stream::once(processor.process_announce(executable)) + .map(|result| { + result.map(|transitions| { + utils::update_db_from_transitions(transitions, &db); + ComputeEvent::AnnounceComputed(announce_hash) + }) + }) + .boxed() + } + PromisePolicy::Enabled => processor + .process_announce_with_promises(executable)? + .map(|event| match event { + Event::Promise(promise) => Ok(ComputeEvent::Promise(promise, announce_hash)), + Event::BlockTransitions(result) => result.map(|transitions| { + utils::update_db_from_transitions(transitions, &db); + ComputeEvent::AnnounceComputed(announce_hash) + }), + }) + .map_err(Into::into) + .boxed(), + }; + + Ok(utils::ComputationStream::new( + db, + processor, + config, + not_computed, + stream, + )) + } + + // async fn compute_announce( + // db: &Database, + // processor: P, + // announce: Announce, + // config: ComputeConfig, + // ) { + // let executable = + // utils::prepare_executable_for_announce(db, announce, config.canonical_quarantine())?; + // let r = processor + // .process_announce(executable) + // .map(|result| { + // result.map(|transitions| { + // utils::update_db_from_transitions(transitions, db); + // ComputeEvent::AnnounceComputed(announce_hash) + // }) + // }) + // .await; + // } + + // async fn compute_one( + // db: &Database, + // processor: &mut P, + // config: ComputeConfig, + // announce_hash: HashOf, + // announce: Announce, + // promise_policy: PromisePolicy, + // ) -> Result> { + // let executable = + // utils::prepare_executable_for_announce(db, announce, config.canonical_quarantine())?; + + // // TODO: check here the promise_policy + // let one_stream = + // futures::stream::once(processor.process_announce(executable)).map(|result| { + // result.map(|transitions| { + // utils::update_db_from_transitions(transitions, db); + // ComputeEvent::AnnounceComputed(announce_hash) + // }) + // }); + // let processor_stream = processor + // .process_announce_with_promises(executable)? + // .map(|event| match event { + // Event::Promise(promise) => Ok(ComputeEvent::Promise(promise, announce_hash)), + // Event::BlockTransitions(result) => result.map(|transitions| { + // utils::update_db_from_transitions(transitions, db); + // ComputeEvent::AnnounceComputed(announce_hash) + // }), + // }); + + // Ok(announce_hash) + // } +} + +impl SubService for ComputeSubService

{ + type Output = ComputeEvent; + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.computation_stream.is_none() + && let Some((announce, promise_policy)) = self.input.pop_front() + { + // let maybe_promise_out_tx = match promise_policy { + // PromisePolicy::Enabled => { + // let (sender, receiver) = mpsc::unbounded_channel(); + // self.promises_stream = Some(utils::AnnouncePromisesStream::new( + // receiver, + // announce.to_hash(), + // )); + + // Some(sender) + // } + // PromisePolicy::Disabled => None, + // }; + + // self.computation_stream = Some(future_timing::timed( + // Self::compute( + // self.db.clone(), + // self.config, + // self.processor.clone(), + // announce, + // promise_policy, + // ) + // .boxed(), + // )); + self.computation_stream = Some(Self::computation_stream( + self.db.clone(), + self.processor.clone(), + self.config.clone(), + promise_policy, + )?); + } + + // if let Some(ref mut stream) = self.promises_stream + // && let Poll::Ready(maybe_event) = stream.poll_next_unpin(cx) + // { + // match maybe_event { + // Some(event) => return Poll::Ready(Ok(event)), + // None => { + // log::trace!("announce's promises stream is ended"); + // self.promises_stream = None; + + // // Checking for possible event of finishing announce computation. + // if let Some(event) = self.pending_event.take() { + // return Poll::Ready(event); + // } + // } + // } + // } + + // if let Some(ref mut computation) = self.computation + // && let Poll::Ready(timing_result) = computation.poll_unpin(cx) + // { + // let (timing, result) = timing_result.into_parts(); + // self.metrics + // .announce_processing_latency + // .record((timing.busy() + timing.idle()).as_secs_f64()); + + // self.computation = None; + + // match self.promises_stream.is_some() { + // true => { + // // We cannot return [`ComputeEvent::AnnounceComputed`] before all promises will be given. + // self.pending_event = Some(result.map(Into::into)); + // } + // false => { + // return Poll::Ready(result.map(Into::into)); + // } + // } + // } + + if let Some(ref mut stream) = self.computation_stream + && let Poll::Ready(maybe_result) = stream.poll_next_unpin(cx) + { + match maybe_result { + Some(Ok(event)) => return Poll::Ready(Ok(event)), + Some(Err(err)) => { + self.computation_stream = None; + return Poll::Ready(Err(err)); + } + None => { + unimplemented!("TODO: FIX THIS CASE"); + self.computation_stream = None; + } + } + } + + Poll::Pending + } +} + +/// Utils for [`ComputeSubService`]. +pub(crate) mod utils { + use super::*; + use futures::{Stream, stream::BoxStream}; + use std::pin::Pin; + + type ComputationFuture = future_timing::Timed>>; + + pub(super) struct ComputationStream

{ + not_computed_announces: VecDeque<(HashOf, Announce)>, + event_stream: BoxStream>, + + db: Database, + processor: P, + config: ComputeConfig, + announce_computing_future: Option, + } + + impl

ComputationStream

{ + pub fn new( + db: Database, + processor: P, + config: ComputeConfig, + not_computed: VecDeque<(HashOf, Announce)>, + stream: BoxStream>, + ) -> Self { + Self { + not_computed_announces: not_computed, + event_stream: stream, + + db, + processor, + config, + announce_computing_future: None, + } + } + } + + impl Stream for ComputationStream

{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.announce_computing_future.is_none() + && let Some((announce_hash, announce)) = self.not_computed_announces.pop_front() + { + let canonical_quarantine = self.config.canonical_quarantine(); + match utils::prepare_executable_for_announce( + &self.db, + announce, + canonical_quarantine, + ) { + Ok(executable) => { + let future = self.processor.process_announce(executable).map(|result| { + result.map(|transitions| { + utils::update_db_from_transitions(transitions, &self.db); + }) + }); + self.announce_computing_future = Some(future_timing::timed(future.boxed())) + } + Err(e) => return Poll::Ready(Some(Err(e))), + }; + } + + if let Some(ref mut future) = self.announce_computing_future { + let (timing, result) = futures::ready!(future.poll_unpin(cx)).into_parts(); + match result { + Ok(()) => { + self.announce_computing_future = None; + cx.waker().wake_by_ref(); + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + self.event_stream.poll_next_unpin(cx) + } + } + + pub fn prepare_executable_for_announce( + db: &Database, announce: Announce, - ) -> Result { - let announce_hash = announce.to_hash(); + canonical_quarantine: u8, + ) -> Result { let block_hash = announce.block_hash; - if !db.block_meta(block_hash).prepared { - return Err(ComputeError::BlockNotPrepared(block_hash)); - } + let matured_events = + find_canonical_events_post_quarantine(db, block_hash, canonical_quarantine)?; + + let events = matured_events + .into_iter() + .filter_map(|event| event.to_request()) + .collect(); + + Ok(ExecutableData { + block: SimpleBlockData { + hash: block_hash, + header: db + .block_header(block_hash) + .ok_or(ComputeError::BlockHeaderNotFound(block_hash))?, + }, + program_states: db + .announce_program_states(announce.parent) + .ok_or(ComputeError::ProgramStatesNotFound(announce.parent))?, + schedule: db + .announce_schedule(announce.parent) + .ok_or(ComputeError::ScheduleNotFound(announce.parent))?, + injected_transactions: announce + .injected_transactions + .into_iter() + .map(|tx| tx.into_verified()) + .collect(), + gas_allowance: announce.gas_allowance, + events, + }) + } + pub(super) fn find_parent_not_computed_announces( + announce: &Announce, + db: &DB, + ) -> Result, Announce)>> + where + DB: AnnounceStorageRO + LatestDataStorageRO, + { let mut parent_hash = announce.parent; - let mut announces_chain: VecDeque<_> = [(announce_hash, announce)].into(); + let mut announces_chain = VecDeque::new(); + let start_announce_hash = db + .latest_data() + .ok_or_else(|| ComputeError::LatestDataNotFound)? + .start_announce_hash; + loop { if db.announce_meta(parent_hash).computed { break; @@ -126,40 +489,58 @@ impl ComputeSubService

{ let next_parent_hash = parent_announce.parent; announces_chain.push_front((parent_hash, parent_announce)); + // This was a start announce, no need to go further. + if parent_hash == start_announce_hash { + break; + } + parent_hash = next_parent_hash; } - let mut computed_announce = ComputedAnnounce::from_announce_hash(announce_hash); - if announces_chain.is_empty() { - log::trace!("All announces are already computed"); - return Ok(computed_announce); - } + Ok(announces_chain) + } + + /// Finds events from Ethereum in database which can be processed in current block. + pub fn find_canonical_events_post_quarantine( + db: &Database, + mut block_hash: H256, + canonical_quarantine: u8, + ) -> Result> { + let genesis_block = db + .latest_data() + .ok_or_else(|| ComputeError::LatestDataNotFound)? + .genesis_block_hash; + + let mut block_header = db + .block_header(block_hash) + .ok_or_else(|| ComputeError::BlockHeaderNotFound(block_hash))?; + + for _ in 0..canonical_quarantine { + if block_hash == genesis_block { + return Ok(Default::default()); + } - for (announce_hash, announce) in announces_chain { - computed_announce.merge_promises( - Self::compute_one(&db, &mut processor, announce_hash, announce, config).await?, - ); + let parent_hash = block_header.parent_hash; + let parent_header = db + .block_header(parent_hash) + .ok_or(ComputeError::BlockHeaderNotFound(parent_hash))?; + + block_hash = parent_hash; + block_header = parent_header; } - Ok(computed_announce) + db.block_events(block_hash) + .ok_or(ComputeError::BlockEventsNotFound(block_hash)) } - async fn compute_one( + pub fn update_db_from_transitions( + transitions: FinalizedBlockTransitions, db: &Database, - processor: &mut P, - announce_hash: HashOf, - announce: Announce, - config: ComputeConfig, - ) -> Result { - let executable = - prepare_executable_for_announce(db, announce, config.canonical_quarantine())?; - let processing_result = processor.process_announce(executable).await?; - + ) -> Result<()> { let FinalizedBlockTransitions { transitions, states, schedule, - promises, program_creations, } = processing_result; @@ -180,125 +561,128 @@ impl ComputeSubService

{ data.computed_announce_hash = announce_hash; }) .ok_or(ComputeError::LatestDataNotFound)?; - - Ok(ComputedAnnounce { - announce_hash, - promises, - }) + Ok(()) } } -impl SubService for ComputeSubService

{ - type Output = ComputedAnnounce; +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + ComputeService, + tests::{MockProcessor, PROCESSOR_RESULT}, + }; + use ethexe_common::{ + DEFAULT_BLOCK_GAS_LIMIT, + db::OnChainStorageRW, + events::{ + RouterEvent, mirror::ExecutableBalanceTopUpRequestedEvent, router::ProgramCreatedEvent, + }, + gear::StateTransition, + mock::*, + }; + use ethexe_processor::Processor; + use gear_core::{ + message::{ReplyCode, SuccessReplyReason}, + rpc::ReplyInfo, + }; + use gprimitives::{ActorId, H256}; - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.computation.is_none() - && let Some(announce) = self.input.pop_front() - { - self.computation = Some(future_timing::timed( - Self::compute( - self.db.clone(), - self.config, - self.processor.clone(), - announce, - ) - .boxed(), - )); + mod test_utils { + use crate::CodeAndIdUnchecked; + use ethexe_common::{ + PrivateKey, SignedMessage, + events::{MirrorEvent, mirror::MessageQueueingRequestedEvent}, + injected::{InjectedTransaction, SignedInjectedTransaction}, + }; + use ethexe_processor::ValidCodeInfo; + use ethexe_runtime_common::RUNTIME_ID; + use gear_core::ids::prelude::CodeIdExt; + use gprimitives::{CodeId, MessageId}; + + use super::*; + + const USER_ID: ActorId = ActorId::new([1u8; 32]); + + pub fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId { + let code_id = CodeId::generate(code); + + let ValidCodeInfo { + code, + instrumented_code, + code_metadata, + } = processor + .process_code(CodeAndIdUnchecked { + code: code.to_vec(), + code_id, + }) + .expect("failed to process code") + .valid + .expect("code is invalid"); + + db.set_original_code(&code); + db.set_instrumented_code(RUNTIME_ID, code_id, instrumented_code); + db.set_code_metadata(code_id, code_metadata); + db.set_code_valid(code_id, true); + + code_id } - if let Some(computation) = &mut self.computation - && let Poll::Ready(timing_result) = computation.poll_unpin(cx) - { - let (timing, result) = timing_result.into_parts(); - self.metrics - .announce_processing_latency - .record((timing.busy() + timing.idle()).as_secs_f64()); - self.computation = None; - return Poll::Ready(result); + pub fn block_events(len: usize, actor_id: ActorId, payload: Vec) -> Vec { + (0..len) + .map(|_| canonical_event(actor_id, payload.clone())) + .collect() } - Poll::Pending - } -} - -pub fn prepare_executable_for_announce( - db: &Database, - announce: Announce, - canonical_quarantine: u8, -) -> Result { - let block_hash = announce.block_hash; - - let matured_events = - find_canonical_events_post_quarantine(db, block_hash, canonical_quarantine)?; - - let events = matured_events - .into_iter() - .filter_map(|event| event.to_request()) - .collect(); - - Ok(ExecutableData { - block: SimpleBlockData { - hash: block_hash, - header: db - .block_header(block_hash) - .ok_or(ComputeError::BlockHeaderNotFound(block_hash))?, - }, - program_states: db - .announce_program_states(announce.parent) - .ok_or(ComputeError::ProgramStatesNotFound(announce.parent))?, - schedule: db - .announce_schedule(announce.parent) - .ok_or(ComputeError::ScheduleNotFound(announce.parent))?, - injected_transactions: announce - .injected_transactions - .into_iter() - .map(|tx| tx.into_verified()) - .collect(), - gas_allowance: announce.gas_allowance, - events, - }) -} - -/// Finds events from Ethereum in database which can be processed in current block. -fn find_canonical_events_post_quarantine( - db: &Database, - mut block_hash: H256, - canonical_quarantine: u8, -) -> Result> { - let genesis_block = db - .latest_data() - .ok_or_else(|| ComputeError::LatestDataNotFound)? - .genesis_block_hash; - - let mut block_header = db - .block_header(block_hash) - .ok_or_else(|| ComputeError::BlockHeaderNotFound(block_hash))?; - - for _ in 0..canonical_quarantine { - if block_hash == genesis_block { - return Ok(Default::default()); + pub fn canonical_event(actor_id: ActorId, payload: Vec) -> BlockEvent { + BlockEvent::Mirror { + actor_id, + event: MirrorEvent::MessageQueueingRequested(MessageQueueingRequestedEvent { + id: MessageId::new(H256::random().0), + source: USER_ID, + value: 0, + payload, + call_reply: false, + }), + } } - let parent_hash = block_header.parent_hash; - let parent_header = db - .block_header(parent_hash) - .ok_or(ComputeError::BlockHeaderNotFound(parent_hash))?; + pub fn create_program_events(actor_id: ActorId, code_id: CodeId) -> Vec { + let created_event = + BlockEvent::Router(RouterEvent::ProgramCreated(ProgramCreatedEvent { + actor_id, + code_id, + })); + + let top_up_event = BlockEvent::Mirror { + actor_id, + event: MirrorEvent::ExecutableBalanceTopUpRequested( + ExecutableBalanceTopUpRequestedEvent { + value: 500_000_000_000_000, + }, + ), + }; + + vec![created_event, top_up_event] + } - block_hash = parent_hash; - block_header = parent_header; + pub fn injected_tx( + destination: ActorId, + payload: Vec, + ref_block: H256, + ) -> SignedInjectedTransaction { + let tx = InjectedTransaction { + destination, + payload: payload.into(), + value: 0, + reference_block: ref_block, + salt: H256::random().0.to_vec().into(), + }; + let pk = PrivateKey::random(); + SignedMessage::create(pk, tx).unwrap() + } } - db.block_events(block_hash) - .ok_or(ComputeError::BlockEventsNotFound(block_hash)) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::tests::{MockProcessor, PROCESSOR_RESULT}; - use ethexe_common::{gear::StateTransition, mock::*}; - use gprimitives::{ActorId, H256}; - #[tokio::test] #[ntest::timeout(3000)] async fn test_compute() { @@ -330,9 +714,12 @@ mod tests { // Set the PROCESSOR_RESULT to return non-empty result PROCESSOR_RESULT.with_borrow_mut(|r| *r = non_empty_result.clone()); - service.receive_announce_to_compute(announce); + service.receive_announce_to_compute(announce, PromisePolicy::Disabled); - assert_eq!(service.next().await.unwrap().announce_hash, announce_hash); + assert_eq!( + service.next().await.unwrap().unwrap_announce_computed(), + announce_hash + ); // Verify block was marked as computed assert!(db.announce_meta(announce_hash).computed); @@ -349,4 +736,218 @@ mod tests { announce_hash ); } + + #[tokio::test] + #[ntest::timeout(30000)] + async fn test_compute_with_promises() { + gear_utils::init_default_logger(); + const BLOCKCHAIN_LEN: usize = 10; + + let db = Database::memory(); + let mut processor = Processor::new(db.clone()).unwrap(); + let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db); + let ping_id = ActorId::from(0x10000); + + let blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db); + + // Setup first announce. + let start_announce_hash = { + let mut announce = blockchain.block_top_announce(0).announce.clone(); + announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); + + let announce_hash = db.set_announce(announce); + db.mutate_announce_meta(announce_hash, |meta| meta.computed = true); + db.mutate_latest_data(|data| { + data.start_announce_hash = announce_hash; + }); + db.set_announce_program_states(announce_hash, Default::default()); + db.set_announce_schedule(announce_hash, Default::default()); + + announce_hash + }; + + // Setup announces and events. + let mut parent_announce = start_announce_hash; + let announces_chain = (1..BLOCKCHAIN_LEN) + .map(|i| { + let announce = { + let mut announce = blockchain.block_top_announce(i).announce.clone(); + announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); + announce.parent = parent_announce; + + let block = announce.block_hash; + let txs = if i != 1 { + vec![test_utils::injected_tx(ping_id, b"PING".into(), block)] + } else { + Default::default() + }; + + announce.injected_transactions = txs; + announce + }; + + let announce_hash = db.set_announce(announce.clone()); + db.mutate_announce_meta(announce_hash, |meta| meta.computed = false); + + let mut block_events = if i == 1 { + test_utils::create_program_events(ping_id, ping_code_id) + } else { + Default::default() + }; + block_events.extend(test_utils::block_events(5, ping_id, b"PING".into())); + db.set_block_events(announce.block_hash, &block_events); + + parent_announce = announce_hash; + announce + }) + .collect::>(); + + let mut compute_service = + ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); + + // Send announces for computation. + compute_service.compute_announce( + announces_chain.get(2).unwrap().clone(), + PromisePolicy::Enabled, + ); + compute_service.compute_announce( + announces_chain.get(5).unwrap().clone(), + PromisePolicy::Enabled, + ); + compute_service.compute_announce( + announces_chain.get(8).unwrap().clone(), + PromisePolicy::Enabled, + ); + + let mut expected_announces = vec![ + announces_chain.get(2).unwrap().to_hash(), + announces_chain.get(5).unwrap().to_hash(), + announces_chain.get(8).unwrap().to_hash(), + ]; + + let mut expected_promises = expected_announces + .iter() + .map(|hash| { + let announce = db.announce(*hash).unwrap(); + let tx = announce.injected_transactions[0].clone().into_data(); + Promise { + tx_hash: tx.to_hash(), + reply: ReplyInfo { + payload: b"PONG".into(), + value: 0, + code: ReplyCode::Success(SuccessReplyReason::Manual), + }, + } + }) + .collect::>(); + + while !expected_announces.is_empty() || !expected_promises.is_empty() { + match compute_service.next().await.unwrap().unwrap() { + ComputeEvent::AnnounceComputed(hash) => { + if *expected_announces.first().unwrap() == hash { + expected_announces.remove(0); + } + } + ComputeEvent::Promise(promise, announce) => { + if *expected_announces.first().unwrap() == announce + && expected_promises.first().unwrap().clone() == promise + { + expected_promises.remove(0); + } + } + _ => unreachable!("unexpected event for current test"), + } + } + } + + #[tokio::test] + async fn test_compute_with_early_break() { + gear_utils::init_default_logger(); + + let db = Database::memory(); + let mut processor = Processor::new(db.clone()).unwrap(); + + let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db); + let ping_id = ActorId::from(0x10000); + + let blockchain = BlockChain::mock(3).setup(&db); + + let first_announce_hash = { + let mut announce = blockchain.block_top_announce(1).announce.clone(); + announce.gas_allowance = Some(DEFAULT_BLOCK_GAS_LIMIT); + + let mut canonical_events = test_utils::create_program_events(ping_id, ping_code_id); + canonical_events.push(test_utils::canonical_event(ping_id, b"PING".into())); + + db.set_block_events(announce.block_hash, &canonical_events); + db.set_announce(announce) + }; + + let (announce, announce_hash) = { + let mut announce = blockchain.block_top_announce(2).announce.clone(); + announce.gas_allowance = Some(400_000); + announce.parent = first_announce_hash; + + let ref_block = announce.block_hash; + let txs = (0..300) + .map(|_| test_utils::injected_tx(ping_id, b"PING".into(), ref_block)) + .collect::>(); + announce.injected_transactions = txs; + let hash = db.set_announce(announce.clone()); + (announce, hash) + }; + + let mut compute_service = + ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); + compute_service.compute_announce(announce, PromisePolicy::Enabled); + + loop { + let event = compute_service.next().await.unwrap().unwrap(); + if event == ComputeEvent::AnnounceComputed(announce_hash) { + break; + } + } + } + + #[test] + fn find_not_computed_announces_work_correctly() { + const BLOCKCHAIN_LEN: usize = 10; + + let db = Database::memory(); + let mut blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db); + + // Setup announces except the head to not-computed state. + blockchain + .announces + .iter_mut() + .enumerate() + .for_each(|(idx, (announce_hash, _))| { + // Set the announces to not computed state + if idx != BLOCKCHAIN_LEN - 1 { + db.mutate_announce_meta(*announce_hash, |meta| { + meta.computed = false; + }); + } + }); + + let expected_not_computed_announces = (0..BLOCKCHAIN_LEN - 1) + .map(|idx| blockchain.block_top_announce(idx).announce.to_hash()) + .collect::>(); + + let head_announce = blockchain + .block_top_announce(BLOCKCHAIN_LEN - 1) + .announce + .clone(); + let not_computed_announces = utils::find_parent_not_computed_announces(&head_announce, &db) + .unwrap() + .into_iter() + .map(|v| v.0) + .collect::>(); + + assert_eq!( + expected_not_computed_announces.len(), + not_computed_announces.len() + ); + assert_eq!(expected_not_computed_announces, not_computed_announces); + } } diff --git a/ethexe/compute/src/lib.rs b/ethexe/compute/src/lib.rs index beb30deef4a..a63f9ef6435 100644 --- a/ethexe/compute/src/lib.rs +++ b/ethexe/compute/src/lib.rs @@ -16,14 +16,21 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use ethexe_common::{Announce, CodeAndIdUnchecked, ComputedAnnounce, HashOf}; -use ethexe_processor::{ExecutableData, ProcessedCodeInfo, Processor, ProcessorError}; +pub use compute::{ + ComputeConfig, ComputeSubService, + utils::{find_canonical_events_post_quarantine, prepare_executable_for_announce}, +}; +use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, injected::Promise}; +use ethexe_processor::{ + ExecutableData, ProcessedCodeInfo, Processor, ProcessorError, + event_stream::ProcessorEventStream, +}; use ethexe_runtime_common::FinalizedBlockTransitions; +use futures::TryFutureExt; use gprimitives::{CodeId, H256}; -use std::collections::HashSet; - -pub use compute::{ComputeConfig, ComputeSubService, prepare_executable_for_announce}; pub use service::ComputeService; +use std::collections::HashSet; +use tokio::sync::mpsc; mod codes; mod compute; @@ -38,12 +45,13 @@ pub struct BlockProcessed { pub block_hash: H256, } -#[derive(Debug, Clone, Eq, PartialEq, derive_more::Unwrap)] +#[derive(Debug, Clone, Eq, PartialEq, derive_more::Unwrap, derive_more::From)] pub enum ComputeEvent { RequestLoadCodes(HashSet), CodeProcessed(CodeId), BlockPrepared(H256), - AnnounceComputed(ComputedAnnounce), + AnnounceComputed(HashOf), + Promise(Promise, HashOf), } #[derive(thiserror::Error, Debug)] @@ -83,6 +91,8 @@ pub enum ComputeError { ProgramStatesNotFound(HashOf), #[error("Schedule not found for computed Announce {0:?}")] ScheduleNotFound(HashOf), + #[error("Promise sender dropped")] + PromiseSenderDropped, #[error(transparent)] Processor(#[from] ProcessorError), @@ -96,6 +106,10 @@ pub trait ProcessorExt: Sized + Unpin + Send + Clone + 'static { &mut self, executable: ExecutableData, ) -> impl Future> + Send; + fn process_announce_with_promises( + &mut self, + executable: ExecutableData, + ) -> Result; fn process_upload_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result; } @@ -108,6 +122,14 @@ impl ProcessorExt for Processor { self.process_programs(executable).await.map_err(Into::into) } + fn process_announce_with_promises( + &mut self, + executable: ExecutableData, + ) -> Result { + self.process_programs_with_promises(executable) + .map_err(Into::into) + } + fn process_upload_code( &mut self, code_and_id: CodeAndIdUnchecked, diff --git a/ethexe/compute/src/service.rs b/ethexe/compute/src/service.rs index f6b4ce76997..0a082784738 100644 --- a/ethexe/compute/src/service.rs +++ b/ethexe/compute/src/service.rs @@ -16,13 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +#[cfg(test)] +use crate::tests::MockProcessor; use crate::{ ComputeEvent, ProcessorExt, Result, codes::CodesSubService, compute::{ComputeConfig, ComputeSubService}, prepare::PrepareSubService, }; -use ethexe_common::{Announce, CodeAndIdUnchecked}; +use ethexe_common::{Announce, CodeAndIdUnchecked, PromisePolicy}; use ethexe_db::Database; use ethexe_processor::Processor; use futures::{Stream, stream::FusedStream}; @@ -39,7 +41,7 @@ pub struct ComputeService { } impl ComputeService

{ - // TODO #4550: consider to create Processor inside ComputeService + /// Creates new compute service. pub fn new(config: ComputeConfig, db: Database, processor: P) -> Self { Self { prepare_sub_service: PrepareSubService::new(db.clone()), @@ -47,6 +49,27 @@ impl ComputeService

{ codes_sub_service: CodesSubService::new(db, processor), } } +} + +#[cfg(test)] +impl ComputeService { + /// Creates the processor with default [`ComputeConfig::without_quarantine`] and [`Processor`] with default config. + pub fn new_with_defaults(db: Database) -> Self { + let config = ComputeConfig::without_quarantine(); + let processor = Processor::new(db.clone()).unwrap(); + Self::new(config, db, processor) + } +} + +#[cfg(test)] +impl ComputeService { + pub fn new_mock_processor(db: Database) -> Self { + Self::new(ComputeConfig::without_quarantine(), db, MockProcessor) + } +} + +impl ComputeService

{ + // TODO #4550: consider to create Processor inside ComputeService pub fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) { self.codes_sub_service.receive_code_to_process(code_and_id); @@ -56,9 +79,9 @@ impl ComputeService

{ self.prepare_sub_service.receive_block_to_prepare(block); } - pub fn compute_announce(&mut self, announce: Announce) { + pub fn compute_announce(&mut self, announce: Announce, promise_policy: PromisePolicy) { self.compute_sub_service - .receive_announce_to_compute(announce); + .receive_announce_to_compute(announce, promise_policy); } } @@ -82,8 +105,8 @@ impl Stream for ComputeService

{ return Poll::Ready(Some(result.map(ComputeEvent::from))); }; - if let Poll::Ready(result) = self.compute_sub_service.poll_next(cx) { - return Poll::Ready(Some(result.map(ComputeEvent::AnnounceComputed))); + if let Poll::Ready(event) = self.compute_sub_service.poll_next(cx) { + return Poll::Ready(Some(event)); }; Poll::Pending @@ -109,9 +132,9 @@ pub(crate) trait SubService: Unpin + Send + 'static { #[cfg(test)] mod tests { + use super::*; - use crate::tests::MockProcessor; - use ethexe_common::{CodeAndIdUnchecked, ComputedAnnounce, db::*, mock::*}; + use ethexe_common::{CodeAndIdUnchecked, db::*, mock::*}; use ethexe_db::Database as DB; use futures::StreamExt; use gear_core::ids::prelude::CodeIdExt; @@ -123,9 +146,7 @@ mod tests { gear_utils::init_default_logger(); let db = DB::memory(); - let processor = MockProcessor; - let config = ComputeConfig::without_quarantine(); - let mut service = ComputeService::new(config, db.clone(), processor); + let mut service = ComputeService::new_mock_processor(db.clone()); let chain = BlockChain::mock(1).setup(&db); let block = chain.blocks[1].to_simple().next_block().setup(&db); @@ -147,10 +168,8 @@ mod tests { gear_utils::init_default_logger(); let db = DB::memory(); - let processor = MockProcessor; + let mut service = ComputeService::new_mock_processor(db.clone()); - let config = ComputeConfig::without_quarantine(); - let mut service = ComputeService::new(config, db.clone(), processor); let chain = BlockChain::mock(1).setup(&db); let block = chain.blocks[1].to_simple().next_block().setup(&db); @@ -167,14 +186,11 @@ mod tests { injected_transactions: vec![], }; let announce_hash = announce.to_hash(); - service.compute_announce(announce); + service.compute_announce(announce, PromisePolicy::Disabled); // Poll service to process the block let event = service.next().await.unwrap().unwrap(); - assert_eq!( - event, - ComputeEvent::AnnounceComputed(ComputedAnnounce::mock(announce_hash)) - ); + assert_eq!(event, ComputeEvent::AnnounceComputed(announce_hash)); // Verify block is marked as computed in DB assert!(db.announce_meta(announce_hash).computed); @@ -186,9 +202,7 @@ mod tests { gear_utils::init_default_logger(); let db = DB::memory(); - let processor = MockProcessor; - let config = ComputeConfig::without_quarantine(); - let mut service = ComputeService::new(config, db.clone(), processor); + let mut service = ComputeService::new_mock_processor(db.clone()); // Create test code let code = vec![0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00]; // Simple WASM header diff --git a/ethexe/compute/src/tests.rs b/ethexe/compute/src/tests.rs index 892e512f500..da4bbaffd66 100644 --- a/ethexe/compute/src/tests.rs +++ b/ethexe/compute/src/tests.rs @@ -18,7 +18,7 @@ use super::*; use ethexe_common::{ - CodeBlobInfo, + CodeBlobInfo, PromisePolicy, db::*, events::{ BlockEvent, RouterEvent, @@ -27,7 +27,6 @@ use ethexe_common::{ mock::*, }; use ethexe_db::Database; -use ethexe_processor::Processor; use futures::StreamExt; use gear_core::{ code::{CodeMetadata, InstantiatedSectionSizes, InstrumentedCode}, @@ -41,7 +40,6 @@ thread_local! { transitions: Vec::new(), states: BTreeMap::new(), schedule: BTreeMap::new(), - promises: Vec::new(), program_creations: Vec::new(), } ) }; @@ -62,7 +60,6 @@ impl ProcessorExt for MockProcessor { transitions: vec![], states: BTreeMap::new(), schedule: BTreeMap::new(), - promises: vec![], program_creations: vec![], } }); @@ -70,6 +67,14 @@ impl ProcessorExt for MockProcessor { Ok(result) } + fn process_announce_with_promises( + &mut self, + executable: ExecutableData, + ) -> Result { + // TODO: FIXME + unimplemented!() + } + fn process_upload_code( &mut self, code_and_id: CodeAndIdUnchecked, @@ -169,8 +174,7 @@ impl TestEnv { mark_as_not_prepared(&mut chain); chain = chain.setup(&db); - let config = ComputeConfig::without_quarantine(); - let compute = ComputeService::new(config, db.clone(), Processor::new(db.clone()).unwrap()); + let compute = ComputeService::new_with_defaults(db.clone()); TestEnv { db, compute, chain } } @@ -221,7 +225,8 @@ impl TestEnv { async fn compute_and_assert_announce(&mut self, announce: Announce) { let announce_hash = announce.to_hash(); - self.compute.compute_announce(announce.clone()); + self.compute + .compute_announce(announce.clone(), PromisePolicy::Disabled); let event = self .compute @@ -230,8 +235,8 @@ impl TestEnv { .unwrap() .expect("expect block will be processing"); - let computed_data = event.unwrap_announce_computed(); - assert_eq!(computed_data.announce_hash, announce_hash); + let computed_announce = event.unwrap_announce_computed(); + assert_eq!(computed_announce, announce_hash); self.db.mutate_block_meta(announce.block_hash, |meta| { meta.announces.get_or_insert_default().insert(announce_hash); diff --git a/ethexe/consensus/src/connect/mod.rs b/ethexe/consensus/src/connect/mod.rs index 2b3e57b127b..081be6187b0 100644 --- a/ethexe/consensus/src/connect/mod.rs +++ b/ethexe/consensus/src/connect/mod.rs @@ -27,10 +27,10 @@ use crate::{ }; use anyhow::{Result, anyhow}; use ethexe_common::{ - Address, Announce, ComputedAnnounce, SimpleBlockData, + Address, Announce, HashOf, PromisePolicy, SimpleBlockData, consensus::{VerifiedAnnounce, VerifiedValidationRequest}, db::OnChainStorageRO, - injected::SignedInjectedTransaction, + injected::{Promise, SignedInjectedTransaction}, network::{AnnouncesRequest, AnnouncesResponse}, }; use ethexe_db::Database; @@ -161,8 +161,10 @@ impl ConnectService { AnnounceStatus::Accepted(announce_hash) => { self.output .push_back(ConsensusEvent::AnnounceAccepted(announce_hash)); - self.output - .push_back(ConsensusEvent::ComputeAnnounce(announce)); + self.output.push_back(ConsensusEvent::ComputeAnnounce( + announce, + PromisePolicy::Disabled, + )); } } @@ -262,7 +264,7 @@ impl ConsensusService for ConnectService { Ok(()) } - fn receive_computed_announce(&mut self, _computed_data: ComputedAnnounce) -> Result<()> { + fn receive_computed_announce(&mut self, _announce_hash: HashOf) -> Result<()> { Ok(()) } @@ -285,6 +287,18 @@ impl ConsensusService for ConnectService { Ok(()) } + fn receive_promise_for_signing( + &mut self, + promise: Promise, + announce_hash: HashOf, + ) -> Result<()> { + tracing::error!( + "Connected consensus node receives the promise for signing, but it not responsible for promises providing: \ + promise={promise:?}, announce_hash={announce_hash}" + ); + Ok(()) + } + fn receive_injected_transaction(&mut self, tx: SignedInjectedTransaction) -> Result<()> { // In "connect-node" we do not process injected transactions. tracing::trace!("Received injected transaction: {tx:?}. Ignoring it."); diff --git a/ethexe/consensus/src/lib.rs b/ethexe/consensus/src/lib.rs index b9c4e7eef8a..77709c6a56c 100644 --- a/ethexe/consensus/src/lib.rs +++ b/ethexe/consensus/src/lib.rs @@ -34,9 +34,9 @@ use anyhow::Result; use ethexe_common::{ - Announce, ComputedAnnounce, Digest, HashOf, SimpleBlockData, + Announce, Digest, HashOf, PromisePolicy, SimpleBlockData, consensus::{BatchCommitmentValidationReply, VerifiedAnnounce, VerifiedValidationRequest}, - injected::{SignedInjectedTransaction, SignedPromise}, + injected::{Promise, SignedInjectedTransaction, SignedPromise}, network::{AnnouncesRequest, AnnouncesResponse, SignedValidatorMessage}, }; use futures::{Stream, stream::FusedStream}; @@ -71,11 +71,18 @@ pub trait ConsensusService: fn receive_prepared_block(&mut self, block: H256) -> Result<()>; /// Process a computed block received - fn receive_computed_announce(&mut self, computed_data: ComputedAnnounce) -> Result<()>; + fn receive_computed_announce(&mut self, computed_data: HashOf) -> Result<()>; /// Process a received producer announce fn receive_announce(&mut self, announce: VerifiedAnnounce) -> Result<()>; + /// Receives the raw promise for signing. + fn receive_promise_for_signing( + &mut self, + promise: Promise, + announce_hash: HashOf, + ) -> Result<()>; + /// Process a received validation request fn receive_validation_request(&mut self, request: VerifiedValidationRequest) -> Result<()>; @@ -109,8 +116,7 @@ pub enum ConsensusEvent { /// Announce from producer was rejected AnnounceRejected(HashOf), /// Outer service have to compute announce - #[from] - ComputeAnnounce(Announce), + ComputeAnnounce(Announce, PromisePolicy), /// Outer service have to publish signed message #[from] PublishMessage(SignedValidatorMessage), @@ -120,9 +126,8 @@ pub enum ConsensusEvent { /// Informational event: commitment was successfully submitted #[from] CommitmentSubmitted(CommitmentSubmitted), + #[from] + SignedPromise(SignedPromise), /// Informational event: during service processing, a warning situation was detected Warning(String), - /// Promises for [`ethexe_common::injected::InjectedTransaction`]s execution in some announce. - #[from] - Promises(Vec), } diff --git a/ethexe/consensus/src/validator/mod.rs b/ethexe/consensus/src/validator/mod.rs index 2c303ae7e74..fc1785374ec 100644 --- a/ethexe/consensus/src/validator/mod.rs +++ b/ethexe/consensus/src/validator/mod.rs @@ -54,11 +54,11 @@ use anyhow::{Result, anyhow}; pub use core::BatchCommitter; use derive_more::{Debug, From}; use ethexe_common::{ - Address, ComputedAnnounce, SimpleBlockData, ToDigest, + Address, Announce, HashOf, SimpleBlockData, consensus::{VerifiedAnnounce, VerifiedValidationRequest}, db::OnChainStorageRO, - ecdsa::{PublicKey, SignedMessage}, - injected::SignedInjectedTransaction, + ecdsa::PublicKey, + injected::{Promise, SignedInjectedTransaction}, network::AnnouncesResponse, }; use ethexe_db::Database; @@ -69,7 +69,7 @@ use futures::{ stream::{FusedStream, FuturesUnordered}, }; use gprimitives::H256; -use gsigner::secp256k1::{Secp256k1SignerExt, Signer}; +use gsigner::secp256k1::Signer; use initial::Initial; use std::{ collections::VecDeque, @@ -210,14 +210,22 @@ impl ConsensusService for ValidatorService { self.update_inner(|inner| inner.process_prepared_block(block)) } - fn receive_computed_announce(&mut self, computed_data: ComputedAnnounce) -> Result<()> { - self.update_inner(|inner| inner.process_computed_announce(computed_data)) + fn receive_computed_announce(&mut self, announce_hash: HashOf) -> Result<()> { + self.update_inner(|inner| inner.process_computed_announce(announce_hash)) } fn receive_announce(&mut self, announce: VerifiedAnnounce) -> Result<()> { self.update_inner(|inner| inner.process_announce(announce)) } + fn receive_promise_for_signing( + &mut self, + promise: Promise, + announce_hash: HashOf, + ) -> Result<()> { + self.update_inner(|inner| inner.process_raw_promise(promise, announce_hash)) + } + fn receive_validation_request(&mut self, batch: VerifiedValidationRequest) -> Result<()> { self.update_inner(|inner| inner.process_validation_request(batch)) } @@ -314,14 +322,22 @@ where DefaultProcessing::prepared_block(self.into(), block) } - fn process_computed_announce(self, computed_data: ComputedAnnounce) -> Result { - DefaultProcessing::computed_announce(self.into(), computed_data) + fn process_computed_announce(self, announce_hash: HashOf) -> Result { + DefaultProcessing::computed_announce(self.into(), announce_hash) } fn process_announce(self, announce: VerifiedAnnounce) -> Result { DefaultProcessing::announce_from_producer(self, announce) } + fn process_raw_promise( + self, + promise: Promise, + announce_hash: HashOf, + ) -> Result { + DefaultProcessing::promise_for_signing(self, promise, announce_hash) + } + fn process_validation_request( self, request: VerifiedValidationRequest, @@ -402,14 +418,22 @@ impl StateHandler for ValidatorState { delegate_call!(self => process_prepared_block(block)) } - fn process_computed_announce(self, computed_data: ComputedAnnounce) -> Result { - delegate_call!(self => process_computed_announce(computed_data)) + fn process_computed_announce(self, announce_hash: HashOf) -> Result { + delegate_call!(self => process_computed_announce(announce_hash)) } fn process_announce(self, verified_announce: VerifiedAnnounce) -> Result { delegate_call!(self => process_announce(verified_announce)) } + fn process_raw_promise( + self, + promise: Promise, + announce_hash: HashOf, + ) -> Result { + delegate_call!(self => process_raw_promise(promise, announce_hash)) + } + fn process_validation_request( self, request: VerifiedValidationRequest, @@ -458,12 +482,21 @@ impl DefaultProcessing { fn computed_announce( s: impl Into, - computed_data: ComputedAnnounce, + announce_hash: HashOf, + ) -> Result { + let mut s = s.into(); + s.warning(format!("unexpected computed announce: {}", announce_hash)); + Ok(s) + } + + fn promise_for_signing( + s: impl Into, + promise: Promise, + announce_hash: HashOf, ) -> Result { let mut s = s.into(); s.warning(format!( - "unexpected computed announce: {}", - computed_data.announce_hash + "unexpected promise for signing: promise={promise:?}, announce_hash={announce_hash:?}" )); Ok(s) } @@ -547,11 +580,4 @@ impl ValidatorContext { pub fn pending(&mut self, event: impl Into) { self.pending_events.push_front(event.into()); } - - pub fn sign_message(&self, data: T) -> Result> { - Ok(self - .core - .signer - .signed_message(self.core.pub_key, data, None)?) - } } diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 34091430069..f42bb9e9f01 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -24,11 +24,11 @@ use crate::{ announces::{self, DBAnnouncesExt}, validator::DefaultProcessing, }; -use anyhow::{Context as _, Result, anyhow}; +use anyhow::{Result, anyhow}; use derive_more::{Debug, Display}; use ethexe_common::{ - Announce, ComputedAnnounce, HashOf, SimpleBlockData, ValidatorsVec, db::BlockMetaStorageRO, - gear::BatchCommitment, network::ValidatorMessage, + Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, db::BlockMetaStorageRO, + gear::BatchCommitment, injected::Promise, network::ValidatorMessage, }; use ethexe_service_utils::Timer; use futures::{FutureExt, future::BoxFuture}; @@ -75,26 +75,10 @@ impl StateHandler for Producer { fn process_computed_announce( mut self, - computed_data: ComputedAnnounce, + announce_hash: HashOf, ) -> Result { match &self.state { - State::WaitingAnnounceComputed(expected) - if *expected == computed_data.announce_hash => - { - if !computed_data.promises.is_empty() { - let signed_promises = computed_data - .promises - .into_iter() - .map(|promise| { - self.ctx - .sign_message(promise) - .context("producer: failed to sign promise") - }) - .collect::>()?; - - self.ctx.output(ConsensusEvent::Promises(signed_promises)); - } - + State::WaitingAnnounceComputed(expected) if *expected == announce_hash => { // Aggregate commitment for the block and use `announce_hash` as head for chain commitment. // `announce_hash` is computed and included in the db already, so it's safe to use it. self.state = State::AggregateBatchCommitment { @@ -102,7 +86,7 @@ impl StateHandler for Producer { .ctx .core .clone() - .aggregate_batch_commitment(self.block, computed_data.announce_hash) + .aggregate_batch_commitment(self.block, announce_hash) .boxed(), }; @@ -111,12 +95,36 @@ impl StateHandler for Producer { State::WaitingAnnounceComputed(expected) => { self.warning(format!( "Computed announce {} is not expected, expected {expected}", - computed_data.announce_hash + announce_hash )); Ok(self.into()) } - _ => DefaultProcessing::computed_announce(self, computed_data), + _ => DefaultProcessing::computed_announce(self, announce_hash), + } + } + + fn process_raw_promise( + mut self, + promise: Promise, + announce_hash: HashOf, + ) -> Result { + match &self.state { + State::WaitingAnnounceComputed(expected) if *expected == announce_hash => { + let tx_hash = promise.tx_hash; + + let signed_promise = + self.ctx + .core + .signer + .signed_message(self.ctx.core.pub_key, promise, None)?; + self.ctx.output(signed_promise); + + tracing::trace!("consensus sign promise for transaction-hash={tx_hash}"); + Ok(self.into()) + } + + _ => DefaultProcessing::promise_for_signing(self, promise, announce_hash), } } @@ -233,7 +241,10 @@ impl Producer { self.state = State::WaitingAnnounceComputed(announce_hash); self.ctx .output(ConsensusEvent::PublishMessage(message.into())); - self.ctx.output(ConsensusEvent::ComputeAnnounce(announce)); + self.ctx.output(ConsensusEvent::ComputeAnnounce( + announce, + PromisePolicy::Enabled, + )); Ok(self.into()) } @@ -293,7 +304,7 @@ mod tests { .setup(&state.context().core.db); let state = state - .process_computed_announce(ComputedAnnounce::mock(announce_hash)) + .process_computed_announce(announce_hash) .unwrap() .wait_for_state(|state| state.is_initial()) .await @@ -340,7 +351,7 @@ mod tests { .setup(&state.context().core.db); let mut state = state - .process_computed_announce(ComputedAnnounce::mock(announce_hash)) + .process_computed_announce(announce_hash) .unwrap() .wait_for_state(|state| matches!(state, ValidatorState::Initial(_))) .await @@ -385,7 +396,7 @@ mod tests { .setup(&state.context().core.db); let (state, event) = state - .process_computed_announce(ComputedAnnounce::mock(announce_hash)) + .process_computed_announce(announce_hash) .unwrap() .wait_for_event() .await @@ -429,7 +440,7 @@ mod tests { .setup(&state.context().core.db); let mut state = state - .process_computed_announce(ComputedAnnounce::mock(announce_hash)) + .process_computed_announce(announce_hash) .unwrap() .wait_for_state(|state| matches!(state, ValidatorState::Initial(_))) .await @@ -481,7 +492,7 @@ mod tests { assert!(state.is_producer(), "Expected producer state, got {state}"); assert!(event.is_compute_announce()); - Ok((state, event.unwrap_compute_announce().to_hash())) + Ok((state, event.unwrap_compute_announce().0.to_hash())) } } } diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index 3a5b7961ec8..2c2a550238c 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -28,7 +28,7 @@ use crate::{ use anyhow::Result; use derive_more::{Debug, Display}; use ethexe_common::{ - Address, Announce, ComputedAnnounce, HashOf, SimpleBlockData, + Address, Announce, HashOf, PromisePolicy, SimpleBlockData, consensus::{VerifiedAnnounce, VerifiedValidationRequest}, }; use std::mem; @@ -70,10 +70,13 @@ impl StateHandler for Subordinate { self.ctx } - fn process_computed_announce(self, computed_data: ComputedAnnounce) -> Result { + fn process_computed_announce( + self, + computed_announce_hash: HashOf, + ) -> Result { match &self.state { State::WaitingAnnounceComputed { announce_hash } - if *announce_hash == computed_data.announce_hash => + if *announce_hash == computed_announce_hash => { if self.is_validator { Participant::create(self.ctx, self.block, self.producer) @@ -81,7 +84,7 @@ impl StateHandler for Subordinate { Initial::create(self.ctx) } } - _ => DefaultProcessing::computed_announce(self, computed_data), + _ => DefaultProcessing::computed_announce(self, computed_announce_hash), } } @@ -170,7 +173,10 @@ impl Subordinate { AnnounceStatus::Accepted(announce_hash) => { self.ctx .output(ConsensusEvent::AnnounceAccepted(announce_hash)); - self.ctx.output(ConsensusEvent::ComputeAnnounce(announce)); + self.ctx.output(ConsensusEvent::ComputeAnnounce( + announce, + PromisePolicy::Disabled, + )); self.state = State::WaitingAnnounceComputed { announce_hash }; Ok(self.into()) @@ -192,7 +198,7 @@ impl Subordinate { mod tests { use super::*; use crate::{mock::*, validator::mock::*}; - use ethexe_common::{ComputedAnnounce, mock::*}; + use ethexe_common::mock::*; #[test] fn create_empty() { @@ -231,7 +237,7 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce1.data().to_hash()), - ConsensusEvent::ComputeAnnounce(announce1.data().clone()) + ConsensusEvent::ComputeAnnounce(announce1.data().clone(), PromisePolicy::Disabled) ] ); // announce2 must stay in pending events, because it's not from current producer. @@ -291,7 +297,7 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), - announce.data().clone().into() + ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); assert_eq!(s.context().pending_events.len(), MAX_PENDING_EVENTS); @@ -320,20 +326,20 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), - announce.data().clone().into() + ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); // After announce is computed, subordinate switches to participant state. let s = s - .process_computed_announce(ComputedAnnounce::mock(announce.data().to_hash())) + .process_computed_announce(announce.data().to_hash()) .unwrap(); assert!(s.is_participant(), "got {s:?}"); assert_eq!( s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), - ConsensusEvent::ComputeAnnounce(announce.data().clone()) + ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); } @@ -362,13 +368,13 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), - announce.data().clone().into() + ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); // After announce is computed, not-validator subordinate switches to initial state. let s = s - .process_computed_announce(ComputedAnnounce::mock(announce.data().to_hash())) + .process_computed_announce(announce.data().to_hash()) .unwrap(); assert!(s.is_initial(), "got {s:?}"); } @@ -397,7 +403,10 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(producer_announce.data().to_hash()), - producer_announce.data().clone().into() + ConsensusEvent::ComputeAnnounce( + producer_announce.data().clone(), + PromisePolicy::Disabled + ) ] ); assert_eq!(s.context().pending_events, vec![alice_announce.into()]); @@ -428,9 +437,7 @@ mod tests { let s = Subordinate::create(ctx, block, producer.to_address(), true).unwrap(); - let s = s - .process_computed_announce(ComputedAnnounce::mock(())) - .unwrap(); + let s = s.process_computed_announce(HashOf::random()).unwrap(); assert_eq!(s.context().output.len(), 1); assert!(matches!(s.context().output[0], ConsensusEvent::Warning(_))); } diff --git a/ethexe/processor/Cargo.toml b/ethexe/processor/Cargo.toml index 46f9af7eaf7..d924a0e1e88 100644 --- a/ethexe/processor/Cargo.toml +++ b/ethexe/processor/Cargo.toml @@ -31,6 +31,8 @@ itertools = { workspace = true, features = ["use_std"] } gear-workspace-hack.workspace = true anyhow.workspace = true derive_more.workspace = true +scopeguard.workspace = true +futures.workspace = true [dev-dependencies] anyhow.workspace = true @@ -41,3 +43,4 @@ demo-async = { workspace = true, features = ["debug", "ethexe"] } demo-panic-payload = { workspace = true, features = ["debug", "ethexe"] } wat.workspace = true ethexe-common = { workspace = true, features = ["mock"] } +gear-core-errors.workspace = true diff --git a/ethexe/processor/src/handling/overlaid.rs b/ethexe/processor/src/handling/overlaid.rs index e7fe15adedf..4e94bd56b97 100644 --- a/ethexe/processor/src/handling/overlaid.rs +++ b/ethexe/processor/src/handling/overlaid.rs @@ -25,7 +25,7 @@ use crate::{ host::InstanceCreator, }; use core_processor::common::JournalNote; -use ethexe_common::{BlockHeader, db::CodesStorageRO, gear::MessageType}; +use ethexe_common::{BlockHeader, db::CodesStorageRO, gear::MessageType, injected::Promise}; use ethexe_db::{CASDatabase, Database}; use ethexe_runtime_common::{InBlockTransitions, TransitionController}; use gear_core::{ @@ -35,6 +35,7 @@ use gear_core::{ }; use gprimitives::{ActorId, MessageId}; use std::collections::HashSet; +use tokio::sync::mpsc; /// Overlay execution context. /// @@ -85,6 +86,7 @@ impl OverlaidRunContext { gas_allowance, chunk_size, block_header, + None, ), base_program, nullified_queue_programs: [base_program].into_iter().collect(), @@ -174,6 +176,11 @@ impl RunContext for OverlaidRunContext { &self.inner.instance_creator } + fn promise_out_tx(&self) -> &Option> { + // OverlaidRunContext should never produce promises + &None + } + fn block_header(&self) -> BlockHeader { self.inner.block_header } diff --git a/ethexe/processor/src/handling/run.rs b/ethexe/processor/src/handling/run.rs index 5fdbcffc59d..0577e04e021 100644 --- a/ethexe/processor/src/handling/run.rs +++ b/ethexe/processor/src/handling/run.rs @@ -113,9 +113,10 @@ use chunk_execution_processing::ChunkJournalsProcessingOutput; use chunks_splitting::ActorStateHashWithQueueSize; use core_processor::common::JournalNote; use ethexe_common::{ - BlockHeader, StateHashWithQueueSize, + BlockHeader, PromisePolicy, StateHashWithQueueSize, db::CodesStorageRO, gear::{CHUNK_PROCESSING_GAS_LIMIT, MessageType}, + injected::Promise, }; use ethexe_db::{CASDatabase, Database}; use ethexe_runtime_common::{ @@ -127,6 +128,7 @@ use gear_core::{ }; use gprimitives::{ActorId, CodeId, H256}; use itertools::Itertools; +use tokio::sync::mpsc; // Process chosen queue type in chunks // Returns whether execution is NOT run out of gas (execution can be continued) @@ -188,6 +190,18 @@ pub(super) trait RunContext { /// Get reference to instance creator. fn instance_creator(&self) -> &InstanceCreator; + /// Returns the promises output channel if it set for current execution. + fn promise_out_tx(&self) -> &Option>; + + /// [`PromisePolicy`] tells processor should it emit promises or not. + /// By default if [`RunContext::promise_out_tx`] returns [`Some`] this function will return [`PromisePolicy::Enabled`]. + fn promise_policy(&self) -> PromisePolicy { + match self.promise_out_tx().is_some() { + true => PromisePolicy::Enabled, + false => PromisePolicy::Disabled, + } + } + /// Returns the header of the current block. fn block_header(&self) -> BlockHeader; @@ -263,6 +277,7 @@ pub(crate) struct CommonRunContext { pub(crate) gas_allowance_counter: GasAllowanceCounter, pub(crate) chunk_size: usize, pub(crate) block_header: BlockHeader, + pub(crate) promise_out_tx: Option>, } impl CommonRunContext { @@ -273,6 +288,7 @@ impl CommonRunContext { gas_allowance: u64, chunk_size: usize, block_header: BlockHeader, + promise_out_tx: Option>, ) -> Self { CommonRunContext { db, @@ -281,6 +297,13 @@ impl CommonRunContext { gas_allowance_counter: GasAllowanceCounter::new(gas_allowance), chunk_size, block_header, + promise_out_tx, + } + } + + fn disable_promises(&mut self) { + if self.promise_out_tx.take().is_some() { + log::trace!("dropping the promise sender"); } } @@ -288,8 +311,12 @@ impl CommonRunContext { // Start with injected queues processing. let can_continue = run_for_queue_type(&mut self, MessageType::Injected).await?; + // Disabling promises after running the injected queue. + self.disable_promises(); + if can_continue { // If gas is still left in block, process canonical (Ethereum) queues + log::trace!("running for canonical queue..."); let _ = run_for_queue_type(&mut self, MessageType::Canonical).await?; } @@ -302,6 +329,10 @@ impl RunContext for CommonRunContext { &self.instance_creator } + fn promise_out_tx(&self) -> &Option> { + &self.promise_out_tx + } + fn block_header(&self) -> BlockHeader { self.block_header } @@ -547,6 +578,9 @@ mod chunk_execution_spawn { }) .collect::>>()?; + let promise_policy = ctx.promise_policy(); + let promise_out_tx = ctx.promise_out_tx().clone(); + let block_header = ctx.block_header(); let block_info = BlockInfo { height: block_header.height, @@ -579,7 +613,9 @@ mod chunk_execution_spawn { gas_allowance_for_chunk, ), block_info, + promise_policy, }, + promise_out_tx.clone(), ) .expect("Some error occurs while running program in instance"); @@ -733,7 +769,7 @@ mod tests { .take(STATE_SIZE) .collect(); - let transitions = InBlockTransitions::new(0, states, Default::default(), vec![]); + let transitions = InBlockTransitions::new(0, states, Default::default()); let mut ctx = CommonRunContext { db: Database::memory(), @@ -742,6 +778,7 @@ mod tests { gas_allowance_counter: GasAllowanceCounter::new(1_000_000), chunk_size: CHUNK_PROCESSING_THREADS, block_header: BlockHeader::dummy(3), + promise_out_tx: None, }; let chunks = chunks_splitting::prepare_execution_chunks(&mut ctx, MessageType::Canonical); @@ -868,8 +905,7 @@ mod tests { (pid2, pid2_state_hash_with_queue_size), ]); - let mut in_block_transitions = - InBlockTransitions::new(3, states, Default::default(), vec![]); + let mut in_block_transitions = InBlockTransitions::new(3, states, Default::default()); let base_program = pid2; diff --git a/ethexe/processor/src/host/api/mod.rs b/ethexe/processor/src/host/api/mod.rs index 7361d65d35b..793f2e8b9e5 100644 --- a/ethexe/processor/src/host/api/mod.rs +++ b/ethexe/processor/src/host/api/mod.rs @@ -26,6 +26,7 @@ pub mod allocator; pub mod database; pub mod lazy_pages; pub mod logging; +pub mod promise; pub mod sandbox; pub struct MemoryWrap(Memory); diff --git a/ethexe/processor/src/host/api/promise.rs b/ethexe/processor/src/host/api/promise.rs new file mode 100644 index 00000000000..58578772a43 --- /dev/null +++ b/ethexe/processor/src/host/api/promise.rs @@ -0,0 +1,50 @@ +// This file is part of Gear. +// +// Copyright (C) 2024-2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use sp_wasm_interface::StoreData; +use wasmtime::{Caller, Linker}; + +use crate::host::{api::MemoryWrap, threads}; + +pub fn link(linker: &mut Linker) -> Result<(), wasmtime::Error> { + linker.func_wrap("env", "ext_publish_promise", publish_promise)?; + + Ok(()) +} + +fn publish_promise(caller: Caller<'_, StoreData>, promise_ptr_len: i64) { + threads::with_params(|params| { + if let Some(ref sender) = params.promise_out_tx { + let memory = MemoryWrap(caller.data().memory()); + let promise = memory.decode_by_val(&caller, promise_ptr_len); + + match sender.send(promise) { + Ok(()) => { + log::trace!( + "successfully send promise to outer service: promise_ptr_len={promise_ptr_len}" + ); + } + Err(err) => { + log::trace!( + "`publish_promise`: failed to send promise to receiver because of error={err}" + ); + } + } + } + }); +} diff --git a/ethexe/processor/src/host/mod.rs b/ethexe/processor/src/host/mod.rs index b22fb1e2bed..153d712cdd2 100644 --- a/ethexe/processor/src/host/mod.rs +++ b/ethexe/processor/src/host/mod.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use core_processor::common::JournalNote; -use ethexe_common::gear::MessageType; +use ethexe_common::{gear::MessageType, injected::Promise}; use ethexe_db::CASDatabase; use ethexe_runtime_common::{ProcessQueueContext, ProgramJournals, unpack_i64_to_u32}; use gear_core::code::{CodeMetadata, InstrumentedCode}; @@ -26,6 +26,7 @@ use parity_scale_codec::{Decode, Encode}; use sp_allocator::{AllocationStats, FreeingBumpHeapAllocator}; use sp_wasm_interface::{HostState, IntoValue, MemoryWrapper, StoreData}; use std::sync::Arc; +use tokio::sync::mpsc; pub mod api; pub mod runtime; @@ -110,6 +111,7 @@ impl InstanceCreator { api::lazy_pages::link(&mut linker)?; api::logging::link(&mut linker)?; api::sandbox::link(&mut linker)?; + api::promise::link(&mut linker)?; let instance_pre = linker.instantiate_pre(&module)?; let instance_pre = Arc::new(instance_pre); @@ -169,8 +171,14 @@ impl InstanceWrapper { &mut self, db: Box, ctx: ProcessQueueContext, + promise_out_tx: Option>, ) -> Result<(ProgramJournals, H256, u64)> { - threads::set(db, ctx.state_root); + threads::set(db, ctx.state_root, promise_out_tx.clone()); + + // Cleanup the `promise_out_tx` from thread-local to signal receiver that channel is closed. + let _cleanup = scopeguard::guard((), |()| { + threads::clear_promise_out_tx(); + }); // Pieces of resulting journal. Hack to avoid single allocation limit. let (ptr_lens, gas_spent): (Vec, i64) = self.call("run", ctx.encode())?; diff --git a/ethexe/processor/src/host/threads.rs b/ethexe/processor/src/host/threads.rs index b41bd615207..fdf70e9f68e 100644 --- a/ethexe/processor/src/host/threads.rs +++ b/ethexe/processor/src/host/threads.rs @@ -19,7 +19,7 @@ // TODO: for each panic here place log::error, otherwise it won't be printed. use core::fmt; -use ethexe_common::HashOf; +use ethexe_common::{HashOf, injected::Promise}; use ethexe_db::CASDatabase; use ethexe_runtime_common::state::{ ActiveProgram, MemoryPages, MemoryPagesRegionInner, Program, ProgramState, QueryableStorage, @@ -30,6 +30,7 @@ use gear_lazy_pages::LazyPagesStorage; use gprimitives::H256; use parity_scale_codec::{Decode, DecodeAll}; use std::{cell::RefCell, collections::BTreeMap}; +use tokio::sync::mpsc; const UNSET_PANIC: &str = "params should be set before query"; const UNKNOWN_STATE: &str = "state should always be valid (must exist)"; @@ -41,6 +42,7 @@ thread_local! { pub struct ThreadParams { pub db: Box, pub state_hash: H256, + pub promise_out_tx: Option>, pages_registry_cache: Option, pages_regions_cache: Option>, } @@ -102,10 +104,15 @@ impl PageKey { } } -pub fn set(db: Box, state_hash: H256) { +pub fn set( + db: Box, + state_hash: H256, + promise_out_tx: Option>, +) { PARAMS.set(Some(ThreadParams { db, state_hash, + promise_out_tx, pages_registry_cache: None, pages_regions_cache: None, })) @@ -137,6 +144,13 @@ pub fn with_params(f: impl FnOnce(&mut ThreadParams) -> T) -> T { }) } +pub fn clear_promise_out_tx() { + PARAMS.with_borrow_mut(|maybe_params| { + let params = maybe_params.as_mut().expect(UNSET_PANIC); + let _ = params.promise_out_tx.take(); + }) +} + #[derive(Debug)] pub struct EthexeHostLazyPages; diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index b6841ce842e..3a9396344a5 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -23,13 +23,14 @@ use ethexe_common::{ CodeAndIdUnchecked, ProgramStates, Schedule, SimpleBlockData, ecdsa::VerifiedData, events::{BlockRequestEvent, MirrorRequestEvent, mirror::MessageQueueingRequestedEvent}, - injected::InjectedTransaction, + injected::{InjectedTransaction, Promise}, }; use ethexe_db::Database; use ethexe_runtime_common::{ FinalizedBlockTransitions, InBlockTransitions, ScheduleHandler, TransitionController, state::Storage, }; +use futures::FutureExt; use gear_core::{ code::{CodeMetadata, InstrumentationStatus, InstrumentedCode}, ids::prelude::CodeIdExt, @@ -38,14 +39,17 @@ use gear_core::{ use gprimitives::{ActorId, CodeId, H256, MessageId}; use handling::{ProcessingHandler, overlaid::OverlaidRunContext, run::CommonRunContext}; use host::InstanceCreator; +use tokio::sync::mpsc; pub use host::InstanceError; +use crate::{event_stream::ProcessorEventStream, host::api::promise}; + mod handling; mod host; -#[cfg(test)] -mod tests; +// #[cfg(test)] +// mod tests; // Default amount of programs in one chunk to be processed in parallel. pub const DEFAULT_CHUNK_SIZE: NonZero = NonZero::new(16).unwrap(); @@ -187,36 +191,92 @@ impl Processor { events, } = executable; - let injected_messages = injected_transactions - .iter() - .map(|tx| tx.data().to_message_id()); + let mut transitions = + InBlockTransitions::new(block.header.height, program_states, schedule); - let mut transitions = InBlockTransitions::new( - block.header.height, - program_states, - schedule, - injected_messages, - ); + transitions = Self::handle_injected_and_events( + self.db.clone(), + transitions, + injected_transactions, + events, + )?; - transitions = - self.process_injected_and_events(transitions, injected_transactions, events)?; if let Some(gas_allowance) = gas_allowance { - transitions = self - .process_queues(transitions, block, gas_allowance) - .await?; + transitions = Self::process_queues( + self.db.clone(), + self.creator.clone(), + transitions, + block, + gas_allowance, + self.config.chunk_size, + None, + ) + .await?; } - transitions = self.process_tasks(transitions); + transitions = Self::process_tasks(transitions, &self.db); Ok(transitions.finalize()) } - fn process_injected_and_events( + pub fn process_programs_with_promises( &mut self, + executable: ExecutableData, + ) -> Result { + let (promise_out_tx, promise_receiver) = mpsc::unbounded_channel(); + + let ExecutableData { + block, + program_states, + schedule, + injected_transactions, + gas_allowance, + events, + } = executable; + let db = self.db.clone(); + let mut transitions = + InBlockTransitions::new(block.header.height, program_states, schedule); + + transitions = Self::handle_injected_and_events( + self.db.clone(), + transitions, + injected_transactions, + events, + )?; + + let db = self.db.clone(); + let creator = self.creator.clone(); + let chunk_size = self.config.chunk_size; + let queue_processing = async move { + if let Some(gas_allowance) = gas_allowance { + transitions = Self::process_queues( + db.clone(), + creator, + transitions, + block, + gas_allowance, + chunk_size, + Some(promise_out_tx), + ) + .await?; + } + transitions = Self::process_tasks(transitions, &db); + Ok(transitions.finalize()) + } + .boxed(); + + Ok(event_stream::ProcessorEventStream::new( + promise_receiver, + queue_processing, + )) + } + + fn handle_injected_and_events( + db: Database, transitions: InBlockTransitions, injected_transactions: Vec>, events: Vec, ) -> Result { - let mut handler = ProcessingHandler::new(self.db.clone(), transitions); + let mut handler = ProcessingHandler::new(db, transitions); for tx in injected_transactions { let source = tx.address().into(); @@ -239,24 +299,28 @@ impl Processor { } async fn process_queues( - &mut self, + db: Database, + creator: InstanceCreator, transitions: InBlockTransitions, block: SimpleBlockData, gas_allowance: u64, + chunk_size: usize, + promise_out_tx: Option>, ) -> Result { CommonRunContext::new( - self.db.clone(), - self.creator.clone(), + db, + creator, transitions, gas_allowance, - self.config.chunk_size, + chunk_size, block.header, + promise_out_tx, ) .run() .await } - fn process_tasks(&mut self, mut transitions: InBlockTransitions) -> InBlockTransitions { + fn process_tasks(mut transitions: InBlockTransitions, db: &Database) -> InBlockTransitions { let tasks = transitions.take_actual_tasks(); let block_height = transitions.block_height(); @@ -264,7 +328,7 @@ impl Processor { let mut handler = ScheduleHandler { controller: TransitionController { - storage: &self.db, + storage: db, transitions: &mut transitions, }, }; @@ -369,14 +433,11 @@ impl OverlaidProcessor { return Err(ExecuteForReplyError::ProgramNotInitialized(program_id)); } - let transitions = InBlockTransitions::new( - block.header.height, - program_states, - Schedule::default(), - vec![], - ); + let transitions = + InBlockTransitions::new(block.header.height, program_states, Schedule::default()); - let transitions = self.0.process_injected_and_events( + let transitions = Processor::handle_injected_and_events( + self.0.db.clone(), transitions, vec![], vec![BlockRequestEvent::Mirror { @@ -422,3 +483,75 @@ impl OverlaidProcessor { Ok(res) } } + +pub mod event_stream { + use super::*; + use futures::{FutureExt, Stream}; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + // TODO: think about name. + #[derive(Debug, derive_more::From)] + pub enum Event { + #[from] + Promise(Promise), + #[from] + BlockTransitions(Result), + } + + type ProcessQueueFuture = + futures::future::BoxFuture<'static, Result>; + + pub struct ProcessorEventStream { + receiver: Option>, + queue_processing: Option, + } + + impl ProcessorEventStream { + pub fn new( + receiver: mpsc::UnboundedReceiver, + queue_processing: ProcessQueueFuture, + ) -> Self { + Self { + receiver: Some(receiver), + queue_processing: Some(queue_processing), + } + } + } + + impl Stream for ProcessorEventStream { + type Item = Event; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.receiver.is_none() && self.queue_processing.is_none() { + return Poll::Ready(None); + } + + if let Some(ref mut receiver) = self.receiver + && let Poll::Ready(maybe_promise) = receiver.poll_recv(cx) + { + match maybe_promise { + Some(promise) => return Poll::Ready(Some(Event::from(promise))), + None => { + // TODO: add log message. + log::trace!(""); + self.receiver = None; + } + } + } + + if let Some(ref mut future) = self.queue_processing + && let Poll::Ready(result) = future.poll_unpin(cx) + { + self.queue_processing = None; + // TODO: think about this + self.receiver = None; + return Poll::Ready(Some(Event::from(result))); + } + + Poll::Pending + } + } +} diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index eb519921103..16c7d128437 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -29,9 +29,14 @@ use ethexe_common::{ mock::*, }; use ethexe_runtime_common::{RUNTIME_ID, state::MessageQueue}; -use gear_core::ids::prelude::CodeIdExt; +use gear_core::{ + ids::prelude::CodeIdExt, + message::{ErrorReplyReason, ReplyCode, SuccessReplyReason}, +}; +use gear_core_errors::SimpleExecutionError; use gprimitives::{ActorId, MessageId}; use parity_scale_codec::Encode; +use tokio::sync::mpsc; use utils::*; mod utils { @@ -108,12 +113,8 @@ mod utils { } pub fn setup_handler(db: Database, block: SimpleBlockData) -> ProcessingHandler { - let transitions = InBlockTransitions::new( - block.header.height, - Default::default(), - Default::default(), - vec![], - ); + let transitions = + InBlockTransitions::new(block.header.height, Default::default(), Default::default()); ProcessingHandler::new(db, transitions) } @@ -195,7 +196,7 @@ async fn ping_init() { program_creations, .. } = processor - .process_programs(executable) + .process_programs(executable, None) .await .expect("failed to process create program"); program_creations @@ -222,7 +223,7 @@ async fn ping_init() { ..Default::default() }; processor - .process_programs(executable) + .process_programs(executable, None) .await .expect("failed to process send message"); } @@ -328,7 +329,7 @@ async fn ping_pong() { .expect("failed to send message"); let to_users = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap() .current_messages(); @@ -442,7 +443,7 @@ async fn async_and_ping() { .expect("failed to send message"); let transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); @@ -538,7 +539,7 @@ async fn many_waits() { handler.transitions = processor.process_tasks(handler.transitions); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); assert_eq!( @@ -563,7 +564,7 @@ async fn many_waits() { .expect("failed to send message"); } handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); assert_eq!( @@ -582,10 +583,10 @@ async fn many_waits() { .for_each(|(pid, cid)| processor.db.set_program_code_id(pid, cid)); // Check all messages wake up and reply with "Hello, world!" - let transitions = InBlockTransitions::new(wake_block.header.height, states, schedule, vec![]); + let transitions = InBlockTransitions::new(wake_block.header.height, states, schedule); let transitions = processor.process_tasks(transitions); let transitions = processor - .process_queues(transitions, wake_block, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(transitions, wake_block, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); @@ -693,7 +694,7 @@ async fn overlay_execution() { program_creations, .. } = processor - .process_programs(executable_data) + .process_programs(executable_data, None) .await .expect("failed to process events"); program_creations.into_iter().for_each(|(pid, cid)| { @@ -718,7 +719,7 @@ async fn overlay_execution() { let block2 = chain.blocks[2].to_simple(); let mut handler = ProcessingHandler::new( processor.db.clone(), - InBlockTransitions::new(block2.header.height, states, schedule, vec![]), + InBlockTransitions::new(block2.header.height, states, schedule), ); // Manually add messages to programs queues @@ -849,6 +850,7 @@ async fn overlay_execution() { async fn injected_ping_pong() { init_logger(); + let (promise_out_tx, mut promise_receiver) = mpsc::unbounded_channel(); let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_ping::WASM_BINARY]); let block1 = chain.blocks[1].to_simple(); @@ -890,7 +892,7 @@ async fn injected_ping_pong() { .expect("failed to send message"); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); @@ -907,15 +909,35 @@ async fn injected_ping_pong() { ) .expect("failed to send message"); + let injected_tx = injected(actor_id, b"PING", 0); + log::error!("injected tx hash: {:?}", injected_tx.to_hash()); handler - .handle_injected_transaction(user_2, injected(actor_id, b"PING", 0)) + .handle_injected_transaction(user_2, injected_tx.clone()) .expect("failed to send message"); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues( + handler.transitions, + block1, + DEFAULT_BLOCK_GAS_LIMIT, + Some(promise_out_tx.clone()), + ) .await .unwrap(); + let promise = promise_receiver + .recv() + .await + .expect("promise must be sent after processing"); + + assert_eq!(promise.tx_hash, injected_tx.to_hash()); + assert_eq!(promise.reply.payload, b"PONG"); + assert_eq!(promise.reply.value, 0); + assert_eq!( + promise.reply.code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); + let to_users = handler.transitions.current_messages(); assert_eq!(to_users.len(), 3); @@ -939,6 +961,7 @@ async fn injected_prioritized_over_canonical() { init_logger(); + let (promise_out_tx, mut promise_receiver) = mpsc::unbounded_channel(); let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_ping::WASM_BINARY]); let block1 = chain.blocks[1].to_simple(); @@ -980,7 +1003,7 @@ async fn injected_prioritized_over_canonical() { .expect("failed to send message"); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); @@ -1001,17 +1024,40 @@ async fn injected_prioritized_over_canonical() { } // Send injected messages + let mut tx_hashes = vec![]; for _ in 0..MSG_NUM { + let tx = injected(actor_id, b"PING", 0); + tx_hashes.push(tx.to_hash()); handler - .handle_injected_transaction(injected_user, injected(actor_id, b"PING", 0)) + .handle_injected_transaction(injected_user, tx) .expect("failed to send message"); } let transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues( + handler.transitions, + block1, + DEFAULT_BLOCK_GAS_LIMIT, + Some(promise_out_tx.clone()), + ) .await .unwrap(); + for tx_hash in tx_hashes { + let promise = promise_receiver + .recv() + .await + .expect("promise for injected transaction"); + + assert_eq!(promise.tx_hash, tx_hash); + assert_eq!(promise.reply.value, 0); + assert_eq!( + promise.reply.code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); + assert_eq!(promise.reply.payload, b"PONG"); + } + // Verify that injected messages were processed first // skip the first message which is INIT reply let mut is_canonical_found = false; @@ -1070,7 +1116,7 @@ async fn executable_balance_charged() { .expect("failed to send message"); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); @@ -1113,6 +1159,7 @@ async fn executable_balance_injected_panic_not_charged() { init_logger(); + let (promise_out_tx, mut promise_receiver) = mpsc::unbounded_channel(); let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([demo_panic_payload::WASM_BINARY]); let block1 = chain.blocks[1].to_simple(); @@ -1157,21 +1204,45 @@ async fn executable_balance_injected_panic_not_charged() { ) .unwrap(); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues( + handler.transitions, + block1, + DEFAULT_BLOCK_GAS_LIMIT, + Some(promise_out_tx.clone()), + ) .await .unwrap(); let init_balance = handler.program_state(actor_id).executable_balance; // We know for sure handling this message is cost less than the threshold. // This message will cause panic in the program. + let panic_tx = injected(actor_id, b"", 0); handler - .handle_injected_transaction(user_id, injected(actor_id, b"", 0)) + .handle_injected_transaction(user_id, panic_tx.clone()) .unwrap(); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues( + handler.transitions, + block1, + DEFAULT_BLOCK_GAS_LIMIT, + Some(promise_out_tx.clone()), + ) .await .unwrap(); + let panic_promise = promise_receiver + .recv() + .await + .expect("promise for injected transaction"); + assert_eq!(panic_promise.tx_hash, panic_tx.to_hash()); + assert_eq!(panic_promise.reply.value, 0); + assert_eq!( + panic_promise.reply.code, + ReplyCode::Error(ErrorReplyReason::Execution( + SimpleExecutionError::UserspacePanic + )) + ); + let to_users = handler.transitions.current_messages(); assert_eq!(to_users.len(), 2); @@ -1198,7 +1269,12 @@ async fn executable_balance_injected_panic_not_charged() { ) .expect("failed to send message"); let transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues( + handler.transitions, + block1, + DEFAULT_BLOCK_GAS_LIMIT, + Some(promise_out_tx.clone()), + ) .await .unwrap(); @@ -1264,7 +1340,7 @@ async fn insufficient_executable_balance_still_charged() { .expect("failed to send message"); handler.transitions = processor - .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT) + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) .await .unwrap(); diff --git a/ethexe/rpc/src/apis/program.rs b/ethexe/rpc/src/apis/program.rs index 568cec5d32d..ab27dc48caf 100644 --- a/ethexe/rpc/src/apis/program.rs +++ b/ethexe/rpc/src/apis/program.rs @@ -22,7 +22,7 @@ use ethexe_common::{ db::{AnnounceStorageRO, CodesStorageRO, OnChainStorageRO}, }; use ethexe_db::Database; -use ethexe_processor::{ExecutableDataForReply, Processor}; +use ethexe_processor::{ExecutableDataForReply, OverlaidProcessor}; use ethexe_runtime_common::state::{ DispatchStash, Mailbox, MemoryPages, MessageQueue, Program, ProgramState, QueryableStorage, Storage, Waitlist, @@ -95,12 +95,12 @@ pub trait Program { pub struct ProgramApi { db: Database, - processor: Processor, + processor: OverlaidProcessor, gas_allowance: u64, } impl ProgramApi { - pub fn new(db: Database, processor: Processor, gas_allowance: u64) -> Self { + pub fn new(db: Database, processor: OverlaidProcessor, gas_allowance: u64) -> Self { Self { db, processor, @@ -167,7 +167,6 @@ impl ProgramServer for ProgramApi { self.processor .clone() - .overlaid() .execute_for_reply(executable) .await .map_err(errors::runtime) diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 1f08fe8da48..4a142649b76 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -107,7 +107,8 @@ impl RpcServer { chunk_size: self.config.chunk_size, }, self.db.clone(), - )?; + )? + .overlaid(); let server_apis = RpcServerApis { code: CodeApi::new(self.db.clone()), diff --git a/ethexe/runtime/common/src/journal.rs b/ethexe/runtime/common/src/journal.rs index 74aff63c908..c69e2565152 100644 --- a/ethexe/runtime/common/src/journal.rs +++ b/ethexe/runtime/common/src/journal.rs @@ -16,10 +16,9 @@ use gear_core::{ env::MessageWaitedType, gas::GasAllowanceCounter, memory::PageBuf, - message::{Dispatch as CoreDispatch, DispatchKind, StoredDispatch}, + message::{Dispatch as CoreDispatch, StoredDispatch}, pages::{GearPage, WasmPage, num_traits::Zero as _, numerated::tree::IntervalsTree}, reservation::GasReserver, - rpc::ReplyInfo, }; use gear_core_errors::SignalCode; use gprimitives::{ActorId, CodeId, H256, MessageId, ReservationId}; @@ -252,20 +251,6 @@ impl JournalHandler for NativeJournalHandler<'_, S> { let destination = dispatch.destination(); let dispatch = dispatch.into_stored(); - if self.message_type == MessageType::Injected && dispatch.kind() == DispatchKind::Reply { - let reply_info = ReplyInfo { - payload: dispatch.payload_bytes().to_vec(), - code: dispatch - .reply_code() - .expect("expect reply_code in dispatch with DispatchKind::Reply"), - value: dispatch.value(), - }; - - self.controller - .transitions - .maybe_store_injected_reply(message_id, reply_info); - } - if self.controller.transitions.is_program(&destination) { let dispatch = Dispatch::from_core_stored( self.controller.storage, @@ -672,7 +657,7 @@ where #[cfg(test)] mod tests { - use gear_core::message::{Message as CoreMessage, StoredMessage}; + use gear_core::message::{DispatchKind, Message as CoreMessage, StoredMessage}; use super::*; diff --git a/ethexe/runtime/common/src/lib.rs b/ethexe/runtime/common/src/lib.rs index 1be408d45bb..d83b7ab70fb 100644 --- a/ethexe/runtime/common/src/lib.rs +++ b/ethexe/runtime/common/src/lib.rs @@ -28,16 +28,21 @@ use core_processor::{ common::{ExecutableActorData, JournalNote}, configs::{BlockConfig, SyscallName}, }; -use ethexe_common::gear::{CHUNK_PROCESSING_GAS_LIMIT, MessageType}; +use ethexe_common::{ + HashOf, PromisePolicy, + gear::{CHUNK_PROCESSING_GAS_LIMIT, MessageType}, + injected::Promise, +}; use gear_core::{ code::{CodeMetadata, InstrumentedCode, MAX_WASM_PAGES_AMOUNT}, gas::GasAllowanceCounter, gas_metering::Schedule, ids::ActorId, message::{DispatchKind, IncomingDispatch, IncomingMessage}, + rpc::ReplyInfo, }; use gear_lazy_pages_common::LazyPagesInterface; -use gprimitives::H256; +use gprimitives::{H256, MessageId}; use gsys::{GasMultiplier, Percent}; use journal::RuntimeJournalHandler; use state::{Dispatch, ProgramState, Storage}; @@ -73,6 +78,7 @@ pub struct ProcessQueueContext { pub code_metadata: CodeMetadata, pub gas_allowance: GasAllowanceCounter, pub block_info: BlockInfo, + pub promise_policy: PromisePolicy, } pub trait RuntimeInterface: Storage { @@ -81,6 +87,9 @@ pub trait RuntimeInterface: Storage { fn init_lazy_pages(&self); fn random_data(&self) -> (Vec, u32); fn update_state_hash(&self, state_hash: &H256); + /// Publish a promise produced during execution to the compute service layer. + /// The implementation is expected to forward it to external subscribers. + fn publish_promise(&self, promise: &Promise); } /// A main low-level interface to perform state changes @@ -207,9 +216,11 @@ where ri.init_lazy_pages(); for dispatch in queue { - let origin = dispatch.message_type; + let dispatch_id = dispatch.id; + let message_type = dispatch.message_type; let call_reply = dispatch.call; let is_first_execution = dispatch.context.is_none(); + let is_promise_required = dispatch.kind.is_handle() && dispatch.message_type.is_injected(); let journal = process_dispatch(dispatch, &block_config, &program_state, &ctx, ri); let mut handler = RuntimeJournalHandler { @@ -221,8 +232,13 @@ where is_first_execution, stop_processing: false, }; + + if ctx.promise_policy.is_enabled() && is_promise_required { + process_journal_for_injected_dispatch(ri, &journal, dispatch_id); + } + let (unhandled_journal_notes, new_state_hash) = handler.handle_journal(journal); - mega_journal.push((unhandled_journal_notes, origin, call_reply)); + mega_journal.push((unhandled_journal_notes, message_type, call_reply)); // Update state hash if it was changed. if let Some(new_state_hash) = new_state_hash { @@ -242,6 +258,48 @@ where (mega_journal, gas_spent) } +/// Finds in [`process_dispatch`]'s the [`JournalNote::SendDispatch`] note and builds from it +/// a [`ReplyInfo`] and [`Promise`] for injected message. +fn process_journal_for_injected_dispatch( + ri: &RI, + journal: &[JournalNote], + dispatch_id: MessageId, +) where + RI: RuntimeInterface, +{ + for note in journal.iter() { + if let JournalNote::SendDispatch { + message_id, + dispatch, + .. + } = note + && *message_id == dispatch_id + && dispatch.kind().is_reply() + { + let Some(code) = dispatch.reply_details().map(|d| d.to_reply_code()) else { + log::error!( + "received reply dispatch without reply details; protocol invariant violated: \ + initial_dispatch_id={dispatch_id:?}, send_dispatch={dispatch:?}" + ); + continue; + }; + + let reply = ReplyInfo { + value: dispatch.value(), + code, + payload: dispatch.message().payload_bytes().to_vec(), + }; + + // SAFE: because of protocol logic - injected message id constructs from injected transaction hash. + let tx_hash = unsafe { HashOf::new(dispatch_id.into_bytes().into()) }; + let promise = Promise { reply, tx_hash }; + + ri.publish_promise(&promise); + break; + } + } +} + fn process_dispatch( dispatch: Dispatch, block_config: &BlockConfig, diff --git a/ethexe/runtime/common/src/transitions.rs b/ethexe/runtime/common/src/transitions.rs index 3d0774e704b..fb00898864d 100644 --- a/ethexe/runtime/common/src/transitions.rs +++ b/ethexe/runtime/common/src/transitions.rs @@ -23,12 +23,10 @@ use alloc::{ use anyhow::{Result, anyhow}; use core::num::NonZero; use ethexe_common::{ - HashOf, ProgramStates, Schedule, ScheduledTask, StateHashWithQueueSize, + ProgramStates, Schedule, ScheduledTask, StateHashWithQueueSize, gear::{Message, StateTransition, ValueClaim}, - injected::Promise, }; -use gear_core::rpc::ReplyInfo; -use gprimitives::{ActorId, CodeId, H256, MessageId}; +use gprimitives::{ActorId, CodeId, H256}; /// In-memory store for the state transitions /// that are going to be applied in the current block. @@ -46,11 +44,6 @@ pub struct InBlockTransitions { schedule: Schedule, modifications: BTreeMap, program_creations: BTreeMap, - - /// The set of injected messages to track replies for. - injected_messages: BTreeSet, - /// Replies for injected messages, in the order of processing. - injected_replies: Vec<(MessageId, ReplyInfo)>, } #[derive(Debug, Clone, Default)] @@ -58,22 +51,15 @@ pub struct FinalizedBlockTransitions { pub transitions: Vec, pub states: ProgramStates, pub schedule: Schedule, - pub promises: Vec, pub program_creations: Vec<(ActorId, CodeId)>, } impl InBlockTransitions { - pub fn new( - block_height: u32, - states: ProgramStates, - schedule: Schedule, - injected_messages: impl IntoIterator, - ) -> Self { + pub fn new(block_height: u32, states: ProgramStates, schedule: Schedule) -> Self { Self { block_height, states, schedule, - injected_messages: injected_messages.into_iter().collect(), ..Default::default() } } @@ -148,13 +134,6 @@ impl InBlockTransitions { &self.program_creations } - /// Handles new reply for injected transaction. - pub fn maybe_store_injected_reply(&mut self, message_id: MessageId, reply: ReplyInfo) { - if self.injected_messages.contains(&message_id) { - self.injected_replies.push((message_id, reply)); - } - } - pub fn modify_state( &mut self, actor_id: ActorId, @@ -216,20 +195,10 @@ impl InBlockTransitions { states, schedule, modifications, - injected_replies, program_creations, .. } = self; - let promises = injected_replies - .into_iter() - .map(|(message_id, reply)| { - // SAFETY: message_id for injected transaction is created from its hash bytes. - let tx_hash = unsafe { HashOf::new(message_id.into_bytes().into()) }; - Promise { tx_hash, reply } - }) - .collect(); - let mut transitions = Vec::with_capacity(modifications.len()); for (actor_id, modification) in modifications { @@ -256,7 +225,6 @@ impl InBlockTransitions { transitions, states, schedule, - promises, program_creations: program_creations.into_iter().collect(), } } diff --git a/ethexe/runtime/src/wasm/interface/mod.rs b/ethexe/runtime/src/wasm/interface/mod.rs index 8090a75b029..77d716341c6 100644 --- a/ethexe/runtime/src/wasm/interface/mod.rs +++ b/ethexe/runtime/src/wasm/interface/mod.rs @@ -25,6 +25,9 @@ pub(crate) mod database_ri; #[path = "logging.rs"] pub(crate) mod logging_ri; +#[path = "promise.rs"] +pub(crate) mod promise_ri; + pub(crate) mod utils { use ethexe_runtime_common::pack_u32_to_i64; diff --git a/ethexe/runtime/src/wasm/interface/promise.rs b/ethexe/runtime/src/wasm/interface/promise.rs new file mode 100644 index 00000000000..06b38acdd69 --- /dev/null +++ b/ethexe/runtime/src/wasm/interface/promise.rs @@ -0,0 +1,40 @@ +// This file is part of Gear. +// +// Copyright (C) 2024-2025 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::wasm::interface; +use ethexe_common::injected::Promise; +use ethexe_runtime_common::pack_u32_to_i64; +use parity_scale_codec::Encode; + +interface::declare!( + pub(super) fn ext_publish_promise(promise_ptr_len: i64); +); + +/// Encode and forward a promise to the host for publication. +pub fn publish_promise(promise: &Promise) { + unsafe { + // Important: the `Promise` struct contains the `ReplyInfo` which have the dynamic type. + // So we need to encode the promise and pass to host handler a pointer and size of encoded data. + + let encoded_promise = promise.encode(); + let promise_ptr_len = + pack_u32_to_i64(encoded_promise.as_ptr() as _, encoded_promise.len() as _); + + sys::ext_publish_promise(promise_ptr_len); + } +} diff --git a/ethexe/runtime/src/wasm/storage.rs b/ethexe/runtime/src/wasm/storage.rs index db800d57bd3..3ff6ca29d82 100644 --- a/ethexe/runtime/src/wasm/storage.rs +++ b/ethexe/runtime/src/wasm/storage.rs @@ -16,9 +16,11 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::wasm::interface::promise_ri; + use super::interface::database_ri; use alloc::vec::Vec; -use ethexe_common::HashOf; +use ethexe_common::{HashOf, injected::Promise}; use ethexe_runtime_common::{ RuntimeInterface, state::{ @@ -150,4 +152,8 @@ impl RuntimeInterface for NativeRuntimeInterface { fn update_state_hash(&self, hash: &H256) { database_ri::update_state_hash(hash); } + + fn publish_promise(&self, promise: &Promise) { + promise_ri::publish_promise(promise); + } } diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index af8a959f98d..94f2d72356a 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -114,7 +114,7 @@ pub struct Service { rpc: Option, fast_sync: bool, - validator_address: Option

, + validator_pub_key: Option, #[cfg(test)] sender: tests::utils::TestingEventSender, @@ -279,24 +279,15 @@ impl Service { .with_context(|| "failed to query validators threshold")?; log::info!("🔒 Multisig threshold: {threshold} / {}", validators.len()); - let processor = Processor::with_config( - ProcessorConfig { - chunk_size: config.node.chunk_processing_threads, - }, - db.clone(), - ) - .with_context(|| "failed to create processor")?; - log::info!( "🔧 Amount of chunk processing threads for programs processing: {}", - processor.config().chunk_size + config.node.chunk_processing_threads ); let signer = Signer::fs(config.node.key_path.clone())?; let validator_pub_key = Self::get_config_public_key(config.node.validator, &signer) .with_context(|| "failed to get validator private key")?; - let validator_address = validator_pub_key.map(|key| key.to_address()); // TODO #4642: use validator session key let _validator_pub_key_session = @@ -379,6 +370,10 @@ impl Service { .map(|config| RpcServer::new(config.clone(), db.clone())); let compute_config = ComputeConfig::new(config.node.canonical_quarantine); + let processor_config = ProcessorConfig { + chunk_size: config.node.chunk_processing_threads, + }; + let processor = Processor::with_config(processor_config, db.clone())?; let compute = ComputeService::new(compute_config, db.clone(), processor); let fast_sync = config.node.fast_sync; @@ -395,7 +390,7 @@ impl Service { prometheus, rpc, fast_sync, - validator_address, + validator_pub_key, #[cfg(test)] sender: unreachable!(), }) @@ -423,7 +418,7 @@ impl Service { rpc: Option, sender: tests::utils::TestingEventSender, fast_sync: bool, - validator_address: Option
, + validator_pub_key: Option, ) -> Self { Self { db, @@ -437,7 +432,7 @@ impl Service { rpc, sender, fast_sync, - validator_address, + validator_pub_key, } } @@ -463,10 +458,11 @@ impl Service { mut prometheus, rpc, fast_sync: _, - validator_address, + validator_pub_key, #[cfg(test)] sender, } = self; + let validator_address = validator_pub_key.map(|key| key.to_address()); let (mut rpc_handle, mut rpc) = if let Some(rpc) = rpc { log::info!("🌐 Rpc server starting at: {}", rpc.port()); @@ -543,8 +539,8 @@ impl Service { ComputeEvent::RequestLoadCodes(codes) => { blob_loader.load_codes(codes)?; } - ComputeEvent::AnnounceComputed(computed_data) => { - consensus.receive_computed_announce(computed_data)? + ComputeEvent::AnnounceComputed(announce_hash) => { + consensus.receive_computed_announce(announce_hash)? } ComputeEvent::BlockPrepared(block_hash) => { consensus.receive_prepared_block(block_hash)? @@ -552,6 +548,9 @@ impl Service { ComputeEvent::CodeProcessed(_) => { // Nothing } + ComputeEvent::Promise(promise, announce_hash) => { + consensus.receive_promise_for_signing(promise, announce_hash)?; + } }, Event::Network(event) => { let Some(_) = network.as_mut() else { @@ -643,7 +642,22 @@ impl Service { } } Event::Consensus(event) => match event { - ConsensusEvent::ComputeAnnounce(announce) => compute.compute_announce(announce), + ConsensusEvent::ComputeAnnounce(announce, promise_policy) => { + compute.compute_announce(announce, promise_policy) + } + ConsensusEvent::SignedPromise(signed_promise) => { + if rpc.is_none() && network.is_none() { + panic!("Promise without network or rpc"); + } + + if let Some(rpc) = &rpc { + rpc.provide_promise(signed_promise.clone()); + } + + if let Some(network) = &mut network { + network.publish_promise(signed_promise); + } + } ConsensusEvent::PublishMessage(message) => { let Some(network) = network.as_mut() else { continue; @@ -667,21 +681,6 @@ impl Service { ConsensusEvent::AnnounceAccepted(_) | ConsensusEvent::AnnounceRejected(_) => { // TODO #4940: consider to publish network message } - ConsensusEvent::Promises(promises) => { - if rpc.is_none() && network.is_none() { - panic!("Promise without network or rpc"); - } - - if let Some(rpc) = &rpc { - rpc.provide_promises(promises.clone()); - } - - if let Some(network) = &mut network { - for promise in promises { - network.publish_promise(promise); - } - } - } }, Event::Prometheus(event) => match event { PrometheusEvent::CollectMetrics { libp2p_metrics } => { diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index ae69d4d7c05..f4d60d59265 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -49,7 +49,7 @@ use ethexe_common::{ mock::*, network::ValidatorMessage, }; -use ethexe_compute::ComputeConfig; +use ethexe_compute::{ComputeConfig, ComputeEvent}; use ethexe_consensus::{BatchCommitter, ConsensusEvent}; use ethexe_db::{Database, verifier::IntegrityVerifier}; use ethexe_ethereum::{TryGetReceipt, deploy::ContractsDeploymentParams, router::Router}; @@ -1545,13 +1545,14 @@ async fn send_injected_tx() { }; // Send request - log::info!("Sending tx pool request to node-1"); - let _r = node1 + log::info!("Sending transaction to node-1"); + let acceptance = node1 .rpc_http_client() .unwrap() .send_transaction(tx_for_node1.clone()) .await .expect("rpc server is set"); + assert_eq!(acceptance, InjectedTransactionAcceptance::Accept); // Tx executable validation takes time, so wait for event. node1 @@ -2403,7 +2404,10 @@ async fn injected_tx_fungible_token() { .validator(env.validators[0]), ); node.start_service().await; - let rpc_client = node.rpc_http_client().expect("RPC client provide by node"); + let rpc_client = node + .rpc_ws_client() + .await + .expect("RPC client provide by node"); // 1. Create Fungible token config let token_config = demo_fungible_token::InitConfig { @@ -2475,11 +2479,10 @@ async fn injected_tx_fungible_token() { .unwrap(), }; - let acceptance = rpc_client - .send_transaction(rpc_tx) + let mut subscription = rpc_client + .send_transaction_and_watch(rpc_tx) .await .expect("successfully send transaction to RPC"); - assert!(matches!(acceptance, InjectedTransactionAcceptance::Accept)); let expected_event = demo_fungible_token::FTEvent::Transfer { from: ActorId::new([0u8; 32]), @@ -2490,10 +2493,7 @@ async fn injected_tx_fungible_token() { // Listen for inclusion and check the expected payload. node.events() .find(|event| { - if let TestingEvent::Consensus(ConsensusEvent::Promises(promises)) = event - && !promises.is_empty() - { - let promise = promises.first().unwrap().data(); + if let TestingEvent::Compute(ComputeEvent::Promise(promise, _)) = event { assert_eq!(promise.reply.payload, expected_event.encode()); assert_eq!( promise.reply.code, @@ -2509,6 +2509,22 @@ async fn injected_tx_fungible_token() { .await; tracing::info!("✅ Tokens mint successfully"); + let subscription_promise = subscription + .next() + .await + .expect("subscription produce value") + .expect("no errors for correct injected transaction"); + assert_eq!(subscription_promise.data().tx_hash, mint_tx.to_hash()); + assert_eq!(subscription_promise.data().reply.value, 0); + assert_eq!( + subscription_promise.data().reply.code, + ReplyCode::Success(SuccessReplyReason::Manual) + ); + assert_eq!( + subscription_promise.into_data().reply.payload, + expected_event.encode() + ); + let db = node.db.clone(); node.events() .find(|event| { diff --git a/ethexe/service/src/tests/utils/env.rs b/ethexe/service/src/tests/utils/env.rs index 47dd4ed75ea..f9c411f3a15 100644 --- a/ethexe/service/src/tests/utils/env.rs +++ b/ethexe/service/src/tests/utils/env.rs @@ -79,7 +79,7 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; -use tokio::{task, task::JoinHandle}; +use tokio::task::{self, JoinHandle}; use tracing::Instrument; /// Max network services which can be created by one test environment. @@ -948,10 +948,7 @@ impl Node { } }; - let validator_address = self - .validator_config - .as_ref() - .map(|c| c.public_key.to_address()); + let validator_pub_key = self.validator_config.as_ref().map(|c| c.public_key); let (sender, receiver) = events::channel(self.db.clone()); @@ -993,7 +990,7 @@ impl Node { rpc, sender, self.fast_sync, - validator_address, + validator_pub_key, ); let name = self.name.clone(); diff --git a/ethexe/service/src/tests/utils/events.rs b/ethexe/service/src/tests/utils/events.rs index d84236ed779..818c09b4f9e 100644 --- a/ethexe/service/src/tests/utils/events.rs +++ b/ethexe/service/src/tests/utils/events.rs @@ -276,8 +276,8 @@ impl TestingEventReceiver { let id = id.into(); log::info!("📗 waiting for announce computed: {id:?}"); self.find_announce(id, |event| { - if let TestingEvent::Compute(ComputeEvent::AnnounceComputed(computed_data)) = event { - Some(computed_data.announce_hash) + if let TestingEvent::Compute(ComputeEvent::AnnounceComputed(announce_hash)) = event { + Some(announce_hash) } else { None }