Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 7 additions & 56 deletions src/event_scanner/scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::{
task::JoinSet,
};
use tokio_stream::{Stream, wrappers::ReceiverStream};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, warn};

#[derive(Copy, Clone, Debug)]
pub(crate) enum ConsumerMode {
Expand Down Expand Up @@ -149,13 +149,8 @@ fn spawn_log_consumers_in_stream_mode<N: Network>(
Ok(message) => {
tx.send(message).await.expect("receiver dropped only if we exit this loop");
}
Err(RecvError::Closed) => {
debug!("No more block ranges to receive");
break;
}
Err(RecvError::Lagged(skipped)) => {
debug!("Channel lagged, skipped {skipped} messages");
}
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => {}
}
}

Expand Down Expand Up @@ -228,10 +223,6 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
.expect("pending blocks not supported");
// Check if in reorg recovery and past the reorg range
if reorg_ancestor.is_some_and(|a| last_log_block_num <= a) {
debug!(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure this is useful since is expected internal state ?

ancestor = reorg_ancestor,
"Reorg recovery complete, resuming normal log collection"
);
reorg_ancestor = None;
}

Expand All @@ -244,10 +235,6 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
Ok(ScannerMessage::Notification(Notification::ReorgDetected {
common_ancestor,
})) => {
debug!(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common_ancestor = common_ancestor,
"Received ReorgDetected notification"
);
// Track reorg state for proper log ordering
reorg_ancestor = Some(common_ancestor);

Expand All @@ -258,7 +245,6 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
// since logs haven't been sent yet
}
Ok(ScannerMessage::Notification(notification)) => {
debug!(notification = ?notification, "Received notification");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notification gets streamed

if !sender.try_stream(notification).await {
return;
}
Expand All @@ -272,15 +258,11 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
}

if collected.is_empty() {
debug!("No logs found");
_ = sender.try_stream(Notification::NoPastLogsFound).await;
return;
}

trace!(count = collected.len(), "Logs found");
collected.reverse(); // restore chronological order

trace!("Sending collected logs to consumer");
_ = sender.try_stream(collected).await;
});

Expand All @@ -295,13 +277,8 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
break;
}
}
Err(RecvError::Closed) => {
debug!("No more block ranges to receive");
break;
}
Err(RecvError::Lagged(skipped)) => {
debug!("Channel lagged, skipped {skipped} messages");
}
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => {}
}
}

Expand Down Expand Up @@ -370,38 +347,12 @@ fn collect_logs<T>(collected: &mut Vec<T>, logs: Vec<T>, count: usize, prepend:

async fn get_logs<N: Network>(
range: RangeInclusive<u64>,
event_filter: &EventFilter,
_event_filter: &EventFilter,
log_filter: &Filter,
provider: &RobustProvider<N>,
) -> Result<Vec<Log>, RobustProviderError> {
let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end());

match provider.get_logs(&log_filter).await {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed these are these are both streamed to the consumer so not sure how helpful logs here are

Ok(logs) => {
if logs.is_empty() {
return Ok(logs);
}

info!(
filter = %event_filter,
log_count = logs.len(),
block_range = ?range,
"found logs for event in block range"
);

Ok(logs)
}
Err(e) => {
error!(
filter = %event_filter,
error = %e,
block_range = ?range,
"failed to get logs for block range"
);

Err(e)
}
}
provider.get_logs(&log_filter).await
}

#[cfg(test)]
Expand Down
3 changes: 1 addition & 2 deletions src/event_scanner/scanner/sync/from_latest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy::{eips::BlockNumberOrTag, network::Network};

use tracing::{error, info};
use tracing::info;

use crate::{
EventScannerBuilder, ScannerError,
Expand Down Expand Up @@ -117,7 +117,6 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
match client.stream_from(latest_block + 1, self.config.block_confirmations).await {
Ok(stream) => stream,
Err(e) => {
error!(error = %e, "Error during sync mode setup");
for listener in listeners {
_ = listener.sender.try_stream(e.clone()).await;
}
Expand Down