Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 42 additions & 38 deletions ethexe/compute/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{ComputeError, ProcessorExt, Result, service::SubService};
use crate::{ProcessorExt, Result, service::SubService};
use ethexe_common::{
CodeAndIdUnchecked,
db::{CodesStorageRO, CodesStorageRW},
};
use ethexe_db::Database;
use ethexe_processor::{ProcessedCodeInfo, ValidCodeInfo};
use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered};
use gprimitives::CodeId;
use metrics::Gauge;
use std::task::{Context, Poll};
use tokio::task::JoinSet;
use std::{
future,
task::{Context, Poll},
};

/// Metrics for the [`CodesSubService`].
#[derive(Clone, metrics_derive::Metrics)]
Expand All @@ -41,7 +44,7 @@ pub struct CodesSubService<P: ProcessorExt> {
processor: P,
metrics: Metrics,

processions: JoinSet<Result<CodeId>>,
processions: FuturesUnordered<BoxFuture<'static, Result<CodeId>>>,
}

impl<P: ProcessorExt> CodesSubService<P> {
Expand All @@ -50,7 +53,7 @@ impl<P: ProcessorExt> CodesSubService<P> {
db,
processor,
metrics: Metrics::default(),
processions: JoinSet::new(),
processions: FuturesUnordered::new(),
}
}

Expand All @@ -71,36 +74,37 @@ impl<P: ProcessorExt> CodesSubService<P> {
"Instrumented code {code_id:?} must exist in database"
);
}
self.processions.spawn(async move { Ok(code_id) });
self.processions.push(future::ready(Ok(code_id)).boxed());
} else {
let db = self.db.clone();
let mut processor = self.processor.clone();

self.processions.spawn_blocking(move || {
processor
.process_code(code_and_id)
.map(|ProcessedCodeInfo { code_id, valid }| {
if let Some(ValidCodeInfo {
code,
self.processions.push(
async move {
let ProcessedCodeInfo { code_id, valid } =
processor.process_code(code_and_id).await?;
if let Some(ValidCodeInfo {
code,
instrumented_code,
code_metadata,
}) = valid
{
db.set_original_code(&code);
db.set_instrumented_code(
ethexe_runtime_common::VERSION,
code_id,
instrumented_code,
code_metadata,
}) = valid
{
db.set_original_code(&code);
db.set_instrumented_code(
ethexe_runtime_common::VERSION,
code_id,
instrumented_code,
);
db.set_code_metadata(code_id, code_metadata);
db.set_code_valid(code_id, true);
} else {
db.set_code_valid(code_id, false);
}

code_id
})
});
);
db.set_code_metadata(code_id, code_metadata);
db.set_code_valid(code_id, true);
} else {
db.set_code_valid(code_id, false);
}

Ok(code_id)
}
.boxed(),
);
}

self.metrics
Expand All @@ -113,14 +117,14 @@ impl<P: ProcessorExt> SubService for CodesSubService<P> {
type Output = CodeId;

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
futures::ready!(self.processions.poll_join_next(cx))
.map(|res| {
self.metrics
.processing_codes
.set(self.processions.len() as f64);
res.map_err(ComputeError::CodeProcessJoin)?
})
.map_or(Poll::Pending, Poll::Ready)
if let Poll::Ready(Some(res)) = self.processions.poll_next_unpin(cx) {
self.metrics
.processing_codes
.set(self.processions.len() as f64);
return Poll::Ready(res);
}

Poll::Pending
}
}

Expand Down
9 changes: 6 additions & 3 deletions ethexe/compute/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {

const USER_ID: ActorId = ActorId::new([1u8; 32]);

pub fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId {
pub async fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId {
let code_id = CodeId::generate(code);

let ValidCodeInfo {
Expand All @@ -448,6 +448,7 @@ mod tests {
code: code.to_vec(),
code_id,
})
.await
.expect("failed to process code")
.valid
.expect("code is invalid");
Expand Down Expand Up @@ -579,7 +580,8 @@ mod tests {

let db = Database::memory();
let mut processor = Processor::new(db.clone()).unwrap();
let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db);
let ping_code_id =
test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await;
let ping_id = ActorId::from(0x10000);

let blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db);
Expand Down Expand Up @@ -702,7 +704,8 @@ mod tests {
let db = Database::memory();
let mut processor = Processor::new(db.clone()).unwrap();

let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db);
let ping_code_id =
test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await;
let ping_id = ActorId::from(0x10000);

let blockchain = BlockChain::mock(3).setup(&db);
Expand Down
11 changes: 6 additions & 5 deletions ethexe/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ pub enum ComputeError {
BlockHeaderNotFound(H256),
#[error("block validators committed for era not found for block({0})")]
CommittedEraNotFound(H256),
#[error("process code join error")]
CodeProcessJoin(#[from] tokio::task::JoinError),
#[error("codes queue not found for computed block({0})")]
CodesQueueNotFound(H256),
#[error("last committed batch not found for computed block({0})")]
Expand Down Expand Up @@ -101,7 +99,10 @@ pub trait ProcessorExt: Sized + Unpin + Send + Clone + 'static {
executable: ExecutableData,
promise_out_tx: Option<mpsc::UnboundedSender<Promise>>,
) -> impl Future<Output = Result<FinalizedBlockTransitions>> + Send;
fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo>;
fn process_code(
&mut self,
code_and_id: CodeAndIdUnchecked,
) -> impl Future<Output = Result<ProcessedCodeInfo>> + Send;
}

impl ProcessorExt for Processor {
Expand All @@ -115,7 +116,7 @@ impl ProcessorExt for Processor {
.map_err(Into::into)
}

fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
self.process_code(code_and_id).map_err(Into::into)
async fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
self.process_code(code_and_id).await.map_err(Into::into)
}
}
2 changes: 1 addition & 1 deletion ethexe/compute/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl ProcessorExt for MockProcessor {
Ok(self.process_programs_result.take().unwrap_or_default())
}

fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
async fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
Ok(self
.process_codes_result
.take()
Expand Down
1 change: 0 additions & 1 deletion ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use gprimitives::ActorId;
pub(crate) mod events;
pub(crate) mod overlaid;
pub(crate) mod run;
mod thread_pool;

/// A high-level interface for executing ops,
/// which mutate states based on the current block request events.
Expand Down
97 changes: 26 additions & 71 deletions ethexe/processor/src/handling/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,9 @@ pub(super) mod chunks_splitting {

mod chunk_execution_spawn {
use super::*;
use crate::{handling::thread_pool::ThreadPool, host::InstanceWrapper};
use crate::thread_pool;
use ethexe_runtime_common::ProcessQueueContext;
use std::sync::LazyLock;
use futures::stream::FuturesOrdered;

/// An alias introduced for better readability of the chunks execution steps.
pub type ChunkItemOutput = (ActorId, H256, ProgramJournals, u64);
Expand All @@ -590,55 +590,6 @@ mod chunk_execution_spawn {
chunk: Vec<(ActorId, H256)>,
queue_type: MessageType,
) -> Result<Vec<ChunkItemOutput>> {
struct Executable {
queue_type: MessageType,
block_info: BlockInfo,
promise_policy: PromisePolicy,
program_id: ActorId,
state_hash: H256,
instrumented_code: InstrumentedCode,
code_metadata: CodeMetadata,
executor: InstanceWrapper,
db: Box<dyn CASDatabase>,
gas_allowance_for_chunk: u64,
promise_out_tx: Option<mpsc::UnboundedSender<Promise>>,
}

fn execute_chunk_item(executable: Executable) -> Result<ChunkItemOutput> {
let Executable {
queue_type,
block_info,
promise_policy,
program_id,
state_hash,
instrumented_code,
code_metadata,
mut executor,
db,
gas_allowance_for_chunk,
promise_out_tx,
} = executable;

let (jn, new_state_hash, gas_spent) = executor.run(
db,
ProcessQueueContext {
program_id,
state_root: state_hash,
queue_type,
instrumented_code,
code_metadata,
gas_allowance: GasAllowanceCounter::new(gas_allowance_for_chunk),
block_info,
promise_policy,
},
promise_out_tx,
)?;
Ok((program_id, new_state_hash, jn, gas_spent))
}

static THREAD_POOL: LazyLock<ThreadPool<Executable, Result<ChunkItemOutput>>> =
LazyLock::new(|| ThreadPool::new(execute_chunk_item));

let gas_allowance_for_chunk = ctx
.inner()
.gas_allowance_counter
Expand All @@ -653,30 +604,34 @@ mod chunk_execution_spawn {
timestamp: block_header.timestamp,
};

let executables = chunk
chunk
.into_iter()
.map(|(program_id, state_hash)| {
let (instrumented_code, code_metadata) = ctx.program_code(program_id)?;

let executor = ctx.inner().instance_creator.instantiate()?;

Ok(Executable {
queue_type,
block_info,
promise_policy,
program_id,
state_hash,
instrumented_code,
code_metadata,
executor,
db: ctx.inner().db.cas().clone_boxed(),
gas_allowance_for_chunk,
promise_out_tx: ctx.inner().promise_out_tx.clone(),
})
let mut executor = ctx.inner().instance_creator.instantiate()?;
let db = ctx.inner().db.cas().clone_boxed();
let promise_out_tx = ctx.inner().promise_out_tx.clone();
Ok(thread_pool::spawn(move || {
let (jn, new_state_hash, gas_spent) = executor.run(
db,
ProcessQueueContext {
program_id,
state_root: state_hash,
queue_type,
instrumented_code,
code_metadata,
gas_allowance: GasAllowanceCounter::new(gas_allowance_for_chunk),
block_info,
promise_policy,
},
promise_out_tx,
)?;
Ok((program_id, new_state_hash, jn, gas_spent))
}))
})
.collect::<Result<Vec<_>>>()?;

THREAD_POOL.spawn_many(executables).try_collect().await
.collect::<Result<FuturesOrdered<_>>>()?
.try_collect()
.await
}
}

Expand Down
Loading
Loading