Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ log/
weight-dumps/
gsdk/vara_runtime_prod.scale
*.meta.txt
.vscode

# cargo
target/
Expand Down
21 changes: 20 additions & 1 deletion ethexe/consensus/src/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,19 @@ impl ConnectService {
producer: Address,
) -> Result<()> {
if let Some(announce) = self.pending_announces.pop(&(producer, block.hash)) {
tracing::trace!(
block = %block.hash,
producer = %producer,
"Announce for current block has been already received, process it immediately"
);
self.process_announce_from_producer(announce, producer)?;
self.state = State::WaitingForBlock;
} else {
tracing::trace!(
block = %block.hash,
producer = %producer,
"Announce for current block has not been received yet, wait for it"
);
self.state = State::WaitingForAnnounce { block, producer };
}

Expand Down Expand Up @@ -276,10 +286,19 @@ impl ConsensusService for ConnectService {
&& sender == *producer
&& announce.block_hash == block.hash
{
tracing::trace!(
producer = %producer,
?announce,
"Received announce from producer, process it immediately"
);
self.process_announce_from_producer(announce, *producer)?;
self.state = State::WaitingForBlock;
} else {
tracing::warn!("Receive unexpected {announce:?}, save to pending announces");
tracing::warn!(
sender = %sender,
?announce,
"Receive unexpected announce, save to pending announces"
);
self.pending_announces
.push((sender, announce.block_hash), announce);
}
Expand Down
6 changes: 6 additions & 0 deletions ethexe/consensus/src/validator/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ impl ValidatorCore {
era_index: block_era + 1,
};

tracing::info!(
block = %block.hash,
?commitment,
"Aggregated validators commitment for next era"
);

Ok(Some(commitment))
}

Expand Down
127 changes: 127 additions & 0 deletions ethexe/ethereum/src/ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use crate::{IntoBlockId, abi::IMirror::stateHashCall as StateHashCall};
use alloy::{
network::Network as AlloyNetwork,
primitives::{Address as AlloyAddress, Bytes, TxKind},
providers::{Provider as _, RootProvider},
rpc::{
client::{BatchRequest, Waiter},
types::{BlockId, TransactionInput, TransactionRequest},
},
sol_types::SolCall,
};
use anyhow::{Context as _, Result, ensure};
use futures::{StreamExt as _, stream::FuturesUnordered};
use gprimitives::{ActorId, H256};
use gsigner::Address;
use std::collections::BTreeMap;

const STATE_HASH_BATCH_SIZE: usize = 128;
const MAX_CONCURRENT_BATCHES: usize = 4;

#[allow(async_fn_in_trait)]
pub trait ProviderExt {
/// Collects the state hashes of the given mirror actors at the specified block.
/// Returns a mapping from actor ID to its state hash.
/// Batches the JSON-RPC calls to avoid making one request per mirror, which can be very slow when syncing many programs.
/// max batch size: [`STATE_HASH_BATCH_SIZE`] , max concurrent batches: [`MAX_CONCURRENT_BATCHES`]
async fn collect_mirror_states(
&self,
at: impl IntoBlockId,
mirrors: Vec<ActorId>,
) -> Result<BTreeMap<ActorId, H256>>;
}

impl<N: AlloyNetwork> ProviderExt for RootProvider<N> {
async fn collect_mirror_states(
&self,
at: impl IntoBlockId,
mirrors: Vec<ActorId>,
) -> Result<BTreeMap<ActorId, H256>> {
let mut program_states = BTreeMap::new();
let block_id = at.into_block_id();

log::trace!(
"Collecting state hashes for {} mirror actors at block {block_id}",
mirrors.len(),
);

let mut futures = Vec::new();
for chunk in mirrors.chunks(STATE_HASH_BATCH_SIZE) {
futures.push(collect_mirror_states_batch(self.clone(), block_id, chunk));
}

let mut futures_unordered: FuturesUnordered<_> = futures
.split_off(futures.len().saturating_sub(MAX_CONCURRENT_BATCHES))
.into_iter()
.collect();

while let Some(batch_states) = futures_unordered.next().await {
log::trace!(
"Received a batch of mirror states ({} states in this batch, {} batches remaining)",
batch_states.as_ref().map(|m| m.len()).unwrap_or(0),
futures_unordered.len() + futures.len(),
);
program_states.extend(batch_states?);
if let Some(next_future) = futures.pop() {
futures_unordered.push(next_future);
}
}

Ok(program_states)
}
}

async fn collect_mirror_states_batch<N: AlloyNetwork>(
provider: RootProvider<N>,
block_id: BlockId,
mirrors: &[ActorId],
) -> Result<BTreeMap<ActorId, H256>> {
if mirrors.is_empty() {
return Ok(BTreeMap::new());
}

let calldata = Bytes::from(StateHashCall {}.abi_encode());

let mut batch = BatchRequest::new(provider.client());
let mut waiters: Vec<(ActorId, Waiter<Bytes>)> = Vec::with_capacity(mirrors.len());

for &actor_id in mirrors {
let mirror = Address::try_from(actor_id)
.context("Provided actor ID is not a valid mirror address")?;

let tx = TransactionRequest {
to: Some(TxKind::Call(AlloyAddress::new(mirror.0))),
input: TransactionInput::new(calldata.clone()),
..Default::default()
};

let waiter = batch
.add_call::<_, Bytes>("eth_call", &(tx, block_id))
.context("failed to add eth_call to JSON-RPC batch")?;
waiters.push((actor_id, waiter));
}

log::trace!(
"Sending JSON-RPC batch for eth_call(stateHash) of {} actors at block {block_id}",
mirrors.len()
);
batch
.send()
.await
.context("failed to send JSON-RPC batch")?;

let mut program_states = BTreeMap::new();
for (actor_id, waiter) in waiters {
let bytes = waiter.await.with_context(|| {
format!("Failed to get state hash for an actor at block {block_id}")
})?;
ensure!(
bytes.len() == 32,
"Unexpected eth_call(stateHash) response length for actor {actor_id}: expected 32, got {}",
bytes.len()
);
program_states.insert(actor_id, H256::from_slice(bytes.as_ref()));
}

Ok(program_states)
}
1 change: 1 addition & 0 deletions ethexe/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use std::time::Duration;

pub mod abi;
pub mod deploy;
pub mod ext;
pub mod middleware;
pub mod mirror;
pub mod router;
Expand Down
80 changes: 61 additions & 19 deletions ethexe/network/src/db_sync/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,26 @@ impl OngoingRequests {
request: OngoingRequest,
channel: oneshot::Sender<HandleResult>,
) {
let multiplier = if let InnerRequest::Hashes(HashesRequest(hashes)) = request
.response_handler
.as_ref()
.expect("response handler must be set at `inner_request` calling")
.inner_request()
{
1 + (hashes.len() / 100)
} else {
1
};

self.requests.insert(
request_id,
(
request
.request(
self.peer_score_handle.clone(),
self.external_data_provider.clone_boxed(),
self.request_timeout,
self.max_rounds_per_request,
self.request_timeout * multiplier as u32,
self.max_rounds_per_request * multiplier as u32,
)
.boxed(),
Some(channel),
Expand Down Expand Up @@ -236,7 +247,8 @@ impl OngoingRequests {
let event = match state {
OngoingRequestState::PendingState => Event::PendingStateRequest { request_id },
OngoingRequestState::SendRequest(peer, request, reason) => {
let outbound_request_id = behaviour.send_request(&peer, request);
let outbound_request_id = behaviour.send_request(&peer, request.clone());
log::trace!("sending request {outbound_request_id} to {peer} {request:?}, round reason: {reason:?}");
self.active_requests.insert(outbound_request_id, request_id);

Event::NewRequestRound {
Expand Down Expand Up @@ -715,18 +727,29 @@ impl OngoingRequest {
.difference(&self.tried_peers)
.choose_stable(&mut rand::thread_rng())
.copied();
self.tried_peers.extend(peer);

if let Some(peer) = peer {
self.tried_peers.insert(peer);
Poll::Ready(peer)
} else {
event_sent.get_or_insert_with(|| {
ctx.state
.set(OngoingRequestState::PendingState)
.expect("set only once");
});

Poll::Pending
if let Some(peer) = ctx
.peers
.iter()
.choose_stable(&mut rand::thread_rng())
.copied()
{
log::trace!("all peers have been tried, clear `tried_peers` and retry");
self.tried_peers.clear();
self.tried_peers.insert(peer);
Poll::Ready(peer)
} else {
event_sent.get_or_insert_with(|| {
ctx.state
.set(OngoingRequestState::PendingState)
.expect("set only once");
});
Poll::Pending
}
}
})
.await;
Expand All @@ -740,16 +763,26 @@ impl OngoingRequest {
peer: PeerId,
reason: NewRequestRoundReason,
) -> Result<InnerResponse, ()> {
let request = self
.response_handler
.as_ref()
.expect("always Some")
.inner_request();

let request = match request {
InnerRequest::Hashes(request) if request.0.len() > 100 => {
let r = InnerRequest::Hashes(HashesRequest(
request.0.iter().take(100).copied().collect(),
));
log::trace!("request is too big, send partial request: {r:?}");
r
}
other => other,
};

CONTEXT.with_mut(|ctx| {
ctx.state
.set(OngoingRequestState::SendRequest(
peer,
self.response_handler
.as_ref()
.expect("always Some")
.inner_request(),
reason,
))
.set(OngoingRequestState::SendRequest(peer, request, reason))
.expect("set only once");
});

Expand Down Expand Up @@ -809,6 +842,15 @@ impl OngoingRequest {
request_timeout: Duration,
max_rounds_per_request: u32,
) -> Result<Response, (RequestFailure, Self)> {
log::trace!(
"starting request {:?} with timeout {} secs and max rounds {max_rounds_per_request}",
self.response_handler
.as_ref()
.expect("response handler must be set at request moment")
.inner_request(),
request_timeout.as_secs(),
);

let request_loop = async {
let mut rounds = 0;
let mut reason = NewRequestRoundReason::FromQueue;
Expand Down
Loading
Loading