Skip to content

Commit 9cb4d63

Browse files
committed
ability to fetch next set of queries for a given block
1 parent 91c28f3 commit 9cb4d63

File tree

4 files changed

+275
-3
lines changed

4 files changed

+275
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

domains/client/relayer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ async-channel.workspace = true
1616
cross-domain-message-gossip.workspace = true
1717
futures.workspace = true
1818
parity-scale-codec = { workspace = true, features = ["derive"] }
19+
rand.workspace = true
1920
sc-client-api.workspace = true
2021
sc-state-db.workspace = true
2122
sc-utils.workspace = true
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//! Schema for processed channel data
2+
3+
use parity_scale_codec::{Decode, Encode};
4+
use sc_client_api::backend::AuxStore;
5+
use sp_blockchain::{Error as ClientError, Result as ClientResult};
6+
use sp_core::H256;
7+
use sp_messenger::messages::{ChainId, ChannelId, Nonce};
8+
use subspace_runtime_primitives::BlockNumber;
9+
10+
const CHANNEL_PROCESSED_STATE: &[u8] = b"channel_processed_state";
11+
12+
fn channel_processed_state_key(dst_chain_id: ChainId, channel_id: ChannelId) -> Vec<u8> {
13+
(CHANNEL_PROCESSED_STATE, dst_chain_id, channel_id).encode()
14+
}
15+
16+
fn load_decode<Backend: AuxStore, T: Decode>(
17+
backend: &Backend,
18+
key: &[u8],
19+
) -> ClientResult<Option<T>> {
20+
match backend.get_aux(key)? {
21+
None => Ok(None),
22+
Some(t) => T::decode(&mut &t[..])
23+
.map_err(|e| {
24+
ClientError::Backend(format!("Relayer DB is corrupted. Decode error: {e}"))
25+
})
26+
.map(Some),
27+
}
28+
}
29+
30+
/// Channel processed state for given dst_chain and channel ID.
31+
#[derive(Debug, Encode, Decode, Clone)]
32+
pub struct ChannelProcessedState {
33+
// Block number of chain at which the channel state is updated
34+
pub block_number: BlockNumber,
35+
/// Block hash of the chain at which the channel state is updated.
36+
pub block_hash: H256,
37+
/// Channel identifier.
38+
pub channel_id: ChannelId,
39+
/// Last processed channel outbox nonce.
40+
pub last_outbox_nonce: Nonce,
41+
/// Last processed channel inbox message response nonce.
42+
pub last_inbox_message_response_nonce: Nonce,
43+
}
44+
45+
/// Load the channel processed state
46+
pub fn get_channel_processed_state<Backend>(
47+
backend: &Backend,
48+
dst_chain_id: ChainId,
49+
channel_id: ChannelId,
50+
) -> ClientResult<Option<ChannelProcessedState>>
51+
where
52+
Backend: AuxStore,
53+
{
54+
load_decode::<_, ChannelProcessedState>(
55+
backend,
56+
channel_processed_state_key(dst_chain_id, channel_id).as_slice(),
57+
)
58+
}
59+
60+
/// Set the channel processed state
61+
pub fn set_channel_state<Backend>(
62+
backend: &Backend,
63+
dst_chain_id: ChainId,
64+
channel_state: ChannelProcessedState,
65+
) -> ClientResult<()>
66+
where
67+
Backend: AuxStore,
68+
{
69+
backend.insert_aux(
70+
&[(
71+
channel_processed_state_key(dst_chain_id, channel_state.channel_id).as_slice(),
72+
channel_state.encode().as_slice(),
73+
)],
74+
vec![],
75+
)
76+
}

domains/client/relayer/src/lib.rs

Lines changed: 197 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,40 @@
33
// TODO: Restore once https://github.com/rust-lang/rust/issues/122105 is resolved
44
// #![deny(unused_crate_dependencies)]
55

6+
mod aux_schema;
67
pub mod worker;
78

9+
use crate::aux_schema::get_channel_processed_state;
810
use async_channel::TrySendError;
911
use cross_domain_message_gossip::{
1012
can_allow_xdm_submission, get_channel_state, get_xdm_processed_block_number,
1113
set_xdm_message_processed_at, BlockId, Message as GossipMessage,
1214
MessageData as GossipMessageData, RELAYER_PREFIX,
1315
};
1416
use parity_scale_codec::{Codec, Encode};
17+
use rand::seq::SliceRandom;
1518
use sc_client_api::{AuxStore, HeaderBackend, ProofProvider, StorageProof};
1619
use sc_utils::mpsc::TracingUnboundedSender;
1720
use sp_api::{ApiRef, ProvideRuntimeApi};
1821
use sp_core::{H256, U256};
19-
use sp_domains::DomainsApi;
22+
use sp_domains::{ChannelId, DomainsApi};
2023
use sp_messenger::messages::{
21-
BlockMessageWithStorageKey, BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, Proof,
24+
BlockMessageWithStorageKey, BlockMessagesQuery, BlockMessagesWithStorageKey, ChainId,
25+
ChannelState, CrossDomainMessage, Nonce, Proof,
2226
};
2327
use sp_messenger::{MessengerApi, RelayerApi, XdmId, MAX_FUTURE_ALLOWED_NONCES};
2428
use sp_mmr_primitives::MmrApi;
2529
use sp_runtime::traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One};
26-
use sp_runtime::ArithmeticError;
30+
use sp_runtime::{ArithmeticError, SaturatedConversion, Saturating};
2731
use sp_subspace_mmr::ConsensusChainMmrLeafProof;
32+
use std::cmp::max;
2833
use std::marker::PhantomData;
2934
use std::sync::Arc;
3035
use subspace_runtime_primitives::BlockHashFor;
3136
use tracing::log;
3237

38+
const CHANNEL_PROCESSED_STATE_CACHE_LIMIT: u32 = 15;
39+
3340
/// The logging target.
3441
const LOG_TARGET: &str = "message::relayer";
3542

@@ -644,3 +651,190 @@ where
644651
Ok(proof)
645652
}
646653
}
654+
655+
// Fetch the unprocessed XDMs at a given block
656+
fn fetch_messages<Backend, Client, Block, CBlock>(
657+
backend: &Backend,
658+
client: &Arc<Client>,
659+
fetch_message_at: Block::Hash,
660+
self_chain_id: ChainId,
661+
) -> Result<Vec<BlockMessagesQuery>, Error>
662+
where
663+
Block: BlockT,
664+
Block::Hash: From<H256>,
665+
CBlock: BlockT,
666+
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
667+
Client::Api: RelayerApi<Block, NumberFor<Block>, NumberFor<CBlock>, CBlock::Hash>,
668+
Backend: AuxStore,
669+
{
670+
let runtime_api = client.runtime_api();
671+
let mut queries = runtime_api
672+
.channels_and_state(fetch_message_at)?
673+
.into_iter()
674+
.filter_map(|(dst_chain_id, channel_id, channel_state)| {
675+
get_channel_state_query(
676+
backend,
677+
client,
678+
fetch_message_at,
679+
self_chain_id,
680+
dst_chain_id,
681+
channel_id,
682+
channel_state,
683+
)
684+
.ok()
685+
.flatten()
686+
})
687+
.collect::<Vec<_>>();
688+
689+
// pick random 15 queries
690+
Ok(if queries.len() <= 15 {
691+
queries
692+
} else {
693+
let mut rng = rand::thread_rng();
694+
queries.shuffle(&mut rng);
695+
queries.truncate(15);
696+
queries
697+
})
698+
}
699+
700+
fn get_channel_state_query<Backend, Client, Block>(
701+
backend: &Backend,
702+
client: &Arc<Client>,
703+
fetch_message_at: Block::Hash,
704+
self_chain_id: ChainId,
705+
dst_chain_id: ChainId,
706+
channel_id: ChannelId,
707+
local_channel_state: ChannelState,
708+
) -> Result<Option<BlockMessagesQuery>, Error>
709+
where
710+
Backend: AuxStore,
711+
Block: BlockT,
712+
Block::Hash: From<H256>,
713+
Client: HeaderBackend<Block>,
714+
{
715+
let maybe_dst_channel_state =
716+
get_channel_state(backend, dst_chain_id, self_chain_id, channel_id)
717+
.ok()
718+
.flatten();
719+
720+
let maybe_channel_processed_state =
721+
if let Some(state) = get_channel_processed_state(backend, dst_chain_id, channel_id)? {
722+
match client.hash(state.block_number.into()).ok().flatten() {
723+
// there is no block at this number, could be due to re-org
724+
None => None,
725+
Some(hash) => {
726+
if hash != state.block_hash.into() {
727+
// client re-org'ed, allow xdm submission
728+
None
729+
} else {
730+
// check if the state is still valid from the current block
731+
match client.number(fetch_message_at)? {
732+
None => Some(state),
733+
Some(current_block_number) => {
734+
let block_limit: NumberFor<Block> =
735+
CHANNEL_PROCESSED_STATE_CACHE_LIMIT.saturated_into();
736+
737+
if state.block_number
738+
>= current_block_number
739+
.saturating_sub(block_limit)
740+
.saturated_into()
741+
{
742+
Some(state)
743+
} else {
744+
None
745+
}
746+
}
747+
}
748+
}
749+
}
750+
}
751+
} else {
752+
None
753+
};
754+
755+
Ok(
756+
match (maybe_dst_channel_state, maybe_channel_processed_state) {
757+
// don't have any info on channel, so assume from the beginning
758+
(None, None) => Some(BlockMessagesQuery {
759+
chain_id: dst_chain_id,
760+
channel_id,
761+
outbox_from: Nonce::zero(),
762+
inbox_responses_from: Nonce::zero(),
763+
}),
764+
// don't have channel processed state, so use the dst_channel state for query
765+
(Some(dst_channel_state), None) => Some(BlockMessagesQuery {
766+
chain_id: dst_chain_id,
767+
channel_id,
768+
outbox_from: dst_channel_state.next_inbox_nonce,
769+
inbox_responses_from: dst_channel_state
770+
.latest_response_received_message_nonce
771+
// pick the next inbox message response nonce or default to zero
772+
.map(|nonce| nonce.saturating_add(One::one()))
773+
.unwrap_or(Nonce::zero()),
774+
}),
775+
// don't have dst channel state, so use the last processed channel state
776+
(None, Some(channel_state)) => Some(BlockMessagesQuery {
777+
chain_id: dst_chain_id,
778+
channel_id,
779+
outbox_from: channel_state.last_outbox_nonce.saturating_add(One::one()),
780+
inbox_responses_from: channel_state
781+
.last_inbox_message_response_nonce
782+
.saturating_add(One::one()),
783+
}),
784+
(Some(dst_channel_state), Some(channel_state)) => {
785+
let next_outbox_nonce = max(
786+
dst_channel_state.next_inbox_nonce,
787+
channel_state.last_outbox_nonce.saturating_add(One::one()),
788+
);
789+
790+
let next_inbox_response_nonce = max(
791+
dst_channel_state
792+
.latest_response_received_message_nonce
793+
.map(|nonce| nonce.saturating_add(One::one()))
794+
.unwrap_or(Nonce::zero()),
795+
channel_state
796+
.last_inbox_message_response_nonce
797+
.saturating_add(One::one()),
798+
);
799+
800+
// if the local channel is closed, and
801+
// last outbox message is already included
802+
// and
803+
// if the dst_channel is closed, and
804+
// last inbox message response is already included
805+
// we can safely skip the channel as the there is nothing further to send.
806+
if local_channel_state == ChannelState::Closed
807+
&& dst_channel_state.state == ChannelState::Closed
808+
{
809+
let is_last_outbox_nonce = dst_channel_state
810+
.next_inbox_nonce
811+
.saturating_sub(One::one())
812+
== channel_state.last_outbox_nonce;
813+
814+
let is_last_inbox_message_response_nonce = dst_channel_state
815+
.latest_response_received_message_nonce
816+
.unwrap_or(Nonce::zero())
817+
== channel_state.last_inbox_message_response_nonce;
818+
819+
if is_last_outbox_nonce && is_last_inbox_message_response_nonce {
820+
None
821+
} else {
822+
Some(BlockMessagesQuery {
823+
chain_id: dst_chain_id,
824+
channel_id,
825+
outbox_from: next_outbox_nonce,
826+
inbox_responses_from: next_inbox_response_nonce,
827+
})
828+
}
829+
} else {
830+
Some(BlockMessagesQuery {
831+
chain_id: dst_chain_id,
832+
channel_id,
833+
outbox_from: next_outbox_nonce,
834+
inbox_responses_from: next_inbox_response_nonce,
835+
})
836+
}
837+
}
838+
},
839+
)
840+
}

0 commit comments

Comments
 (0)