Skip to content

Commit eea0aa9

Browse files
authored
Remove abstract mutex and channel types from runtime context (#592)
* Remove HasMutex trait and use futures::Mutex directly * Remove channel traits * Fix use of futures::UnboundedReceiver, which works differently from Tokio's version * Remove run_concurrent_task_stream method * Remove stream traits
1 parent fdaa818 commit eea0aa9

File tree

45 files changed

+148
-1583
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+148
-1583
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cosmos/cosmos-relayer/src/impls/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use cgp::core::error::{
1010
};
1111
use cgp::prelude::*;
1212
use eyre::Report;
13+
use futures::channel::mpsc::SendError;
14+
use futures::channel::oneshot::Canceled;
1315
use hermes_any_counterparty::impls::encoding::client_state::UnknownClientStateType;
1416
use hermes_any_counterparty::impls::encoding::consensus_state::UnknownConsensusStateType;
1517
use hermes_chain_type_components::traits::types::amount::HasAmountType;
@@ -132,6 +134,8 @@ delegate_components! {
132134
subtle_encoding::Error,
133135
reqwest::Error,
134136
InvalidUri,
137+
SendError,
138+
Canceled,
135139

136140
// TODO: make it retryable?
137141
TransportError,

crates/relayer/relayer-components-extra/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ hermes-relayer-components = { workspace = true }
1717
hermes-chain-type-components = { workspace = true }
1818
hermes-runtime-components = { workspace = true }
1919
hermes-logging-components = { workspace = true }
20+
21+
futures = { workspace = true }

crates/relayer/relayer-components-extra/src/batch/impls/message_sender.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,49 @@
11
use alloc::vec::Vec;
22
use core::marker::PhantomData;
33

4-
use cgp::core::error::ErrorOf;
54
use cgp::prelude::*;
5+
use futures::channel::mpsc::{SendError, TrySendError};
6+
use futures::channel::oneshot;
7+
use futures::channel::oneshot::Canceled;
68
use hermes_chain_type_components::traits::types::message_response::MessageResponseOf;
79
use hermes_relayer_components::chain::types::aliases::MessageOf;
810
use hermes_relayer_components::relay::traits::ibc_message_sender::{
911
CanSendIbcMessages, IbcMessageSender, IbcMessageSenderComponent,
1012
};
1113
use hermes_relayer_components::relay::traits::target::{HasTargetChainTypes, RelayTarget};
12-
use hermes_runtime_components::traits::channel::CanUseChannels;
13-
use hermes_runtime_components::traits::channel_once::{CanCreateChannelsOnce, CanUseChannelsOnce};
1414

1515
use crate::batch::traits::channel::HasMessageBatchSender;
1616
use crate::batch::traits::types::CanUseMessageBatchChannel;
1717
use crate::batch::types::sink::BatchWorkerSink;
1818

19-
pub struct SendMessagesToBatchWorker;
20-
21-
#[cgp_provider(IbcMessageSenderComponent<Sink>)]
19+
#[cgp_new_provider(IbcMessageSenderComponent<Sink>)]
2220
impl<Relay, Sink, Target> IbcMessageSender<Relay, Sink, Target> for SendMessagesToBatchWorker
2321
where
2422
Target: RelayTarget,
2523
Relay: HasTargetChainTypes<Target>
2624
+ CanSendIbcMessages<BatchWorkerSink, Target>
2725
+ CanUseMessageBatchChannel<Target::Chain>
2826
+ HasMessageBatchSender<Target::Chain>
29-
+ CanRaiseAsyncError<ErrorOf<Relay::Runtime>>,
30-
Relay::Runtime: CanCreateChannelsOnce + CanUseChannelsOnce + CanUseChannels,
27+
+ CanRaiseAsyncError<SendError>
28+
+ CanRaiseAsyncError<Canceled>,
3129
{
3230
async fn send_messages(
3331
relay: &Relay,
3432
_target: Target,
3533
messages: Vec<MessageOf<Relay::TargetChain>>,
3634
) -> Result<Vec<MessageResponseOf<Relay::TargetChain>>, Relay::Error> {
37-
let (result_sender, result_receiver) = Relay::Runtime::new_channel_once();
35+
let (result_sender, result_receiver) = oneshot::channel();
3836

3937
let message_sender = relay.get_batch_sender(PhantomData::<Target::Chain>);
4038

41-
Relay::Runtime::send(message_sender, (messages, result_sender))
39+
message_sender
40+
.lock()
4241
.await
42+
.unbounded_send((messages, result_sender))
43+
.map_err(TrySendError::into_send_error)
4344
.map_err(Relay::raise_error)?;
4445

45-
let events = Relay::Runtime::receive_once(result_receiver)
46-
.await
47-
.map_err(Relay::raise_error)??;
46+
let events = result_receiver.await.map_err(Relay::raise_error)??;
4847

4948
Ok(events)
5049
}
Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1+
use alloc::sync::Arc;
12
use alloc::vec::Vec;
23

34
use cgp::core::Async;
45
use cgp::prelude::HasAsyncErrorType;
6+
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
7+
use futures::channel::oneshot;
8+
use futures::lock::Mutex;
59
use hermes_chain_type_components::traits::types::message::HasMessageType;
610
use hermes_chain_type_components::traits::types::message_response::{
711
HasMessageResponseType, MessageResponseOf,
812
};
913
use hermes_relayer_components::chain::types::aliases::MessageOf;
1014
use hermes_relayer_components::multi::traits::chain_at::HasChainTypeAt;
11-
use hermes_runtime_components::traits::channel::{HasChannelTypes, ReceiverOf, SenderOf};
12-
use hermes_runtime_components::traits::channel_once::{HasChannelOnceTypes, SenderOnceOf};
13-
use hermes_runtime_components::traits::runtime::HasRuntimeType;
1415

1516
pub trait HasMessageBatchChannelTypes<Tag>: Async {
1617
type BatchSubmission: Async;
@@ -26,62 +27,49 @@ pub type MessageBatchSenderOf<Context, Tag> =
2627
pub type MessageBatchReceiverOf<Context, Tag> =
2728
<Context as HasMessageBatchChannelTypes<Tag>>::MessageBatchReceiver;
2829

29-
impl<Context, Tag, Chain, Runtime> HasMessageBatchChannelTypes<Tag> for Context
30+
impl<Context, Tag, Chain> HasMessageBatchChannelTypes<Tag> for Context
3031
where
31-
Context:
32-
HasChainTypeAt<Tag, Chain = Chain> + HasRuntimeType<Runtime = Runtime> + HasAsyncErrorType,
32+
Context: HasChainTypeAt<Tag, Chain = Chain> + HasAsyncErrorType,
3333
Chain: HasMessageType + HasMessageResponseType,
34-
Runtime: HasChannelTypes + HasChannelOnceTypes,
3534
{
3635
type BatchSubmission = (
3736
Vec<Chain::Message>,
38-
SenderOnceOf<Runtime, Result<Vec<Chain::MessageResponse>, Context::Error>>,
37+
oneshot::Sender<Result<Vec<Chain::MessageResponse>, Context::Error>>,
3938
);
4039

41-
type MessageBatchSender = SenderOf<Runtime, Self::BatchSubmission>;
40+
type MessageBatchSender = Arc<Mutex<UnboundedSender<Self::BatchSubmission>>>;
4241

43-
type MessageBatchReceiver = ReceiverOf<Runtime, Self::BatchSubmission>;
42+
type MessageBatchReceiver = UnboundedReceiver<Self::BatchSubmission>;
4443
}
4544

4645
pub trait CanUseMessageBatchChannel<Tag>:
4746
HasChainTypeAt<Tag, Chain: HasMessageType + HasMessageResponseType>
48-
+ HasRuntimeType<Runtime: HasChannelTypes + HasChannelOnceTypes>
4947
+ HasAsyncErrorType
5048
+ HasMessageBatchChannelTypes<
5149
Tag,
5250
BatchSubmission = (
5351
Vec<MessageOf<Self::Chain>>,
54-
SenderOnceOf<Self::Runtime, Result<Vec<MessageResponseOf<Self::Chain>>, Self::Error>>,
52+
oneshot::Sender<Result<Vec<MessageResponseOf<Self::Chain>>, Self::Error>>,
5553
),
56-
MessageBatchSender = SenderOf<
57-
Self::Runtime,
58-
(
59-
Vec<MessageOf<Self::Chain>>,
60-
SenderOnceOf<
61-
Self::Runtime,
62-
Result<Vec<MessageResponseOf<Self::Chain>>, Self::Error>,
63-
>,
64-
),
65-
>,
66-
MessageBatchReceiver = ReceiverOf<
67-
Self::Runtime,
68-
(
69-
Vec<MessageOf<Self::Chain>>,
70-
SenderOnceOf<
71-
Self::Runtime,
72-
Result<Vec<MessageResponseOf<Self::Chain>>, Self::Error>,
73-
>,
74-
),
54+
MessageBatchSender = Arc<
55+
Mutex<
56+
UnboundedSender<(
57+
Vec<MessageOf<Self::Chain>>,
58+
oneshot::Sender<Result<Vec<MessageResponseOf<Self::Chain>>, Self::Error>>,
59+
)>,
60+
>,
7561
>,
62+
MessageBatchReceiver = UnboundedReceiver<(
63+
Vec<MessageOf<Self::Chain>>,
64+
oneshot::Sender<Result<Vec<MessageResponseOf<Self::Chain>>, Self::Error>>,
65+
)>,
7666
>
7767
{
7868
}
7969

80-
impl<Context, Tag, Chain, Runtime> CanUseMessageBatchChannel<Tag> for Context
70+
impl<Context, Tag, Chain> CanUseMessageBatchChannel<Tag> for Context
8171
where
82-
Context:
83-
HasChainTypeAt<Tag, Chain = Chain> + HasRuntimeType<Runtime = Runtime> + HasAsyncErrorType,
72+
Context: HasChainTypeAt<Tag, Chain = Chain> + HasAsyncErrorType,
8473
Chain: HasMessageType + HasMessageResponseType,
85-
Runtime: HasChannelTypes + HasChannelOnceTypes,
8674
{
8775
}

crates/relayer/relayer-components-extra/src/batch/worker.rs

Lines changed: 51 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ use hermes_relayer_components::chain::traits::types::message::{
1313
use hermes_relayer_components::multi::traits::chain_at::HasChainAt;
1414
use hermes_relayer_components::relay::traits::ibc_message_sender::CanSendIbcMessages;
1515
use hermes_relayer_components::relay::traits::target::RelayTarget;
16-
use hermes_runtime_components::traits::channel::{CanUseChannels, HasChannelTypes};
17-
use hermes_runtime_components::traits::channel_once::{CanUseChannelsOnce, HasChannelOnceTypes};
18-
use hermes_runtime_components::traits::mutex::HasMutex;
1916
use hermes_runtime_components::traits::runtime::HasRuntime;
2017
use hermes_runtime_components::traits::sleep::CanSleep;
2118
use hermes_runtime_components::traits::spawn::CanSpawnTask;
@@ -100,7 +97,7 @@ where
10097
+ CanUseMessageBatchChannel<Target::Chain>
10198
+ CanProcessMessageBatches<Target>
10299
+ for<'a> CanLog<LogBatchWorker<'a, Target>>,
103-
Relay::Runtime: HasTime + HasMutex + CanSleep + CanUseChannels + HasChannelOnceTypes,
100+
Relay::Runtime: HasTime + CanSleep,
104101
{
105102
async fn run_loop(&self, config: &BatchConfig, mut receiver: Relay::MessageBatchReceiver) {
106103
let runtime = self.runtime();
@@ -110,54 +107,56 @@ where
110107
let mut last_sent_time = runtime.now();
111108

112109
loop {
113-
let payload = Relay::Runtime::try_receive(&mut receiver);
114-
115-
match payload {
116-
Ok(m_batch) => {
117-
if let Some(batch) = m_batch {
118-
let batch_size = batch.0.len();
119-
120-
self.log(
121-
"received message batch",
122-
&LogBatchWorker {
123-
details: &format!("batch_size = {batch_size}"),
124-
log_level: LogLevel::Trace,
125-
phantom: PhantomData,
126-
},
127-
)
128-
.await;
129-
130-
pending_batches.push_back(batch);
131-
}
110+
// Poll and see if there is additional payload from the receiver.
111+
// We do not block and wait for the next payload, as it may only arrive much later on.
112+
// If there is no payload, we will still need to continue and process any pending messages
113+
// that we have previously received.
114+
let payload = receiver.try_next();
132115

133-
let current_batch_size = pending_batches.len();
134-
let now = runtime.now();
116+
// futures::UnboundedReceiver uses `Ok(None)` to indicate that the channel has closed.
117+
// We ignore errors as it indicates the sender was not ready, which will be retried later on.
118+
//
119+
// https://docs.rs/futures/latest/futures/channel/mpsc/struct.UnboundedReceiver.html#method.try_next
120+
let channel_closed = matches!(payload, Ok(None));
135121

136-
self.process_message_batches(
137-
config,
138-
&mut pending_batches,
139-
now,
140-
&mut last_sent_time,
141-
)
142-
.await;
122+
if let Ok(Some(batch)) = payload {
123+
let batch_size = batch.0.len();
143124

144-
if pending_batches.len() == current_batch_size {
145-
runtime.sleep(config.sleep_time).await;
146-
}
147-
}
148-
Err(e) => {
149-
self.log(
150-
"error in try_receive, terminating worker",
151-
&LogBatchWorker {
152-
details: &format!("error = {:?}", e),
153-
log_level: LogLevel::Error,
154-
phantom: PhantomData,
155-
},
156-
)
157-
.await;
158-
159-
return;
160-
}
125+
self.log(
126+
"received message batch",
127+
&LogBatchWorker {
128+
details: &format!("batch_size = {batch_size}"),
129+
log_level: LogLevel::Trace,
130+
phantom: PhantomData,
131+
},
132+
)
133+
.await;
134+
135+
pending_batches.push_back(batch);
136+
}
137+
138+
let current_batch_size = pending_batches.len();
139+
let now = runtime.now();
140+
141+
self.process_message_batches(config, &mut pending_batches, now, &mut last_sent_time)
142+
.await;
143+
144+
if pending_batches.len() == current_batch_size {
145+
runtime.sleep(config.sleep_time).await;
146+
}
147+
148+
if channel_closed {
149+
self.log(
150+
"batch channel has closed, terminating worker",
151+
&LogBatchWorker {
152+
details: "",
153+
log_level: LogLevel::Info,
154+
phantom: PhantomData,
155+
},
156+
)
157+
.await;
158+
159+
return;
161160
}
162161
}
163162
}
@@ -184,8 +183,7 @@ where
184183
+ CanUseMessageBatchChannel<Target::Chain>
185184
+ CanPartitionMessageBatches<Target>
186185
+ for<'a> CanLog<LogBatchWorker<'a, Target>>,
187-
Relay::Runtime:
188-
HasTime + CanSpawnTask + HasChannelTypes + HasChannelOnceTypes + HasAsyncErrorType,
186+
Relay::Runtime: HasTime + CanSpawnTask + HasAsyncErrorType,
189187
SendReadyBatchTask<Relay, Target>: Task,
190188
{
191189
async fn process_message_batches(
@@ -325,7 +323,6 @@ where
325323
Relay: CanUseMessageBatchChannel<Target::Chain>
326324
+ CanSendIbcMessages<BatchWorkerSink, Target>
327325
+ for<'a> CanLog<LogBatchWorker<'a, Target>>,
328-
Relay::Runtime: CanUseChannelsOnce + CanUseChannels,
329326
Relay::Error: Clone,
330327
{
331328
async fn send_ready_batches(&self, ready_batches: VecDeque<Relay::BatchSubmission>) {
@@ -366,7 +363,7 @@ where
366363
.await;
367364

368365
for (_, sender) in senders.into_iter() {
369-
let _ = Relay::Runtime::send_once(sender, Err(e.clone()));
366+
let _ = sender.send(Err(e.clone()));
370367
}
371368
}
372369
Ok(all_events) => {
@@ -385,7 +382,7 @@ where
385382

386383
for (message_count, sender) in senders.into_iter() {
387384
let events = take(&mut all_events, message_count);
388-
let _ = Relay::Runtime::send_once(sender, Ok(events));
385+
let _ = sender.send(Ok(events));
389386
}
390387
}
391388
}

0 commit comments

Comments
 (0)