Skip to content

Commit b0125b6

Browse files
Allow for external event serialization without depending or extending internal external types
1 parent 39cf304 commit b0125b6

File tree

8 files changed

+225
-27
lines changed

8 files changed

+225
-27
lines changed

app/src/main/java/io/getstream/android/core/sample/client/StreamClient.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import io.getstream.android.core.api.authentication.StreamTokenManager
2020
import io.getstream.android.core.api.authentication.StreamTokenProvider
2121
import io.getstream.android.core.api.log.StreamLogger
2222
import io.getstream.android.core.api.log.StreamLoggerProvider
23+
import io.getstream.android.core.api.model.config.StreamClientSerializationConfig
2324
import io.getstream.android.core.api.model.value.StreamApiKey
2425
import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader
2526
import io.getstream.android.core.api.model.value.StreamUserId
@@ -28,6 +29,7 @@ import io.getstream.android.core.api.processing.StreamBatcher
2829
import io.getstream.android.core.api.processing.StreamRetryProcessor
2930
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
3031
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
32+
import io.getstream.android.core.api.serialization.StreamProductEventSerialization
3133
import io.getstream.android.core.api.socket.StreamConnectionIdHolder
3234
import io.getstream.android.core.api.socket.StreamWebSocketFactory
3335
import io.getstream.android.core.api.socket.listeners.StreamClientListener
@@ -103,6 +105,11 @@ fun createStreamClient(
103105
connectionIdHolder = connectionIdHolder,
104106
socketFactory = socketFactory,
105107
healthMonitor = healthMonitor,
108+
serializationConfig = StreamClientSerializationConfig.default(object :
109+
StreamProductEventSerialization<Unit> {
110+
override fun serialize(data: Unit): Result<String> = Result.success("")
111+
override fun deserialize(raw: String): Result<Unit> = Result.success(Unit)
112+
}),
106113
batcher = batcher,
107114
)
108115
}

stream-android-core/src/main/java/io/getstream/android/core/api/StreamClient.kt

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.getstream.android.core.api.processing.StreamRetryProcessor
3232
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
3333
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
3434
import io.getstream.android.core.api.serialization.StreamClientEventSerialization
35+
import io.getstream.android.core.api.serialization.StreamProductEventSerialization
3536
import io.getstream.android.core.api.socket.StreamConnectionIdHolder
3637
import io.getstream.android.core.api.socket.StreamWebSocket
3738
import io.getstream.android.core.api.socket.StreamWebSocketFactory
@@ -40,6 +41,7 @@ import io.getstream.android.core.api.socket.monitor.StreamHealthMonitor
4041
import io.getstream.android.core.api.subscribe.StreamSubscription
4142
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
4243
import io.getstream.android.core.internal.client.StreamClientImpl
44+
import io.getstream.android.core.internal.serialization.StreamCompositeEventSerializationImpl
4345
import io.getstream.android.core.internal.serialization.StreamCompositeMoshiJsonSerialization
4446
import io.getstream.android.core.internal.serialization.StreamMoshiJsonSerializationImpl
4547
import io.getstream.android.core.internal.serialization.moshi.StreamCoreMoshiProvider
@@ -218,8 +220,7 @@ fun StreamClient(
218220
healthMonitor: StreamHealthMonitor,
219221
batcher: StreamBatcher<String>,
220222
// Serialization
221-
serializationConfig: StreamClientSerializationConfig =
222-
StreamClientSerializationConfig.defaults(),
223+
serializationConfig: StreamClientSerializationConfig,
223224
// Logging
224225
logProvider: StreamLoggerProvider = StreamLoggerProvider.defaultAndroidLogger(),
225226
): StreamClient {
@@ -256,7 +257,6 @@ fun StreamClient(
256257
serialQueue = serialQueue,
257258
connectionIdHolder = connectionIdHolder,
258259
logger = clientLogger,
259-
retryProcessor = retryProcessor,
260260
mutableConnectionState = MutableStateFlow(StreamConnectionState.Idle),
261261
subscriptionManager = clientSubscriptionManager,
262262
socketSession =
@@ -271,8 +271,12 @@ fun StreamClient(
271271
),
272272
jsonSerialization = compositeSerialization,
273273
eventParser =
274-
serializationConfig.eventParser
275-
?: StreamClientEventSerialization(compositeSerialization),
274+
StreamCompositeEventSerializationImpl(
275+
internal = serializationConfig.eventParser
276+
?: StreamClientEventSerialization(compositeSerialization),
277+
external = serializationConfig.productEventSerializers,
278+
)
279+
,
276280
healthMonitor = healthMonitor,
277281
batcher = batcher,
278282
internalSocket = socket,

stream-android-core/src/main/java/io/getstream/android/core/api/model/config/StreamClientSerializationConfig.kt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package io.getstream.android.core.api.model.config
1818
import io.getstream.android.core.annotations.StreamCoreApi
1919
import io.getstream.android.core.api.serialization.StreamClientEventSerialization
2020
import io.getstream.android.core.api.serialization.StreamJsonSerialization
21+
import io.getstream.android.core.api.serialization.StreamProductEventSerialization
2122

2223
/**
2324
* Configuration for serialization and deserialization in the Stream client.
@@ -31,31 +32,41 @@ data class StreamClientSerializationConfig
3132
private constructor(
3233
val json: StreamJsonSerialization? = null,
3334
val eventParser: StreamClientEventSerialization? = null,
35+
val productEventSerializers: StreamProductEventSerialization<*>,
36+
val internalTypes: Set<String> = setOf("connection.ok", "connection.error", "health.check"),
37+
val alsoExternal: Set<String> = emptySet(),
3438
) {
3539
companion object {
3640
/**
3741
* Creates a default [StreamClientSerializationConfig]. Using the internal implementations.
3842
*
43+
* @param productEvents The product event serializers.
44+
* @param alsoExternal The event types to also parse as external.
3945
* @return A default [StreamClientSerializationConfig].
4046
*/
41-
fun defaults() = StreamClientSerializationConfig()
47+
fun <T> default(productEvents: StreamProductEventSerialization<T>, alsoExternal: Set<String> = emptySet()) =
48+
StreamClientSerializationConfig(productEventSerializers = productEvents, alsoExternal = alsoExternal)
4249

4350
/**
4451
* Creates a [StreamClientSerializationConfig] with the given JSON serialization.
4552
*
4653
* @param serialization The JSON serialization implementation.
54+
* @param productEvents The product event serializers.
55+
* @param alsoExternal The event types to also parse as external.
4756
* @return A [StreamClientSerializationConfig] with the given JSON serialization.
4857
*/
49-
fun json(serialization: StreamJsonSerialization) =
50-
StreamClientSerializationConfig(json = serialization)
58+
fun <T> json(serialization: StreamJsonSerialization, productEvents: StreamProductEventSerialization<T>, alsoExternal: Set<String> = emptySet()) =
59+
StreamClientSerializationConfig(json = serialization, productEventSerializers = productEvents, alsoExternal = alsoExternal)
5160

5261
/**
5362
* Creates a [StreamClientSerializationConfig] with the given event parsing.
5463
*
5564
* @param serialization The event parsing implementation.
65+
* @param productEvents The product event serializers.
66+
* @param alsoExternal The event types to also parse as external.
5667
* @return A [StreamClientSerializationConfig] with the given event parsing.
5768
*/
58-
fun event(serialization: StreamClientEventSerialization) =
59-
StreamClientSerializationConfig(eventParser = serialization)
69+
fun <T> event(serialization: StreamClientEventSerialization, productEvents: StreamProductEventSerialization<T>, alsoExternal: Set<String> = emptySet()) =
70+
StreamClientSerializationConfig(eventParser = serialization, productEventSerializers = productEvents, alsoExternal = alsoExternal)
6071
}
6172
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.getstream.android.core.api.serialization
2+
3+
import io.getstream.android.core.annotations.StreamCoreApi
4+
5+
/**
6+
* Interface for serializing and deserializing product event objects.
7+
*
8+
* All operations return a [Result] so callers can safely propagate failures (e.g. malformed input,
9+
* I/O errors, schema mismatches) without throwing.
10+
*
11+
* @param T The product event type handled by this interface.
12+
*/
13+
@StreamCoreApi
14+
interface StreamProductEventSerialization<T> {
15+
16+
/**
17+
* Encodes a product event into a [String] suitable for transport or storage.
18+
*
19+
* @param data The event to serialize.
20+
* @return `Result.success(String)` when encoding succeeds, or `Result.failure(Throwable)` when
21+
* the process fails.
22+
*/
23+
fun serialize(data: T): Result<String>
24+
25+
/**
26+
* Decodes a product event from a [String] representation.
27+
*
28+
* @param raw The string to deserialize.
29+
* @return `Result.success(T)` when decoding succeeds, or `Result.failure(Throwable)` when
30+
* the process fails.
31+
*/
32+
fun deserialize(raw: String): Result<T>
33+
}

stream-android-core/src/main/java/io/getstream/android/core/api/socket/listeners/StreamClientListener.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ interface StreamClientListener {
3939
*
4040
* @param event The event received from the WebSocket.
4141
*/
42-
fun onEvent(event: StreamClientWsEvent) {}
42+
fun onEvent(event: Any) {}
4343

4444
/**
4545
* Called when an error occurs on the client.

stream-android-core/src/main/java/io/getstream/android/core/internal/client/StreamClientImpl.kt

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import io.getstream.android.core.api.log.StreamLogger
2121
import io.getstream.android.core.api.model.StreamTypedKey.Companion.randomExecutionKey
2222
import io.getstream.android.core.api.model.connection.StreamConnectedUser
2323
import io.getstream.android.core.api.model.connection.StreamConnectionState
24-
import io.getstream.android.core.api.model.event.StreamClientWsEvent
2524
import io.getstream.android.core.api.model.value.StreamUserId
26-
import io.getstream.android.core.api.processing.StreamRetryProcessor
2725
import io.getstream.android.core.api.processing.StreamSerialProcessingQueue
2826
import io.getstream.android.core.api.processing.StreamSingleFlightProcessor
2927
import io.getstream.android.core.api.socket.StreamConnectionIdHolder
@@ -39,14 +37,13 @@ import kotlinx.coroutines.flow.StateFlow
3937
import kotlinx.coroutines.flow.asStateFlow
4038
import kotlinx.coroutines.flow.update
4139

42-
internal class StreamClientImpl(
40+
internal class StreamClientImpl<T>(
4341
private val userId: StreamUserId,
4442
private val tokenManager: StreamTokenManager,
4543
private val singleFlight: StreamSingleFlightProcessor,
4644
private val serialQueue: StreamSerialProcessingQueue,
47-
private val retryProcessor: StreamRetryProcessor,
4845
private val connectionIdHolder: StreamConnectionIdHolder,
49-
private val socketSession: StreamSocketSession,
46+
private val socketSession: StreamSocketSession<T>,
5047
private val mutableConnectionState: MutableStateFlow<StreamConnectionState>,
5148
private val logger: StreamLogger,
5249
private val subscriptionManager: StreamSubscriptionManager<StreamClientListener>,
@@ -77,13 +74,14 @@ internal class StreamClientImpl(
7774
socketSession
7875
.subscribe(
7976
object : StreamClientListener {
77+
8078
override fun onState(state: StreamConnectionState) {
8179
logger.v { "[client#onState]: $state" }
8280
mutableConnectionState.update(state)
8381
subscriptionManager.forEach { it.onState(state) }
8482
}
8583

86-
override fun onEvent(event: StreamClientWsEvent) {
84+
override fun onEvent(event: Any) {
8785
logger.v { "[client#onEvent]: $event" }
8886
subscriptionManager.forEach { it.onEvent(event) }
8987
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package io.getstream.android.core.internal.serialization
2+
3+
import com.squareup.moshi.JsonReader
4+
import io.getstream.android.core.api.model.event.StreamClientWsEvent
5+
import io.getstream.android.core.api.serialization.StreamClientEventSerialization
6+
import io.getstream.android.core.api.serialization.StreamProductEventSerialization
7+
import io.getstream.android.core.api.utils.runCatchingCancellable
8+
import okio.Buffer
9+
10+
/**
11+
* Represents a composite event that can be either a [StreamClientWsEvent] or a product-specific event.
12+
*
13+
* @param T The type of the product-specific event.
14+
*/
15+
internal class StreamCompositeSerializationEvent<T> private constructor(
16+
val core: StreamClientWsEvent? = null,
17+
val product: T? = null,
18+
) {
19+
companion object {
20+
/**
21+
* Creates a new [StreamCompositeSerializationEvent] with the given [event].
22+
*
23+
* @param event The event to wrap.
24+
* @return A new [StreamCompositeSerializationEvent] with the given [event].
25+
*/
26+
fun <T> internal(event: StreamClientWsEvent) = StreamCompositeSerializationEvent<T>(core = event)
27+
28+
/**
29+
* Creates a new [StreamCompositeSerializationEvent] with the given [event].
30+
*
31+
* @param event The event to wrap.
32+
* @return A new [StreamCompositeSerializationEvent] with the given [event].
33+
*/
34+
fun <T> external(event: T) = StreamCompositeSerializationEvent(product = event)
35+
36+
/**
37+
* Creates a new [StreamCompositeSerializationEvent] with the given [event].
38+
*
39+
* @param event The event to wrap.
40+
* @return A new [StreamCompositeSerializationEvent] with the given [event].
41+
*/
42+
fun <T> both(event: StreamClientWsEvent, product: T) =
43+
StreamCompositeSerializationEvent(core = event, product = product)
44+
}
45+
}
46+
47+
/**
48+
* Serializes and deserializes [StreamCompositeSerializationEvent] objects.
49+
*
50+
* @param T The type of the product-specific event.
51+
*/
52+
internal class StreamCompositeEventSerializationImpl<T>(
53+
private val internal: StreamClientEventSerialization,
54+
private val external: StreamProductEventSerialization<T>,
55+
private val internalTypes: Set<String> = setOf("connection.ok", "connection.error", "health.check"),
56+
private val alsoExternal: Set<String> = emptySet(),
57+
) {
58+
/**
59+
* Serializes the given [data] into a [String].
60+
*
61+
* @param data The data to serialize.
62+
* @return `Result.success(String)` when encoding succeeds, or `Result.failure(Throwable)` when
63+
* the process fails.
64+
*/
65+
fun serialize(data: StreamCompositeSerializationEvent<T>): Result<String> {
66+
data.core?.let { return internal.serialize(it) }
67+
data.product?.let { return external.serialize(it) }
68+
return Result.failure(NullPointerException())
69+
}
70+
71+
/**
72+
* Deserializes the given [raw] string into a [StreamCompositeSerializationEvent].
73+
*
74+
* @param raw The string to deserialize.
75+
* @return `Result.success(StreamCompositeSerializationEvent)` when decoding succeeds, or
76+
* `Result.failure(Throwable)` when the process fails.
77+
*/
78+
fun deserialize(raw: String): Result<StreamCompositeSerializationEvent<T>> =
79+
runCatchingCancellable {
80+
val type = peekType(raw)
81+
return try {
82+
when (type) {
83+
null -> {
84+
val ext = external
85+
ext.deserialize(raw).map { StreamCompositeSerializationEvent.external(it) }
86+
}
87+
in alsoExternal -> {
88+
val coreSer = internal
89+
val extSer = external
90+
val core = coreSer.deserialize(raw).getOrThrow()
91+
val prod = extSer.deserialize(raw).getOrThrow()
92+
Result.success(StreamCompositeSerializationEvent.both(core, prod))
93+
}
94+
in internalTypes -> {
95+
val coreSer = internal
96+
coreSer.deserialize(raw).map { StreamCompositeSerializationEvent.internal(it) }
97+
}
98+
else -> {
99+
val ext = external
100+
ext.deserialize(raw).map { StreamCompositeSerializationEvent.external(it) }
101+
}
102+
}
103+
} catch (e: Throwable) {
104+
Result.failure(e)
105+
}
106+
}
107+
108+
private fun peekType(raw: String): String? {
109+
val reader = JsonReader.of(Buffer().writeUtf8(raw))
110+
reader.isLenient = true
111+
return try {
112+
if (reader.peek() != JsonReader.Token.BEGIN_OBJECT) return null
113+
reader.beginObject()
114+
var result: String? = null
115+
while (reader.hasNext()) {
116+
val name = reader.nextName()
117+
if (name == "type" && reader.peek() == JsonReader.Token.STRING) {
118+
result = reader.nextString()
119+
// consume the rest to keep reader state valid
120+
while (reader.hasNext()) reader.skipValue()
121+
break
122+
} else {
123+
reader.skipValue()
124+
}
125+
}
126+
if (reader.peek() == JsonReader.Token.END_OBJECT) reader.endObject()
127+
result
128+
} catch (_: Throwable) {
129+
null
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)