Skip to content

Commit 3a619e6

Browse files
Spotless
1 parent f5ded39 commit 3a619e6

File tree

7 files changed

+448
-358
lines changed

7 files changed

+448
-358
lines changed

CLAUDE.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,65 @@ StreamClient (Main Interface)
162162
│ ├── StreamNetworkMonitor (ConnectivityManager callbacks)
163163
│ └── StreamLifecycleMonitor (ProcessLifecycleOwner observer)
164164
├── StreamConnectionRecoveryEvaluator (Reconnect heuristics)
165+
├── StreamCidWatcher (Watch registry & rewatch coordinator)
166+
│ ├── StreamSubscriptionManager<StreamCidRewatchListener> (Rewatch listener registry)
167+
│ └── StreamSubscriptionManager<StreamClientListener> (Connection state monitoring)
165168
└── StreamSubscriptionManager<StreamClientListener> (Event distribution)
166169
```
167170

171+
### Watch Management: StreamCidWatcher
172+
173+
**Purpose:** Manages a registry of watched resources (channels/conversations) and automatically triggers re-watching when the WebSocket connection state changes.
174+
175+
**Location:**
176+
- Interface: `stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidWatcher.kt`
177+
- Implementation: `stream-android-core/src/main/java/io/getstream/android/core/internal/watcher/StreamCidWatcherImpl.kt`
178+
- Listener: `stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidRewatchListener.kt`
179+
180+
**Key Concept:** When the WebSocket reconnects (network recovery, app resume), all active watches must be re-established on the server. `StreamCidWatcher` maintains which `StreamCid`s (Channel IDs) are currently watched and notifies listeners on every `Connected` state transition.
181+
182+
**Core Operations:**
183+
```kotlin
184+
// Add a CID to watch registry
185+
fun watch(cid: StreamCid): Result<StreamCid>
186+
187+
// Remove a CID from watch registry
188+
fun stopWatching(cid: StreamCid): Result<StreamCid>
189+
190+
// Clear all watched CIDs
191+
fun clear(): Result<Unit>
192+
193+
// Subscribe to rewatch notifications
194+
fun subscribe(
195+
listener: StreamCidRewatchListener,
196+
options: StreamSubscriptionManager.Options
197+
): Result<StreamSubscription>
198+
199+
// Lifecycle management (StreamStartableComponent)
200+
fun start(): Result<Unit>
201+
fun stop(): Result<Unit>
202+
```
203+
204+
**Usage Flow:**
205+
1. Product SDK watches a channel: `watcher.watch(StreamCid.parse("messaging:general"))`
206+
2. Watcher adds CID to internal `ConcurrentHashMap<StreamCid, Unit>` registry
207+
3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamCidRewatchListener { cids -> ... })`
208+
4. Call `watcher.start()` to begin monitoring connection state changes
209+
5. On `StreamConnectionState.Connected` event, watcher invokes all listeners with complete CID list
210+
6. Product SDK re-establishes server-side watches for each CID
211+
212+
**Implementation Details:**
213+
- Thread-safe: Uses `ConcurrentHashMap` for CID registry (line 36 in `StreamCidWatcherImpl.kt`)
214+
- Async execution: Rewatch callbacks invoked on internal coroutine scope with `SupervisorJob + Dispatchers.Default` (line 45)
215+
- Error handling: Exceptions from rewatch callbacks are caught, logged, and surfaced via `StreamClientListener.onError` (lines 61-67)
216+
- Idempotent: Multiple `watch()` calls with same CID only maintain one entry
217+
- Only triggers on `Connected` state when registry is non-empty (line 51)
218+
219+
**Test Coverage:**
220+
- Location: `stream-android-core/src/test/java/io/getstream/android/core/internal/watcher/StreamCidWatcherImplTest.kt`
221+
- 24 comprehensive test cases covering watch operations, lifecycle, state changes, error handling, and concurrency
222+
- 100% instruction/branch/line coverage (verified via Kover)
223+
168224
## Configuration Defaults
169225

170226
### StreamBatcher

stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,13 +290,15 @@ public fun StreamClient(
290290
logger = logProvider.taggedLogger("SCConnectionRecoveryEvaluator"),
291291
singleFlightProcessor = singleFlight,
292292
),
293-
cidWatcher: StreamCidWatcher = StreamCidWatcher(
294-
logProvider.taggedLogger("SCCidRewatcher"),
295-
streamRewatchSubscriptionManager = StreamSubscriptionManager(
296-
logger = logProvider.taggedLogger("SCRewatchSubscriptionManager")
293+
cidWatcher: StreamCidWatcher =
294+
StreamCidWatcher(
295+
logProvider.taggedLogger("SCCidRewatcher"),
296+
streamRewatchSubscriptionManager =
297+
StreamSubscriptionManager(
298+
logger = logProvider.taggedLogger("SCRewatchSubscriptionManager")
299+
),
300+
streamClientSubscriptionManager = clientSubscriptionManager,
297301
),
298-
streamClientSubscriptionManager = clientSubscriptionManager,
299-
)
300302
): StreamClient {
301303
val clientLogger = logProvider.taggedLogger(tag = "SCClient")
302304
val parent = scope.coroutineContext[Job]
Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,42 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package io.getstream.android.core.api.model
218

319
import io.getstream.android.core.annotations.StreamPublishedApi
420

521
@ConsistentCopyVisibility
622
@StreamPublishedApi
7-
public data class StreamCid private constructor(
8-
public val type: String,
9-
public val id: String,
10-
) {
23+
public data class StreamCid private constructor(public val type: String, public val id: String) {
1124

1225
public companion object {
1326

14-
public fun parse(cid: String) : StreamCid {
27+
public fun parse(cid: String): StreamCid {
1528
require(cid.isNotEmpty())
1629
val split = cid.split(":")
1730
require(split.size == 2)
1831
return StreamCid(split[0], split[1])
1932
}
2033

21-
public fun fromTypeAndId(type: String, id: String) : StreamCid {
34+
public fun fromTypeAndId(type: String, id: String): StreamCid {
2235
require(type.isNotEmpty())
2336
require(id.isNotEmpty())
2437
return StreamCid(type, id)
2538
}
2639
}
2740

2841
public fun formatted(): String = "$type:$id"
29-
}
42+
}

stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidRewatchListener.kt

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package io.getstream.android.core.api.watcher
218

319
import io.getstream.android.core.annotations.StreamInternalApi
@@ -8,25 +24,23 @@ import io.getstream.android.core.api.model.StreamCid
824
* state change.
925
*
1026
* This functional interface is used with [StreamCidWatcher] to receive notifications when the
11-
* WebSocket connection state changes (especially during reconnections), allowing the product SDK
12-
* to re-establish server-side watches for all actively monitored resources.
27+
* WebSocket connection state changes (especially during reconnections), allowing the product SDK to
28+
* re-establish server-side watches for all actively monitored resources.
1329
*
1430
* ## When This Is Called
1531
*
16-
* The callback is triggered on every [StreamConnectionState.Connected] event when the watched
17-
* CID registry is non-empty. This ensures that all active subscriptions are restored after:
32+
* The callback is triggered on every [StreamConnectionState.Connected] event when the watched CID
33+
* registry is non-empty. This ensures that all active subscriptions are restored after:
1834
* - Initial connection establishment
1935
* - Network reconnection after temporary disconnection
2036
* - WebSocket recovery after connection loss
2137
*
2238
* ## Threading
23-
*
2439
* - The callback is invoked asynchronously on an internal coroutine scope
2540
* - Implementations should be thread-safe as concurrent invocations are possible
2641
* - Long-running operations should be dispatched to an appropriate dispatcher
2742
*
2843
* ## Error Handling
29-
*
3044
* - Exceptions thrown by this callback are caught and logged by the watcher
3145
* - Errors are also surfaced via [StreamClientListener.onError] for user-level handling
3246
* - A failing callback will not prevent other listeners from being notified
@@ -59,24 +73,23 @@ public fun interface StreamCidRewatchListener {
5973
/**
6074
* Called when watched resources need to be re-subscribed.
6175
*
62-
* This method is invoked with the complete list of [StreamCid]s currently in the watch
63-
* registry whenever the connection state changes to [StreamConnectionState.Connected] and
64-
* the registry is non-empty.
76+
* This method is invoked with the complete list of [StreamCid]s currently in the watch registry
77+
* whenever the connection state changes to [StreamConnectionState.Connected] and the registry
78+
* is non-empty.
6579
*
6680
* Implementations should iterate over the provided list and re-establish server-side watches
6781
* for each CID to maintain active subscriptions across reconnection cycles.
6882
*
6983
* ## Contract
70-
*
7184
* - The list is never empty when this method is called
7285
* - The list represents a snapshot of the watch registry at the time of invocation
7386
* - Modifications to the list do not affect the internal registry
7487
* - Multiple calls may occur for the same set of CIDs during reconnection scenarios
7588
*
7689
* ## Error Handling
7790
*
78-
* Exceptions thrown by this method are caught, logged, and surfaced via error callbacks.
79-
* The failure of one listener does not prevent other listeners from being notified.
91+
* Exceptions thrown by this method are caught, logged, and surfaced via error callbacks. The
92+
* failure of one listener does not prevent other listeners from being notified.
8093
*
8194
* @param list A non-empty list of [StreamCid]s that require re-watching
8295
*/

stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidWatcher.kt

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-core-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package io.getstream.android.core.api.watcher
218

319
import io.getstream.android.core.annotations.StreamInternalApi
@@ -6,7 +22,6 @@ import io.getstream.android.core.api.model.StreamCid
622
import io.getstream.android.core.api.observers.StreamStartableComponent
723
import io.getstream.android.core.api.socket.listeners.StreamClientListener
824
import io.getstream.android.core.api.subscribe.StreamObservable
9-
import io.getstream.android.core.api.subscribe.StreamSubscription
1025
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
1126
import io.getstream.android.core.internal.watcher.StreamCidWatcherImpl
1227

@@ -15,22 +30,22 @@ import io.getstream.android.core.internal.watcher.StreamCidWatcherImpl
1530
* triggers re-watching when the connection state changes.
1631
*
1732
* This component is critical for maintaining active subscriptions across reconnection cycles. When
18-
* the WebSocket disconnects and reconnects, all active watches must be re-established on the server.
19-
* [StreamCidWatcher] tracks which [StreamCid]s (Channel IDs) are currently being watched and
20-
* invokes a callback on every connection state change, allowing the product SDK to re-subscribe.
33+
* the WebSocket disconnects and reconnects, all active watches must be re-established on the
34+
* server. [StreamCidWatcher] tracks which [StreamCid]s (Channel IDs) are currently being watched
35+
* and invokes a callback on every connection state change, allowing the product SDK to
36+
* re-subscribe.
2137
*
2238
* ## Typical Usage Flow
23-
*
2439
* 1. Product SDK watches a channel: `watcher.watch(StreamCid.parse("messaging:general"))`
2540
* 2. Watcher adds the CID to its internal registry
26-
* 3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamCidRewatchListener { cids -> resubscribe(cids) })`
41+
* 3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamCidRewatchListener { cids
42+
* -> resubscribe(cids) })`
2743
* 4. Call `watcher.start()` to begin monitoring connection state changes
2844
* 5. Connection state changes (e.g., reconnection after network loss)
2945
* 6. Watcher invokes listener with all watched CIDs: `["messaging:general", "livestream:sports"]`
3046
* 7. Product SDK re-establishes server-side watches for each CID
3147
*
3248
* ## Threading
33-
*
3449
* - All methods are thread-safe and can be called from any thread
3550
* - The rewatch callback is invoked asynchronously on an internal coroutine scope
3651
* - Callback invocation happens asynchronously on connection state changes
@@ -47,7 +62,8 @@ import io.getstream.android.core.internal.watcher.StreamCidWatcherImpl
4762
* @see StreamStartableComponent Lifecycle contract for start/stop operations
4863
*/
4964
@StreamInternalApi
50-
public interface StreamCidWatcher : StreamObservable<StreamCidRewatchListener>, StreamStartableComponent {
65+
public interface StreamCidWatcher :
66+
StreamObservable<StreamCidRewatchListener>, StreamStartableComponent {
5167

5268
/**
5369
* Registers a channel or resource as actively watched.
@@ -56,9 +72,11 @@ public interface StreamCidWatcher : StreamObservable<StreamCidRewatchListener>,
5672
* changes (especially during reconnection), the rewatch callback will include this CID in the
5773
* list of resources that need to be re-subscribed.
5874
*
59-
* Multiple calls with the same [cid] are idempotent—only one entry is maintained per unique CID.
75+
* Multiple calls with the same [cid] are idempotent—only one entry is maintained per unique
76+
* CID.
6077
*
6178
* ## Example
79+
*
6280
* ```kotlin
6381
* val cid = StreamCid.parse("messaging:general")
6482
* watcher.watch(cid)
@@ -70,7 +88,7 @@ public interface StreamCidWatcher : StreamObservable<StreamCidRewatchListener>,
7088
* @return [Result.success] with the [cid] if registration succeeded, or [Result.failure] if an
7189
* error occurred (e.g., internal registry corruption)
7290
*/
73-
public fun watch(cid: StreamCid) : Result<StreamCid>
91+
public fun watch(cid: StreamCid): Result<StreamCid>
7492

7593
/**
7694
* Unregisters a channel or resource from the watch registry.
@@ -81,6 +99,7 @@ public interface StreamCidWatcher : StreamObservable<StreamCidRewatchListener>,
8199
* Calling this method with a CID that is not being watched is a no-op (not an error).
82100
*
83101
* ## Example
102+
*
84103
* ```kotlin
85104
* val cid = StreamCid.parse("messaging:general")
86105
* watcher.stopWatching(cid)
@@ -92,17 +111,18 @@ public interface StreamCidWatcher : StreamObservable<StreamCidRewatchListener>,
92111
* @return [Result.success] with the [cid] if removal succeeded, or [Result.failure] if an error
93112
* occurred (e.g., internal registry corruption)
94113
*/
95-
public fun stopWatching(cid: StreamCid) : Result<StreamCid>
114+
public fun stopWatching(cid: StreamCid): Result<StreamCid>
96115

97116
/**
98117
* Removes all entries from the watch registry.
99118
*
100-
* After calling this method, the rewatch callback will NOT be invoked on subsequent
101-
* connection state changes (since the registry is empty) until new resources are watched.
119+
* After calling this method, the rewatch callback will NOT be invoked on subsequent connection
120+
* state changes (since the registry is empty) until new resources are watched.
102121
*
103122
* This method is typically called during cleanup or logout to ensure no stale watches remain.
104123
*
105124
* ## Example
125+
*
106126
* ```kotlin
107127
* // During logout
108128
* watcher.clear()
@@ -120,10 +140,10 @@ public interface StreamCidWatcher : StreamObservable<StreamCidRewatchListener>,
120140
public fun StreamCidWatcher(
121141
logger: StreamLogger,
122142
streamRewatchSubscriptionManager: StreamSubscriptionManager<StreamCidRewatchListener>,
123-
streamClientSubscriptionManager: StreamSubscriptionManager<StreamClientListener>
124-
) : StreamCidWatcher = StreamCidWatcherImpl(
125-
logger = logger,
126-
rewatchSubscriptions = streamRewatchSubscriptionManager,
127-
clientSubscriptions = streamClientSubscriptionManager
128-
)
129-
143+
streamClientSubscriptionManager: StreamSubscriptionManager<StreamClientListener>,
144+
): StreamCidWatcher =
145+
StreamCidWatcherImpl(
146+
logger = logger,
147+
rewatchSubscriptions = streamRewatchSubscriptionManager,
148+
clientSubscriptions = streamClientSubscriptionManager,
149+
)

0 commit comments

Comments
 (0)