Skip to content

Commit 51e09b2

Browse files
authored
fix(relayer): Accept http requests in gear-eth (#806)
1 parent 9fd11ec commit 51e09b2

File tree

8 files changed

+318
-256
lines changed

8 files changed

+318
-256
lines changed

Cargo.lock

Lines changed: 51 additions & 51 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ members = [
4444
resolver = "2"
4545

4646
[workspace.package]
47-
version = "1.0.0"
47+
version = "1.0.1"
4848
edition = "2021"
4949

5050
[workspace.dependencies]

relayer/src/message_relayer/common/gear/block_listener.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl BlockListener {
4848
}
4949

5050
pub async fn run<const RECEIVER_COUNT: usize>(
51-
self,
51+
mut self,
5252
) -> [broadcast::Receiver<GearBlock>; RECEIVER_COUNT] {
5353
// Capacity for the channel. At the moment merkle-root relayer might lag behind
5454
// during proof generation or era sync, so we need to have enough capacity
@@ -57,14 +57,13 @@ impl BlockListener {
5757
let (tx, _) = broadcast::channel(CAPACITY);
5858
let tx2 = tx.clone();
5959
tokio::task::spawn(async move {
60-
let api = self.api_provider.client();
6160
let UnprocessedBlocks {
6261
last_block,
6362
first_block,
6463
blocks: _,
6564
} = self.block_storage.unprocessed_blocks().await;
6665

67-
let api_back = api.clone();
66+
let api_back = self.api_provider.client();
6867
let tx_back = tx2.clone();
6968
let storage_back = self.block_storage.clone();
7069

@@ -118,9 +117,10 @@ impl BlockListener {
118117
Ok(())
119118
});
120119

120+
let mut last_finalized_block_number = None;
121121
loop {
122-
let res = self.run_inner(&tx2).await;
123-
match res {
122+
let res = self.run_inner(&tx2, &mut last_finalized_block_number).await;
123+
let e = match res {
124124
Ok(false) => {
125125
log::info!("Gear block listener stopped due to no active receivers");
126126
return;
@@ -131,12 +131,17 @@ impl BlockListener {
131131
continue;
132132
}
133133

134-
Err(err) => {
135-
log::error!("Gear block listener failed: {err}");
134+
Err(e) => e,
135+
};
136136

137-
return;
138-
}
137+
log::error!(r#"Gear block listener failed: "{e:?}""#);
138+
139+
if let Err(e) = self.api_provider.reconnect().await {
140+
log::error!(r#"API provider unable to reconnect: "{e}""#);
141+
return;
139142
}
143+
144+
log::debug!("API provider reconnected");
140145
}
141146
});
142147

@@ -147,20 +152,22 @@ impl BlockListener {
147152
.expect("expected Vec of correct length")
148153
}
149154

150-
async fn run_inner(&self, tx: &broadcast::Sender<GearBlock>) -> anyhow::Result<bool> {
155+
async fn run_inner(
156+
&self,
157+
tx: &broadcast::Sender<GearBlock>,
158+
last_finalized_block_number: &mut Option<u32>,
159+
) -> anyhow::Result<bool> {
151160
let gear_api = self.api_provider.client();
152161

153-
let mut last_finalized_block_number = None;
154162
let mut subscription = gear_api.subscribe_grandpa_justifications().await?;
155-
156163
while let Some(justification) = subscription.next().await {
157164
let justification = justification?;
158165

159166
let block_hash = justification.commit.target_hash;
160167
let block_number = justification.commit.target_number;
161168

162169
// Check if there are missing blocks and fetch them
163-
if let Some(last_finalized) = last_finalized_block_number {
170+
if let Some(last_finalized) = *last_finalized_block_number {
164171
if last_finalized + 1 != block_number {
165172
log::info!("Detected gap: last finalized block was #{last_finalized}, current block is #{block_number}");
166173

@@ -211,7 +218,7 @@ impl BlockListener {
211218
}
212219

213220
// Update the last finalized block number
214-
last_finalized_block_number = Some(block_number);
221+
*last_finalized_block_number = Some(block_number);
215222
self.metrics.latest_block.set(block_number as i64);
216223
}
217224

relayer/src/message_relayer/common/gear/message_data_extractor.rs

Lines changed: 74 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,27 @@ use anyhow::Result as AnyResult;
88
use ethereum_common::U256;
99
use std::{cmp::Ordering, ops::Deref};
1010
use tokio::{
11-
sync::mpsc::{UnboundedReceiver, UnboundedSender},
11+
sync::mpsc::{UnboundedReceiver, WeakUnboundedSender},
1212
task,
1313
};
1414

1515
pub type BlockData = (GearBlock, AuthoritySetId);
1616

17+
/// The purpose of the entity is to receive requests to relay message(s) from an external source,
18+
/// than get required data for relaying the message(s) and send the data further.
19+
///
20+
/// An example of an external requester is HTTP server.
1721
pub struct MessageDataExtractor {
1822
api_provider: ApiProviderConnection,
19-
sender: UnboundedSender<MessageInBlock>,
23+
sender: WeakUnboundedSender<MessageInBlock>,
2024
receiver: UnboundedReceiver<Message>,
2125
blocks: BlockDataList,
2226
}
2327

2428
impl MessageDataExtractor {
2529
pub fn new(
2630
api_provider: ApiProviderConnection,
27-
sender: UnboundedSender<MessageInBlock>,
31+
sender: WeakUnboundedSender<MessageInBlock>,
2832
receiver: UnboundedReceiver<Message>,
2933
) -> Self {
3034
Self {
@@ -35,33 +39,10 @@ impl MessageDataExtractor {
3539
}
3640
}
3741

38-
pub fn sender(&self) -> &UnboundedSender<MessageInBlock> {
39-
&self.sender
40-
}
41-
4242
pub fn spawn(self) {
4343
task::spawn(self::task(self));
4444
}
4545

46-
async fn run_inner(&mut self) -> anyhow::Result<()> {
47-
loop {
48-
let Some(message) = self.receiver.recv().await else {
49-
return Ok(());
50-
};
51-
52-
log::trace!(r#"Processing message: "{message:?}""#);
53-
54-
let block_data = match self.find_block_data(message.block) {
55-
Some(block_data) => block_data,
56-
None => self.retreive_block_data(message.block).await?,
57-
};
58-
59-
log::trace!(r#"Found data for the message block: "{block_data:?}""#);
60-
61-
self.process_message_block(message, block_data).await?;
62-
}
63-
}
64-
6546
fn find_block_data(&self, block_number: u32) -> Option<BlockData> {
6647
self.blocks.find_by_block_number(block_number).cloned()
6748
}
@@ -86,56 +67,87 @@ impl MessageDataExtractor {
8667

8768
Ok(block_data)
8869
}
89-
90-
async fn process_message_block(
91-
&self,
92-
message: Message,
93-
block_data: BlockData,
94-
) -> anyhow::Result<()> {
95-
let (block, authority_set_id) = block_data;
96-
let messages = common::message_queued_events_of(&block);
97-
let block_hash = block.hash();
98-
for message_queued in messages {
99-
if U256::from_big_endian(&message_queued.nonce_be).0 != message.nonce.0 {
100-
log::info!("Message nonce mismatch, skipping {message_queued:?}");
101-
continue;
102-
}
103-
104-
log::trace!("Processing message in block: {message_queued:?}");
105-
106-
self.sender.send(MessageInBlock {
107-
message: message_queued,
108-
block: GearBlockNumber(block.number()),
109-
block_hash: block_hash.0.into(),
110-
authority_set_id,
111-
})?;
112-
113-
break;
114-
}
115-
116-
Ok(())
117-
}
11870
}
11971

12072
async fn task(mut this: MessageDataExtractor) {
73+
let mut message_last = None;
12174
loop {
122-
let result = this.run_inner().await;
75+
let result = run_inner(&mut this, &mut message_last).await;
12376
let Err(e) = result else {
12477
log::trace!("Message data extractor exiting...");
12578
return;
12679
};
12780

12881
log::error!("Message data extractor failed: {e}");
82+
if let Err(e) = this.api_provider.reconnect().await {
83+
log::error!(r#"Unable to reconnect: "{e}""#);
84+
return;
85+
}
12986

130-
match this.api_provider.reconnect().await {
131-
Ok(_) => {
132-
log::info!("Message queued extractor reconnected");
87+
log::debug!("API provider reconnected");
88+
}
89+
}
90+
91+
async fn run_inner(
92+
this: &mut MessageDataExtractor,
93+
message_last: &mut Option<Message>,
94+
) -> anyhow::Result<()> {
95+
loop {
96+
let message = match message_last.clone() {
97+
Some(message) => message,
98+
None => {
99+
let Some(message) = this.receiver.recv().await else {
100+
return Ok(());
101+
};
102+
103+
*message_last = Some(message.clone());
104+
105+
message
133106
}
107+
};
108+
109+
log::trace!(r#"Processing message: "{message:?}""#);
134110

135-
Err(e) => {
136-
log::error!("Message queued extractor unable to reconnect: {e:?}");
137-
return;
111+
let block_data = match this.find_block_data(message.block) {
112+
Some(block_data) => block_data,
113+
None => this.retreive_block_data(message.block).await?,
114+
};
115+
116+
*message_last = None;
117+
118+
log::trace!(r#"Found data for the message block: "{block_data:?}""#);
119+
120+
let (block, authority_set_id) = block_data;
121+
let messages = common::message_queued_events_of(&block);
122+
let block_hash = block.hash();
123+
let Some(sender) = this.sender.upgrade() else {
124+
log::info!("Unable to upgrade sender channel.");
125+
126+
return Ok(());
127+
};
128+
129+
for message_queued in messages {
130+
if U256::from_big_endian(&message_queued.nonce_be).0 != message.nonce.0 {
131+
log::info!("Message nonce mismatch, skipping {message_queued:?}");
132+
continue;
138133
}
134+
135+
log::trace!("Processing message in block: {message_queued:?}");
136+
137+
if sender
138+
.send(MessageInBlock {
139+
message: message_queued,
140+
block: GearBlockNumber(block.number()),
141+
block_hash: block_hash.0.into(),
142+
authority_set_id,
143+
})
144+
.is_err()
145+
{
146+
log::info!("Sender channel closed.");
147+
return Ok(());
148+
}
149+
150+
break;
139151
}
140152
}
141153
}

relayer/src/message_relayer/common/paid_messages_filter.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub struct PaidMessagesFilter {
1313
pending_messages: HashMap<[u8; 32], MessageInBlock>,
1414
pending_nonces: Vec<[u8; 32]>,
1515
excluded_from_fees: HashSet<AccountId32>,
16+
sender: UnboundedSender<MessageInBlock>,
1617

1718
metrics: Metrics,
1819
}
@@ -33,11 +34,15 @@ impl_metered_service! {
3334
}
3435

3536
impl PaidMessagesFilter {
36-
pub fn new(excluded_from_fees: HashSet<AccountId32>) -> Self {
37+
pub fn new(
38+
excluded_from_fees: HashSet<AccountId32>,
39+
sender: UnboundedSender<MessageInBlock>,
40+
) -> Self {
3741
Self {
3842
pending_messages: HashMap::default(),
3943
pending_nonces: vec![],
4044
excluded_from_fees,
45+
sender,
4146
metrics: Metrics::new(),
4247
}
4348
}
@@ -46,10 +51,9 @@ impl PaidMessagesFilter {
4651
mut self,
4752
mut messages: UnboundedReceiver<MessageInBlock>,
4853
mut paid_messages: UnboundedReceiver<PaidMessage>,
49-
sender: UnboundedSender<MessageInBlock>,
5054
) {
5155
tokio::spawn(async move {
52-
match run_inner(&mut self, &sender, &mut messages, &mut paid_messages).await {
56+
match run_inner(&mut self, &mut messages, &mut paid_messages).await {
5357
Ok(_) => {}
5458
Err(e) => log::error!("Paid messages filter failed: {e}"),
5559
}
@@ -59,7 +63,6 @@ impl PaidMessagesFilter {
5963

6064
async fn run_inner(
6165
self_: &mut PaidMessagesFilter,
62-
sender: &UnboundedSender<MessageInBlock>,
6366
messages: &mut UnboundedReceiver<MessageInBlock>,
6467
paid_messages: &mut UnboundedReceiver<PaidMessage>,
6568
) -> anyhow::Result<()> {
@@ -91,7 +94,7 @@ async fn run_inner(
9194
AccountId32::from(message.message.source),
9295
hex::encode(message.message.nonce_be)
9396
);
94-
sender.send(message)?;
97+
self_.sender.send(message)?;
9598

9699
continue;
97100
}
@@ -100,7 +103,7 @@ async fn run_inner(
100103
.pending_messages
101104
.insert(message.message.nonce_be, message)
102105
{
103-
panic!(
106+
log::error!(
104107
"Received 2 messages with the same nonce: {}",
105108
hex::encode(msg.message.nonce_be)
106109
);
@@ -112,7 +115,7 @@ async fn run_inner(
112115

113116
for i in (0..self_.pending_nonces.len()).rev() {
114117
if let Some(message) = self_.pending_messages.remove(&self_.pending_nonces[i]) {
115-
sender.send(message)?;
118+
self_.sender.send(message)?;
116119
self_.pending_nonces.remove(i);
117120
}
118121
}
@@ -140,7 +143,8 @@ mod tests {
140143

141144
set.insert(account0.into());
142145

143-
let filter = PaidMessagesFilter::new(set);
146+
let (filter_msg_sender, mut msg_receiver) = mpsc::unbounded_channel();
147+
let filter = PaidMessagesFilter::new(set, filter_msg_sender);
144148

145149
let message0 = MessageInBlock {
146150
message: Message {
@@ -168,8 +172,7 @@ mod tests {
168172

169173
let (msg_sender, filter_msg_receiver) = mpsc::unbounded_channel();
170174
let (paid_sender, paid_receiver) = mpsc::unbounded_channel();
171-
let (filter_msg_sender, mut msg_receiver) = mpsc::unbounded_channel();
172-
filter.spawn(filter_msg_receiver, paid_receiver, filter_msg_sender);
175+
filter.spawn(filter_msg_receiver, paid_receiver);
173176

174177
msg_sender.send(message0).unwrap();
175178
let res = msg_receiver.recv().await.unwrap();

0 commit comments

Comments
 (0)