diff --git a/packages/stream_core/CHANGELOG.md b/packages/stream_core/CHANGELOG.md index 66e131d..d419105 100644 --- a/packages/stream_core/CHANGELOG.md +++ b/packages/stream_core/CHANGELOG.md @@ -1,3 +1,17 @@ +## Upcoming + +### 💥 BREAKING CHANGES + +- `SharedEmitter` and `StateEmitter` now implement `Stream` directly instead of exposing a `stream` getter +- Removed `stream` getter from `SharedEmitter` and `StateEmitter` + +### ✨ Features + +- Added `hasListener` and `isClosed` properties to `SharedEmitter` +- Added `asSharedEmitter()` and `asStateEmitter()` extension methods for read-only views +- Added `update`, `getAndUpdate`, `updateAndGet` extension methods on `MutableStateEmitter` +- Added `StreamEvent` base interface and `EventResolver` for event transformation + ## 0.3.3 ### ✨ Features diff --git a/packages/stream_core/lib/src/utils.dart b/packages/stream_core/lib/src/utils.dart index 0428cbb..ada1676 100644 --- a/packages/stream_core/lib/src/utils.dart +++ b/packages/stream_core/lib/src/utils.dart @@ -1,6 +1,7 @@ export 'utils/comparable_extensions.dart'; export 'utils/comparable_field.dart'; export 'utils/disposable.dart'; +export 'utils/event_emitter.dart'; export 'utils/lifecycle_state_provider.dart'; export 'utils/list_extensions.dart'; export 'utils/network_state_provider.dart'; diff --git a/packages/stream_core/lib/src/utils/event_emitter.dart b/packages/stream_core/lib/src/utils/event_emitter.dart new file mode 100644 index 0000000..0bda35c --- /dev/null +++ b/packages/stream_core/lib/src/utils/event_emitter.dart @@ -0,0 +1,85 @@ +import 'shared_emitter.dart'; + +/// Base interface for events that can be emitted through an [EventEmitter]. +/// +/// Implement this interface to create custom event types: +/// +/// ```dart +/// class UserLoggedIn implements StreamEvent { +/// final String userId; +/// UserLoggedIn(this.userId); +/// } +/// ``` +abstract interface class StreamEvent {} + +/// A function that inspects an [event] and optionally transforms it. +/// +/// Returns a transformed event if the resolver handles it, or `null` to +/// pass the event to the next resolver in the chain. +/// +/// ```dart +/// final resolver = (event) { +/// if (event is GenericEvent) return SpecificEvent(event.data); +/// return null; // Let other resolvers handle it +/// }; +/// ``` +typedef EventResolver = T? Function(T event); + +/// A read-only event emitter constrained to [StreamEvent] subtypes. +/// +/// Type alias for [SharedEmitter] that enforces event type safety. +/// +/// See also: +/// - [MutableEventEmitter] for the mutable variant with resolver support. +typedef EventEmitter = SharedEmitter; + +/// A mutable event emitter with resolver support for event transformation. +/// +/// Extends [SharedEmitterImpl] to apply a chain of [EventResolver]s before +/// emitting events. Each resolver inspects the event and can transform it +/// into a more specific type. The first resolver to return a non-null result +/// determines the final emitted event. +/// +/// ```dart +/// final emitter = MutableEventEmitter( +/// resolvers: [ +/// (event) => event is RawEvent ? ParsedEvent(event.data) : null, +/// ], +/// ); +/// +/// emitter.on((event) { +/// print('Received parsed: ${event.data}'); +/// }); +/// +/// emitter.emit(RawEvent(data)); // Transformed and emitted as ParsedEvent +/// ``` +/// +/// See also: +/// - [EventEmitter] for the read-only interface. +/// - [EventResolver] for the resolver function signature. +final class MutableEventEmitter + extends SharedEmitterImpl implements MutableSharedEmitter { + /// Creates a [MutableEventEmitter] with optional event [resolvers]. + /// + /// Resolvers are applied in order to each emitted event until one returns + /// a non-null result. Supports synchronous or asynchronous emission via + /// [sync], and can replay the last [replay] events to new subscribers. + MutableEventEmitter({ + super.replay = 0, + super.sync = false, + Iterable>? resolvers, + }) : _resolvers = resolvers ?? const {}; + + final Iterable> _resolvers; + + @override + void emit(T value) { + for (final resolver in _resolvers) { + final result = resolver(value); + if (result != null) return super.emit(result); + } + + // No resolver matched — emit the event as-is. + return super.emit(value); + } +} diff --git a/packages/stream_core/lib/src/utils/shared_emitter.dart b/packages/stream_core/lib/src/utils/shared_emitter.dart index 2fd8f79..0c52b06 100644 --- a/packages/stream_core/lib/src/utils/shared_emitter.dart +++ b/packages/stream_core/lib/src/utils/shared_emitter.dart @@ -2,72 +2,31 @@ import 'dart:async'; import 'package:rxdart/rxdart.dart'; -/// A read-only emitter that allows listening for events of type [T]. +import 'result.dart'; + +/// A read-only emitter that broadcasts events to multiple listeners. /// -/// Listeners can subscribe to receive events, wait for specific event types, -/// and register handlers for certain event types. The emitter supports -/// type filtering, allowing listeners to only receive events of a specific -/// subtype of [T]. +/// Similar to Kotlin's `SharedFlow`, this emitter allows multiple subscribers +/// to receive events of type [T]. Implements [Stream] to provide stream +/// functionality directly, allowing this emitter to be used anywhere a stream +/// is expected. /// /// See also: -/// - [MutableSharedEmitter] for the mutable interface that allows emitting events. -abstract interface class SharedEmitter { - /// The stream of events emitted by this emitter. - /// - /// Returns a [Stream] that emits events of type [T]. - Stream get stream; - - /// Waits for an event of type [E] to be emitted within the specified [timeLimit]. - /// - /// If such an event is emitted, it is returned. If the time limit - /// is exceeded without receiving the event, a [TimeoutException] is thrown. - /// - /// Returns a [Future] that completes with the first event of type [E]. - Future waitFor({Duration? timeLimit}); - - /// Registers a handler [onEvent] that will be invoked whenever an event of type [E] is emitted. - /// - /// Returns a [StreamSubscription] that can be used to manage the subscription. - StreamSubscription on( - void Function(E event) onEvent, - ); - - /// Returns the first element that satisfies the given [test]. - /// - /// If no such element is found and [orElse] is provided, calls [orElse] and returns its result. - /// If no element is found and [orElse] is not provided, throws a [StateError]. - /// - /// Returns a [Future] that completes with the first matching element. - Future firstWhere( - bool Function(T element) test, { - T Function()? orElse, - }); - - /// Subscribes to the emitter to receive events of type [T]. +/// - [MutableSharedEmitter] for the mutable variant that allows emitting events. +/// - [SharedEmitterExtension] for convenience methods like [on] and [waitFor]. +abstract interface class SharedEmitter implements Stream { + /// Whether this emitter is closed. /// - /// The [onData] callback is invoked for each emitted value. - /// Optional callbacks for [onError], [onDone], and [cancelOnError] can also be provided. - /// - /// Returns a [StreamSubscription] that can be used to manage the subscription. - StreamSubscription listen( - void Function(T value)? onData, { - Function? onError, - void Function()? onDone, - bool? cancelOnError, - }); + /// A closed emitter cannot emit new values. + bool get isClosed; } -/// A mutable emitter that allows emitting events of type [T] to multiple listeners. -/// -/// Listeners can subscribe to receive events, wait for specific event types, -/// and register handlers for certain event types. The emitter supports -/// type filtering, allowing listeners to only receive events of a specific -/// subtype of [T]. +/// A mutable emitter that broadcasts events to multiple listeners. /// -/// The emitter can be closed to release resources, after which no further -/// events can be emitted or listened to. +/// Extends [SharedEmitter] with the ability to emit events and close +/// the emitter. Listeners can subscribe to receive events, wait for specific +/// event types, and register handlers for certain event types. /// -/// Example usage: /// ```dart /// final emitter = MutableSharedEmitter(); /// @@ -78,138 +37,155 @@ abstract interface class SharedEmitter { /// emitter.emit(MyEvent()); /// ``` /// -/// Make sure to call [close] when the emitter is no longer needed -/// to avoid memory leaks. +/// Call [close] when this emitter is no longer needed to release resources. /// /// See also: /// - [SharedEmitter] for the read-only interface. abstract interface class MutableSharedEmitter extends SharedEmitter { - /// Creates a new instance of [MutableSharedEmitter]. + /// Creates a [MutableSharedEmitter]. /// - /// When [replay] is greater than 0, the emitter will replay the last [replay] events - /// to new subscribers. When [sync] is `true`, events are emitted synchronously. + /// Supports synchronous or asynchronous event emission via [sync], and + /// can optionally replay the last [replay] events to new subscribers. factory MutableSharedEmitter({ int replay, bool sync, }) = SharedEmitterImpl; - /// Emits the [value] to the listeners. + /// Emits the given [value] to all active listeners. void emit(T value); - /// Attempts to emit the [value] to the listeners. + /// Attempts to emit the given [value] to all active listeners. /// - /// This method is similar to [emit], but does not throw exceptions - /// if the emitter is closed. - /// - /// Returns `true` if the value was successfully emitted, `false` otherwise. + /// Returns `true` if the value was successfully emitted, or `false` if + /// emission failed for any reason (e.g., emitter is closed or an error + /// occurred). Unlike [emit], this method never throws. bool tryEmit(T value); - /// Closes the emitter and releases all resources. - /// - /// No further events can be emitted or listened to after calling this method. - /// - /// Returns a [Future] that completes when the emitter is fully closed. + /// Whether this emitter has any active listeners. + bool get hasListener; + + /// Closes this emitter, preventing any further events from being emitted. Future close(); } -/// The default implementation of [MutableSharedEmitter] using RxDart subjects. +/// Default implementation of [MutableSharedEmitter] using RxDart subjects. /// -/// This implementation supports synchronous or asynchronous event emission -/// and can optionally replay recent events to new subscribers. Uses [PublishSubject] -/// for normal operation or [ReplaySubject] when replay functionality is needed. +/// Uses [PublishSubject] for normal operation or [ReplaySubject] when replay +/// functionality is needed. Extends [StreamView] to delegate all stream +/// operations to the underlying subject. /// -/// Example: /// ```dart /// final emitter = MutableSharedEmitter(); /// -/// emitter.on((value) { +/// // Can be used directly as a stream +/// emitter.where((value) => value > 10).listen((value) { /// print('Received: $value'); /// }); /// -/// emitter.emit(42); // Will emit 42 to all listeners -/// emitter.emit(10); // Will emit 10 to all listeners +/// emitter.emit(42); /// ``` /// /// For replay functionality: +/// /// ```dart /// final replayEmitter = MutableSharedEmitter(replay: 2); /// replayEmitter.emit(1); /// replayEmitter.emit(2); /// -/// // New subscribers will immediately receive the last 2 values (1, 2) -/// replayEmitter.listen((value) => print(value)); +/// // New subscribers immediately receive the last 2 values (1, 2) +/// replayEmitter.listen(print); /// ``` /// -/// Make sure to call [close] when done to avoid memory leaks. -/// /// See also: /// - [MutableSharedEmitter] for the interface. -/// - [PublishSubject] from `rxdart` for the underlying stream implementation. -class SharedEmitterImpl implements MutableSharedEmitter { - /// Creates a new instance of [SharedEmitterImpl]. +class SharedEmitterImpl extends StreamView + implements MutableSharedEmitter { + /// Creates a [SharedEmitterImpl]. + /// + /// Supports synchronous or asynchronous event emission via [sync], and + /// can optionally replay the last [replay] events to new subscribers. SharedEmitterImpl({ int replay = 0, bool sync = false, - }) : _shared = switch (replay) { - 0 => PublishSubject(sync: sync), - > 0 => ReplaySubject(maxSize: replay, sync: sync), - _ => throw ArgumentError('Replay count cannot be negative'), - }; + }) : this._(_createSubject(replay, sync)); + + SharedEmitterImpl._(this._shared) : super(_shared); + + static Subject _createSubject(int replay, bool sync) { + return switch (replay) { + 0 => PublishSubject(sync: sync), + > 0 => ReplaySubject(maxSize: replay, sync: sync), + _ => throw ArgumentError('Replay count cannot be negative'), + }; + } final Subject _shared; @override - Stream get stream => _shared.stream; + void emit(T value) => _shared.add(value); @override - void emit(T value) => _shared.add(value); + bool tryEmit(T value) => runSafelySync(() => emit(value)).isSuccess; @override - bool tryEmit(T value) { - if (_shared.isClosed) return false; + bool get hasListener => _shared.hasListener; - emit(value); - return true; - } + @override + bool get isClosed => _shared.isClosed; @override + Future close() => _shared.close(); +} + +/// Type conversion methods for [MutableSharedEmitter]. +extension MutableSharedEmitterExtension on MutableSharedEmitter { + /// Returns a read-only view of this emitter. + /// + /// Useful for exposing this emitter to consumers who should only be able + /// to listen, not emit. + /// + /// ```dart + /// class MyService { + /// final _events = MutableSharedEmitter(); + /// + /// SharedEmitter get events => _events.asSharedEmitter(); + /// } + /// ``` + SharedEmitter asSharedEmitter() => this; +} + +/// Convenience methods for [SharedEmitter] event handling. +extension SharedEmitterExtension on SharedEmitter { + /// Waits for and returns the first event of type [E] emitted by this emitter. + /// + /// Throws a [TimeoutException] if [timeLimit] is exceeded before receiving + /// an event of type [E]. + /// + /// Throws a [StateError] if this emitter closes before an event of type [E] + /// is received. + /// + /// ```dart + /// final event = await emitter.waitFor( + /// timeLimit: Duration(seconds: 5), + /// ); + /// ``` Future waitFor({Duration? timeLimit}) { - final future = _shared.whereType().first; + final future = whereType().first; if (timeLimit == null) return future; return future.timeout(timeLimit); } - @override + /// Listens for events of type [E] and invokes [onEvent] for each one. + /// + /// ```dart + /// emitter.on((event) { + /// print('User logged in: ${event.userId}'); + /// }); + /// ``` StreamSubscription on( void Function(E event) onEvent, ) { - return _shared.whereType().listen(onEvent); - } - - @override - Future firstWhere( - bool Function(T element) test, { - T Function()? orElse, - }) { - return _shared.firstWhere(test, orElse: orElse); + return whereType().listen(onEvent); } - - @override - StreamSubscription listen( - void Function(T value)? onData, { - Function? onError, - void Function()? onDone, - bool? cancelOnError, - }) { - return _shared.listen( - onData, - onError: onError, - onDone: onDone, - cancelOnError: cancelOnError, - ); - } - - @override - Future close() => _shared.close(); } diff --git a/packages/stream_core/lib/src/utils/state_emitter.dart b/packages/stream_core/lib/src/utils/state_emitter.dart index 9c2576a..5bb0799 100644 --- a/packages/stream_core/lib/src/utils/state_emitter.dart +++ b/packages/stream_core/lib/src/utils/state_emitter.dart @@ -2,112 +2,64 @@ import 'dart:async'; import 'package:rxdart/rxdart.dart'; +import 'result.dart'; import 'shared_emitter.dart'; -/// A state-aware emitter that maintains the current value and emits state changes. +/// A read-only emitter that maintains and broadcasts the current state. /// -/// Extends [SharedEmitter] with state management capabilities, allowing access to -/// the current value, error state, and providing a [ValueStream] for reactive programming. -/// Unlike regular emitters, state emitters always have a current value (after initialization) -/// and new subscribers immediately receive the current state. +/// Similar to Kotlin's `StateFlow`, this emitter always has a current value +/// available and new subscribers immediately receive the current state. +/// +/// See also: +/// - [MutableStateEmitter] for the mutable variant. +/// - [SharedEmitter] for event emission without state. abstract interface class StateEmitter implements SharedEmitter { - /// The stream of state changes as a [ValueStream]. - /// - /// A [ValueStream] is a special stream that always has a current value available - /// and provides immediate access to the latest emitted value. - /// - /// Returns a [ValueStream] of type [T]. - @override - ValueStream get stream; - - /// The current value of the state. - /// - /// Throws a [StateError] if no value has been emitted yet or if the state is in an error state. - /// - /// Returns the current state value of type [T]. + /// The current state value. T get value; - - /// The current value of the state, or `null` if no value is available. - /// - /// This is a safe alternative to [value] that returns `null` instead of throwing - /// when no value is available. - /// - /// Returns the current state value or `null`. - T? get valueOrNull; - - /// Whether the state emitter currently has a value. - /// - /// Returns `true` if a value has been emitted and is available, `false` otherwise. - bool get hasValue; - - /// The current error of the state. - /// - /// Throws a [StateError] if the state is not in an error state. - /// - /// Returns the current error object. - Object get error; - - /// The current error of the state, or `null` if no error is present. - /// - /// This is a safe alternative to [error] that returns `null` instead of throwing - /// when no error is present. - /// - /// Returns the current error object or `null`. - Object? get errorOrNull; - - /// Whether the state emitter currently has an error. - /// - /// Returns `true` if the state is in an error condition, `false` otherwise. - bool get hasError; } -/// A mutable state emitter that allows updating the current state value. +/// A mutable emitter that maintains and broadcasts the current state. /// -/// Combines the capabilities of [StateEmitter] and [MutableSharedEmitter] to provide -/// both state management and the ability to emit new values. The emitter maintains -/// the current state and notifies listeners when the state changes. +/// Extends [StateEmitter] with the ability to update the state value. +/// Listeners are notified only when the value changes (conflation). /// -/// Example usage: /// ```dart -/// final stateEmitter = MutableStateEmitter(0); +/// final counter = MutableStateEmitter(0); /// -/// stateEmitter.listen((value) { +/// counter.listen((value) { /// print('State changed to: $value'); /// }); /// -/// stateEmitter.value = 42; // Triggers listener with value 42 -/// print(stateEmitter.value); // Prints: 42 +/// counter.value = 42; // Prints: State changed to: 42 +/// counter.value = 42; // No output (same value) /// ``` -abstract interface class MutableStateEmitter - implements StateEmitter, MutableSharedEmitter { - /// Creates a new [MutableStateEmitter] with the given [initialValue]. +/// +/// See also: +/// - [StateEmitter] for the read-only interface. +/// - [MutableStateEmitterExtension] for convenience methods. +abstract interface class MutableStateEmitter extends StateEmitter + implements MutableSharedEmitter { + /// Creates a [MutableStateEmitter] with the given [initialValue]. /// - /// When [sync] is `true`, state changes are emitted synchronously. + /// Supports synchronous or asynchronous state emission via [sync]. factory MutableStateEmitter( T initialValue, { bool sync, }) = StateEmitterImpl; - /// Sets the current state value. + /// The current state value. /// - /// This is equivalent to calling [emit] with the new value. - /// Listeners will be notified of the state change if the new value - /// is different from the current value. + /// Setting a value equal to the current value does nothing (conflation). set value(T newValue); } -/// The default implementation of [MutableStateEmitter] using a [BehaviorSubject]. -/// -/// This implementation uses RxDart's [BehaviorSubject] to maintain state and emit -/// changes to subscribers. The behavior subject ensures that new subscribers -/// immediately receive the current state value. +/// Default implementation of [MutableStateEmitter] using a [BehaviorSubject]. /// -/// The emitter only emits new values when they differ from the current value, -/// preventing unnecessary notifications for identical state updates. +/// Uses RxDart's [BehaviorSubject] to maintain state. New subscribers +/// immediately receive the current value. Only emits when the value changes. /// -/// Example: /// ```dart -/// final emitter = StateEmitterImpl('initial'); +/// final emitter = MutableStateEmitter('initial'); /// /// emitter.listen((value) { /// print('State: $value'); @@ -116,91 +68,99 @@ abstract interface class MutableStateEmitter /// emitter.value = 'updated'; // Prints: State: updated /// emitter.value = 'updated'; // No output (same value) /// ``` -class StateEmitterImpl implements MutableStateEmitter { - /// Creates a new instance of [StateEmitterImpl]. +/// +/// See also: +/// - [MutableStateEmitter] for the interface. +class StateEmitterImpl extends StreamView + implements MutableStateEmitter { + /// Creates a [StateEmitterImpl] with the given [initialValue]. + /// + /// Supports synchronous or asynchronous state emission via [sync]. StateEmitterImpl( T initialValue, { bool sync = false, - }) : _state = BehaviorSubject.seeded(initialValue, sync: sync); + }) : this._(BehaviorSubject.seeded(initialValue, sync: sync)); - final BehaviorSubject _state; + StateEmitterImpl._(this._state) : super(_state); - @override - ValueStream get stream => _state.stream; + final BehaviorSubject _state; @override T get value => _state.value; @override - set value(T newValue) => emit(newValue); - - @override - bool get hasValue => _state.hasValue; - - @override - T? get valueOrNull => _state.valueOrNull; - - @override - Object get error => _state.error; + set value(T newValue) { + if (value == newValue) return; + _state.add(newValue); + } @override - Object? get errorOrNull => _state.errorOrNull; + void emit(T newValue) => value = newValue; @override - bool get hasError => _state.hasError; + bool tryEmit(T newValue) => runSafelySync(() => emit(newValue)).isSuccess; @override - void emit(T newValue) { - if (value == newValue) return; - _state.add(newValue); - } + bool get hasListener => _state.hasListener; @override - bool tryEmit(T value) { - if (_state.isClosed) return false; - - emit(value); - return true; - } + bool get isClosed => _state.isClosed; @override - Future waitFor({Duration? timeLimit}) { - final future = _state.whereType().first; - if (timeLimit == null) return future; + Future close() => _state.close(); +} - return future.timeout(timeLimit); - } +/// Convenience methods for [MutableStateEmitter]. +extension MutableStateEmitterExtension on MutableStateEmitter { + /// Returns a read-only view of this emitter. + /// + /// Useful for exposing state to consumers who should only be able to + /// read, not modify the value. + /// + /// ```dart + /// class CounterBloc { + /// final _count = MutableStateEmitter(0); + /// + /// StateEmitter get count => _count.asStateEmitter(); + /// + /// void increment() => _count.update((c) => c + 1); + /// } + /// ``` + StateEmitter asStateEmitter() => this; - @override - StreamSubscription on( - void Function(E event) onEvent, - ) { - return _state.whereType().listen(onEvent); + /// Applies the [updater] function to the current value and sets the result. + /// + /// ```dart + /// counter.update((current) => current + 1); + /// ``` + void update(T Function(T current) updater) { + value = updater(value); } - @override - Future firstWhere( - bool Function(T element) test, { - T Function()? orElse, - }) { - return _state.firstWhere(test, orElse: orElse); + /// Applies the [updater] function to the current value, sets the result, + /// and returns the previous value. + /// + /// ```dart + /// final previous = counter.getAndUpdate((current) => current * 2); + /// print(previous); // 5 + /// print(counter.value); // 10 + /// ``` + T getAndUpdate(T Function(T current) updater) { + final previous = value; + value = updater(value); + return previous; } - @override - StreamSubscription listen( - void Function(T value)? onData, { - Function? onError, - void Function()? onDone, - bool? cancelOnError, - }) { - return _state.listen( - onData, - onError: onError, - onDone: onDone, - cancelOnError: cancelOnError, - ); + /// Applies the [updater] function to the current value, sets the result, + /// and returns the new value. + /// + /// ```dart + /// final newValue = counter.updateAndGet((current) => current * 2); + /// print(newValue); // 10 + /// ``` + T updateAndGet(T Function(T current) updater) { + final newValue = updater(value); + value = newValue; + return newValue; } - - @override - Future close() => _state.close(); } diff --git a/packages/stream_core/lib/src/ws.dart b/packages/stream_core/lib/src/ws.dart index aa0c5ee..bae54dd 100644 --- a/packages/stream_core/lib/src/ws.dart +++ b/packages/stream_core/lib/src/ws.dart @@ -7,6 +7,5 @@ export 'ws/client/reconnect/retry_strategy.dart'; export 'ws/client/stream_web_socket_client.dart'; export 'ws/client/web_socket_connection_state.dart'; export 'ws/client/web_socket_health_monitor.dart'; -export 'ws/events/event_emitter.dart'; export 'ws/events/ws_event.dart'; export 'ws/events/ws_request.dart'; diff --git a/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart b/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart index 7c07cc5..5022433 100644 --- a/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart +++ b/packages/stream_core/lib/src/ws/client/stream_web_socket_client.dart @@ -1,7 +1,6 @@ import 'dart:async'; import '../../utils.dart'; -import '../events/event_emitter.dart'; import '../events/ws_event.dart'; import '../events/ws_request.dart'; import 'engine/stream_web_socket_engine.dart'; @@ -75,8 +74,8 @@ class StreamWebSocketClient /// The event emitter for WebSocket events. /// /// Use this to listen to incoming WebSocket events with type-safe event handling. - EventEmitter get events => _events; - late final MutableEventEmitter _events; + EventEmitter get events => _events; + late final MutableEventEmitter _events; /// The current connection state of the WebSocket. /// diff --git a/packages/stream_core/lib/src/ws/events/event_emitter.dart b/packages/stream_core/lib/src/ws/events/event_emitter.dart deleted file mode 100644 index 350e3ca..0000000 --- a/packages/stream_core/lib/src/ws/events/event_emitter.dart +++ /dev/null @@ -1,68 +0,0 @@ -import '../../utils.dart'; -import 'ws_event.dart'; - -/// A function that inspects an event and optionally resolves it into a -/// more specific or refined version of the same type. -/// -/// If the resolver does not recognize or handle the event, -/// it returns `null`, allowing other resolvers to attempt resolution. -typedef EventResolver = T? Function(T event); - -/// A read-only event emitter for WebSocket events. -/// -/// Provides the same functionality as [SharedEmitter] but with type constraints -/// to ensure only [WsEvent] subtypes can be emitted. This is the read-only -/// interface used by consumers to listen for WebSocket events. -typedef EventEmitter = SharedEmitter; - -/// A mutable event emitter for WebSocket events with resolver support. -/// -/// Extends [SharedEmitterImpl] to provide event resolution capabilities for WebSocket events. -/// Before emitting an event, the emitter applies a series of [EventResolver]s to inspect -/// and potentially transform the event. The first resolver that returns a non-null result -/// determines the event that will be emitted. -/// -/// This is particularly useful for: -/// - Converting generic events to more specific event types -/// - Adding metadata or context to events -/// - Filtering or transforming events before emission -/// -/// Example usage: -/// ```dart -/// final emitter = MutableEventEmitter( -/// resolvers: [ -/// (event) => event is GenericEvent ? SpecificEvent(event.data) : null, -/// ], -/// ); -/// -/// emitter.on((event) { -/// // Handle SpecificEvent -/// }); -/// -/// emitter.emit(GenericEvent(data)); // Will be resolved to SpecificEvent -/// ``` -class MutableEventEmitter extends SharedEmitterImpl { - /// Creates a new [MutableEventEmitter] with optional event resolvers. - /// - /// When [resolvers] are provided, they will be applied to each emitted event - /// in order until one returns a non-null result. The [replay] and [sync] - /// parameters are passed to the underlying [SharedEmitterImpl]. - MutableEventEmitter({ - super.replay = 0, - super.sync = false, - Iterable>? resolvers, - }) : _resolvers = resolvers ?? const {}; - - final Iterable> _resolvers; - - @override - void emit(T value) { - for (final resolver in _resolvers) { - final result = resolver(value); - if (result != null) return super.emit(result); - } - - // No resolver matched — emit the event as-is. - return super.emit(value); - } -} diff --git a/packages/stream_core/lib/src/ws/events/ws_event.dart b/packages/stream_core/lib/src/ws/events/ws_event.dart index c93a88b..3fc66ba 100644 --- a/packages/stream_core/lib/src/ws/events/ws_event.dart +++ b/packages/stream_core/lib/src/ws/events/ws_event.dart @@ -1,13 +1,12 @@ import 'package:equatable/equatable.dart'; -abstract class WsEvent extends Equatable { +import '../../utils.dart' show StreamEvent; + +abstract class WsEvent implements StreamEvent { const WsEvent(); Object? get error => null; HealthCheckInfo? get healthCheckInfo => null; - - @override - List get props => []; } final class HealthCheckInfo extends Equatable { diff --git a/packages/stream_core/test/utils/event_emitter_test.dart b/packages/stream_core/test/utils/event_emitter_test.dart new file mode 100644 index 0000000..d304a0b --- /dev/null +++ b/packages/stream_core/test/utils/event_emitter_test.dart @@ -0,0 +1,258 @@ +import 'package:stream_core/stream_core.dart'; +import 'package:test/test.dart'; + +// Test event classes +class BaseEvent implements StreamEvent { + const BaseEvent(this.data); + + final String data; +} + +class SpecificEvent extends BaseEvent { + const SpecificEvent(super.data); +} + +class TransformedEvent extends BaseEvent { + const TransformedEvent(super.data); +} + +class AnotherEvent extends BaseEvent { + const AnotherEvent(super.data, this.value); + + final int value; +} + +void main() { + group('MutableEventEmitter', () { + group('resolver functionality', () { + test('transforms event when resolver returns non-null', () async { + final emitter = MutableEventEmitter( + resolvers: [ + (event) => TransformedEvent('transformed: ${event.data}'), + ], + ); + addTearDown(emitter.close); + + final values = []; + emitter.listen(values.add); + + emitter.emit(const BaseEvent('test')); + + await pumpEventQueue(); + + expect(values.length, 1); + expect(values.first, isA()); + expect(values.first.data, 'transformed: test'); + }); + + test('emits event as-is when no resolver matches', () async { + final emitter = MutableEventEmitter( + resolvers: [ + (event) => null, // No resolver matches + ], + ); + addTearDown(emitter.close); + + final values = []; + emitter.listen(values.add); + + const originalEvent = BaseEvent('test'); + emitter.emit(originalEvent); + + await pumpEventQueue(); + + expect(values.length, 1); + expect(values.first, originalEvent); + expect(values.first.data, 'test'); + }); + + test('emits event as-is when no resolvers provided', () async { + final emitter = MutableEventEmitter(); + addTearDown(emitter.close); + + final values = []; + emitter.listen(values.add); + + const originalEvent = BaseEvent('test'); + emitter.emit(originalEvent); + + await pumpEventQueue(); + + expect(values.length, 1); + expect(values.first, originalEvent); + }); + + test('first resolver that returns non-null wins', () async { + final emitter = MutableEventEmitter( + resolvers: [ + (event) => null, // First resolver doesn't match + // Second resolver matches + (event) => TransformedEvent('first-match: ${event.data}'), + // Should not be called + (event) => TransformedEvent('should-not-reach: ${event.data}'), + ], + ); + addTearDown(emitter.close); + + final values = []; + emitter.listen(values.add); + + emitter.emit(const BaseEvent('test')); + + await pumpEventQueue(); + + expect(values.length, 1); + expect(values.first, isA()); + expect(values.first.data, 'first-match: test'); + }); + + test('resolver can transform to different event type', () async { + final emitter = MutableEventEmitter( + resolvers: [ + (event) => AnotherEvent(event.data, event.data.length), + ], + ); + addTearDown(emitter.close); + + final values = []; + emitter.listen(values.add); + + emitter.emit(const BaseEvent('hello')); + + await pumpEventQueue(); + + expect(values.length, 1); + expect(values.first, isA()); + expect((values.first as AnotherEvent).value, 5); + }); + + test('resolver can conditionally transform based on event properties', + () async { + final emitter = MutableEventEmitter( + resolvers: [ + (event) => event.data.startsWith('transform:') + ? TransformedEvent(event.data.replaceFirst('transform:', '')) + : null, + ], + ); + addTearDown(emitter.close); + + final transformedValues = []; + final baseValues = []; + + emitter.on(transformedValues.add); + emitter.on(baseValues.add); + + emitter.emit(const BaseEvent('transform:hello')); + emitter.emit(const BaseEvent('keep-as-is')); + + await pumpEventQueue(); + + expect(transformedValues.length, 1); + expect(transformedValues.first.data, 'hello'); + // BaseEvent listeners will also receive TransformedEvent since it extends BaseEvent + expect(baseValues.length, 2); + }); + }); + + group('inherited SharedEmitter functionality', () { + test('supports multiple listeners', () async { + final emitter = MutableEventEmitter(); + addTearDown(emitter.close); + + final values1 = []; + final values2 = []; + + emitter.listen(values1.add); + emitter.listen(values2.add); + + emitter.emit(const BaseEvent('test')); + + await pumpEventQueue(); + + expect(values1.length, 1); + expect(values2.length, 1); + }); + + test('supports type filtering with on()', () async { + final emitter = MutableEventEmitter( + resolvers: [ + (event) => event is BaseEvent ? SpecificEvent(event.data) : null, + ], + ); + addTearDown(emitter.close); + + final specificEvents = []; + emitter.on(specificEvents.add); + + emitter.emit(const BaseEvent('test')); + + await pumpEventQueue(); + + expect(specificEvents.length, 1); + expect(specificEvents.first.data, 'test'); + }); + + test('supports waitFor()', () async { + final emitter = MutableEventEmitter( + resolvers: [ + (event) => event is BaseEvent ? SpecificEvent(event.data) : null, + ], + ); + addTearDown(emitter.close); + + final future = emitter.waitFor(); + + emitter.emit(const BaseEvent('test')); + + final result = await future; + + expect(result, isA()); + expect(result.data, 'test'); + }); + + test('supports replay functionality', () async { + final emitter = MutableEventEmitter(replay: 2); + addTearDown(emitter.close); + + emitter.emit(const BaseEvent('first')); + emitter.emit(const BaseEvent('second')); + emitter.emit(const BaseEvent('third')); + + final values = []; + emitter.listen(values.add); + + await pumpEventQueue(); + + expect(values.length, 2); + expect(values[0].data, 'second'); + expect(values[1].data, 'third'); + }); + + test('supports sync mode', () { + final emitter = MutableEventEmitter(sync: true); + addTearDown(emitter.close); + + final values = []; + + emitter.listen(values.add); + + emitter.emit(const BaseEvent('test')); + + expect(values.length, 1); // Synchronous - value immediately available + }); + + test( + 'tryEmit catches errors and returns false instead of throwing', + () async { + final emitter = MutableEventEmitter(); + addTearDown(emitter.close); + + await emitter.close(); + + expect(emitter.tryEmit(const BaseEvent('test')), isFalse); + }, + ); + }); + }); +} diff --git a/packages/stream_core/test/utils/shared_emitter_test.dart b/packages/stream_core/test/utils/shared_emitter_test.dart new file mode 100644 index 0000000..c0013dc --- /dev/null +++ b/packages/stream_core/test/utils/shared_emitter_test.dart @@ -0,0 +1,530 @@ +import 'dart:async'; + +import 'package:rxdart/rxdart.dart'; +import 'package:stream_core/stream_core.dart'; +import 'package:test/test.dart'; + +void main() { + group('SharedEmitter', () { + group('basic functionality', () { + test('emits values to listeners', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final values = []; + + emitter.listen(values.add); + + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + + await pumpEventQueue(); + + expect(values, [1, 2, 3]); + }); + + test('supports multiple listeners', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final values1 = []; + final values2 = []; + + emitter.listen(values1.add); + emitter.listen(values2.add); + + emitter.emit(1); + emitter.emit(2); + + await pumpEventQueue(); + + expect(values1, [1, 2]); + expect(values2, [1, 2]); + }); + + test('close stops emissions', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final values = []; + var isDone = false; + + emitter.listen( + values.add, + onDone: () => isDone = true, + ); + + emitter.emit(1); + await emitter.close(); + + await pumpEventQueue(); + + expect(values, [1]); + expect(isDone, isTrue); + }); + + test('tryEmit returns false after close', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + expect(emitter.tryEmit(1), isTrue); + + await emitter.close(); + + expect(emitter.tryEmit(2), isFalse); + }); + + test('emit throws when called after close', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + await emitter.close(); + + expect(() => emitter.emit(1), throwsStateError); + }); + + test('tryEmit catches errors and returns false instead of throwing', + () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + await emitter.close(); + + // emit would throw, but tryEmit should catch the error + expect(() => emitter.emit(1), throwsStateError); + expect(emitter.tryEmit(1), isFalse); // Does not throw + }); + }); + + group('type filtering', () { + test('on() filters by type', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final strings = []; + final ints = []; + + emitter.on(strings.add); + emitter.on(ints.add); + + emitter.emit('hello'); + emitter.emit(42); + emitter.emit('world'); + emitter.emit(100); + + await pumpEventQueue(); + + expect(strings, ['hello', 'world']); + expect(ints, [42, 100]); + }); + + test('waitFor() waits for specific type', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final futureString = emitter.waitFor(); + + emitter.emit(1); + emitter.emit(2); + emitter.emit('found'); + + final result = await futureString; + + expect(result, 'found'); + }); + + test('waitFor() times out when event not received', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + // Start waiting for a String that will never come + final future = emitter.waitFor( + timeLimit: const Duration(milliseconds: 50), + ); + + // Emit non-matching types + emitter.emit(1); + emitter.emit(2); + + // Should timeout since no String was emitted + await expectLater(future, throwsA(isA())); + }); + }); + + group('stream interface', () { + test('can be used with where()', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final evenValues = []; + + emitter.where((v) => v.isEven).listen(evenValues.add); + + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + emitter.emit(4); + + await pumpEventQueue(); + + expect(evenValues, [2, 4]); + }); + + test('can be used with map()', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final doubled = []; + + emitter.map((v) => v * 2).listen(doubled.add); + + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + + await pumpEventQueue(); + + expect(doubled, [2, 4, 6]); + }); + + test('can be used with take()', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final values = []; + + emitter.take(2).listen(values.add); + + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + + await pumpEventQueue(); + + expect(values, [1, 2]); + }); + + test('can be used with skip()', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final values = []; + + emitter.skip(2).listen(values.add); + + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + emitter.emit(4); + + await pumpEventQueue(); + + expect(values, [3, 4]); + }); + + test('can be used with distinct()', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final values = []; + + emitter.distinct().listen(values.add); + + emitter.emit(1); + emitter.emit(1); + emitter.emit(2); + emitter.emit(2); + emitter.emit(1); + + await pumpEventQueue(); + + expect(values, [1, 2, 1]); + }); + + test('can be used with first', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final futureFirst = emitter.first; + + emitter.emit(42); + emitter.emit(100); + + final result = await futureFirst; + + expect(result, 42); + }); + + test('can be used with firstWhere()', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final futureFirst = emitter.firstWhere((v) => v > 10); + + emitter.emit(1); + emitter.emit(5); + emitter.emit(15); + emitter.emit(20); + + final result = await futureFirst; + + expect(result, 15); + }); + + test('isBroadcast returns true', () { + final emitter = MutableSharedEmitter(); + + expect(emitter.isBroadcast, isTrue); + + emitter.close(); + }); + + test('emitter can be used directly as a stream', () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + final values = []; + + // Emitter implements Stream, so it can be used directly + emitter.listen(values.add); + + emitter.emit(1); + emitter.emit(2); + + await pumpEventQueue(); + + expect(values, [1, 2]); + }); + }); + + group('replay functionality', () { + test('replay emitter replays last N values to new subscribers', () async { + final emitter = MutableSharedEmitter(replay: 2); + addTearDown(emitter.close); + + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + + final values = []; + emitter.listen(values.add); + + await pumpEventQueue(); + + // Should receive last 2 values (2, 3) + expect(values, [2, 3]); + }); + + test('replay 0 does not replay values', () async { + final emitter = MutableSharedEmitter(replay: 0); + addTearDown(emitter.close); + + emitter.emit(1); + emitter.emit(2); + + final values = []; + emitter.listen(values.add); + + emitter.emit(3); + + await pumpEventQueue(); + + // Should only receive value emitted after subscribing + expect(values, [3]); + }); + + test('negative replay throws ArgumentError', () { + expect( + () => MutableSharedEmitter(replay: -1), + throwsA(isA()), + ); + }); + }); + + group('sync mode', () { + test('sync mode emits values synchronously', () { + final emitter = MutableSharedEmitter(sync: true); + addTearDown(emitter.close); + + final values = []; + + emitter.listen(values.add); + + emitter.emit(1); + expect(values, [1]); // Synchronous - value immediately available + + emitter.emit(2); + expect(values, [1, 2]); + + emitter.close(); + }); + }); + + group('properties', () { + test('hasListener returns true when listeners exist', () async { + final emitter = MutableSharedEmitter(); + + expect(emitter.hasListener, isFalse); + + final subscription = emitter.listen((_) {}); + + expect(emitter.hasListener, isTrue); + + await subscription.cancel(); + expect(emitter.hasListener, isFalse); + }); + + test('isClosed returns true after close', () async { + final emitter = MutableSharedEmitter(); + + expect(emitter.isClosed, isFalse); + + await emitter.close(); + + expect(emitter.isClosed, isTrue); + }); + }); + + group('extensions', () { + test('asSharedEmitter returns SharedEmitter type', () async { + final mutableEmitter = MutableSharedEmitter(); + addTearDown(mutableEmitter.close); + + final readOnlyEmitter = mutableEmitter.asSharedEmitter(); + + // Should be able to use as SharedEmitter + expect(readOnlyEmitter, isA>()); + + // Should be able to listen + final values = []; + readOnlyEmitter.listen(values.add); + + // Can still emit through mutableEmitter + mutableEmitter.emit(42); + await pumpEventQueue(); + expect(values, [42]); + }); + }); + }); + + group('SharedEmitter vs StateEmitter behavior', () { + test('SharedEmitter does not emit to late subscribers by default', + () async { + final emitter = MutableSharedEmitter(); + addTearDown(emitter.close); + + emitter.emit(1); + emitter.emit(2); + + final values = []; + emitter.listen(values.add); + + emitter.emit(3); + + await pumpEventQueue(); + + // Only receives value emitted after subscribing + expect(values, [3]); + }); + + test('StateEmitter always emits current value to new subscribers', + () async { + final emitter = MutableStateEmitter(0); + addTearDown(emitter.close); + + emitter.emit(1); + emitter.emit(2); + + final values = []; + emitter.listen(values.add); + + emitter.emit(3); + + await pumpEventQueue(); + + // Receives current value (2) + new value (3) + expect(values, [2, 3]); + }); + }); + + group('Emitter stream operations', () { + test('can merge two SharedEmitters', () async { + final emitter1 = MutableSharedEmitter(); + final emitter2 = MutableSharedEmitter(); + + final values = []; + final merged = Rx.merge([emitter1, emitter2]); + merged.listen(values.add); + + emitter1.emit(1); + emitter2.emit(2); + emitter1.emit(3); + emitter2.emit(4); + + await pumpEventQueue(); + + expect(values, [1, 2, 3, 4]); + + await emitter1.close(); + await emitter2.close(); + }); + + test('can merge two StateEmitters', () async { + final emitter1 = MutableStateEmitter(0); + final emitter2 = MutableStateEmitter(100); + + final values = []; + final merged = Rx.merge([emitter1, emitter2]); + merged.listen(values.add); + + await pumpEventQueue(); + + // Both initial values are emitted + expect(values, contains(0)); + expect(values, contains(100)); + + values.clear(); + + emitter1.value = 1; + emitter2.value = 101; + + await pumpEventQueue(); + + // Order may vary when merging different streams + expect(values, containsAll([1, 101])); + expect(values.length, 2); + + await emitter1.close(); + await emitter2.close(); + }); + + test('can merge SharedEmitter and StateEmitter', () async { + final sharedEmitter = MutableSharedEmitter(); + final stateEmitter = MutableStateEmitter('initial'); + + final values = []; + final merged = Rx.merge([sharedEmitter, stateEmitter]); + merged.listen(values.add); + + await pumpEventQueue(); + + // StateEmitter emits initial value immediately + expect(values, ['initial']); + + values.clear(); + + sharedEmitter.emit('from shared'); + stateEmitter.value = 'from state'; + + await pumpEventQueue(); + + // Order may vary when merging different stream types + expect(values, containsAll(['from shared', 'from state'])); + expect(values.length, 2); + + await sharedEmitter.close(); + await stateEmitter.close(); + }); + }); +} diff --git a/packages/stream_core/test/utils/state_emitter_test.dart b/packages/stream_core/test/utils/state_emitter_test.dart new file mode 100644 index 0000000..c5803f5 --- /dev/null +++ b/packages/stream_core/test/utils/state_emitter_test.dart @@ -0,0 +1,218 @@ +import 'package:stream_core/stream_core.dart'; +import 'package:test/test.dart'; + +void main() { + group('StateEmitter', () { + group('state-specific functionality', () { + test('has initial value', () { + final emitter = MutableStateEmitter(42); + + expect(emitter.value, 42); + + emitter.close(); + }); + + test('emits initial value to new subscribers', () async { + final emitter = MutableStateEmitter(42); + addTearDown(emitter.close); + + final values = []; + + emitter.listen(values.add); + + await pumpEventQueue(); + + expect(values, [42]); + }); + + test('updates value on emit', () async { + final emitter = MutableStateEmitter(0); + addTearDown(emitter.close); + + emitter.emit(42); + + expect(emitter.value, 42); + }); + + test('value setter works like emit', () async { + final emitter = MutableStateEmitter(0); + addTearDown(emitter.close); + + emitter.value = 42; + + expect(emitter.value, 42); + }); + + test('skips duplicate values (conflation)', () async { + final emitter = MutableStateEmitter(0); + addTearDown(emitter.close); + + final values = []; + + emitter.listen(values.add); + + emitter.emit(1); + emitter.emit(1); // duplicate - should be skipped + emitter.emit(2); + emitter.emit(2); // duplicate - should be skipped + + await pumpEventQueue(); + + expect(values, [0, 1, 2]); + }); + + test('value setter throws when called after close', () async { + final emitter = MutableStateEmitter(0); + + await emitter.close(); + + expect(() => emitter.value = 1, throwsStateError); + }); + + test('tryEmit returns false after close', () async { + final emitter = MutableStateEmitter(0); + + expect(emitter.tryEmit(1), isTrue); + + await emitter.close(); + + expect(emitter.tryEmit(2), isFalse); + }); + + test( + 'tryEmit catches errors and returns false instead of throwing', + () async { + final emitter = MutableStateEmitter(0); + + await emitter.close(); + + // emit would throw, but tryEmit should catch the error + expect(() => emitter.emit(1), throwsStateError); + expect(emitter.tryEmit(1), isFalse); // Does not throw + }, + ); + }); + + group('late subscribers', () { + test('late subscribers receive current value', () async { + final emitter = MutableStateEmitter(0); + addTearDown(emitter.close); + + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + + // Subscribe after emissions + final values = []; + emitter.listen(values.add); + + await pumpEventQueue(); + + // Should receive current value (3) + expect(values, [3]); + expect(emitter.value, 3); + }); + }); + + group('atomic update methods', () { + test('update applies function to current value', () async { + final emitter = MutableStateEmitter(10); + addTearDown(emitter.close); + + emitter.update((current) => current * 2); + + expect(emitter.value, 20); + }); + + test('getAndUpdate returns previous value and updates', () async { + final emitter = MutableStateEmitter(10); + addTearDown(emitter.close); + + final previous = emitter.getAndUpdate((current) => current + 5); + + expect(previous, 10); + expect(emitter.value, 15); + }); + + test('updateAndGet returns new value after update', () async { + final emitter = MutableStateEmitter(10); + addTearDown(emitter.close); + + final newValue = emitter.updateAndGet((current) => current + 5); + + expect(newValue, 15); + expect(emitter.value, 15); + }); + }); + + group('sync mode', () { + test('sync mode emits new values synchronously', () async { + final emitter = MutableStateEmitter(0, sync: true); + addTearDown(emitter.close); + + final values = []; + + emitter.listen(values.add); + + // Wait for initial value to be emitted + await pumpEventQueue(); + expect(values, [0]); + + // New emissions are synchronous + emitter.emit(1); + expect(values, [0, 1]); // Synchronous - value immediately available + + emitter.emit(2); + expect(values, [0, 1, 2]); + }); + }); + + group('properties', () { + test('hasListener returns true when listeners exist', () async { + final emitter = MutableStateEmitter(0); + addTearDown(emitter.close); + + expect(emitter.hasListener, isFalse); + + final subscription = emitter.listen((_) {}); + + expect(emitter.hasListener, isTrue); + + await subscription.cancel(); + expect(emitter.hasListener, isFalse); + }); + + test('isClosed returns true after close', () async { + final emitter = MutableStateEmitter(0); + + expect(emitter.isClosed, isFalse); + + await emitter.close(); + + expect(emitter.isClosed, isTrue); + }); + }); + + group('extensions', () { + test('asStateEmitter returns StateEmitter type', () { + final mutableEmitter = MutableStateEmitter(0); + addTearDown(mutableEmitter.close); + + final readOnlyEmitter = mutableEmitter.asStateEmitter(); + + // Should be able to use as StateEmitter + expect(readOnlyEmitter, isA>()); + + // Should be able to read the latest value + expect(readOnlyEmitter.value, 0); + + // Updates through mutableEmitter are reflected in read-only view + mutableEmitter.value = 42; + expect(readOnlyEmitter.value, 42); + + mutableEmitter.value = 100; + expect(readOnlyEmitter.value, 100); + }); + }); + }); +}