Skip to content

Commit b880ae9

Browse files
committed
connect new route to fetch messages, refactor processed message state from prev commit and remove other old route caching
1 parent 9cb4d63 commit b880ae9

File tree

2 files changed

+421
-289
lines changed

2 files changed

+421
-289
lines changed
Lines changed: 199 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,32 @@
11
//! Schema for processed channel data
22
3+
use crate::CHANNEL_PROCESSED_STATE_CACHE_LIMIT;
34
use parity_scale_codec::{Decode, Encode};
45
use sc_client_api::backend::AuxStore;
5-
use sp_blockchain::{Error as ClientError, Result as ClientResult};
6-
use sp_core::H256;
6+
use sp_blockchain::{Error as ClientError, HeaderBackend, Result as ClientResult};
77
use sp_messenger::messages::{ChainId, ChannelId, Nonce};
8-
use subspace_runtime_primitives::BlockNumber;
8+
use sp_runtime::traits::{Block as BlockT, NumberFor};
9+
use sp_runtime::{SaturatedConversion, Saturating};
10+
use std::sync::Arc;
911

1012
const CHANNEL_PROCESSED_STATE: &[u8] = b"channel_processed_state";
13+
const OUTBOX_MESSAGES_PREFIX: &[u8] = b"outbox_messages";
14+
const INBOX_RESPONSE_MESSAGES_PREFIX: &[u8] = b"inbox_responses_messages";
1115

12-
fn channel_processed_state_key(dst_chain_id: ChainId, channel_id: ChannelId) -> Vec<u8> {
13-
(CHANNEL_PROCESSED_STATE, dst_chain_id, channel_id).encode()
16+
fn channel_processed_state_key(
17+
prefix: &[u8],
18+
src_chain_id: ChainId,
19+
dst_chain_id: ChainId,
20+
channel_id: ChannelId,
21+
) -> Vec<u8> {
22+
(
23+
CHANNEL_PROCESSED_STATE,
24+
prefix,
25+
src_chain_id,
26+
dst_chain_id,
27+
channel_id,
28+
)
29+
.encode()
1430
}
1531

1632
fn load_decode<Backend: AuxStore, T: Decode>(
@@ -29,48 +45,210 @@ fn load_decode<Backend: AuxStore, T: Decode>(
2945

3046
/// Channel processed state for given dst_chain and channel ID.
3147
#[derive(Debug, Encode, Decode, Clone)]
32-
pub struct ChannelProcessedState {
48+
pub struct ChannelProcessedState<Block: BlockT> {
3349
// Block number of chain at which the channel state is updated
34-
pub block_number: BlockNumber,
50+
pub block_number: NumberFor<Block>,
3551
/// Block hash of the chain at which the channel state is updated.
36-
pub block_hash: H256,
52+
pub block_hash: Block::Hash,
3753
/// Channel identifier.
3854
pub channel_id: ChannelId,
39-
/// Last processed channel outbox nonce.
40-
pub last_outbox_nonce: Nonce,
41-
/// Last processed channel inbox message response nonce.
42-
pub last_inbox_message_response_nonce: Nonce,
55+
/// Last processed channel nonce.
56+
pub nonce: Option<Nonce>,
57+
}
58+
59+
/// Load the channel outbox processed state
60+
fn get_channel_outbox_processed_state<Backend, Block: BlockT>(
61+
backend: &Backend,
62+
src_chain_id: ChainId,
63+
dst_chain_id: ChainId,
64+
channel_id: ChannelId,
65+
) -> ClientResult<Option<ChannelProcessedState<Block>>>
66+
where
67+
Backend: AuxStore,
68+
{
69+
load_decode::<_, ChannelProcessedState<Block>>(
70+
backend,
71+
channel_processed_state_key(
72+
OUTBOX_MESSAGES_PREFIX,
73+
src_chain_id,
74+
dst_chain_id,
75+
channel_id,
76+
)
77+
.as_slice(),
78+
)
4379
}
4480

45-
/// Load the channel processed state
46-
pub fn get_channel_processed_state<Backend>(
81+
/// Load the channel inbox response processed state
82+
fn get_channel_inbox_message_response_processed_state<Backend, Block: BlockT>(
4783
backend: &Backend,
84+
src_chain_id: ChainId,
4885
dst_chain_id: ChainId,
4986
channel_id: ChannelId,
50-
) -> ClientResult<Option<ChannelProcessedState>>
87+
) -> ClientResult<Option<ChannelProcessedState<Block>>>
5188
where
5289
Backend: AuxStore,
5390
{
54-
load_decode::<_, ChannelProcessedState>(
91+
load_decode::<_, ChannelProcessedState<Block>>(
5592
backend,
56-
channel_processed_state_key(dst_chain_id, channel_id).as_slice(),
93+
channel_processed_state_key(
94+
INBOX_RESPONSE_MESSAGES_PREFIX,
95+
src_chain_id,
96+
dst_chain_id,
97+
channel_id,
98+
)
99+
.as_slice(),
57100
)
58101
}
59102

60-
/// Set the channel processed state
61-
pub fn set_channel_state<Backend>(
103+
/// Set the channel outbox processed state
104+
pub(crate) fn set_channel_outbox_processed_state<Backend, Block: BlockT>(
62105
backend: &Backend,
106+
src_chain_id: ChainId,
63107
dst_chain_id: ChainId,
64-
channel_state: ChannelProcessedState,
108+
channel_state: ChannelProcessedState<Block>,
65109
) -> ClientResult<()>
66110
where
67111
Backend: AuxStore,
68112
{
69113
backend.insert_aux(
70114
&[(
71-
channel_processed_state_key(dst_chain_id, channel_state.channel_id).as_slice(),
115+
channel_processed_state_key(
116+
OUTBOX_MESSAGES_PREFIX,
117+
src_chain_id,
118+
dst_chain_id,
119+
channel_state.channel_id,
120+
)
121+
.as_slice(),
72122
channel_state.encode().as_slice(),
73123
)],
74124
vec![],
75125
)
76126
}
127+
128+
/// Set the channel inbox processed state
129+
pub(crate) fn set_channel_inbox_response_processed_state<Backend, Block: BlockT>(
130+
backend: &Backend,
131+
src_chain_id: ChainId,
132+
dst_chain_id: ChainId,
133+
channel_state: ChannelProcessedState<Block>,
134+
) -> ClientResult<()>
135+
where
136+
Backend: AuxStore,
137+
{
138+
backend.insert_aux(
139+
&[(
140+
channel_processed_state_key(
141+
INBOX_RESPONSE_MESSAGES_PREFIX,
142+
src_chain_id,
143+
dst_chain_id,
144+
channel_state.channel_id,
145+
)
146+
.as_slice(),
147+
channel_state.encode().as_slice(),
148+
)],
149+
vec![],
150+
)
151+
}
152+
153+
/// Last processed nonce data.
154+
#[derive(Debug, Clone)]
155+
pub(crate) struct LastProcessedNonces {
156+
pub outbox_nonce: Option<Nonce>,
157+
pub inbox_response_nonce: Option<Nonce>,
158+
}
159+
160+
/// Returns non expired last processed nonces.
161+
pub(crate) fn get_last_processed_nonces<Backend, Client, Block>(
162+
backend: &Backend,
163+
client: &Arc<Client>,
164+
latest_hash: Block::Hash,
165+
src_chain_id: ChainId,
166+
dst_chain_id: ChainId,
167+
channel_id: ChannelId,
168+
) -> ClientResult<LastProcessedNonces>
169+
where
170+
Backend: AuxStore,
171+
Block: BlockT,
172+
Client: HeaderBackend<Block>,
173+
{
174+
let last_processed_outbox_nonce = get_channel_outbox_processed_state::<_, Block>(
175+
backend,
176+
src_chain_id,
177+
dst_chain_id,
178+
channel_id,
179+
)?
180+
.and_then(|state| {
181+
is_last_processed_nonce_valid(
182+
client,
183+
latest_hash,
184+
state.block_number,
185+
state.block_hash,
186+
state.nonce,
187+
)
188+
.ok()
189+
.flatten()
190+
});
191+
192+
let last_processed_inbox_response_nonce = get_channel_inbox_message_response_processed_state::<
193+
_,
194+
Block,
195+
>(
196+
backend, src_chain_id, dst_chain_id, channel_id
197+
)?
198+
.and_then(|state| {
199+
is_last_processed_nonce_valid(
200+
client,
201+
latest_hash,
202+
state.block_number,
203+
state.block_hash,
204+
state.nonce,
205+
)
206+
.ok()
207+
.flatten()
208+
});
209+
210+
Ok(LastProcessedNonces {
211+
outbox_nonce: last_processed_outbox_nonce,
212+
inbox_response_nonce: last_processed_inbox_response_nonce,
213+
})
214+
}
215+
216+
fn is_last_processed_nonce_valid<Client, Block>(
217+
client: &Arc<Client>,
218+
latest_hash: Block::Hash,
219+
processed_block_number: NumberFor<Block>,
220+
processed_block_hash: Block::Hash,
221+
last_process_nonce: Option<Nonce>,
222+
) -> ClientResult<Option<Nonce>>
223+
where
224+
Block: BlockT,
225+
Client: HeaderBackend<Block>,
226+
{
227+
// short circuit if there is no last processed nonce
228+
if last_process_nonce.is_none() {
229+
return Ok(None);
230+
}
231+
232+
// there is no block at this number, could be due to re-org
233+
let Some(block_hash) = client.hash(processed_block_number)? else {
234+
return Ok(None);
235+
};
236+
237+
// hash mismatch could be due to re-org
238+
if block_hash != processed_block_hash {
239+
return Ok(None);
240+
}
241+
242+
let Some(latest_number) = client.number(latest_hash)? else {
243+
return Ok(None);
244+
};
245+
246+
// if the cache limit is reached, return none
247+
if latest_number.saturating_sub(processed_block_number)
248+
> CHANNEL_PROCESSED_STATE_CACHE_LIMIT.saturated_into()
249+
{
250+
Ok(None)
251+
} else {
252+
Ok(last_process_nonce)
253+
}
254+
}

0 commit comments

Comments
 (0)