Skip to content

Commit ea20cea

Browse files
Rename and update tests
1 parent ecf0347 commit ea20cea

File tree

6 files changed

+187
-300
lines changed

6 files changed

+187
-300
lines changed

CLAUDE.md

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,8 @@ StreamClient (Main Interface)
163163
│ └── StreamLifecycleMonitor (ProcessLifecycleOwner observer)
164164
├── StreamConnectionRecoveryEvaluator (Reconnect heuristics)
165165
├── StreamWatcher<T> (Watch registry & rewatch coordinator)
166-
│ ├── StreamSubscriptionManager<StreamRewatchListener<T>> (Rewatch listener registry)
167-
│ └── StreamSubscriptionManager<StreamClientListener> (Connection state monitoring)
166+
│ ├── StateFlow<StreamConnectionState> (Observes connection state)
167+
│ └── StreamSubscriptionManager<StreamRewatchListener<T>> (Rewatch listener registry)
168168
└── StreamSubscriptionManager<StreamClientListener> (Event distribution)
169169
```
170170

@@ -201,26 +201,75 @@ fun start(): Result<Unit>
201201
fun stop(): Result<Unit>
202202
```
203203

204+
**Factory Function:**
205+
```kotlin
206+
fun <T> StreamWatcher(
207+
scope: CoroutineScope,
208+
logger: StreamLogger,
209+
connectionState: StateFlow<StreamConnectionState>
210+
): StreamWatcher<T>
211+
```
212+
213+
The factory creates the watcher with an internal `StreamSubscriptionManager<StreamRewatchListener<T>>` automatically - product SDKs don't need to manage this.
214+
204215
**Usage Flow:**
205-
1. Product SDK watches a channel: `watcher.watch("messaging:general")` (for `StreamWatcher<String>`)
206-
2. Watcher adds the identifier to an internal `ConcurrentHashMap<T, Unit>` registry
207-
3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamRewatchListener { ids, connectionId -> ... })`
208-
4. Call `watcher.start()` to begin monitoring connection state changes
216+
1. Create watcher with connection state flow: `val watcher = StreamWatcher<String>(scope, logger, streamClient.connectionState)`
217+
2. Product SDK registers a rewatch listener: `watcher.subscribe(StreamRewatchListener { ids, connectionId -> ... })`
218+
3. Call `watcher.start()` to begin monitoring connection state changes
219+
4. Product SDK watches channels: `watcher.watch("messaging:general")`
209220
5. On `StreamConnectionState.Connected` event, watcher invokes all listeners with complete identifier list AND the current connectionId
210221
6. Product SDK re-establishes server-side watches for each identifier using the provided connectionId
211222

223+
**Complete Example:**
224+
```kotlin
225+
// Create watcher for channel IDs
226+
val watcher = StreamWatcher<String>(
227+
scope = CoroutineScope(SupervisorJob() + Dispatchers.Default),
228+
logger = logger,
229+
connectionState = streamClient.connectionState
230+
)
231+
232+
// Register rewatch listener
233+
watcher.subscribe(
234+
listener = StreamRewatchListener { channelIds, connectionId ->
235+
// Re-establish server-side watches after reconnection
236+
channelIds.forEach { channelId ->
237+
channelApi.watch(channelId, connectionId)
238+
}
239+
},
240+
options = StreamSubscriptionManager.Options()
241+
).getOrThrow()
242+
243+
// Start monitoring
244+
watcher.start().getOrThrow()
245+
246+
// Watch channels as users view them
247+
watcher.watch("messaging:general")
248+
watcher.watch("messaging:random")
249+
250+
// When done
251+
watcher.stop()
252+
```
253+
212254
**Implementation Details:**
213255
- **Generic type parameter**: Allows watching any type `T` - common usage is `String` for channel IDs, but can be custom data classes
214-
- Thread-safe: Uses `ConcurrentHashMap<T, Unit>` for the registry (line 52 in `StreamWatcherImpl.kt`)
215-
- Async execution: Rewatch callbacks invoked on internal coroutine scope with `SupervisorJob + Dispatchers.Default` (line 61)
216-
- Error handling: Exceptions from rewatch callbacks are caught, logged, and surfaced via `StreamClientListener.onError` (lines 77-82)
256+
- **StateFlow observation**: Watcher observes `StateFlow<StreamConnectionState>` directly, no intermediate subscription manager needed
257+
- Safer: Only receives connection state, no access to full StreamClient APIs
258+
- Simpler: Factory creates internal subscription manager automatically
259+
- Testable: Easy to test with `MutableStateFlow`
260+
- Standalone: Truly independent component with minimal dependencies
261+
- Thread-safe: Uses `ConcurrentHashMap<T, Unit>` for the registry (line 54 in `StreamWatcherImpl.kt`)
262+
- Async execution: Rewatch callbacks invoked on internal coroutine scope with `SupervisorJob + Dispatchers.Default` (line 78)
263+
- Error handling: Exceptions from rewatch callbacks are caught and logged (lines 88-92)
217264
- Idempotent: Multiple `watch()` calls with the same identifier only maintain one entry
218-
- Only triggers on `Connected` state when registry is non-empty (line 67)
219-
- Connection ID extracted from `Connected` state and passed to all listeners (line 70)
265+
- Only triggers on `Connected` state when registry is non-empty (line 77)
266+
- Connection ID extracted from `Connected` state and passed to all listeners (line 80)
267+
- Lifecycle: `start()` launches a coroutine to collect from StateFlow, `stop()` cancels the collection job (lines 61-72)
220268

221269
**Test Coverage:**
222270
- Location: `stream-android-core/src/test/java/io/getstream/android/core/internal/watcher/StreamWatcherImplTest.kt`
223-
- 29 comprehensive test cases covering watch operations, lifecycle, state changes, error handling, concurrency, and connectionId verification
271+
- 30 comprehensive test cases covering watch operations, lifecycle, state changes, error handling, concurrency, and connectionId verification
272+
- Tests use MutableStateFlow to directly emit connection state changes, verifying watcher responds correctly
224273
- 100% instruction/branch/line coverage (verified via Kover)
225274

226275
## Configuration Defaults

stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ import io.getstream.android.core.api.socket.listeners.StreamClientListener
5050
import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor
5151
import io.getstream.android.core.api.subscribe.StreamObservable
5252
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
53-
import io.getstream.android.core.api.watcher.StreamWatcher
5453
import io.getstream.android.core.internal.client.StreamClientImpl
5554
import io.getstream.android.core.internal.observers.StreamNetworkAndLifeCycleMonitor
5655
import io.getstream.android.core.internal.serialization.StreamCompositeEventSerializationImpl

stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamWatcher.kt

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import io.getstream.android.core.annotations.StreamInternalApi
2020
import io.getstream.android.core.api.log.StreamLogger
2121
import io.getstream.android.core.api.model.connection.StreamConnectionState
2222
import io.getstream.android.core.api.observers.StreamStartableComponent
23-
import io.getstream.android.core.api.socket.listeners.StreamClientListener
2423
import io.getstream.android.core.api.subscribe.StreamObservable
2524
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
2625
import io.getstream.android.core.internal.watcher.StreamWatcherImpl
@@ -39,8 +38,8 @@ import kotlinx.coroutines.flow.StateFlow
3938
* ## Typical Usage Flow
4039
* 1. Product SDK watches a channel: `watcher.watch("messaging:general")`
4140
* 2. Watcher adds the identifier to its internal registry
42-
* 3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamRewatchListener { ids, connectionId ->
43-
* resubscribe(ids, connectionId) })`
41+
* 3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamRewatchListener { ids,
42+
* connectionId -> resubscribe(ids, connectionId) })`
4443
* 4. Call `watcher.start()` to begin monitoring connection state changes
4544
* 5. Connection state changes (e.g., reconnection after network loss)
4645
* 6. Watcher invokes listener with all watched entries: `["messaging:general",
@@ -138,24 +137,22 @@ public interface StreamWatcher<T> :
138137
}
139138

140139
/**
141-
* Creates a new [StreamWatcher] instance with the provided dependencies.
140+
* Creates a new [StreamWatcher] instance that observes connection state changes.
142141
*
143-
* This factory method instantiates the default implementation ([StreamWatcherImpl]) and wires all
144-
* required dependencies for monitoring connection state changes and triggering rewatch callbacks.
142+
* This factory method instantiates the default implementation ([StreamWatcherImpl]) and creates the
143+
* necessary internal subscription manager. The watcher observes the provided connection state flow
144+
* and triggers rewatch callbacks when the connection state changes to Connected.
145145
*
146146
* ## Parameters
147-
* - **scope**: The [CoroutineScope] used for launching async operations (rewatch callbacks). This
148-
* scope should typically use `SupervisorJob + Dispatchers.Default` to ensure callback failures
149-
* don't cancel the scope and to avoid blocking the main thread. The scope is NOT cancelled when
150-
* [StreamWatcher.stop] is called, allowing the component to be restarted.
147+
* - **scope**: The [CoroutineScope] used for launching async operations (collecting state flow and
148+
* invoking rewatch callbacks). This scope should typically use `SupervisorJob + Dispatchers.
149+
* Default` to ensure callback failures don't cancel the scope and to avoid blocking the main
150+
* thread. The scope is NOT cancelled when [StreamWatcher.stop] is called, allowing the component
151+
* to be restarted.
151152
* - **logger**: A [StreamLogger] instance for diagnostic output, tagged appropriately (e.g.,
152-
* "SCWatcher"). Used for logging state changes, rewatch events, and errors.
153-
* - **streamRewatchSubscriptionManager**: Manages subscriptions to [StreamRewatchListener]. This
154-
* manager handles the list of listeners that will be invoked when connection state changes
155-
* require re-watching identifiers.
156-
* - **streamClientSubscriptionManager**: Manages subscriptions to [StreamClientListener]. This is
157-
* used to subscribe to connection state changes (via [StreamClientListener.onState]) and to
158-
* surface errors (via [StreamClientListener.onError]) when rewatch callbacks fail.
153+
* "Watcher"). Used for logging state changes, rewatch events, and errors.
154+
* - **connectionState**: A [StateFlow] providing connection state updates. Typically obtained from
155+
* `streamClient.connectionState`. The watcher will collect from this flow when started.
159156
*
160157
* ## Lifecycle
161158
*
@@ -169,18 +166,11 @@ public interface StreamWatcher<T> :
169166
* val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
170167
* val logger = StreamLogger.getLogger("MyApp.Watcher")
171168
*
172-
* val rewatchManager = StreamSubscriptionManager<StreamRewatchListener<String>>(
173-
* logger = logger.withTag("RewatchSubscriptions")
174-
* )
175-
* val clientManager = StreamSubscriptionManager<StreamClientListener>(
176-
* logger = logger.withTag("ClientSubscriptions")
177-
* )
178-
*
169+
* // Create watcher - only needs the connection state flow
179170
* val watcher = StreamWatcher<String>(
180171
* scope = scope,
181172
* logger = logger,
182-
* streamRewatchSubscriptionManager = rewatchManager,
183-
* streamClientSubscriptionManager = clientManager
173+
* connectionState = streamClient.connectionState
184174
* )
185175
*
186176
* // Register rewatch listener
@@ -197,10 +187,10 @@ public interface StreamWatcher<T> :
197187
* watcher.watch("livestream:sports")
198188
* ```
199189
*
200-
* @param scope Coroutine scope for async operations (rewatch callback invocations)
190+
* @param scope Coroutine scope for async operations (state collection and rewatch callback
191+
* invocations)
201192
* @param logger Logger for diagnostic output and error reporting
202-
* @param streamRewatchSubscriptionManager Subscription manager for rewatch listeners
203-
* @param streamClientSubscriptionManager Subscription manager for client state listeners
193+
* @param connectionState StateFlow providing connection state updates
204194
* @return A new [StreamWatcher] instance (implementation: [StreamWatcherImpl])
205195
* @see StreamWatcher The interface contract
206196
* @see StreamWatcherImpl The concrete implementation
@@ -210,12 +200,13 @@ public interface StreamWatcher<T> :
210200
public fun <T> StreamWatcher(
211201
scope: CoroutineScope,
212202
logger: StreamLogger,
213-
streamRewatchSubscriptionManager: StreamSubscriptionManager<StreamRewatchListener<T>>,
214-
streamClientSubscriptionManager: StreamSubscriptionManager<StreamClientListener>,
215-
): StreamWatcher<T> =
216-
StreamWatcherImpl(
203+
connectionState: StateFlow<StreamConnectionState>,
204+
): StreamWatcher<T> {
205+
val rewatchSubscriptions = StreamSubscriptionManager<StreamRewatchListener<T>>(logger)
206+
return StreamWatcherImpl(
217207
scope = scope,
208+
connectionState = connectionState,
209+
rewatchSubscriptions = rewatchSubscriptions,
218210
logger = logger,
219-
rewatchSubscriptions = streamRewatchSubscriptionManager,
220-
clientSubscriptions = streamClientSubscriptionManager,
221211
)
212+
}

stream-android-core/src/main/java/io/getstream/android/core/internal/watcher/StreamWatcherImpl.kt

Lines changed: 44 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,91 +19,86 @@ package io.getstream.android.core.internal.watcher
1919
import io.getstream.android.core.annotations.StreamInternalApi
2020
import io.getstream.android.core.api.log.StreamLogger
2121
import io.getstream.android.core.api.model.connection.StreamConnectionState
22-
import io.getstream.android.core.api.socket.listeners.StreamClientListener
2322
import io.getstream.android.core.api.subscribe.StreamSubscription
2423
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
2524
import io.getstream.android.core.api.watcher.StreamRewatchListener
2625
import io.getstream.android.core.api.watcher.StreamWatcher
2726
import java.util.concurrent.ConcurrentHashMap
2827
import java.util.concurrent.ConcurrentMap
2928
import kotlinx.coroutines.CoroutineScope
29+
import kotlinx.coroutines.Job
30+
import kotlinx.coroutines.flow.StateFlow
3031
import kotlinx.coroutines.launch
3132

3233
/**
33-
* Implementation of [StreamWatcher] that uses [StreamSubscriptionManager] to monitor connection
34-
* state changes and trigger rewatch callbacks.
34+
* Implementation of [StreamWatcher] that observes connection state changes via [StateFlow] and
35+
* triggers rewatch callbacks.
3536
*
36-
* This implementation subscribes to connection state changes via [StreamClientListener] and uses
37-
* the provided [CoroutineScope] for invoking rewatch callbacks asynchronously.
37+
* This implementation collects from the connection state flow and uses the provided
38+
* [CoroutineScope] for invoking rewatch callbacks asynchronously.
3839
*
39-
* Exceptions thrown by the rewatch callback are caught, logged, and surfaced via the error callback
40-
* to prevent crashes while still allowing error handling by the product SDK.
40+
* Exceptions thrown by rewatch callbacks are caught and logged to prevent crashes while still
41+
* allowing error handling by the product SDK.
4142
*
4243
* @param scope Coroutine scope for async operations (should use SupervisorJob to prevent callback
4344
* failures from cancelling the scope)
45+
* @param connectionState StateFlow providing connection state updates
4446
* @param watched Concurrent map storing watched entries (defaults to empty [ConcurrentHashMap])
4547
* @param rewatchSubscriptions Manager for rewatch listener subscriptions
46-
* @param clientSubscriptions Manager for subscribing to connection state change notifications
4748
* @param logger Logger for diagnostic output and error reporting
4849
*/
4950
@StreamInternalApi
5051
internal class StreamWatcherImpl<T>(
5152
private val scope: CoroutineScope,
53+
private val connectionState: StateFlow<StreamConnectionState>,
5254
private val watched: ConcurrentMap<T, Unit> = ConcurrentHashMap(),
5355
private val rewatchSubscriptions: StreamSubscriptionManager<StreamRewatchListener<T>>,
54-
private val clientSubscriptions: StreamSubscriptionManager<StreamClientListener>,
5556
private val logger: StreamLogger,
5657
) : StreamWatcher<T> {
5758

58-
private var subscription: StreamSubscription? = null
59+
private var collectionJob: Job? = null
5960

60-
private val listener =
61-
object : StreamClientListener {
62-
override fun onState(state: StreamConnectionState) {
63-
// Invoke rewatch callback on every connection state change
64-
if (state is StreamConnectionState.Connected && watched.isNotEmpty()) {
65-
scope.launch {
66-
val cids = watched.keys.toSet()
67-
val connectionId = state.connectionId
68-
logger.v {
69-
"[onState] Triggering rewatch for ${cids.size} items on connection $connectionId: ${cids.joinToString()}"
70-
}
61+
override fun start(): Result<Unit> {
62+
if (collectionJob != null) {
63+
return Result.success(Unit) // Already started
64+
}
7165

72-
if (cids.isNotEmpty()) {
73-
rewatchSubscriptions
74-
.forEach { it.onRewatch(cids, connectionId) }
75-
.onFailure { error ->
76-
logger.e(error) {
77-
"[onState] Rewatch callback failed for ${cids.size} items. Error: ${error.message}"
78-
}
79-
clientSubscriptions.forEach { it.onError(error) }
80-
}
81-
}
82-
}
83-
} else {
84-
logger.v { "[onState] State: $state, items count: ${watched.size}" }
66+
return runCatching {
67+
collectionJob =
68+
scope.launch {
69+
connectionState.collect { state -> handleConnectionStateChange(state) }
8570
}
86-
}
8771
}
72+
}
8873

89-
override fun start(): Result<Unit> {
90-
if (subscription != null) {
91-
return Result.success(Unit) // Already started
92-
}
74+
private fun handleConnectionStateChange(state: StreamConnectionState) {
75+
// Invoke rewatch callback when connected and have watched items
76+
if (state is StreamConnectionState.Connected && watched.isNotEmpty()) {
77+
scope.launch {
78+
val items = watched.keys.toSet()
79+
val connectionId = state.connectionId
80+
logger.v {
81+
"[handleConnectionStateChange] Triggering rewatch for ${items.size} items on connection $connectionId: ${items.joinToString()}"
82+
}
9383

94-
return clientSubscriptions
95-
.subscribe(
96-
listener,
97-
StreamSubscriptionManager.Options(
98-
retention = StreamSubscriptionManager.Options.Retention.KEEP_UNTIL_CANCELLED
99-
),
100-
)
101-
.map { subscription = it }
84+
if (items.isNotEmpty()) {
85+
rewatchSubscriptions
86+
.forEach { it.onRewatch(items, connectionId) }
87+
.onFailure { error ->
88+
logger.e(error) {
89+
"[handleConnectionStateChange] Rewatch callback failed for ${items.size} items. Error: ${error.message}"
90+
}
91+
}
92+
}
93+
}
94+
} else {
95+
logger.v { "[handleConnectionStateChange] State: $state, items count: ${watched.size}" }
96+
}
10297
}
10398

10499
override fun stop(): Result<Unit> = runCatching {
105-
subscription?.cancel()
106-
subscription = null
100+
collectionJob?.cancel()
101+
collectionJob = null
107102
// Don't cancel scope - allows restart like other StreamStartableComponent implementations
108103
}
109104

0 commit comments

Comments
 (0)