Skip to content

Commit 6bb48bf

Browse files
authored
Expose orderpool_sender on public api (#240)
Closes #225
1 parent 5d152e8 commit 6bb48bf

File tree

4 files changed

+37
-15
lines changed

4 files changed

+37
-15
lines changed

crates/rbuilder/src/bin/dummy-builder.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ use reth::tasks::pool::BlockingTaskPool;
4141
use reth_chainspec::MAINNET;
4242
use reth_db::{database::Database, DatabaseEnv};
4343
use reth_provider::{DatabaseProviderFactory, StateProviderFactory};
44-
use tokio::{signal::ctrl_c, sync::broadcast};
44+
use tokio::{
45+
signal::ctrl_c,
46+
sync::{broadcast, mpsc},
47+
};
4548
use tokio_util::sync::CancellationToken;
4649
use tracing::{info, level_filters::LevelFilter};
4750

@@ -71,6 +74,18 @@ async fn main() -> eyre::Result<()> {
7174
cancel.clone(),
7275
);
7376

77+
let order_input_config = OrderInputConfig::new(
78+
false,
79+
true,
80+
DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(),
81+
DEFAULT_INCOMING_BUNDLES_PORT,
82+
*DEFAULT_IP,
83+
DEFAULT_SERVE_MAX_CONNECTIONS,
84+
DEFAULT_RESULTS_CHANNEL_TIMEOUT,
85+
DEFAULT_INPUT_CHANNEL_BUFFER_SIZE,
86+
);
87+
let (orderpool_sender, orderpool_receiver) =
88+
mpsc::channel(order_input_config.input_channel_buffer_size);
7489
let builder = LiveBuilder::<
7590
ProviderFactoryReopener<Arc<DatabaseEnv>>,
7691
Arc<DatabaseEnv>,
@@ -80,16 +95,7 @@ async fn main() -> eyre::Result<()> {
8095
error_storage_path: None,
8196
simulation_threads: 1,
8297
blocks_source: payload_event,
83-
order_input_config: OrderInputConfig::new(
84-
false,
85-
true,
86-
DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(),
87-
DEFAULT_INCOMING_BUNDLES_PORT,
88-
*DEFAULT_IP,
89-
DEFAULT_SERVE_MAX_CONNECTIONS,
90-
DEFAULT_RESULTS_CHANNEL_TIMEOUT,
91-
DEFAULT_INPUT_CHANNEL_BUFFER_SIZE,
92-
),
98+
order_input_config,
9399
chain_chain_spec: chain_spec.clone(),
94100
provider: create_provider_factory(
95101
Some(&RETH_DB_PATH.parse::<PathBuf>().unwrap()),
@@ -105,6 +111,8 @@ async fn main() -> eyre::Result<()> {
105111
sink_factory: Box::new(TraceBlockSinkFactory {}),
106112
builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))],
107113
run_sparse_trie_prefetcher: false,
114+
orderpool_sender,
115+
orderpool_receiver,
108116
};
109117

110118
let ctrlc = tokio::spawn(async move {

crates/rbuilder/src/live_builder/base_config.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use std::{
3232
sync::Arc,
3333
time::Duration,
3434
};
35+
use tokio::sync::mpsc;
3536
use tracing::warn;
3637

3738
use super::SlotSource;
@@ -205,11 +206,14 @@ impl BaseConfig {
205206
P: DatabaseProviderFactory<DB> + StateProviderFactory + HeaderProvider + Clone,
206207
SlotSourceType: SlotSource,
207208
{
209+
let order_input_config = OrderInputConfig::from_config(self)?;
210+
let (orderpool_sender, orderpool_receiver) =
211+
mpsc::channel(order_input_config.input_channel_buffer_size);
208212
Ok(LiveBuilder::<P, DB, SlotSourceType> {
209213
watchdog_timeout: self.watchdog_timeout(),
210214
error_storage_path: self.error_storage_path.clone(),
211215
simulation_threads: self.simulation_threads,
212-
order_input_config: OrderInputConfig::from_config(self)?,
216+
order_input_config,
213217
blocks_source: slot_source,
214218
chain_chain_spec: self.chain_spec()?,
215219
provider,
@@ -225,6 +229,9 @@ impl BaseConfig {
225229
builders: Vec::new(),
226230

227231
run_sparse_trie_prefetcher: self.root_hash_use_sparse_trie,
232+
233+
orderpool_sender,
234+
orderpool_receiver,
228235
})
229236
}
230237

crates/rbuilder/src/live_builder/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use alloy_primitives::{Address, B256};
2626
use building::BlockBuildingPool;
2727
use eyre::Context;
2828
use jsonrpsee::RpcModule;
29+
use order_input::ReplaceableOrderPoolCommand;
2930
use payload_events::MevBoostSlotData;
3031
use reth::{primitives::Header, providers::HeaderProvider};
3132
use reth_chainspec::ChainSpec;
@@ -81,6 +82,10 @@ where
8182
pub sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
8283
pub builders: Vec<Arc<dyn BlockBuildingAlgorithm<P, DB>>>,
8384
pub extra_rpc: RpcModule<()>,
85+
86+
/// Notify rbuilder of new [`ReplaceableOrderPoolCommand`] flow via this channel.
87+
pub orderpool_sender: mpsc::Sender<ReplaceableOrderPoolCommand>,
88+
pub orderpool_receiver: mpsc::Receiver<ReplaceableOrderPoolCommand>,
8489
}
8590

8691
impl<P, DB, BlocksSourceType: SlotSource> LiveBuilder<P, DB, BlocksSourceType>
@@ -119,6 +124,8 @@ where
119124
self.provider.clone(),
120125
self.extra_rpc,
121126
self.global_cancellation.clone(),
127+
self.orderpool_sender,
128+
self.orderpool_receiver,
122129
)
123130
.await?;
124131
inner_jobs_handles.push(handle);

crates/rbuilder/src/live_builder/order_input/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pub struct OrderInputConfig {
9595
/// Timeout to wait when sending to that channel (after that the ReplaceableOrderPoolCommand is lost).
9696
results_channel_timeout: Duration,
9797
/// Size of the bounded channel.
98-
input_channel_buffer_size: usize,
98+
pub input_channel_buffer_size: usize,
9999
}
100100
pub const DEFAULT_SERVE_MAX_CONNECTIONS: u32 = 4096;
101101
pub const DEFAULT_RESULTS_CHANNEL_TIMEOUT: Duration = Duration::from_millis(50);
@@ -184,6 +184,8 @@ pub async fn start_orderpool_jobs<P>(
184184
provider_factory: P,
185185
extra_rpc: RpcModule<()>,
186186
global_cancel: CancellationToken,
187+
order_sender: mpsc::Sender<ReplaceableOrderPoolCommand>,
188+
order_receiver: mpsc::Receiver<ReplaceableOrderPoolCommand>,
187189
) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)>
188190
where
189191
P: StateProviderFactory + 'static,
@@ -200,8 +202,6 @@ where
200202
orderpool: orderpool.clone(),
201203
};
202204

203-
let (order_sender, order_receiver) = mpsc::channel(config.input_channel_buffer_size);
204-
205205
let clean_job = clean_orderpool::spawn_clean_orderpool_job(
206206
config.clone(),
207207
provider_factory,

0 commit comments

Comments
 (0)