Skip to content

Commit bc244e3

Browse files
Add the connection ID to the stream watcher onRewatch, extend tests
1 parent 3a619e6 commit bc244e3

File tree

10 files changed

+987
-59
lines changed

10 files changed

+987
-59
lines changed

CLAUDE.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -204,21 +204,22 @@ fun stop(): Result<Unit>
204204
**Usage Flow:**
205205
1. Product SDK watches a channel: `watcher.watch(StreamCid.parse("messaging:general"))`
206206
2. Watcher adds CID to internal `ConcurrentHashMap<StreamCid, Unit>` registry
207-
3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamCidRewatchListener { cids -> ... })`
207+
3. Product SDK registers a rewatch listener: `watcher.subscribe(StreamCidRewatchListener { cids, connectionId -> ... })`
208208
4. Call `watcher.start()` to begin monitoring connection state changes
209-
5. On `StreamConnectionState.Connected` event, watcher invokes all listeners with complete CID list
210-
6. Product SDK re-establishes server-side watches for each CID
209+
5. On `StreamConnectionState.Connected` event, watcher invokes all listeners with complete CID list AND the current connectionId
210+
6. Product SDK re-establishes server-side watches for each CID using the provided connectionId
211211

212212
**Implementation Details:**
213-
- Thread-safe: Uses `ConcurrentHashMap` for CID registry (line 36 in `StreamCidWatcherImpl.kt`)
214-
- Async execution: Rewatch callbacks invoked on internal coroutine scope with `SupervisorJob + Dispatchers.Default` (line 45)
215-
- Error handling: Exceptions from rewatch callbacks are caught, logged, and surfaced via `StreamClientListener.onError` (lines 61-67)
213+
- Thread-safe: Uses `ConcurrentHashMap` for CID registry (line 52 in `StreamCidWatcherImpl.kt`)
214+
- Async execution: Rewatch callbacks invoked on internal coroutine scope with `SupervisorJob + Dispatchers.Default` (line 61)
215+
- Error handling: Exceptions from rewatch callbacks are caught, logged, and surfaced via `StreamClientListener.onError` (lines 77-82)
216216
- Idempotent: Multiple `watch()` calls with same CID only maintain one entry
217-
- Only triggers on `Connected` state when registry is non-empty (line 51)
217+
- Only triggers on `Connected` state when registry is non-empty (line 67)
218+
- Connection ID extracted from `Connected` state and passed to all listeners (line 70)
218219

219220
**Test Coverage:**
220221
- Location: `stream-android-core/src/test/java/io/getstream/android/core/internal/watcher/StreamCidWatcherImplTest.kt`
221-
- 24 comprehensive test cases covering watch operations, lifecycle, state changes, error handling, and concurrency
222+
- 29 comprehensive test cases covering watch operations, lifecycle, state changes, error handling, concurrency, and connectionId verification
222223
- 100% instruction/branch/line coverage (verified via Kover)
223224

224225
## Configuration Defaults

stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,8 @@ public fun StreamClient(
292292
),
293293
cidWatcher: StreamCidWatcher =
294294
StreamCidWatcher(
295-
logProvider.taggedLogger("SCCidRewatcher"),
295+
scope = scope,
296+
logger = logProvider.taggedLogger("SCCidRewatcher"),
296297
streamRewatchSubscriptionManager =
297298
StreamSubscriptionManager(
298299
logger = logProvider.taggedLogger("SCRewatchSubscriptionManager")
@@ -366,6 +367,7 @@ public fun StreamClient(
366367
tokenManager = tokenManager,
367368
singleFlight = singleFlight,
368369
serialQueue = serialQueue,
370+
cidWatcher = cidWatcher,
369371
connectionIdHolder = connectionIdHolder,
370372
logger = clientLogger,
371373
mutableConnectionState = mutableConnectionState,

stream-android-core/src/main/java/io/getstream/android/core/api/model/StreamCid.kt

Lines changed: 187 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,205 @@ package io.getstream.android.core.api.model
1818

1919
import io.getstream.android.core.annotations.StreamPublishedApi
2020

21+
/**
22+
* Represents a Configuration Identifier (CID) in the Stream platform.
23+
*
24+
* A CID uniquely identifies a resource by combining a configuration type and a resource ID in the
25+
* format `"configuration:identifier"`. This is the primary identifier used throughout the Stream
26+
* SDK for referencing various resources such as channels, calls, feeds, and other configuration
27+
* entities.
28+
*
29+
* ## Format
30+
*
31+
* CIDs follow the pattern: `"<configuration>:<identifier>"`
32+
* - **configuration**: The resource configuration type (e.g., "messaging", "livestream", "call",
33+
* "feed")
34+
* - **identifier**: The unique identifier for the resource within that configuration (e.g.,
35+
* "general", "support", "user-123")
36+
*
37+
* ## Examples
38+
*
39+
* ```kotlin
40+
* // Parse from formatted string
41+
* val cid1 = StreamCid.parse("messaging:general")
42+
* println(cid1.configuration) // "messaging"
43+
* println(cid1.id) // "general"
44+
* println(cid1.formatted()) // "messaging:general"
45+
*
46+
* // Create from configuration and id
47+
* val cid2 = StreamCid.fromTypeAndId("call", "video-room-1")
48+
* println(cid2.formatted()) // "call:video-room-1"
49+
*
50+
* // Different configuration types
51+
* val channelCid = StreamCid.parse("messaging:support")
52+
* val callCid = StreamCid.parse("call:team-meeting")
53+
* val feedCid = StreamCid.parse("feed:user-timeline")
54+
*
55+
* // Use in equality comparisons
56+
* val cid3 = StreamCid.parse("messaging:general")
57+
* println(cid1 == cid3) // true (data class equality)
58+
* ```
59+
*
60+
* ## Thread Safety
61+
*
62+
* This class is immutable and thread-safe. All instances are created via factory methods in the
63+
* companion object, and the constructor is private to enforce validation.
64+
*
65+
* ## Validation
66+
*
67+
* Both [configuration] and [id] must be non-empty strings. Attempting to create a CID with empty
68+
* values will throw [IllegalArgumentException].
69+
*
70+
* @property configuration The configuration type (non-empty, e.g., "messaging", "call", "feed")
71+
* @property id The resource identifier within the configuration (non-empty, e.g., "general",
72+
* "support")
73+
* @see parse Factory method for creating a CID from a formatted string
74+
* @see fromTypeAndId Factory method for creating a CID from separate configuration and id
75+
* components
76+
* @see formatted Returns the string representation in "configuration:id" format
77+
*/
2178
@ConsistentCopyVisibility
2279
@StreamPublishedApi
23-
public data class StreamCid private constructor(public val type: String, public val id: String) {
80+
public data class StreamCid
81+
private constructor(
82+
/**
83+
* The configuration type component of the CID.
84+
*
85+
* Common configuration types include:
86+
* - `"messaging"` - Messaging channels
87+
* - `"livestream"` - Live streaming channels
88+
* - `"call"` - Audio/video calls
89+
* - `"feed"` - Activity feeds
90+
* - `"team"` - Team collaboration
91+
*
92+
* This value is guaranteed to be non-empty.
93+
*
94+
* @see id The resource identifier component
95+
*/
96+
public val configuration: String,
97+
/**
98+
* The unique identifier for the resource within its [configuration].
99+
*
100+
* This can be any non-empty string that uniquely identifies the resource within its
101+
* configuration type. Common patterns include:
102+
* - User IDs: `"user-123"`
103+
* - Resource names: `"general"`, `"support"`, `"random"`
104+
* - Generated IDs: `"ch-abc123"`, `UUID.randomUUID().toString()`
105+
*
106+
* This value is guaranteed to be non-empty.
107+
*
108+
* @see configuration The configuration type component
109+
*/
110+
public val id: String,
111+
) {
24112

25113
public companion object {
26114

115+
/**
116+
* Parses a formatted CID string into a [StreamCid] instance.
117+
*
118+
* This method parses a CID from its standard string representation `"configuration:id"` and
119+
* creates a [StreamCid] instance with the extracted configuration and id components.
120+
*
121+
* ## Format Requirements
122+
*
123+
* The input string must:
124+
* - Be non-empty
125+
* - Contain exactly one colon (`:`) separator
126+
* - Have non-empty configuration and id components on both sides of the colon
127+
*
128+
* ## Examples
129+
*
130+
* ```kotlin
131+
* // Valid CIDs
132+
* val cid1 = StreamCid.parse("messaging:general")
133+
* val cid2 = StreamCid.parse("livestream:sports-2023")
134+
* val cid3 = StreamCid.parse("call:team-meeting")
135+
*
136+
* // Invalid CIDs (will throw IllegalArgumentException)
137+
* StreamCid.parse("") // Empty string
138+
* StreamCid.parse("messaging") // Missing colon separator
139+
* StreamCid.parse("messaging:") // Empty id
140+
* StreamCid.parse(":general") // Empty configuration
141+
* StreamCid.parse("messaging:general:extra") // Too many colons
142+
* ```
143+
*
144+
* @param cid The formatted CID string in "configuration:id" format
145+
* @return A new [StreamCid] instance with the parsed configuration and id
146+
* @throws IllegalArgumentException if the input is empty, doesn't contain exactly one
147+
* colon, or has empty configuration/id components
148+
* @see fromTypeAndId Alternative factory method for creating from separate components
149+
* @see formatted Returns the inverse transformation (CID → String)
150+
*/
27151
public fun parse(cid: String): StreamCid {
28-
require(cid.isNotEmpty())
152+
require(cid.isNotEmpty()) { "CID string cannot be empty" }
29153
val split = cid.split(":")
30-
require(split.size == 2)
31-
return StreamCid(split[0], split[1])
154+
require(split.size == 2) {
155+
"CID must be in format 'configuration:id' with exactly one colon separator, got: '$cid'"
156+
}
157+
return fromTypeAndId(split[0], split[1])
32158
}
33159

160+
/**
161+
* Creates a [StreamCid] from separate configuration and id components.
162+
*
163+
* This method constructs a CID directly from its constituent parts without requiring a
164+
* pre-formatted string. This is useful when you already have the configuration and id as
165+
* separate values.
166+
*
167+
* ## Examples
168+
*
169+
* ```kotlin
170+
* // Create CID from components
171+
* val cid = StreamCid.fromTypeAndId(type = "messaging", id = "general")
172+
* println(cid.formatted()) // "messaging:general"
173+
*
174+
* // Useful when building CIDs dynamically
175+
* val configurationType = "call"
176+
* val resourceId = UUID.randomUUID().toString()
177+
* val dynamicCid = StreamCid.fromTypeAndId(configurationType, resourceId)
178+
* ```
179+
*
180+
* @param type The configuration type (must be non-empty)
181+
* @param id The resource identifier (must be non-empty)
182+
* @return A new [StreamCid] instance with the given configuration and id
183+
* @throws IllegalArgumentException if either [type] or [id] is empty
184+
* @see parse Alternative factory method for parsing from formatted strings
185+
*/
34186
public fun fromTypeAndId(type: String, id: String): StreamCid {
35-
require(type.isNotEmpty())
36-
require(id.isNotEmpty())
187+
require(type.isNotEmpty()) { "Configuration type cannot be empty" }
188+
require(id.isNotEmpty()) { "Resource id cannot be empty" }
37189
return StreamCid(type, id)
38190
}
39191
}
40192

41-
public fun formatted(): String = "$type:$id"
193+
/**
194+
* Returns the formatted string representation of this CID.
195+
*
196+
* Converts the CID back to its canonical string format `"configuration:id"`. This is the
197+
* inverse operation of [parse].
198+
*
199+
* ## Examples
200+
*
201+
* ```kotlin
202+
* val cid = StreamCid.fromTypeAndId("messaging", "general")
203+
* println(cid.formatted()) // "messaging:general"
204+
*
205+
* // Round-trip parsing
206+
* val original = "livestream:sports"
207+
* val parsed = StreamCid.parse(original)
208+
* val formatted = parsed.formatted()
209+
* println(original == formatted) // true
210+
* ```
211+
*
212+
* ## Use Cases
213+
* - Logging and debugging: Display CIDs in human-readable format
214+
* - API requests: Send CID as string parameter to backend
215+
* - Serialization: Convert CID to JSON/XML string representation
216+
* - UI display: Show resource identifiers to users
217+
*
218+
* @return The CID in "configuration:id" format
219+
* @see parse The inverse operation that creates a CID from this string format
220+
*/
221+
public fun formatted(): String = "$configuration:$id"
42222
}

stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidRewatchListener.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ import io.getstream.android.core.api.model.StreamCid
5252
*
5353
* // Register a rewatch listener
5454
* watcher.subscribe(
55-
* listener = StreamCidRewatchListener { cids ->
56-
* logger.i { "Re-watching ${cids.size} channels" }
55+
* listener = StreamCidRewatchListener { cids, connectionId ->
56+
* logger.i { "Re-watching ${cids.size} channels on connection $connectionId" }
5757
* cids.forEach { cid ->
5858
* // Re-establish server-side watch for each CID
59-
* channelClient.watch(cid)
59+
* channelClient.watch(cid, connectionId)
6060
* }
6161
* }
6262
* )
@@ -85,13 +85,15 @@ public fun interface StreamCidRewatchListener {
8585
* - The list represents a snapshot of the watch registry at the time of invocation
8686
* - Modifications to the list do not affect the internal registry
8787
* - Multiple calls may occur for the same set of CIDs during reconnection scenarios
88+
* - The connectionId reflects the current active connection at the moment of invocation
8889
*
8990
* ## Error Handling
9091
*
9192
* Exceptions thrown by this method are caught, logged, and surfaced via error callbacks. The
9293
* failure of one listener does not prevent other listeners from being notified.
9394
*
9495
* @param list A non-empty list of [StreamCid]s that require re-watching
96+
* @param connectionId The connection ID of the current active WebSocket connection
9597
*/
96-
public fun onRewatch(list: List<StreamCid>)
98+
public fun onRewatch(list: List<StreamCid>, connectionId: String)
9799
}

stream-android-core/src/main/java/io/getstream/android/core/api/watcher/StreamCidWatcher.kt

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import io.getstream.android.core.api.socket.listeners.StreamClientListener
2424
import io.getstream.android.core.api.subscribe.StreamObservable
2525
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
2626
import io.getstream.android.core.internal.watcher.StreamCidWatcherImpl
27+
import kotlinx.coroutines.CoroutineScope
2728

2829
/**
2930
* Manages a registry of watched resources (channels, conversations, streams) and automatically
@@ -136,13 +137,85 @@ public interface StreamCidWatcher :
136137
public fun clear(): Result<Unit>
137138
}
138139

140+
/**
141+
* Creates a new [StreamCidWatcher] instance with the provided dependencies.
142+
*
143+
* This factory method instantiates the default implementation ([StreamCidWatcherImpl]) and wires
144+
* all required dependencies for monitoring connection state changes and triggering rewatch
145+
* callbacks.
146+
*
147+
* ## Parameters
148+
* - **scope**: The [CoroutineScope] used for launching async operations (rewatch callbacks). This
149+
* scope should typically use `SupervisorJob + Dispatchers.Default` to ensure callback failures
150+
* don't cancel the scope and to avoid blocking the main thread. The scope is NOT cancelled when
151+
* [StreamCidWatcher.stop] is called, allowing the component to be restarted.
152+
* - **logger**: A [StreamLogger] instance for diagnostic output, tagged appropriately (e.g.,
153+
* "SCCidWatcher"). Used for logging state changes, rewatch events, and errors.
154+
* - **streamRewatchSubscriptionManager**: Manages subscriptions to [StreamCidRewatchListener]. This
155+
* manager handles the list of listeners that will be invoked when connection state changes
156+
* require re-watching CIDs.
157+
* - **streamClientSubscriptionManager**: Manages subscriptions to [StreamClientListener]. This is
158+
* used to subscribe to connection state changes (via [StreamClientListener.onState]) and to
159+
* surface errors (via [StreamClientListener.onError]) when rewatch callbacks fail.
160+
*
161+
* ## Lifecycle
162+
*
163+
* The returned watcher is created in a stopped state. You must call [StreamCidWatcher.start] to
164+
* begin monitoring connection state changes. The component can be started and stopped multiple
165+
* times throughout its lifetime.
166+
*
167+
* ## Usage Example
168+
*
169+
* ```kotlin
170+
* val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
171+
* val logger = StreamLogger.getLogger("MyApp.CidWatcher")
172+
*
173+
* val rewatchManager = StreamSubscriptionManager<StreamCidRewatchListener>(
174+
* logger = logger.withTag("RewatchSubscriptions")
175+
* )
176+
* val clientManager = StreamSubscriptionManager<StreamClientListener>(
177+
* logger = logger.withTag("ClientSubscriptions")
178+
* )
179+
*
180+
* val watcher = StreamCidWatcher(
181+
* scope = scope,
182+
* logger = logger,
183+
* streamRewatchSubscriptionManager = rewatchManager,
184+
* streamClientSubscriptionManager = clientManager
185+
* )
186+
*
187+
* // Register rewatch listener
188+
* watcher.subscribe(StreamCidRewatchListener { cids, connectionId ->
189+
* println("Re-watching ${cids.size} channels on connection $connectionId")
190+
* // Re-establish server-side watches...
191+
* })
192+
*
193+
* // Start monitoring
194+
* watcher.start()
195+
*
196+
* // Watch channels
197+
* watcher.watch(StreamCid.parse("messaging:general"))
198+
* watcher.watch(StreamCid.parse("livestream:sports"))
199+
* ```
200+
*
201+
* @param scope Coroutine scope for async operations (rewatch callback invocations)
202+
* @param logger Logger for diagnostic output and error reporting
203+
* @param streamRewatchSubscriptionManager Subscription manager for rewatch listeners
204+
* @param streamClientSubscriptionManager Subscription manager for client state listeners
205+
* @return A new [StreamCidWatcher] instance (implementation: [StreamCidWatcherImpl])
206+
* @see StreamCidWatcher The interface contract
207+
* @see StreamCidWatcherImpl The concrete implementation
208+
* @see StreamCidRewatchListener Callback interface for rewatch notifications
209+
*/
139210
@StreamInternalApi
140211
public fun StreamCidWatcher(
212+
scope: CoroutineScope,
141213
logger: StreamLogger,
142214
streamRewatchSubscriptionManager: StreamSubscriptionManager<StreamCidRewatchListener>,
143215
streamClientSubscriptionManager: StreamSubscriptionManager<StreamClientListener>,
144216
): StreamCidWatcher =
145217
StreamCidWatcherImpl(
218+
scope = scope,
146219
logger = logger,
147220
rewatchSubscriptions = streamRewatchSubscriptionManager,
148221
clientSubscriptions = streamClientSubscriptionManager,

0 commit comments

Comments
 (0)