From f26e2245853e402118f9bbc6ccc8dfd75fcad9a5 Mon Sep 17 00:00:00 2001 From: Ryan O'Neill Date: Sun, 1 Mar 2026 19:44:56 -0800 Subject: [PATCH] Split subscription/mod.rs into logical submodules Extract the 1000-line monolithic subscription module into 6 focused submodules while preserving all public API paths and test compatibility: - core.rs: Subscription trait, BoxedSubscription, Tick/Timer/Channel/Stream - combinators.rs: Mapped/Filter/Take/Debounce/Throttle subscriptions - ext.rs: SubscriptionExt trait and blanket impl - batch.rs: BatchSubscription and batch() function - interval.rs: IntervalImmediateSubscription and interval_immediate() - terminal.rs: TerminalEventSubscription and terminal_events() Co-Authored-By: Claude Opus 4.6 --- src/app/subscription/batch.rs | 44 ++ src/app/subscription/combinators.rs | 366 ++++++++++ src/app/subscription/core.rs | 258 +++++++ src/app/subscription/ext.rs | 128 ++++ src/app/subscription/interval.rs | 107 +++ src/app/subscription/mod.rs | 1004 +-------------------------- src/app/subscription/terminal.rs | 112 +++ 7 files changed, 1038 insertions(+), 981 deletions(-) create mode 100644 src/app/subscription/batch.rs create mode 100644 src/app/subscription/combinators.rs create mode 100644 src/app/subscription/core.rs create mode 100644 src/app/subscription/ext.rs create mode 100644 src/app/subscription/interval.rs create mode 100644 src/app/subscription/terminal.rs diff --git a/src/app/subscription/batch.rs b/src/app/subscription/batch.rs new file mode 100644 index 0000000..638df9c --- /dev/null +++ b/src/app/subscription/batch.rs @@ -0,0 +1,44 @@ +use std::pin::Pin; + +use tokio_stream::Stream; +use tokio_util::sync::CancellationToken; + +use super::{BoxedSubscription, Subscription}; + +/// A batch of subscriptions combined into one. +pub struct BatchSubscription { + pub(crate) subscriptions: Vec>, +} + +impl BatchSubscription { + /// Creates a batch of subscriptions. + pub fn new(subscriptions: Vec>) -> Self { + Self { subscriptions } + } +} + +impl Subscription for BatchSubscription { + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use futures_util::stream::SelectAll; + use tokio_stream::StreamExt; + + let mut select_all = SelectAll::new(); + for sub in self.subscriptions { + select_all.push(sub.into_stream(cancel.clone())); + } + + Box::pin(async_stream::stream! { + while let Some(msg) = select_all.next().await { + yield msg; + } + }) + } +} + +/// Combines multiple subscriptions into one. +pub fn batch(subscriptions: Vec>) -> BatchSubscription { + BatchSubscription::new(subscriptions) +} diff --git a/src/app/subscription/combinators.rs b/src/app/subscription/combinators.rs new file mode 100644 index 0000000..4122d52 --- /dev/null +++ b/src/app/subscription/combinators.rs @@ -0,0 +1,366 @@ +use std::pin::Pin; +use std::time::Duration; + +use tokio_stream::Stream; +use tokio_util::sync::CancellationToken; + +use super::Subscription; + +/// A subscription that maps the messages of an inner subscription. +pub struct MappedSubscription +where + S: Subscription, + F: Fn(M) -> N + Send + 'static, +{ + inner: Box, + map_fn: F, + _phantom: std::marker::PhantomData<(M, N)>, +} + +impl MappedSubscription +where + S: Subscription, + F: Fn(M) -> N + Send + 'static, +{ + /// Creates a mapped subscription. + pub fn new(inner: S, map_fn: F) -> Self { + Self { + inner: Box::new(inner), + map_fn, + _phantom: std::marker::PhantomData, + } + } +} + +impl Subscription for MappedSubscription +where + M: Send + 'static, + N: Send + 'static, + F: Fn(M) -> N + Send + 'static, + S: Subscription, +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use tokio_stream::StreamExt; + let mut inner_stream = self.inner.into_stream(cancel); + let map_fn = self.map_fn; + + Box::pin(async_stream::stream! { + while let Some(m) = inner_stream.next().await { + yield (map_fn)(m); + } + }) + } +} + +/// A subscription that filters messages from an inner subscription. +/// +/// Only messages for which the predicate returns `true` are emitted. +/// +/// # Example +/// +/// ```rust +/// use envision::app::{SubscriptionExt, tick}; +/// use std::time::Duration; +/// +/// let sub = tick(Duration::from_secs(1)) +/// .with_message(|| 42i32) +/// .filter(|n| *n > 0); +/// ``` +pub struct FilterSubscription +where + S: Subscription, + P: Fn(&M) -> bool + Send + 'static, +{ + inner: Box, + predicate: P, + _phantom: std::marker::PhantomData, +} + +impl FilterSubscription +where + S: Subscription, + P: Fn(&M) -> bool + Send + 'static, +{ + /// Creates a filtered subscription. + pub fn new(inner: S, predicate: P) -> Self { + Self { + inner: Box::new(inner), + predicate, + _phantom: std::marker::PhantomData, + } + } +} + +impl Subscription for FilterSubscription +where + M: Send + 'static, + S: Subscription, + P: Fn(&M) -> bool + Send + 'static, +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use tokio_stream::StreamExt; + + let predicate = self.predicate; + let mut inner = self.inner.into_stream(cancel); + + Box::pin(async_stream::stream! { + while let Some(msg) = inner.next().await { + if (predicate)(&msg) { + yield msg; + } + } + }) + } +} + +/// A subscription that takes only the first N messages from an inner subscription. +/// +/// After N messages, the subscription ends. +/// +/// # Example +/// +/// ```rust +/// use envision::app::{SubscriptionExt, tick}; +/// use std::time::Duration; +/// +/// let sub = tick(Duration::from_secs(1)) +/// .with_message(|| "tick") +/// .take(5); +/// ``` +pub struct TakeSubscription +where + S: Subscription, +{ + inner: Box, + pub(crate) count: usize, + _phantom: std::marker::PhantomData, +} + +impl TakeSubscription +where + S: Subscription, +{ + /// Creates a take subscription. + pub fn new(inner: S, count: usize) -> Self { + Self { + inner: Box::new(inner), + count, + _phantom: std::marker::PhantomData, + } + } +} + +impl Subscription for TakeSubscription +where + M: Send + 'static, + S: Subscription, +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use tokio_stream::StreamExt; + + let count = self.count; + let mut inner = self.inner.into_stream(cancel); + + Box::pin(async_stream::stream! { + let mut taken = 0; + while taken < count { + match inner.next().await { + Some(msg) => { + taken += 1; + yield msg; + } + None => break, + } + } + }) + } +} + +/// A subscription that debounces messages from an inner subscription. +/// +/// Debouncing delays message emission until a quiet period has passed. +/// If a new message arrives before the quiet period expires, the timer resets. +/// Only the most recent message is emitted after the quiet period. +/// +/// This is useful for scenarios like search-as-you-type where you want to +/// wait until the user stops typing before triggering a search. +/// +/// # Example +/// +/// ```rust +/// use envision::app::{SubscriptionExt, tick}; +/// use std::time::Duration; +/// +/// // Only emit after 300ms of no new messages +/// let sub = tick(Duration::from_millis(100)) +/// .with_message(|| "tick") +/// .debounce(Duration::from_millis(300)); +/// ``` +pub struct DebounceSubscription +where + S: Subscription, +{ + inner: Box, + pub(crate) duration: Duration, + _phantom: std::marker::PhantomData, +} + +impl DebounceSubscription +where + S: Subscription, +{ + /// Creates a debounced subscription. + pub fn new(inner: S, duration: Duration) -> Self { + Self { + inner: Box::new(inner), + duration, + _phantom: std::marker::PhantomData, + } + } +} + +impl Subscription for DebounceSubscription +where + M: Send + 'static, + S: Subscription, +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use tokio_stream::StreamExt; + + let duration = self.duration; + let mut inner = self.inner.into_stream(cancel.clone()); + + Box::pin(async_stream::stream! { + let mut pending: Option = None; + let mut deadline: Option = None; + + loop { + tokio::select! { + biased; + + // Check for cancellation first + _ = cancel.cancelled() => { + break; + } + + // Check if deadline has passed + _ = async { + match deadline { + Some(d) => tokio::time::sleep_until(d).await, + None => std::future::pending::<()>().await, + } + } => { + if let Some(m) = pending.take() { + deadline = None; + yield m; + } + } + + // Check for new messages + msg = inner.next() => { + match msg { + Some(m) => { + pending = Some(m); + deadline = Some(tokio::time::Instant::now() + duration); + } + None => { + // Stream ended, emit any pending message + if let Some(m) = pending.take() { + yield m; + } + break; + } + } + } + } + } + }) + } +} + +/// A subscription that throttles messages from an inner subscription. +/// +/// Throttling limits the rate of message emission. At most one message +/// is emitted per duration. The first message is emitted immediately, +/// and subsequent messages are dropped until the duration has passed. +/// +/// This is useful for limiting API calls or expensive operations. +/// +/// # Example +/// +/// ```rust +/// use envision::app::{SubscriptionExt, tick}; +/// use std::time::Duration; +/// +/// // Emit at most once every 100ms +/// let sub = tick(Duration::from_millis(50)) +/// .with_message(|| "tick") +/// .throttle(Duration::from_millis(100)); +/// ``` +pub struct ThrottleSubscription +where + S: Subscription, +{ + inner: Box, + pub(crate) duration: Duration, + _phantom: std::marker::PhantomData, +} + +impl ThrottleSubscription +where + S: Subscription, +{ + /// Creates a throttled subscription. + pub fn new(inner: S, duration: Duration) -> Self { + Self { + inner: Box::new(inner), + duration, + _phantom: std::marker::PhantomData, + } + } +} + +impl Subscription for ThrottleSubscription +where + M: Send + 'static, + S: Subscription, +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use tokio_stream::StreamExt; + + let duration = self.duration; + let mut inner = self.inner.into_stream(cancel); + + Box::pin(async_stream::stream! { + let mut last_emit: Option = None; + + while let Some(msg) = inner.next().await { + let now = tokio::time::Instant::now(); + let should_emit = match last_emit { + None => true, + Some(last) => now.duration_since(last) >= duration, + }; + + if should_emit { + last_emit = Some(now); + yield msg; + } + } + }) + } +} diff --git a/src/app/subscription/core.rs b/src/app/subscription/core.rs new file mode 100644 index 0000000..76f9f58 --- /dev/null +++ b/src/app/subscription/core.rs @@ -0,0 +1,258 @@ +use std::pin::Pin; +use std::time::Duration; + +use tokio::sync::mpsc; +use tokio_stream::Stream; +use tokio_util::sync::CancellationToken; + +/// A subscription that produces messages over time. +/// +/// Subscriptions are long-running async streams that emit messages. They're +/// typically used for timers, events from external sources, or any ongoing +/// async operation that produces multiple messages. +pub trait Subscription: Send + 'static { + /// Converts this subscription into a stream of messages. + /// + /// The stream runs until it naturally ends or the cancellation token is triggered. + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>>; +} + +/// A boxed subscription. +pub type BoxedSubscription = Box>; + +/// A subscription that fires at regular intervals. +/// +/// Each tick produces a message using the provided function. +/// +/// # Example +/// +/// ```rust +/// use envision::app::TickSubscription; +/// use std::time::Duration; +/// +/// let tick = TickSubscription::new(Duration::from_secs(1), || "tick"); +/// ``` +pub struct TickSubscription +where + F: Fn() -> M + Send + 'static, +{ + pub(crate) interval: Duration, + message_fn: F, +} + +impl TickSubscription +where + F: Fn() -> M + Send + 'static, +{ + /// Creates a new tick subscription with the given interval and message function. + pub fn new(interval: Duration, message_fn: F) -> Self { + Self { + interval, + message_fn, + } + } +} + +impl M + Send + 'static> Subscription for TickSubscription { + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + let interval_duration = self.interval; + let message_fn = self.message_fn; + + Box::pin(async_stream::stream! { + let mut interval = tokio::time::interval(interval_duration); + loop { + tokio::select! { + _ = interval.tick() => { + yield (message_fn)(); + } + _ = cancel.cancelled() => { + break; + } + } + } + }) + } +} + +/// Builder for tick subscriptions with a fluent API. +pub struct TickSubscriptionBuilder { + interval: Duration, +} + +impl TickSubscriptionBuilder { + /// Creates a tick subscription builder with the given interval. + pub fn every(interval: Duration) -> Self { + Self { interval } + } + + /// Sets the message to produce on each tick. + pub fn with_message(self, message_fn: F) -> TickSubscription + where + F: Fn() -> M + Send + 'static, + { + TickSubscription::new(self.interval, message_fn) + } +} + +/// Creates a tick subscription builder. +/// +/// # Example +/// +/// ```rust +/// use envision::app::tick; +/// use std::time::Duration; +/// +/// let sub = tick(Duration::from_secs(1)).with_message(|| "tick"); +/// ``` +pub fn tick(interval: Duration) -> TickSubscriptionBuilder { + TickSubscriptionBuilder::every(interval) +} + +/// A subscription that fires once after a delay. +/// +/// # Example +/// +/// ```rust +/// use envision::app::TimerSubscription; +/// use std::time::Duration; +/// +/// let timer = TimerSubscription::after(Duration::from_secs(5), "timeout"); +/// ``` +pub struct TimerSubscription { + pub(crate) delay: Duration, + pub(crate) message: M, +} + +impl TimerSubscription { + /// Creates a timer that fires the given message after the delay. + pub fn after(delay: Duration, message: M) -> Self { + Self { delay, message } + } +} + +impl Subscription for TimerSubscription { + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + let delay = self.delay; + let message = self.message; + + Box::pin(async_stream::stream! { + tokio::select! { + _ = tokio::time::sleep(delay) => { + yield message; + } + _ = cancel.cancelled() => {} + } + }) + } +} + +/// A subscription that receives messages from a channel. +/// +/// This is useful for receiving events from external sources like +/// websockets, file watchers, or other async operations. +/// +/// # Example +/// +/// ```rust +/// use envision::app::ChannelSubscription; +/// +/// let (tx, rx) = tokio::sync::mpsc::channel::(100); +/// let subscription = ChannelSubscription::new(rx); +/// ``` +pub struct ChannelSubscription { + receiver: mpsc::Receiver, +} + +impl ChannelSubscription { + /// Creates a subscription from a channel receiver. + pub fn new(receiver: mpsc::Receiver) -> Self { + Self { receiver } + } +} + +impl Subscription for ChannelSubscription { + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + let mut receiver = self.receiver; + + Box::pin(async_stream::stream! { + loop { + tokio::select! { + msg = receiver.recv() => { + match msg { + Some(m) => yield m, + None => break, // Channel closed + } + } + _ = cancel.cancelled() => { + break; + } + } + } + }) + } +} + +/// A subscription that wraps a stream directly. +/// +/// This allows using any async stream as a subscription. +/// +/// # Example +/// +/// ```rust +/// use envision::app::StreamSubscription; +/// +/// let stream = tokio_stream::pending::(); +/// let subscription = StreamSubscription::new(stream); +/// ``` +pub struct StreamSubscription { + stream: S, +} + +impl StreamSubscription { + /// Creates a subscription from any stream. + pub fn new(stream: S) -> Self { + Self { stream } + } +} + +impl Subscription for StreamSubscription +where + M: Send + 'static, + S: Stream + Send + Unpin + 'static, +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use tokio_stream::StreamExt; + let mut inner = self.stream; + + Box::pin(async_stream::stream! { + loop { + tokio::select! { + item = inner.next() => { + match item { + Some(m) => yield m, + None => break, // Stream ended + } + } + _ = cancel.cancelled() => { + break; + } + } + } + }) + } +} diff --git a/src/app/subscription/ext.rs b/src/app/subscription/ext.rs new file mode 100644 index 0000000..722d617 --- /dev/null +++ b/src/app/subscription/ext.rs @@ -0,0 +1,128 @@ +use std::time::Duration; + +use super::combinators::{ + DebounceSubscription, FilterSubscription, MappedSubscription, TakeSubscription, + ThrottleSubscription, +}; +use super::Subscription; + +/// Extension trait for subscriptions. +/// +/// Provides fluent methods for composing and transforming subscriptions. +/// +/// # Example +/// +/// ```rust +/// use envision::app::{SubscriptionExt, tick}; +/// use std::time::Duration; +/// +/// // Create a tick subscription with filtering and limiting +/// let sub = tick(Duration::from_millis(100)) +/// .with_message(|| 42i32) +/// .filter(|n| *n > 0) +/// .take(10) +/// .throttle(Duration::from_millis(200)); +/// ``` +pub trait SubscriptionExt: Subscription + Sized { + /// Maps the messages of this subscription. + /// + /// # Example + /// + /// ```rust + /// use envision::app::{SubscriptionExt, tick}; + /// use std::time::Duration; + /// + /// let sub = tick(Duration::from_secs(1)) + /// .with_message(|| 42) + /// .map(|n| format!("value: {}", n)); + /// ``` + fn map(self, f: F) -> MappedSubscription + where + F: Fn(M) -> N + Send + 'static, + { + MappedSubscription::new(self, f) + } + + /// Filters messages from this subscription. + /// + /// Only messages for which the predicate returns `true` are emitted. + /// + /// # Example + /// + /// ```rust + /// use envision::app::{SubscriptionExt, tick}; + /// use std::time::Duration; + /// + /// let sub = tick(Duration::from_secs(1)) + /// .with_message(|| 42i32) + /// .filter(|n| *n > 0); + /// ``` + fn filter

(self, predicate: P) -> FilterSubscription + where + P: Fn(&M) -> bool + Send + 'static, + { + FilterSubscription::new(self, predicate) + } + + /// Takes only the first N messages from this subscription. + /// + /// After N messages, the subscription ends. + /// + /// # Example + /// + /// ```rust + /// use envision::app::{SubscriptionExt, tick}; + /// use std::time::Duration; + /// + /// let sub = tick(Duration::from_secs(1)) + /// .with_message(|| "tick") + /// .take(5); + /// ``` + fn take(self, count: usize) -> TakeSubscription { + TakeSubscription::new(self, count) + } + + /// Debounces messages from this subscription. + /// + /// Only emits a message after a quiet period has passed. If a new message + /// arrives before the quiet period expires, the timer resets. Only the most + /// recent message is emitted. + /// + /// # Example + /// + /// ```rust + /// use envision::app::{SubscriptionExt, tick}; + /// use std::time::Duration; + /// + /// // Only emit after 300ms of no new messages + /// let sub = tick(Duration::from_millis(100)) + /// .with_message(|| "tick") + /// .debounce(Duration::from_millis(300)); + /// ``` + fn debounce(self, duration: Duration) -> DebounceSubscription { + DebounceSubscription::new(self, duration) + } + + /// Throttles messages from this subscription. + /// + /// Limits the rate of message emission. At most one message is emitted + /// per duration. The first message passes immediately, subsequent messages + /// are dropped until the duration has passed. + /// + /// # Example + /// + /// ```rust + /// use envision::app::{SubscriptionExt, tick}; + /// use std::time::Duration; + /// + /// // Emit at most once every 100ms + /// let sub = tick(Duration::from_millis(50)) + /// .with_message(|| "tick") + /// .throttle(Duration::from_millis(100)); + /// ``` + fn throttle(self, duration: Duration) -> ThrottleSubscription { + ThrottleSubscription::new(self, duration) + } +} + +impl> SubscriptionExt for S {} diff --git a/src/app/subscription/interval.rs b/src/app/subscription/interval.rs new file mode 100644 index 0000000..732267f --- /dev/null +++ b/src/app/subscription/interval.rs @@ -0,0 +1,107 @@ +use std::pin::Pin; +use std::time::Duration; + +use tokio_stream::Stream; +use tokio_util::sync::CancellationToken; + +use super::Subscription; + +/// A subscription that fires immediately, then at regular intervals. +/// +/// Unlike [`TickSubscription`](super::TickSubscription), this fires the first message immediately +/// without waiting for the interval. +/// +/// # Example +/// +/// ```rust +/// use envision::app::IntervalImmediateSubscription; +/// use std::time::Duration; +/// +/// let sub = IntervalImmediateSubscription::new(Duration::from_secs(1), || "tick"); +/// ``` +pub struct IntervalImmediateSubscription +where + F: Fn() -> M + Send + 'static, +{ + pub(crate) interval: Duration, + message_fn: F, +} + +impl IntervalImmediateSubscription +where + F: Fn() -> M + Send + 'static, +{ + /// Creates a new interval immediate subscription. + pub fn new(interval: Duration, message_fn: F) -> Self { + Self { + interval, + message_fn, + } + } +} + +impl M + Send + 'static> Subscription + for IntervalImmediateSubscription +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + let interval_duration = self.interval; + let message_fn = self.message_fn; + + Box::pin(async_stream::stream! { + // Fire immediately + yield (message_fn)(); + + let mut interval = tokio::time::interval(interval_duration); + // Skip the first tick since we already fired + interval.tick().await; + + loop { + tokio::select! { + _ = interval.tick() => { + yield (message_fn)(); + } + _ = cancel.cancelled() => { + break; + } + } + } + }) + } +} + +/// Builder for interval immediate subscriptions with a fluent API. +pub struct IntervalImmediateBuilder { + interval: Duration, +} + +impl IntervalImmediateBuilder { + /// Creates an interval immediate subscription builder. + pub fn every(interval: Duration) -> Self { + Self { interval } + } + + /// Sets the message to produce on each tick. + pub fn with_message(self, message_fn: F) -> IntervalImmediateSubscription + where + F: Fn() -> M + Send + 'static, + { + IntervalImmediateSubscription::new(self.interval, message_fn) + } +} + +/// Creates an interval immediate subscription builder that fires immediately. +/// +/// # Example +/// +/// ```rust +/// use envision::app::interval_immediate; +/// use std::time::Duration; +/// +/// let sub = interval_immediate(Duration::from_secs(1)).with_message(|| "tick"); +/// ``` +pub fn interval_immediate(interval: Duration) -> IntervalImmediateBuilder { + IntervalImmediateBuilder::every(interval) +} diff --git a/src/app/subscription/mod.rs b/src/app/subscription/mod.rs index b0e0001..bf63d9d 100644 --- a/src/app/subscription/mod.rs +++ b/src/app/subscription/mod.rs @@ -13,988 +13,30 @@ //! let tick = TickSubscription::new(Duration::from_secs(1), || "tick"); //! ``` -use std::pin::Pin; -use std::time::Duration; +mod batch; +mod combinators; +mod core; +mod ext; +mod interval; +mod terminal; + +pub use batch::{batch, BatchSubscription}; +pub use combinators::{ + DebounceSubscription, FilterSubscription, MappedSubscription, TakeSubscription, + ThrottleSubscription, +}; +pub use core::{ + tick, BoxedSubscription, ChannelSubscription, StreamSubscription, Subscription, + TickSubscription, TickSubscriptionBuilder, TimerSubscription, +}; +pub use ext::SubscriptionExt; +pub use interval::{interval_immediate, IntervalImmediateBuilder, IntervalImmediateSubscription}; +pub use terminal::{terminal_events, TerminalEventSubscription}; -use tokio::sync::mpsc; -use tokio_stream::Stream; -use tokio_util::sync::CancellationToken; - -/// A subscription that produces messages over time. -/// -/// Subscriptions are long-running async streams that emit messages. They're -/// typically used for timers, events from external sources, or any ongoing -/// async operation that produces multiple messages. -pub trait Subscription: Send + 'static { - /// Converts this subscription into a stream of messages. - /// - /// The stream runs until it naturally ends or the cancellation token is triggered. - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>>; -} - -/// A boxed subscription. -pub type BoxedSubscription = Box>; - -/// A subscription that fires at regular intervals. -/// -/// Each tick produces a message using the provided function. -/// -/// # Example -/// -/// ```rust -/// use envision::app::TickSubscription; -/// use std::time::Duration; -/// -/// let tick = TickSubscription::new(Duration::from_secs(1), || "tick"); -/// ``` -pub struct TickSubscription -where - F: Fn() -> M + Send + 'static, -{ - interval: Duration, - message_fn: F, -} - -impl TickSubscription -where - F: Fn() -> M + Send + 'static, -{ - /// Creates a new tick subscription with the given interval and message function. - pub fn new(interval: Duration, message_fn: F) -> Self { - Self { - interval, - message_fn, - } - } -} - -impl M + Send + 'static> Subscription for TickSubscription { - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - let interval_duration = self.interval; - let message_fn = self.message_fn; - - Box::pin(async_stream::stream! { - let mut interval = tokio::time::interval(interval_duration); - loop { - tokio::select! { - _ = interval.tick() => { - yield (message_fn)(); - } - _ = cancel.cancelled() => { - break; - } - } - } - }) - } -} - -/// Builder for tick subscriptions with a fluent API. -pub struct TickSubscriptionBuilder { - interval: Duration, -} - -impl TickSubscriptionBuilder { - /// Creates a tick subscription builder with the given interval. - pub fn every(interval: Duration) -> Self { - Self { interval } - } - - /// Sets the message to produce on each tick. - pub fn with_message(self, message_fn: F) -> TickSubscription - where - F: Fn() -> M + Send + 'static, - { - TickSubscription::new(self.interval, message_fn) - } -} - -/// Creates a tick subscription builder. -/// -/// # Example -/// -/// ```rust -/// use envision::app::tick; -/// use std::time::Duration; -/// -/// let sub = tick(Duration::from_secs(1)).with_message(|| "tick"); -/// ``` -pub fn tick(interval: Duration) -> TickSubscriptionBuilder { - TickSubscriptionBuilder::every(interval) -} - -/// A subscription that fires once after a delay. -/// -/// # Example -/// -/// ```rust -/// use envision::app::TimerSubscription; -/// use std::time::Duration; -/// -/// let timer = TimerSubscription::after(Duration::from_secs(5), "timeout"); -/// ``` -pub struct TimerSubscription { - delay: Duration, - message: M, -} - -impl TimerSubscription { - /// Creates a timer that fires the given message after the delay. - pub fn after(delay: Duration, message: M) -> Self { - Self { delay, message } - } -} - -impl Subscription for TimerSubscription { - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - let delay = self.delay; - let message = self.message; - - Box::pin(async_stream::stream! { - tokio::select! { - _ = tokio::time::sleep(delay) => { - yield message; - } - _ = cancel.cancelled() => {} - } - }) - } -} - -/// A subscription that receives messages from a channel. -/// -/// This is useful for receiving events from external sources like -/// websockets, file watchers, or other async operations. -/// -/// # Example -/// -/// ```rust -/// use envision::app::ChannelSubscription; -/// -/// let (tx, rx) = tokio::sync::mpsc::channel::(100); -/// let subscription = ChannelSubscription::new(rx); -/// ``` -pub struct ChannelSubscription { - receiver: mpsc::Receiver, -} - -impl ChannelSubscription { - /// Creates a subscription from a channel receiver. - pub fn new(receiver: mpsc::Receiver) -> Self { - Self { receiver } - } -} - -impl Subscription for ChannelSubscription { - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - let mut receiver = self.receiver; - - Box::pin(async_stream::stream! { - loop { - tokio::select! { - msg = receiver.recv() => { - match msg { - Some(m) => yield m, - None => break, // Channel closed - } - } - _ = cancel.cancelled() => { - break; - } - } - } - }) - } -} - -/// A subscription that wraps a stream directly. -/// -/// This allows using any async stream as a subscription. -/// -/// # Example -/// -/// ```rust -/// use envision::app::StreamSubscription; -/// -/// let stream = tokio_stream::pending::(); -/// let subscription = StreamSubscription::new(stream); -/// ``` -pub struct StreamSubscription { - stream: S, -} - -impl StreamSubscription { - /// Creates a subscription from any stream. - pub fn new(stream: S) -> Self { - Self { stream } - } -} - -impl Subscription for StreamSubscription -where - M: Send + 'static, - S: Stream + Send + Unpin + 'static, -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use tokio_stream::StreamExt; - let mut inner = self.stream; - - Box::pin(async_stream::stream! { - loop { - tokio::select! { - item = inner.next() => { - match item { - Some(m) => yield m, - None => break, // Stream ended - } - } - _ = cancel.cancelled() => { - break; - } - } - } - }) - } -} - -/// A subscription that maps the messages of an inner subscription. -pub struct MappedSubscription -where - S: Subscription, - F: Fn(M) -> N + Send + 'static, -{ - inner: Box, - map_fn: F, - _phantom: std::marker::PhantomData<(M, N)>, -} - -impl MappedSubscription -where - S: Subscription, - F: Fn(M) -> N + Send + 'static, -{ - /// Creates a mapped subscription. - pub fn new(inner: S, map_fn: F) -> Self { - Self { - inner: Box::new(inner), - map_fn, - _phantom: std::marker::PhantomData, - } - } -} - -impl Subscription for MappedSubscription -where - M: Send + 'static, - N: Send + 'static, - F: Fn(M) -> N + Send + 'static, - S: Subscription, -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use tokio_stream::StreamExt; - let mut inner_stream = self.inner.into_stream(cancel); - let map_fn = self.map_fn; - - Box::pin(async_stream::stream! { - while let Some(m) = inner_stream.next().await { - yield (map_fn)(m); - } - }) - } -} - -/// Extension trait for subscriptions. -/// -/// Provides fluent methods for composing and transforming subscriptions. -/// -/// # Example -/// -/// ```rust -/// use envision::app::{SubscriptionExt, tick}; -/// use std::time::Duration; -/// -/// // Create a tick subscription with filtering and limiting -/// let sub = tick(Duration::from_millis(100)) -/// .with_message(|| 42i32) -/// .filter(|n| *n > 0) -/// .take(10) -/// .throttle(Duration::from_millis(200)); -/// ``` -pub trait SubscriptionExt: Subscription + Sized { - /// Maps the messages of this subscription. - /// - /// # Example - /// - /// ```rust - /// use envision::app::{SubscriptionExt, tick}; - /// use std::time::Duration; - /// - /// let sub = tick(Duration::from_secs(1)) - /// .with_message(|| 42) - /// .map(|n| format!("value: {}", n)); - /// ``` - fn map(self, f: F) -> MappedSubscription - where - F: Fn(M) -> N + Send + 'static, - { - MappedSubscription::new(self, f) - } - - /// Filters messages from this subscription. - /// - /// Only messages for which the predicate returns `true` are emitted. - /// - /// # Example - /// - /// ```rust - /// use envision::app::{SubscriptionExt, tick}; - /// use std::time::Duration; - /// - /// let sub = tick(Duration::from_secs(1)) - /// .with_message(|| 42i32) - /// .filter(|n| *n > 0); - /// ``` - fn filter

(self, predicate: P) -> FilterSubscription - where - P: Fn(&M) -> bool + Send + 'static, - { - FilterSubscription::new(self, predicate) - } - - /// Takes only the first N messages from this subscription. - /// - /// After N messages, the subscription ends. - /// - /// # Example - /// - /// ```rust - /// use envision::app::{SubscriptionExt, tick}; - /// use std::time::Duration; - /// - /// let sub = tick(Duration::from_secs(1)) - /// .with_message(|| "tick") - /// .take(5); - /// ``` - fn take(self, count: usize) -> TakeSubscription { - TakeSubscription::new(self, count) - } - - /// Debounces messages from this subscription. - /// - /// Only emits a message after a quiet period has passed. If a new message - /// arrives before the quiet period expires, the timer resets. Only the most - /// recent message is emitted. - /// - /// # Example - /// - /// ```rust - /// use envision::app::{SubscriptionExt, tick}; - /// use std::time::Duration; - /// - /// // Only emit after 300ms of no new messages - /// let sub = tick(Duration::from_millis(100)) - /// .with_message(|| "tick") - /// .debounce(Duration::from_millis(300)); - /// ``` - fn debounce(self, duration: Duration) -> DebounceSubscription { - DebounceSubscription::new(self, duration) - } - - /// Throttles messages from this subscription. - /// - /// Limits the rate of message emission. At most one message is emitted - /// per duration. The first message passes immediately, subsequent messages - /// are dropped until the duration has passed. - /// - /// # Example - /// - /// ```rust - /// use envision::app::{SubscriptionExt, tick}; - /// use std::time::Duration; - /// - /// // Emit at most once every 100ms - /// let sub = tick(Duration::from_millis(50)) - /// .with_message(|| "tick") - /// .throttle(Duration::from_millis(100)); - /// ``` - fn throttle(self, duration: Duration) -> ThrottleSubscription { - ThrottleSubscription::new(self, duration) - } -} - -impl> SubscriptionExt for S {} - -/// A batch of subscriptions combined into one. -pub struct BatchSubscription { - subscriptions: Vec>, -} - -impl BatchSubscription { - /// Creates a batch of subscriptions. - pub fn new(subscriptions: Vec>) -> Self { - Self { subscriptions } - } -} - -impl Subscription for BatchSubscription { - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use futures_util::stream::SelectAll; - use tokio_stream::StreamExt; - - let mut select_all = SelectAll::new(); - for sub in self.subscriptions { - select_all.push(sub.into_stream(cancel.clone())); - } - - Box::pin(async_stream::stream! { - while let Some(msg) = select_all.next().await { - yield msg; - } - }) - } -} - -/// Combines multiple subscriptions into one. -pub fn batch(subscriptions: Vec>) -> BatchSubscription { - BatchSubscription::new(subscriptions) -} - -/// A subscription that fires immediately, then at regular intervals. -/// -/// Unlike [`TickSubscription`], this fires the first message immediately -/// without waiting for the interval. -/// -/// # Example -/// -/// ```rust -/// use envision::app::IntervalImmediateSubscription; -/// use std::time::Duration; -/// -/// let sub = IntervalImmediateSubscription::new(Duration::from_secs(1), || "tick"); -/// ``` -pub struct IntervalImmediateSubscription -where - F: Fn() -> M + Send + 'static, -{ - interval: Duration, - message_fn: F, -} - -impl IntervalImmediateSubscription -where - F: Fn() -> M + Send + 'static, -{ - /// Creates a new interval immediate subscription. - pub fn new(interval: Duration, message_fn: F) -> Self { - Self { - interval, - message_fn, - } - } -} - -impl M + Send + 'static> Subscription - for IntervalImmediateSubscription -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - let interval_duration = self.interval; - let message_fn = self.message_fn; - - Box::pin(async_stream::stream! { - // Fire immediately - yield (message_fn)(); - - let mut interval = tokio::time::interval(interval_duration); - // Skip the first tick since we already fired - interval.tick().await; - - loop { - tokio::select! { - _ = interval.tick() => { - yield (message_fn)(); - } - _ = cancel.cancelled() => { - break; - } - } - } - }) - } -} - -/// Builder for interval immediate subscriptions with a fluent API. -pub struct IntervalImmediateBuilder { - interval: Duration, -} - -impl IntervalImmediateBuilder { - /// Creates an interval immediate subscription builder. - pub fn every(interval: Duration) -> Self { - Self { interval } - } - - /// Sets the message to produce on each tick. - pub fn with_message(self, message_fn: F) -> IntervalImmediateSubscription - where - F: Fn() -> M + Send + 'static, - { - IntervalImmediateSubscription::new(self.interval, message_fn) - } -} - -/// Creates an interval immediate subscription builder that fires immediately. -/// -/// # Example -/// -/// ```rust -/// use envision::app::interval_immediate; -/// use std::time::Duration; -/// -/// let sub = interval_immediate(Duration::from_secs(1)).with_message(|| "tick"); -/// ``` -pub fn interval_immediate(interval: Duration) -> IntervalImmediateBuilder { - IntervalImmediateBuilder::every(interval) -} - -/// A subscription that filters messages from an inner subscription. -/// -/// Only messages for which the predicate returns `true` are emitted. -/// -/// # Example -/// -/// ```rust -/// use envision::app::{SubscriptionExt, tick}; -/// use std::time::Duration; -/// -/// let sub = tick(Duration::from_secs(1)) -/// .with_message(|| 42i32) -/// .filter(|n| *n > 0); -/// ``` -pub struct FilterSubscription -where - S: Subscription, - P: Fn(&M) -> bool + Send + 'static, -{ - inner: Box, - predicate: P, - _phantom: std::marker::PhantomData, -} - -impl FilterSubscription -where - S: Subscription, - P: Fn(&M) -> bool + Send + 'static, -{ - /// Creates a filtered subscription. - pub fn new(inner: S, predicate: P) -> Self { - Self { - inner: Box::new(inner), - predicate, - _phantom: std::marker::PhantomData, - } - } -} - -impl Subscription for FilterSubscription -where - M: Send + 'static, - S: Subscription, - P: Fn(&M) -> bool + Send + 'static, -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use tokio_stream::StreamExt; - - let predicate = self.predicate; - let mut inner = self.inner.into_stream(cancel); - - Box::pin(async_stream::stream! { - while let Some(msg) = inner.next().await { - if (predicate)(&msg) { - yield msg; - } - } - }) - } -} - -/// A subscription that takes only the first N messages from an inner subscription. -/// -/// After N messages, the subscription ends. -/// -/// # Example -/// -/// ```rust -/// use envision::app::{SubscriptionExt, tick}; -/// use std::time::Duration; -/// -/// let sub = tick(Duration::from_secs(1)) -/// .with_message(|| "tick") -/// .take(5); -/// ``` -pub struct TakeSubscription -where - S: Subscription, -{ - inner: Box, - count: usize, - _phantom: std::marker::PhantomData, -} - -impl TakeSubscription -where - S: Subscription, -{ - /// Creates a take subscription. - pub fn new(inner: S, count: usize) -> Self { - Self { - inner: Box::new(inner), - count, - _phantom: std::marker::PhantomData, - } - } -} - -impl Subscription for TakeSubscription -where - M: Send + 'static, - S: Subscription, -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use tokio_stream::StreamExt; - - let count = self.count; - let mut inner = self.inner.into_stream(cancel); - - Box::pin(async_stream::stream! { - let mut taken = 0; - while taken < count { - match inner.next().await { - Some(msg) => { - taken += 1; - yield msg; - } - None => break, - } - } - }) - } -} - -/// A subscription that debounces messages from an inner subscription. -/// -/// Debouncing delays message emission until a quiet period has passed. -/// If a new message arrives before the quiet period expires, the timer resets. -/// Only the most recent message is emitted after the quiet period. -/// -/// This is useful for scenarios like search-as-you-type where you want to -/// wait until the user stops typing before triggering a search. -/// -/// # Example -/// -/// ```rust -/// use envision::app::{SubscriptionExt, tick}; -/// use std::time::Duration; -/// -/// // Only emit after 300ms of no new messages -/// let sub = tick(Duration::from_millis(100)) -/// .with_message(|| "tick") -/// .debounce(Duration::from_millis(300)); -/// ``` -pub struct DebounceSubscription -where - S: Subscription, -{ - inner: Box, - duration: Duration, - _phantom: std::marker::PhantomData, -} - -impl DebounceSubscription -where - S: Subscription, -{ - /// Creates a debounced subscription. - pub fn new(inner: S, duration: Duration) -> Self { - Self { - inner: Box::new(inner), - duration, - _phantom: std::marker::PhantomData, - } - } -} - -impl Subscription for DebounceSubscription -where - M: Send + 'static, - S: Subscription, -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use tokio_stream::StreamExt; - - let duration = self.duration; - let mut inner = self.inner.into_stream(cancel.clone()); - - Box::pin(async_stream::stream! { - let mut pending: Option = None; - let mut deadline: Option = None; - - loop { - tokio::select! { - biased; - - // Check for cancellation first - _ = cancel.cancelled() => { - break; - } - - // Check if deadline has passed - _ = async { - match deadline { - Some(d) => tokio::time::sleep_until(d).await, - None => std::future::pending::<()>().await, - } - } => { - if let Some(m) = pending.take() { - deadline = None; - yield m; - } - } - - // Check for new messages - msg = inner.next() => { - match msg { - Some(m) => { - pending = Some(m); - deadline = Some(tokio::time::Instant::now() + duration); - } - None => { - // Stream ended, emit any pending message - if let Some(m) = pending.take() { - yield m; - } - break; - } - } - } - } - } - }) - } -} - -/// A subscription that throttles messages from an inner subscription. -/// -/// Throttling limits the rate of message emission. At most one message -/// is emitted per duration. The first message is emitted immediately, -/// and subsequent messages are dropped until the duration has passed. -/// -/// This is useful for limiting API calls or expensive operations. -/// -/// # Example -/// -/// ```rust -/// use envision::app::{SubscriptionExt, tick}; -/// use std::time::Duration; -/// -/// // Emit at most once every 100ms -/// let sub = tick(Duration::from_millis(50)) -/// .with_message(|| "tick") -/// .throttle(Duration::from_millis(100)); -/// ``` -pub struct ThrottleSubscription -where - S: Subscription, -{ - inner: Box, - duration: Duration, - _phantom: std::marker::PhantomData, -} - -impl ThrottleSubscription -where - S: Subscription, -{ - /// Creates a throttled subscription. - pub fn new(inner: S, duration: Duration) -> Self { - Self { - inner: Box::new(inner), - duration, - _phantom: std::marker::PhantomData, - } - } -} - -impl Subscription for ThrottleSubscription -where - M: Send + 'static, - S: Subscription, -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use tokio_stream::StreamExt; - - let duration = self.duration; - let mut inner = self.inner.into_stream(cancel); - - Box::pin(async_stream::stream! { - let mut last_emit: Option = None; - - while let Some(msg) = inner.next().await { - let now = tokio::time::Instant::now(); - let should_emit = match last_emit { - None => true, - Some(last) => now.duration_since(last) >= duration, - }; - - if should_emit { - last_emit = Some(now); - yield msg; - } - } - }) - } -} - -/// A subscription that reads terminal input events from crossterm. -/// -/// This subscription uses crossterm's async event stream to read keyboard, -/// mouse, paste, focus, and resize events. Each event is passed through -/// a handler function that can optionally produce a message. -/// -/// # Example -/// -/// ```rust -/// use envision::app::TerminalEventSubscription; -/// use crossterm::event::{Event, KeyCode, KeyEvent}; -/// -/// let sub = TerminalEventSubscription::new(|event| { -/// match event { -/// Event::Key(KeyEvent { code: KeyCode::Char('q'), .. }) => { -/// Some("quit".to_string()) -/// } -/// Event::Key(KeyEvent { code: KeyCode::Up, .. }) => { -/// Some("up".to_string()) -/// } -/// _ => None, -/// } -/// }); -/// ``` -pub struct TerminalEventSubscription -where - F: Fn(crossterm::event::Event) -> Option + Send + 'static, -{ - event_handler: F, - _phantom: std::marker::PhantomData, -} - -impl TerminalEventSubscription -where - F: Fn(crossterm::event::Event) -> Option + Send + 'static, -{ - /// Creates a new terminal event subscription. - pub fn new(event_handler: F) -> Self { - Self { - event_handler, - _phantom: std::marker::PhantomData, - } - } -} - -impl Subscription for TerminalEventSubscription -where - M: Send + 'static, - F: Fn(crossterm::event::Event) -> Option + Send + 'static, -{ - fn into_stream( - self: Box, - cancel: CancellationToken, - ) -> Pin + Send>> { - use crossterm::event::EventStream; - use tokio_stream::StreamExt; - - let handler = self.event_handler; - - Box::pin(async_stream::stream! { - let mut reader = EventStream::new(); - loop { - tokio::select! { - maybe_event = reader.next() => { - match maybe_event { - Some(Ok(event)) => { - if let Some(msg) = (handler)(event) { - yield msg; - } - } - Some(Err(_)) => break, - None => break, - } - } - _ = cancel.cancelled() => break, - } - } - }) - } -} - -/// Creates a terminal event subscription. -/// -/// This is a convenience function for creating a [`TerminalEventSubscription`]. -/// -/// # Example -/// -/// ```rust -/// use envision::app::terminal_events; -/// use crossterm::event::{Event, KeyCode, KeyEvent}; -/// -/// let sub = terminal_events(|event| { -/// if let Event::Key(KeyEvent { code: KeyCode::Char('q'), .. }) = event { -/// Some("quit".to_string()) -/// } else { -/// None -/// } -/// }); -/// ``` -pub fn terminal_events(handler: F) -> TerminalEventSubscription -where - F: Fn(crossterm::event::Event) -> Option + Send + 'static, -{ - TerminalEventSubscription::new(handler) -} +#[cfg(test)] +pub(crate) use tokio::sync::mpsc; +#[cfg(test)] +pub(crate) use tokio_util::sync::CancellationToken; #[cfg(test)] mod tests; diff --git a/src/app/subscription/terminal.rs b/src/app/subscription/terminal.rs new file mode 100644 index 0000000..b9df1a2 --- /dev/null +++ b/src/app/subscription/terminal.rs @@ -0,0 +1,112 @@ +use std::pin::Pin; + +use tokio_stream::Stream; +use tokio_util::sync::CancellationToken; + +use super::Subscription; + +/// A subscription that reads terminal input events from crossterm. +/// +/// This subscription uses crossterm's async event stream to read keyboard, +/// mouse, paste, focus, and resize events. Each event is passed through +/// a handler function that can optionally produce a message. +/// +/// # Example +/// +/// ```rust +/// use envision::app::TerminalEventSubscription; +/// use crossterm::event::{Event, KeyCode, KeyEvent}; +/// +/// let sub = TerminalEventSubscription::new(|event| { +/// match event { +/// Event::Key(KeyEvent { code: KeyCode::Char('q'), .. }) => { +/// Some("quit".to_string()) +/// } +/// Event::Key(KeyEvent { code: KeyCode::Up, .. }) => { +/// Some("up".to_string()) +/// } +/// _ => None, +/// } +/// }); +/// ``` +pub struct TerminalEventSubscription +where + F: Fn(crossterm::event::Event) -> Option + Send + 'static, +{ + pub(crate) event_handler: F, + _phantom: std::marker::PhantomData, +} + +impl TerminalEventSubscription +where + F: Fn(crossterm::event::Event) -> Option + Send + 'static, +{ + /// Creates a new terminal event subscription. + pub fn new(event_handler: F) -> Self { + Self { + event_handler, + _phantom: std::marker::PhantomData, + } + } +} + +impl Subscription for TerminalEventSubscription +where + M: Send + 'static, + F: Fn(crossterm::event::Event) -> Option + Send + 'static, +{ + fn into_stream( + self: Box, + cancel: CancellationToken, + ) -> Pin + Send>> { + use crossterm::event::EventStream; + use tokio_stream::StreamExt; + + let handler = self.event_handler; + + Box::pin(async_stream::stream! { + let mut reader = EventStream::new(); + loop { + tokio::select! { + maybe_event = reader.next() => { + match maybe_event { + Some(Ok(event)) => { + if let Some(msg) = (handler)(event) { + yield msg; + } + } + Some(Err(_)) => break, + None => break, + } + } + _ = cancel.cancelled() => break, + } + } + }) + } +} + +/// Creates a terminal event subscription. +/// +/// This is a convenience function for creating a [`TerminalEventSubscription`]. +/// +/// # Example +/// +/// ```rust +/// use envision::app::terminal_events; +/// use crossterm::event::{Event, KeyCode, KeyEvent}; +/// +/// let sub = terminal_events(|event| { +/// if let Event::Key(KeyEvent { code: KeyCode::Char('q'), .. }) = event { +/// Some("quit".to_string()) +/// } else { +/// None +/// } +/// }); +/// ``` +pub fn terminal_events(handler: F) -> TerminalEventSubscription +where + F: Fn(crossterm::event::Event) -> Option + Send + 'static, +{ + TerminalEventSubscription::new(handler) +}