Skip to content

Commit 086d29c

Browse files
authored
feat: Subscriptions are Created in the Parent Thread (#161)
1 parent c2cdc32 commit 086d29c

File tree

2 files changed

+42
-60
lines changed

2 files changed

+42
-60
lines changed

src/block_range_scanner.rs

Lines changed: 41 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -350,18 +350,21 @@ impl<N: Network> Service<N> {
350350
sender: mpsc::Sender<Message>,
351351
) -> Result<(), ScannerError> {
352352
let max_block_range = self.max_block_range;
353-
let provider = self.provider.clone();
354353
let latest = self.provider.get_block_number().await?;
355354

356355
// the next block returned by the underlying subscription will always be `latest + 1`,
357356
// because `latest` was already mined and subscription by definition only streams after new
358357
// blocks have been mined
359358
let range_start = (latest + 1).saturating_sub(block_confirmations);
360359

360+
let subscription = self.provider.subscribe_blocks().await?;
361+
362+
info!("WebSocket connected for live blocks");
363+
361364
tokio::spawn(async move {
362365
Self::stream_live_blocks(
363366
range_start,
364-
provider,
367+
subscription,
365368
sender,
366369
block_confirmations,
367370
max_block_range,
@@ -437,6 +440,9 @@ impl<N: Network> Service<N> {
437440

438441
let confirmed_tip = latest_block.saturating_sub(block_confirmations);
439442

443+
let subscription = self.provider.subscribe_blocks().await?;
444+
info!("Buffering live blocks");
445+
440446
// If start is beyond confirmed tip, skip historical and go straight to live
441447
if start_block > confirmed_tip {
442448
info!(
@@ -448,7 +454,7 @@ impl<N: Network> Service<N> {
448454
tokio::spawn(async move {
449455
Self::stream_live_blocks(
450456
start_block,
451-
provider,
457+
subscription,
452458
sender,
453459
block_confirmations,
454460
max_block_range,
@@ -474,7 +480,7 @@ impl<N: Network> Service<N> {
474480
tokio::spawn(async move {
475481
Self::stream_live_blocks(
476482
cutoff + 1,
477-
provider,
483+
subscription,
478484
live_block_buffer_sender,
479485
block_confirmations,
480486
max_block_range,
@@ -664,62 +670,46 @@ impl<N: Network> Service<N> {
664670

665671
async fn stream_live_blocks(
666672
mut range_start: BlockNumber,
667-
provider: RobustProvider<N>,
673+
subscription: Subscription<N::HeaderResponse>,
668674
sender: mpsc::Sender<Message>,
669675
block_confirmations: u64,
670676
max_block_range: u64,
671677
) {
672-
match Self::get_block_subscription(&provider).await {
673-
Ok(ws_stream) => {
674-
info!("WebSocket connected for live blocks");
675-
676-
// ensure we start streaming only after the expected_next_block cutoff
677-
let cutoff = range_start;
678-
let mut stream =
679-
ws_stream.into_stream().skip_while(|header| header.number() < cutoff);
680-
681-
while let Some(incoming_block) = stream.next().await {
682-
let incoming_block_num = incoming_block.number();
683-
info!(block_number = incoming_block_num, "Received block header");
684-
685-
if incoming_block_num < range_start {
686-
warn!("Reorg detected: sending forked range");
687-
if !sender.try_stream(ScannerStatus::ReorgDetected).await {
688-
return;
689-
}
678+
// ensure we start streaming only after the expected_next_block cutoff
679+
let cutoff = range_start;
680+
let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff);
690681

691-
// Calculate the confirmed block position for the incoming block
692-
let incoming_confirmed =
693-
incoming_block_num.saturating_sub(block_confirmations);
682+
while let Some(incoming_block) = stream.next().await {
683+
let incoming_block_num = incoming_block.number();
684+
info!(block_number = incoming_block_num, "Received block header");
694685

695-
// updated expected block to updated confirmed
696-
range_start = incoming_confirmed;
697-
}
686+
if incoming_block_num < range_start {
687+
warn!("Reorg detected: sending forked range");
688+
if !sender.try_stream(ScannerStatus::ReorgDetected).await {
689+
return;
690+
}
698691

699-
let confirmed = incoming_block_num.saturating_sub(block_confirmations);
700-
if confirmed >= range_start {
701-
// NOTE: Edge case when difference between range end and range start >= max
702-
// reads
703-
let range_end =
704-
confirmed.min(range_start.saturating_add(max_block_range - 1));
705-
706-
info!(
707-
range_start = range_start,
708-
range_end = range_end,
709-
"Sending live block range"
710-
);
711-
712-
if !sender.try_stream(range_start..=range_end).await {
713-
return;
714-
}
692+
// Calculate the confirmed block position for the incoming block
693+
let incoming_confirmed = incoming_block_num.saturating_sub(block_confirmations);
715694

716-
// Overflow can not realistically happen
717-
range_start = range_end + 1;
718-
}
719-
}
695+
// updated expected block to updated confirmed
696+
range_start = incoming_confirmed;
720697
}
721-
Err(e) => {
722-
_ = sender.try_stream(e).await;
698+
699+
let confirmed = incoming_block_num.saturating_sub(block_confirmations);
700+
if confirmed >= range_start {
701+
// NOTE: Edge case when difference between range end and range start >= max
702+
// reads
703+
let range_end = confirmed.min(range_start.saturating_add(max_block_range - 1));
704+
705+
info!(range_start = range_start, range_end = range_end, "Sending live block range");
706+
707+
if !sender.try_stream(range_start..=range_end).await {
708+
return;
709+
}
710+
711+
// Overflow can not realistically happen
712+
range_start = range_end + 1;
723713
}
724714
}
725715
}
@@ -765,13 +755,6 @@ impl<N: Network> Service<N> {
765755

766756
info!(processed = processed, discarded = discarded, "Processed buffered messages");
767757
}
768-
769-
async fn get_block_subscription(
770-
provider: &RobustProvider<N>,
771-
) -> Result<Subscription<N::HeaderResponse>, ScannerError> {
772-
let ws_stream = provider.subscribe_blocks().await?;
773-
Ok(ws_stream)
774-
}
775758
}
776759

777760
async fn reorg_detected<N: Network>(

src/event_scanner/filter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,8 @@ impl Display for EventFilter {
4949
if !self.events.is_empty() {
5050
content.push(format!("events: [{}]", self.events.join(", ")));
5151
}
52-
if !self.event_signatures.is_empty() {
52+
if let Some(value_or_array) = self.event_signatures.to_value_or_array() {
5353
// No guarantee the order of values returned by `Topic`
54-
let value_or_array = self.event_signatures.to_value_or_array().unwrap();
5554
let event_signatures = match value_or_array {
5655
ValueOrArray::Value(value) => format!("{value}"),
5756
ValueOrArray::Array(arr) => {

0 commit comments

Comments
 (0)