Skip to content

Commit 0f2a296

Browse files
committed
ref: rename EventListener's sender -> subscriber
1 parent c42e49d commit 0f2a296

File tree

6 files changed

+15
-15
lines changed

6 files changed

+15
-15
lines changed

src/event_scanner/listener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ use tokio::sync::mpsc::Sender;
44
#[derive(Clone)]
55
pub(crate) struct EventListener {
66
pub filter: EventFilter,
7-
pub sender: Sender<Message>,
7+
pub subscriber: Sender<Message>,
88
}

src/event_scanner/modes/common.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub fn spawn_log_consumers<N: Network>(
4848
mode: ConsumerMode,
4949
) {
5050
for listener in listeners.iter().cloned() {
51-
let EventListener { filter, sender } = listener;
51+
let EventListener { filter, subscriber } = listener;
5252

5353
let provider = provider.clone();
5454
let base_filter = Filter::from(&filter);
@@ -72,7 +72,7 @@ pub fn spawn_log_consumers<N: Network>(
7272

7373
match mode {
7474
ConsumerMode::Stream => {
75-
if !sender.try_stream(logs).await {
75+
if !subscriber.try_stream(logs).await {
7676
break;
7777
}
7878
}
@@ -90,19 +90,19 @@ pub fn spawn_log_consumers<N: Network>(
9090
}
9191
}
9292
Err(e) => {
93-
if !sender.try_stream(e).await {
93+
if !subscriber.try_stream(e).await {
9494
break;
9595
}
9696
}
9797
}
9898
}
9999
Ok(BlockRangeMessage::Error(e)) => {
100-
if !sender.try_stream(e).await {
100+
if !subscriber.try_stream(e).await {
101101
break;
102102
}
103103
}
104104
Ok(BlockRangeMessage::Status(status)) => {
105-
if !sender.try_stream(status).await {
105+
if !subscriber.try_stream(status).await {
106106
break;
107107
}
108108
}
@@ -119,7 +119,7 @@ pub fn spawn_log_consumers<N: Network>(
119119
collected.reverse(); // restore chronological order
120120
}
121121

122-
_ = sender.try_stream(collected).await;
122+
_ = subscriber.try_stream(collected).await;
123123
}
124124
});
125125
}

src/event_scanner/modes/historic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl<N: Network> HistoricEventScanner<N> {
107107
#[must_use]
108108
pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream<Message> {
109109
let (sender, receiver) = mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
110-
self.listeners.push(EventListener { filter, sender });
110+
self.listeners.push(EventListener { filter, subscriber: sender });
111111
ReceiverStream::new(receiver)
112112
}
113113

@@ -199,7 +199,7 @@ mod tests {
199199
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
200200
let mut scanner = HistoricScannerBuilder::new().connect::<Ethereum>(provider);
201201
let _stream = scanner.subscribe(EventFilter::new());
202-
let sender = &scanner.listeners[0].sender;
202+
let sender = &scanner.listeners[0].subscriber;
203203
assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES);
204204
}
205205
}

src/event_scanner/modes/latest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl<N: Network> LatestEventScanner<N> {
126126
#[must_use]
127127
pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream<Message> {
128128
let (sender, receiver) = mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
129-
self.listeners.push(EventListener { filter, sender });
129+
self.listeners.push(EventListener { filter, subscriber: sender });
130130
ReceiverStream::new(receiver)
131131
}
132132

@@ -243,7 +243,7 @@ mod tests {
243243
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
244244
let mut scanner = LatestScannerBuilder::new().connect::<Ethereum>(provider);
245245
let _stream = scanner.subscribe(EventFilter::new());
246-
let sender = &scanner.listeners[0].sender;
246+
let sender = &scanner.listeners[0].subscriber;
247247
assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES);
248248
}
249249
}

src/event_scanner/modes/live.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl<N: Network> LiveEventScanner<N> {
9898
#[must_use]
9999
pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream<Message> {
100100
let (sender, receiver) = mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
101-
self.listeners.push(EventListener { filter, sender });
101+
self.listeners.push(EventListener { filter, subscriber: sender });
102102
ReceiverStream::new(receiver)
103103
}
104104

@@ -188,7 +188,7 @@ mod tests {
188188
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
189189
let mut scanner = LiveScannerBuilder::new().connect::<Ethereum>(provider);
190190
let _stream = scanner.subscribe(EventFilter::new());
191-
let sender = &scanner.listeners[0].sender;
191+
let sender = &scanner.listeners[0].subscriber;
192192
assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES);
193193
}
194194
}

src/event_scanner/modes/sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl<N: Network> SyncEventScanner<N> {
107107
#[must_use]
108108
pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream<Message> {
109109
let (sender, receiver) = mpsc::channel::<Message>(MAX_BUFFERED_MESSAGES);
110-
self.listeners.push(EventListener { filter, sender });
110+
self.listeners.push(EventListener { filter, subscriber: sender });
111111
ReceiverStream::new(receiver)
112112
}
113113

@@ -219,7 +219,7 @@ mod tests {
219219
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
220220
let mut scanner = SyncScannerBuilder::new().connect::<Ethereum>(provider);
221221
let _stream = scanner.subscribe(EventFilter::new());
222-
let sender = &scanner.listeners[0].sender;
222+
let sender = &scanner.listeners[0].subscriber;
223223
assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES);
224224
}
225225
}

0 commit comments

Comments
 (0)