Skip to content

Commit 9c8eff3

Browse files
authored
ref: introduce ChannelState enum for explicit channel closure handling (#223)
<!-- Append the issue number --> Resolves #222 ## Description This PR addresses the issue of ambiguous channel closure handling by introducing a `ChannelState` enum with explicit `Open` and `Closed` variants that makes the code intent clear: ```rust pub enum ChannelState { /// The channel is open and the message was successfully sent. Open, /// The channel is closed (receiver dropped), no further messages can be sent. Closed, } ``` ### Before vs After **Before:** ```rust if !sender.try_stream(Notification::ReorgDetected).await { return; // Channel closed } ``` **After:** ```rust if sender.try_stream(Notification::ReorgDetected).await.is_closed() { return; } ``` ### Testing - Added comprehensive tests for `ChannelState` enum - Added tests for `TryStream` trait behavior with open/closed channels ---
1 parent 8061f2b commit 9c8eff3

File tree

5 files changed

+147
-36
lines changed

5 files changed

+147
-36
lines changed

src/block_range_scanner/common.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio_stream::StreamExt;
77
use crate::{
88
ScannerError, ScannerMessage,
99
block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
10-
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
10+
types::{ChannelState, IntoScannerResult, Notification, ScannerResult, TryStream},
1111
};
1212
use alloy::{
1313
consensus::BlockHeader,
@@ -83,7 +83,9 @@ pub(crate) async fn stream_live_blocks<N: Network>(
8383
// only once the first relevant block is received from the subscription, and not before that;
8484
// otherwise callers might perform certain operations expecting the relevant blocks to start
8585
// coming, when in fact they are not.
86-
if notify_after_first_block && !sender.try_stream(Notification::SwitchingToLive).await {
86+
if notify_after_first_block &&
87+
sender.try_stream(Notification::SwitchingToLive).await.is_closed()
88+
{
8789
return;
8890
}
8991

@@ -258,7 +260,8 @@ async fn stream_blocks_continuously<
258260
};
259261

260262
if let Some(common_ancestor) = common_ancestor {
261-
if !handle_reorg_detected(common_ancestor, stream_start, state, sender).await {
263+
if handle_reorg_detected(common_ancestor, stream_start, state, sender).await.is_closed()
264+
{
262265
return; // Channel closed
263266
}
264267
} else {
@@ -268,7 +271,7 @@ async fn stream_blocks_continuously<
268271

269272
// Stream the next batch of confirmed blocks
270273
let batch_end_num = incoming_block.saturating_sub(block_confirmations);
271-
if !stream_next_batch(
274+
if stream_next_batch(
272275
batch_end_num,
273276
state,
274277
stream_start,
@@ -278,20 +281,21 @@ async fn stream_blocks_continuously<
278281
reorg_handler,
279282
)
280283
.await
284+
.is_closed()
281285
{
282286
return; // Channel closed
283287
}
284288
}
285289
}
286290

287-
/// Handles a detected reorg by notifying and adjusting the streaming state
288-
/// Returns false if the channel is closed
291+
/// Handles a detected reorg by notifying and adjusting the streaming state.
292+
/// Returns `ChannelState::Closed` if the channel is closed, `ChannelState::Open` otherwise.
289293
async fn handle_reorg_detected<N: Network>(
290294
common_ancestor: N::BlockResponse,
291295
stream_start: BlockNumber,
292296
state: &mut LiveStreamingState<N>,
293297
sender: &mpsc::Sender<BlockScannerResult>,
294-
) -> bool {
298+
) -> ChannelState {
295299
let ancestor_num = common_ancestor.header().number();
296300

297301
info!(
@@ -300,8 +304,11 @@ async fn handle_reorg_detected<N: Network>(
300304
"Reorg detected during live streaming"
301305
);
302306

303-
if !sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await {
304-
return false;
307+
let channel_state =
308+
sender.try_stream(Notification::ReorgDetected { common_ancestor: ancestor_num }).await;
309+
310+
if channel_state.is_closed() {
311+
return ChannelState::Closed;
305312
}
306313

307314
// Reset streaming position based on common ancestor
@@ -325,11 +332,11 @@ async fn handle_reorg_detected<N: Network>(
325332
state.previous_batch_end = Some(common_ancestor);
326333
}
327334

328-
true
335+
ChannelState::Open
329336
}
330337

331338
/// Streams the next batch of blocks up to `batch_end_num`.
332-
/// Returns false if the channel is closed
339+
/// Returns `ChannelState::Closed` if the channel is closed, `ChannelState::Open` otherwise.
333340
async fn stream_next_batch<N: Network>(
334341
batch_end_num: BlockNumber,
335342
state: &mut LiveStreamingState<N>,
@@ -338,10 +345,10 @@ async fn stream_next_batch<N: Network>(
338345
sender: &mpsc::Sender<BlockScannerResult>,
339346
provider: &RobustProvider<N>,
340347
reorg_handler: &mut ReorgHandler<N>,
341-
) -> bool {
348+
) -> ChannelState {
342349
if batch_end_num < state.batch_start {
343350
// No new confirmed blocks to stream yet
344-
return true;
351+
return ChannelState::Open;
345352
}
346353

347354
// The minimum common ancestor is the block before the stream start
@@ -360,13 +367,13 @@ async fn stream_next_batch<N: Network>(
360367

361368
if state.previous_batch_end.is_none() {
362369
// Channel closed
363-
return false;
370+
return ChannelState::Closed;
364371
}
365372

366373
// SAFETY: Overflow cannot realistically happen
367374
state.batch_start = batch_end_num + 1;
368375

369-
true
376+
ChannelState::Open
370377
}
371378

372379
/// Tracks the current state of live streaming
@@ -413,7 +420,7 @@ pub(crate) async fn stream_historical_range<N: Network>(
413420

414421
for range in RangeIterator::forward(start, finalized_batch_end, max_block_range) {
415422
trace!(range_start = *range.start(), range_end = *range.end(), "Streaming finalized range");
416-
if !sender.try_stream(range).await {
423+
if sender.try_stream(range).await.is_closed() {
417424
return None; // channel closed
418425
}
419426
}
@@ -484,7 +491,7 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
484491
}
485492
};
486493

487-
if !sender.try_stream(batch).await {
494+
if sender.try_stream(batch).await.is_closed() {
488495
return None; // channel closed
489496
}
490497

@@ -503,7 +510,8 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
503510
common_ancestor = common_ancestor,
504511
"Reorg detected during historical streaming, resetting range iterator"
505512
);
506-
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
513+
if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed()
514+
{
507515
return None;
508516
}
509517
let reset_to = (common_ancestor + 1).max(min_common_ancestor);

src/block_range_scanner/rewind_handler.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
common::BlockScannerResult, range_iterator::RangeIterator, reorg_handler::ReorgHandler,
1616
ring_buffer::RingBufferCapacity,
1717
},
18-
types::TryStream,
18+
types::{ChannelState, TryStream},
1919
};
2020

2121
pub(crate) struct RewindHandler<N: Network> {
@@ -126,7 +126,7 @@ impl<N: Network> RewindHandler<N> {
126126
let mut iter = RangeIterator::reverse(from, to, max_block_range);
127127
for range in &mut iter {
128128
// stream the range regularly, i.e. from smaller block number to greater
129-
if !sender.try_stream(range).await {
129+
if sender.try_stream(range).await.is_closed() {
130130
break;
131131
}
132132

@@ -146,14 +146,15 @@ impl<N: Network> RewindHandler<N> {
146146
tip = tip.header().number(),
147147
"Reorg detected during rewind, rescanning affected blocks"
148148
);
149-
if !Self::handle_reorg_rescan(
149+
if Self::handle_reorg_rescan(
150150
&mut tip,
151151
common_ancestor,
152152
max_block_range,
153153
sender,
154154
provider,
155155
)
156156
.await
157+
.is_closed()
157158
{
158159
return;
159160
}
@@ -164,14 +165,15 @@ impl<N: Network> RewindHandler<N> {
164165

165166
/// Handles re-scanning of reorged blocks.
166167
///
167-
/// Returns `true` on success, `false` if stream closed or terminal error occurred.
168+
/// Returns `ChannelState::Closed` if stream closed or terminal error occurred,
169+
/// `ChannelState::Open` on success.
168170
async fn handle_reorg_rescan(
169171
tip: &mut N::BlockResponse,
170172
common_ancestor: N::BlockResponse,
171173
max_block_range: u64,
172174
sender: &mpsc::Sender<BlockScannerResult>,
173175
provider: &RobustProvider<N>,
174-
) -> bool {
176+
) -> ChannelState {
175177
let tip_number = tip.header().number();
176178
let common_ancestor = common_ancestor.header().number();
177179

@@ -182,8 +184,8 @@ impl<N: Network> RewindHandler<N> {
182184
"Rescanning reorged blocks"
183185
);
184186

185-
if !sender.try_stream(Notification::ReorgDetected { common_ancestor }).await {
186-
return false;
187+
if sender.try_stream(Notification::ReorgDetected { common_ancestor }).await.is_closed() {
188+
return ChannelState::Closed;
187189
}
188190

189191
// Get the new tip block (same height as original tip, but new hash)
@@ -203,7 +205,7 @@ impl<N: Network> RewindHandler<N> {
203205
);
204206
}
205207
_ = sender.try_stream(e).await;
206-
return false;
208+
return ChannelState::Closed;
207209
}
208210
};
209211

@@ -212,11 +214,11 @@ impl<N: Network> RewindHandler<N> {
212214

213215
for batch in RangeIterator::forward(rescan_from, tip_number, max_block_range) {
214216
trace!(range_start = *batch.start(), range_end = *batch.end(), "Rescanning batch");
215-
if !sender.try_stream(batch).await {
216-
return false;
217+
if sender.try_stream(batch).await.is_closed() {
218+
return ChannelState::Closed;
217219
}
218220
}
219221

220-
true
222+
ChannelState::Open
221223
}
222224
}

src/block_range_scanner/sync_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ impl<N: Network> SyncHandler<N> {
198198
}
199199
};
200200

201-
if !sender.try_stream(Notification::SwitchingToLive).await {
201+
if sender.try_stream(Notification::SwitchingToLive).await.is_closed() {
202202
debug!("Channel closed before live streaming could start");
203203
return;
204204
}

src/event_scanner/block_range_handler.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl<N: Network> StreamHandler<N> {
123123
continue;
124124
}
125125

126-
if !listener.sender.try_stream(result).await {
126+
if listener.sender.try_stream(result).await.is_closed() {
127127
return;
128128
}
129129
}
@@ -306,12 +306,12 @@ impl<N: Network> LatestEventsHandler<N> {
306306
// since logs haven't been sent yet
307307
}
308308
Ok(ScannerMessage::Notification(notification)) => {
309-
if !listener.sender.try_stream(notification).await {
309+
if listener.sender.try_stream(notification).await.is_closed() {
310310
return;
311311
}
312312
}
313313
Err(e) => {
314-
if !listener.sender.try_stream(e).await {
314+
if listener.sender.try_stream(e).await.is_closed() {
315315
return;
316316
}
317317
}

src/types.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,26 @@ use tokio::sync::mpsc;
44

55
use crate::ScannerError;
66

7+
/// Represents the state of a channel after attempting to send a message.
8+
///
9+
/// This enum provides explicit semantics for channel operations, making it clear
10+
/// whether the downstream receiver is still listening or has been dropped.
11+
#[derive(Debug, Clone, Copy)]
12+
pub(crate) enum ChannelState {
13+
/// The channel is open and the message was successfully sent.
14+
Open,
15+
/// The channel is closed (receiver dropped), no further messages can be sent.
16+
Closed,
17+
}
18+
19+
impl ChannelState {
20+
/// Returns `true` if the channel is closed.
21+
#[must_use]
22+
pub(crate) fn is_closed(self) -> bool {
23+
matches!(self, ChannelState::Closed)
24+
}
25+
}
26+
727
/// Messages streamed by the scanner to subscribers.
828
///
929
/// Each message represents either data or a notification about the scanner's state or behavior.
@@ -109,12 +129,93 @@ impl<T: Clone> IntoScannerResult<T> for Notification {
109129

110130
/// Internal helper for attempting to forward a stream item through an `mpsc` channel.
111131
pub(crate) trait TryStream<T: Clone> {
112-
async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool;
132+
async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState;
113133
}
114134

115135
impl<T: Clone + Debug> TryStream<T> for mpsc::Sender<ScannerResult<T>> {
116-
async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> bool {
136+
async fn try_stream<M: IntoScannerResult<T>>(&self, msg: M) -> ChannelState {
117137
let item = msg.into_scanner_message_result();
118-
self.send(item).await.is_ok()
138+
if self.send(item).await.is_err() {
139+
return ChannelState::Closed;
140+
}
141+
ChannelState::Open
142+
}
143+
}
144+
145+
#[cfg(test)]
146+
mod tests {
147+
use super::*;
148+
use std::ops::RangeInclusive;
149+
150+
use crate::ScannerError;
151+
152+
/// Type alias for test results.
153+
type TestResult = Result<ScannerMessage<RangeInclusive<u64>>, ScannerError>;
154+
155+
mod channel_state_enum {
156+
use super::*;
157+
158+
#[test]
159+
fn is_closed_returns_false_for_open_state() {
160+
assert!(!ChannelState::Open.is_closed());
161+
}
162+
163+
#[test]
164+
fn is_closed_returns_true_for_closed_state() {
165+
assert!(ChannelState::Closed.is_closed());
166+
}
167+
168+
#[test]
169+
fn channel_state_is_copy() {
170+
let state = ChannelState::Open;
171+
let copied = state; // Copy, not move
172+
assert!(!state.is_closed()); // Both are still valid
173+
assert!(!copied.is_closed());
174+
}
175+
176+
#[test]
177+
fn channel_state_debug_format() {
178+
assert_eq!(format!("{:?}", ChannelState::Open), "Open");
179+
assert_eq!(format!("{:?}", ChannelState::Closed), "Closed");
180+
}
181+
}
182+
183+
mod try_stream {
184+
use super::*;
185+
186+
#[tokio::test]
187+
async fn try_stream_returns_open_when_receiver_exists() {
188+
let (tx, _rx) = mpsc::channel::<TestResult>(10);
189+
190+
let result = tx.try_stream(Notification::SwitchingToLive).await;
191+
192+
assert!(!result.is_closed());
193+
}
194+
195+
#[tokio::test]
196+
async fn try_stream_returns_closed_when_receiver_dropped() {
197+
let (tx, rx) = mpsc::channel::<TestResult>(10);
198+
drop(rx); // Drop the receiver to close the channel
199+
200+
let result = tx.try_stream(Notification::SwitchingToLive).await;
201+
202+
assert!(result.is_closed());
203+
}
204+
205+
#[tokio::test]
206+
async fn try_stream_sends_message_successfully() {
207+
let (tx, mut rx) = mpsc::channel::<TestResult>(10);
208+
209+
let result = tx.try_stream(Notification::SwitchingToLive).await;
210+
211+
assert!(!result.is_closed());
212+
213+
// Verify the message was actually sent
214+
let received = rx.recv().await.unwrap();
215+
assert!(matches!(
216+
received,
217+
Ok(ScannerMessage::Notification(Notification::SwitchingToLive))
218+
));
219+
}
119220
}
120221
}

0 commit comments

Comments
 (0)