Skip to content

Commit e030a35

Browse files
committed
refactor: use new cache and quiet errors
1 parent 74eaa72 commit e030a35

File tree

2 files changed

+34
-39
lines changed

2 files changed

+34
-39
lines changed

src/tasks/cache/bundle.rs

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
//! Bundler service responsible for fetching bundles and sending them to the simulator.
22
use crate::config::BuilderConfig;
3-
use init4_bin_base::perms::SharedToken;
4-
use reqwest::{Client, Url};
5-
use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse};
3+
use init4_bin_base::perms::tx_cache::BuilderTxCache;
4+
use signet_tx_cache::{TxCacheError, types::TxCacheBundle};
65
use tokio::{
76
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
87
task::JoinHandle,
98
time::{self, Duration},
109
};
11-
use tracing::{Instrument, debug, error, trace, trace_span};
10+
use tracing::{Instrument, error, trace, trace_span};
1211

1312
/// Poll interval for the bundle poller in milliseconds.
1413
const POLL_INTERVAL_MS: u64 = 1000;
@@ -18,10 +17,10 @@ const POLL_INTERVAL_MS: u64 = 1000;
1817
pub struct BundlePoller {
1918
/// The builder configuration values.
2019
config: &'static BuilderConfig,
21-
/// Authentication module that periodically fetches and stores auth tokens.
22-
token: SharedToken,
23-
/// Holds a Reqwest client
24-
client: Client,
20+
21+
/// Client for the tx cache.
22+
tx_cache: BuilderTxCache,
23+
2524
/// Defines the interval at which the bundler polls the tx-pool for bundles.
2625
poll_interval_ms: u64,
2726
}
@@ -42,34 +41,37 @@ impl BundlePoller {
4241
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
4342
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
4443
let config = crate::config();
45-
let token = config.oauth_token();
46-
Self { config, token, client: Client::new(), poll_interval_ms }
47-
}
48-
49-
/// Fetches bundles from the transaction cache and returns them.
50-
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<TxCacheBundle>> {
51-
let bundle_url: Url = self.config.tx_pool_url.join("bundles")?;
52-
let token =
53-
self.token.secret().await.map_err(|e| eyre::eyre!("Failed to read token: {e}"))?;
54-
55-
self.client
56-
.get(bundle_url)
57-
.bearer_auth(token)
58-
.send()
59-
.await?
60-
.error_for_status()?
61-
.json()
62-
.await
63-
.map(|resp: TxCacheBundlesResponse| resp.bundles)
64-
.map_err(Into::into)
44+
let cache = signet_tx_cache::TxCache::new(config.tx_pool_url.clone());
45+
let tx_cache = BuilderTxCache::new(cache, config.oauth_token());
46+
Self { config, tx_cache, poll_interval_ms }
6547
}
6648

6749
/// Returns the poll duration as a [`Duration`].
6850
const fn poll_duration(&self) -> Duration {
6951
Duration::from_millis(self.poll_interval_ms)
7052
}
7153

72-
async fn task_future(mut self, outbound: UnboundedSender<TxCacheBundle>) {
54+
/// Checks the bundle cache for new bundles.
55+
pub async fn check_bundle_cache(&self) -> Result<Vec<TxCacheBundle>, TxCacheError> {
56+
let res = self.tx_cache.get_bundles().await;
57+
58+
match res {
59+
Ok(bundles) => {
60+
trace!(count = ?bundles.len(), "found bundles");
61+
Ok(bundles)
62+
}
63+
Err(TxCacheError::NotOurSlot) => {
64+
trace!("Not our slot to fetch bundles");
65+
Err(TxCacheError::NotOurSlot)
66+
}
67+
Err(err) => {
68+
error!(?err, "Failed to fetch bundles from tx-cache");
69+
Err(err)
70+
}
71+
}
72+
}
73+
74+
async fn task_future(self, outbound: UnboundedSender<TxCacheBundle>) {
7375
loop {
7476
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
7577

@@ -85,17 +87,10 @@ impl BundlePoller {
8587
// exit the span after the check.
8688
drop(_guard);
8789

88-
if let Ok(bundles) = self
89-
.check_bundle_cache()
90-
.instrument(span.clone())
91-
.await
92-
.inspect_err(|err| debug!(%err, "Error fetching bundles"))
93-
{
94-
let _guard = span.entered();
95-
trace!(count = ?bundles.len(), "found bundles");
90+
if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
9691
for bundle in bundles.into_iter() {
9792
if let Err(err) = outbound.send(bundle) {
98-
error!(err = ?err, "Failed to send bundle - channel is dropped");
93+
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
9994
break;
10095
}
10196
}

tests/bundle_poller_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() -> Result<()> {
77
setup_logging();
88
setup_test_config();
99

10-
let mut bundle_poller = builder::tasks::cache::BundlePoller::new();
10+
let bundle_poller = builder::tasks::cache::BundlePoller::new();
1111

1212
let _ = bundle_poller.check_bundle_cache().await?;
1313

0 commit comments

Comments
 (0)