Skip to content

Commit f5ded39

Browse files
Add tests and update KDoc
1 parent b579caa commit f5ded39

File tree

4 files changed

+854
-6
lines changed

4 files changed

+854
-6
lines changed

stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ public fun StreamClient(
293293
cidWatcher: StreamCidWatcher = StreamCidWatcher(
294294
logProvider.taggedLogger("SCCidRewatcher"),
295295
streamRewatchSubscriptionManager = StreamSubscriptionManager(
296-
logger = logProvider.taggedLogger("SCRewatchSubscritionManager")
296+
logger = logProvider.taggedLogger("SCRewatchSubscriptionManager")
297297
),
298298
streamClientSubscriptionManager = clientSubscriptionManager,
299299
)
@@ -354,7 +354,7 @@ public fun StreamClient(
354354

355355
if (watchListener != null) {
356356
// Auto-subscribe the re-watch listener if any
357-
cidWatcher.subscribe(watchListener)
357+
cidWatcher.subscribe(watchListener).getOrThrow()
358358
}
359359

360360
val mutableConnectionState = MutableStateFlow<StreamConnectionState>(StreamConnectionState.Idle)

stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidRewatchListener.kt

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,82 @@ package io.getstream.android.core.api.watcher
33
import io.getstream.android.core.annotations.StreamInternalApi
44
import io.getstream.android.core.api.model.StreamCid
55

6+
/**
7+
* Callback interface invoked when watched resources need to be re-subscribed after a connection
8+
* state change.
9+
*
10+
* This functional interface is used with [StreamCidWatcher] to receive notifications when the
11+
* WebSocket connection state changes (especially during reconnections), allowing the product SDK
12+
* to re-establish server-side watches for all actively monitored resources.
13+
*
14+
* ## When This Is Called
15+
*
16+
* The callback is triggered on every [StreamConnectionState.Connected] event when the watched
17+
* CID registry is non-empty. This ensures that all active subscriptions are restored after:
18+
* - Initial connection establishment
19+
* - Network reconnection after temporary disconnection
20+
* - WebSocket recovery after connection loss
21+
*
22+
* ## Threading
23+
*
24+
* - The callback is invoked asynchronously on an internal coroutine scope
25+
* - Implementations should be thread-safe as concurrent invocations are possible
26+
* - Long-running operations should be dispatched to an appropriate dispatcher
27+
*
28+
* ## Error Handling
29+
*
30+
* - Exceptions thrown by this callback are caught and logged by the watcher
31+
* - Errors are also surfaced via [StreamClientListener.onError] for user-level handling
32+
* - A failing callback will not prevent other listeners from being notified
33+
*
34+
* ## Example Usage
35+
*
36+
* ```kotlin
37+
* val watcher = StreamCidWatcher(logger, rewatchManager, clientManager)
38+
*
39+
* // Register a rewatch listener
40+
* watcher.subscribe(
41+
* listener = StreamCidRewatchListener { cids ->
42+
* logger.i { "Re-watching ${cids.size} channels" }
43+
* cids.forEach { cid ->
44+
* // Re-establish server-side watch for each CID
45+
* channelClient.watch(cid)
46+
* }
47+
* }
48+
* )
49+
*
50+
* watcher.start()
51+
* ```
52+
*
53+
* @see StreamCidWatcher Main component that manages the watch registry and triggers callbacks
54+
* @see StreamCid Channel identifier in format "type:id" (e.g., "messaging:general")
55+
*/
656
@StreamInternalApi
757
public fun interface StreamCidRewatchListener {
858

59+
/**
60+
* Called when watched resources need to be re-subscribed.
61+
*
62+
* This method is invoked with the complete list of [StreamCid]s currently in the watch
63+
* registry whenever the connection state changes to [StreamConnectionState.Connected] and
64+
* the registry is non-empty.
65+
*
66+
* Implementations should iterate over the provided list and re-establish server-side watches
67+
* for each CID to maintain active subscriptions across reconnection cycles.
68+
*
69+
* ## Contract
70+
*
71+
* - The list is never empty when this method is called
72+
* - The list represents a snapshot of the watch registry at the time of invocation
73+
* - Modifications to the list do not affect the internal registry
74+
* - Multiple calls may occur for the same set of CIDs during reconnection scenarios
75+
*
76+
* ## Error Handling
77+
*
78+
* Exceptions thrown by this method are caught, logged, and surfaced via error callbacks.
79+
* The failure of one listener does not prevent other listeners from being notified.
80+
*
81+
* @param list A non-empty list of [StreamCid]s that require re-watching
82+
*/
983
public fun onRewatch(list: List<StreamCid>)
1084
}

stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidWatcher.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import io.getstream.android.core.internal.watcher.StreamCidWatcherImpl
2323
*
2424
* 1. Product SDK watches a channel: `watcher.watch(StreamCid.parse("messaging:general"))`
2525
* 2. Watcher adds the CID to its internal registry
26-
* 3. Product SDK registers a rewatch callback: `watcher.onRewatch { cids -> resubscribe(cids) }`
26+
* 3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamCidRewatchListener { cids -> resubscribe(cids) })`
2727
* 4. Call `watcher.start()` to begin monitoring connection state changes
2828
* 5. Connection state changes (e.g., reconnection after network loss)
29-
* 6. Watcher invokes callback with all watched CIDs: `["messaging:general", "livestream:sports"]`
29+
* 6. Watcher invokes listener with all watched CIDs: `["messaging:general", "livestream:sports"]`
3030
* 7. Product SDK re-establishes server-side watches for each CID
3131
*
3232
* ## Threading
@@ -97,8 +97,8 @@ public interface StreamCidWatcher : StreamObservable<StreamCidRewatchListener>,
9797
/**
9898
* Removes all entries from the watch registry.
9999
*
100-
* After calling this method, the rewatch callback will be invoked with an empty list on
101-
* subsequent connection state changes until new resources are watched.
100+
* After calling this method, the rewatch callback will NOT be invoked on subsequent
101+
* connection state changes (since the registry is empty) until new resources are watched.
102102
*
103103
* This method is typically called during cleanup or logout to ensure no stale watches remain.
104104
*

0 commit comments

Comments
 (0)