Skip to content

Commit e191ece

Browse files
committed
feat: make ProtocolStreamBuilder more flexible
Currently most of the TychoStreamBuilder configuration is leaked on ProtocolStreamBuilder. While this makes the interface slightly cleaner, it also makes the ProtocolStreamBuilder very strict regarding the data feed. This PR aims to improve this by allowing to use a raw receiver instead of TychoStreamBuilder. This clearly separate these two structs and make the whole system more flexible, for example we can test with a custom feed. This was made in a completely backward compatible way, if we like the idea we can start deprecating the old interfaces.
1 parent 68aa68d commit e191ece

File tree

2 files changed

+165
-56
lines changed

2 files changed

+165
-56
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ evm = [
145145
"dep:foundry-block-explorers", "dep:revm", "dep:revm-inspectors", "dep:alloy", "dep:reqwest"
146146
]
147147
rfq = ["dep:reqwest", "dep:async-trait", "dep:tokio-tungstenite", "dep:async-stream", "dep:http", "dep:prost"]
148+
test-utils = []
148149

149150
[profile.bench]
150151
debug = true

src/evm/stream.rs

Lines changed: 164 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,13 @@
101101
use std::{collections::HashMap, sync::Arc, time};
102102

103103
use futures::{Stream, StreamExt};
104+
use tokio::sync::mpsc::Receiver;
104105
use tokio_stream::wrappers::ReceiverStream;
105106
use tracing::{debug, error, warn};
106107
use tycho_client::{
107108
feed::{
108109
component_tracker::ComponentFilter, synchronizer::ComponentWithState, BlockHeader,
109-
SynchronizerState,
110+
BlockSynchronizerError, FeedMessage, SynchronizerState,
110111
},
111112
stream::{RetryConfiguration, StreamError, TychoStreamBuilder},
112113
};
@@ -156,13 +157,21 @@ impl StreamEndPolicy {
156157
}
157158
}
158159

160+
/// The source of feed messages for the protocol stream.
161+
enum StreamSource {
162+
/// Live connection to Tycho server
163+
Tycho(TychoStreamBuilder),
164+
/// Pre-built receiver (for testing or custom sources)
165+
Receiver(Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>),
166+
}
167+
159168
/// Builds and configures the multi protocol stream described in the [module-level docs](self).
160169
///
161170
/// See the module documentation for details on protocols, configuration options, and
162171
/// stream behavior.
163172
pub struct ProtocolStreamBuilder {
164173
decoder: TychoStreamDecoder<BlockHeader>,
165-
stream_builder: TychoStreamBuilder,
174+
source: StreamSource,
166175
stream_end_policy: StreamEndPolicy,
167176
}
168177

@@ -173,11 +182,53 @@ impl ProtocolStreamBuilder {
173182
pub fn new(tycho_url: &str, chain: Chain) -> Self {
174183
Self {
175184
decoder: TychoStreamDecoder::new(),
176-
stream_builder: TychoStreamBuilder::new(tycho_url, chain.into()),
185+
source: StreamSource::Tycho(TychoStreamBuilder::new(tycho_url, chain.into())),
177186
stream_end_policy: StreamEndPolicy::default(),
178187
}
179188
}
180189

190+
/// Creates a new builder from a pre-built receiver.
191+
///
192+
/// This is useful for testing or custom feed sources. Tycho-client specific configuration
193+
/// methods (like `block_time`, `auth_key`, etc.) will have no effect when using this
194+
/// constructor.
195+
///
196+
/// # Example
197+
/// ```ignore
198+
/// use tokio::sync::mpsc;
199+
/// use tycho_simulation::evm::stream::ProtocolStreamBuilder;
200+
/// use tycho_simulation::evm::protocol::uniswap_v2::state::UniswapV2State;
201+
/// use tycho_client::feed::component_tracker::ComponentFilter;
202+
///
203+
/// let (tx, rx) = mpsc::channel(100);
204+
/// // Send test messages via tx...
205+
///
206+
/// let stream = ProtocolStreamBuilder::from_receiver(rx)
207+
/// .exchange::<UniswapV2State>("uniswap_v2", ComponentFilter::default(), None)
208+
/// .build()
209+
/// .await?;
210+
/// ```
211+
pub fn from_receiver(
212+
rx: Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>,
213+
) -> Self {
214+
Self {
215+
decoder: TychoStreamDecoder::new(),
216+
source: StreamSource::Receiver(rx),
217+
stream_end_policy: StreamEndPolicy::default(),
218+
}
219+
}
220+
221+
/// Helper to apply configuration only when source is Tycho.
222+
/// Returns self unchanged if source is a Receiver.
223+
fn with_tycho(mut self, f: impl FnOnce(TychoStreamBuilder) -> TychoStreamBuilder) -> Self {
224+
if let StreamSource::Tycho(builder) = self.source {
225+
self.source = StreamSource::Tycho(f(builder));
226+
} else {
227+
warn!("with_tycho() called but source is not Tycho. Ignoring configuration.");
228+
}
229+
self
230+
}
231+
181232
/// Adds a specific exchange to the stream.
182233
///
183234
/// This configures the builder to include a new protocol synchronizer for `name`,
@@ -189,7 +240,8 @@ impl ProtocolStreamBuilder {
189240
/// # Parameters
190241
///
191242
/// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
192-
/// - `filter`: Defines the set of components to include in the stream.
243+
/// - `filter`: Defines the set of components to include in the stream. Ignored when using
244+
/// `from_receiver()`.
193245
/// - `filter_fn`: Optional custom filter function for client-side filtering of components not
194246
/// expressible in `filter`.
195247
///
@@ -210,9 +262,12 @@ impl ProtocolStreamBuilder {
210262
+ Send
211263
+ 'static,
212264
{
213-
self.stream_builder = self
214-
.stream_builder
215-
.exchange(name, filter);
265+
// Register with TychoStreamBuilder only for live connections
266+
if let StreamSource::Tycho(builder) = self.source {
267+
self.source = StreamSource::Tycho(builder.exchange(name, filter));
268+
}
269+
270+
// Decoder registration happens regardless of source
216271
self.decoder.register_decoder::<T>(name);
217272
if let Some(predicate) = filter_fn {
218273
self.decoder
@@ -238,7 +293,8 @@ impl ProtocolStreamBuilder {
238293
/// # Parameters
239294
///
240295
/// - `name`: The protocol or exchange name (e.g., `"uniswap_v4"`, `"vm:balancer_v2"`).
241-
/// - `filter`: Defines the set of components to include in the stream.
296+
/// - `filter`: Defines the set of components to include in the stream. Ignored when using
297+
/// `from_receiver()`.
242298
/// - `filter_fn`: Optional custom filter function for client-side filtering of components not
243299
/// expressible in `filter`.
244300
/// - `decoder_context`: The decoder context for this exchange
@@ -261,9 +317,12 @@ impl ProtocolStreamBuilder {
261317
+ Send
262318
+ 'static,
263319
{
264-
self.stream_builder = self
265-
.stream_builder
266-
.exchange(name, filter);
320+
// Register with TychoStreamBuilder only for live connections
321+
if let StreamSource::Tycho(builder) = self.source {
322+
self.source = StreamSource::Tycho(builder.exchange(name, filter));
323+
}
324+
325+
// Decoder registration happens regardless of source
267326
self.decoder
268327
.register_decoder_with_context::<T>(name, decoder_context);
269328
if let Some(predicate) = filter_fn {
@@ -281,77 +340,80 @@ impl ProtocolStreamBuilder {
281340
/// Sets the block time interval for the stream.
282341
///
283342
/// This controls how often the stream produces updates.
284-
pub fn block_time(mut self, block_time: u64) -> Self {
285-
self.stream_builder = self
286-
.stream_builder
287-
.block_time(block_time);
288-
self
343+
///
344+
/// Note: Has no effect when using `from_receiver()`.
345+
pub fn block_time(self, block_time: u64) -> Self {
346+
self.with_tycho(|b| b.block_time(block_time))
289347
}
290348

291349
/// Sets the network operation timeout (deprecated).
292350
///
293351
/// Use [`latency_buffer`] instead for controlling latency.
294352
/// This method is retained for backwards compatibility.
353+
///
354+
/// Note: Has no effect when using `from_receiver()`.
295355
#[deprecated = "Use latency_buffer instead"]
296-
pub fn timeout(mut self, timeout: u64) -> Self {
297-
self.stream_builder = self.stream_builder.timeout(timeout);
298-
self
356+
pub fn timeout(self, timeout: u64) -> Self {
357+
self.with_tycho(|b| b.timeout(timeout))
299358
}
300359

301360
/// Sets the latency buffer to aggregate same-block messages.
302361
///
303362
/// This allows the supervisor to wait a short interval for all synchronizers to emit
304363
/// before aggregating.
305-
pub fn latency_buffer(mut self, timeout: u64) -> Self {
306-
self.stream_builder = self.stream_builder.timeout(timeout);
307-
self
364+
///
365+
/// Note: Has no effect when using `from_receiver()`.
366+
pub fn latency_buffer(self, timeout: u64) -> Self {
367+
self.with_tycho(|b| b.timeout(timeout))
308368
}
309369

310370
/// Sets the maximum number of blocks a synchronizer may miss before being marked as `Stale`.
311-
pub fn max_missed_blocks(mut self, n: u64) -> Self {
312-
self.stream_builder = self.stream_builder.max_missed_blocks(n);
313-
self
371+
///
372+
/// Note: Has no effect when using `from_receiver()`.
373+
pub fn max_missed_blocks(self, n: u64) -> Self {
374+
self.with_tycho(|b| b.max_missed_blocks(n))
314375
}
315376

316377
/// Sets how long a synchronizer may take to process the initial message.
317378
///
318379
/// Useful for data-intensive protocols where startup decoding takes longer.
319-
pub fn startup_timeout(mut self, timeout: time::Duration) -> Self {
320-
self.stream_builder = self
321-
.stream_builder
322-
.startup_timeout(timeout);
323-
self
380+
///
381+
/// Note: Has no effect when using `from_receiver()`.
382+
pub fn startup_timeout(self, timeout: time::Duration) -> Self {
383+
self.with_tycho(|b| b.startup_timeout(timeout))
324384
}
325385

326386
/// Configures the stream to exclude state updates.
327387
///
328388
/// This reduces bandwidth and decoding workload if protocol state is not of
329389
/// interest (e.g. only process new tokens).
330-
pub fn no_state(mut self, no_state: bool) -> Self {
331-
self.stream_builder = self.stream_builder.no_state(no_state);
332-
self
390+
///
391+
/// Note: Has no effect when using `from_receiver()`.
392+
pub fn no_state(self, no_state: bool) -> Self {
393+
self.with_tycho(|b| b.no_state(no_state))
333394
}
334395

335396
/// Sets the API key for authenticating with the Tycho server.
336-
pub fn auth_key(mut self, auth_key: Option<String>) -> Self {
337-
self.stream_builder = self.stream_builder.auth_key(auth_key);
338-
self
397+
///
398+
/// Note: Has no effect when using `from_receiver()`.
399+
pub fn auth_key(self, auth_key: Option<String>) -> Self {
400+
self.with_tycho(|b| b.auth_key(auth_key))
339401
}
340402

341403
/// Disables TLS/ SSL for the connection, using http and ws protocols.
342404
///
343405
/// This is not recommended for production use.
344-
pub fn no_tls(mut self, no_tls: bool) -> Self {
345-
self.stream_builder = self.stream_builder.no_tls(no_tls);
346-
self
406+
///
407+
/// Note: Has no effect when using `from_receiver()`.
408+
pub fn no_tls(self, no_tls: bool) -> Self {
409+
self.with_tycho(|b| b.no_tls(no_tls))
347410
}
348411

349412
/// Disable compression for the connection.
350-
pub fn disable_compression(mut self) -> Self {
351-
self.stream_builder = self
352-
.stream_builder
353-
.disable_compression();
354-
self
413+
///
414+
/// Note: Has no effect when using `from_receiver()`.
415+
pub fn disable_compression(self) -> Self {
416+
self.with_tycho(|b| b.disable_compression())
355417
}
356418

357419
/// Sets the stream end policy.
@@ -388,19 +450,17 @@ impl ProtocolStreamBuilder {
388450
}
389451

390452
/// Configures the retry policy for websocket reconnects.
391-
pub fn websocket_retry_config(mut self, config: &RetryConfiguration) -> Self {
392-
self.stream_builder = self
393-
.stream_builder
394-
.websockets_retry_config(config);
395-
self
453+
///
454+
/// Note: Has no effect when using `from_receiver()`.
455+
pub fn websocket_retry_config(self, config: &RetryConfiguration) -> Self {
456+
self.with_tycho(|b| b.websockets_retry_config(config))
396457
}
397458

398459
/// Configures the retry policy for state synchronization.
399-
pub fn state_synchronizer_retry_config(mut self, config: &RetryConfiguration) -> Self {
400-
self.stream_builder = self
401-
.stream_builder
402-
.state_synchronizer_retry_config(config);
403-
self
460+
///
461+
/// Note: Has no effect when using `from_receiver()`.
462+
pub fn state_synchronizer_retry_config(self, config: &RetryConfiguration) -> Self {
463+
self.with_tycho(|b| b.state_synchronizer_retry_config(config))
404464
}
405465

406466
pub fn get_decoder(&self) -> &TychoStreamDecoder<BlockHeader> {
@@ -417,7 +477,15 @@ impl ProtocolStreamBuilder {
417477
initialize_hook_handlers().map_err(|e| {
418478
StreamError::SetUpError(format!("Error initializing hook handlers: {e:?}"))
419479
})?;
420-
let (_, rx) = self.stream_builder.build().await?;
480+
481+
let rx = match self.source {
482+
StreamSource::Tycho(builder) => {
483+
let (_, rx) = builder.build().await?;
484+
rx
485+
}
486+
StreamSource::Receiver(rx) => rx,
487+
};
488+
421489
let decoder = Arc::new(self.decoder);
422490

423491
let stream = Box::pin(
@@ -448,7 +516,7 @@ impl ProtocolStreamBuilder {
448516
move |msg| {
449517
let decoder = decoder.clone(); // Clone again for the async block
450518
async move {
451-
let msg = msg.expect("Save since stream ends if we receive an error");
519+
let msg = msg.expect("Safe since stream ends if we receive an error");
452520
decoder.decode(&msg).await.map_err(|e| {
453521
debug!(msg=?msg, "Decode error: {}", e);
454522
e
@@ -460,3 +528,43 @@ impl ProtocolStreamBuilder {
460528
Ok(stream)
461529
}
462530
}
531+
532+
#[cfg(any(test, feature = "test-utils"))]
533+
pub mod test_utils {
534+
use std::{path::Path, time::Duration};
535+
536+
use tokio::sync::mpsc;
537+
538+
use super::*;
539+
540+
/// Creates a mock receiver that emits messages at a fixed interval.
541+
pub fn mock_receiver(
542+
messages: Vec<FeedMessage<BlockHeader>>,
543+
interval: Duration,
544+
) -> Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>> {
545+
let (tx, rx) = mpsc::channel(messages.len().max(1));
546+
547+
tokio::spawn(async move {
548+
for msg in messages {
549+
tokio::time::sleep(interval).await;
550+
if tx.send(Ok(msg)).await.is_err() {
551+
break;
552+
}
553+
}
554+
});
555+
556+
rx
557+
}
558+
559+
/// Creates a mock receiver from a JSON fixture file.
560+
pub fn mock_receiver_from_file(
561+
path: impl AsRef<Path>,
562+
interval: Duration,
563+
) -> Result<Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>, std::io::Error>
564+
{
565+
let data = std::fs::read(path)?;
566+
let messages: Vec<FeedMessage<BlockHeader>> = serde_json::from_slice(&data)
567+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
568+
Ok(mock_receiver(messages, interval))
569+
}
570+
}

0 commit comments

Comments
 (0)