Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions stream-chat-android-state/api/stream-chat-android-state.api
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,42 @@ public final class io/getstream/chat/android/state/plugin/config/ChannelMessageL
public fun toString ()Ljava/lang/String;
}

public final class io/getstream/chat/android/state/plugin/config/MessageBufferConfig {
public fun <init> ()V
public fun <init> (Ljava/util/Set;ILio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;)V
public synthetic fun <init> (Ljava/util/Set;ILio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/util/Set;
public final fun component2 ()I
public final fun component3 ()Lio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;
public final fun copy (Ljava/util/Set;ILio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;)Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;Ljava/util/Set;ILio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
public fun equals (Ljava/lang/Object;)Z
public final fun getCapacity ()I
public final fun getChannelTypes ()Ljava/util/Set;
public final fun getOverflow ()Lio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class io/getstream/chat/android/state/plugin/config/MessageBufferOverflow : java/lang/Enum {
public static final field DROP_LATEST Lio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;
public static final field DROP_OLDEST Lio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;
public static fun getEntries ()Lkotlin/enums/EnumEntries;
public static fun valueOf (Ljava/lang/String;)Lio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;
public static fun values ()[Lio/getstream/chat/android/state/plugin/config/MessageBufferOverflow;
}

public final class io/getstream/chat/android/state/plugin/config/MessageLimitConfig {
public fun <init> ()V
public fun <init> (Ljava/util/Set;)V
public synthetic fun <init> (Ljava/util/Set;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;)V
public synthetic fun <init> (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/util/Set;
public final fun copy (Ljava/util/Set;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;Ljava/util/Set;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
public final fun component2 ()Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
public final fun copy (Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
public static synthetic fun copy$default (Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;Ljava/util/Set;Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;ILjava/lang/Object;)Lio/getstream/chat/android/state/plugin/config/MessageLimitConfig;
public fun equals (Ljava/lang/Object;)Z
public final fun getChannelMessageLimits ()Ljava/util/Set;
public final fun getMessageBufferConfig ()Lio/getstream/chat/android/state/plugin/config/MessageBufferConfig;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ import io.getstream.chat.android.state.event.handler.internal.batch.BatchEvent
import io.getstream.chat.android.state.event.handler.internal.batch.SocketEventCollector
import io.getstream.chat.android.state.event.handler.internal.utils.realType
import io.getstream.chat.android.state.event.handler.internal.utils.toChannelUserRead
import io.getstream.chat.android.state.plugin.config.MessageBufferConfig
import io.getstream.chat.android.state.plugin.config.MessageBufferOverflow
import io.getstream.chat.android.state.plugin.logic.channel.internal.ChannelLogic
import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry
import io.getstream.chat.android.state.plugin.logic.querychannels.internal.QueryChannelsLogic
Expand All @@ -128,6 +130,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.StateFlow
Expand Down Expand Up @@ -157,6 +160,7 @@ internal class EventHandlerSequential(
private val repos: RepositoryFacade,
private val sideEffect: suspend () -> Unit,
private val syncedEvents: Flow<List<ChatEvent>>,
private val bufferConfig: MessageBufferConfig,
scope: CoroutineScope,
) : EventHandler {

Expand All @@ -167,12 +171,61 @@ internal class EventHandlerSequential(

private val mutex = Mutex()
private val socketEvents = MutableSharedFlow<ChatEvent>(extraBufferCapacity = Int.MAX_VALUE)

/**
* Secondary flow used only when [bufferConfig] opts specific channel types into a bounded buffer.
* Allocated lazily so the default configuration pays no cost for it.
*/
private val bufferedNewMessageEvents: MutableSharedFlow<ChatEvent> by lazy {
MutableSharedFlow(
extraBufferCapacity = bufferConfig.capacity,
onBufferOverflow = when (bufferConfig.overflow) {
MessageBufferOverflow.DROP_OLDEST -> BufferOverflow.DROP_OLDEST
MessageBufferOverflow.DROP_LATEST -> BufferOverflow.DROP_LATEST
},
)
}
private val socketEventCollector = SocketEventCollector(scope) { batchEvent ->
handleBatchEvent(batchEvent)
}

private var eventsDisposable: Disposable = EMPTY_DISPOSABLE

/**
* Default listener — emits every event into the unbuffered [socketEvents] flow without
* inspecting [bufferConfig]. Used whenever no channel types are opted in for buffering.
*/
private val defaultSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
logEmitOutcome(event, socketEvents.tryEmit(event))
}

/**
* Listener used only when [bufferConfig] opts specific channel types into a bounded buffer.
* Routes matching [NewMessageEvent]s to [bufferedNewMessageEvents] and everything else to
* [socketEvents].
*/
private val bufferedSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
val target = if (event is NewMessageEvent && event.channelType in bufferConfig.channelTypes) {
bufferedNewMessageEvents
} else {
socketEvents
}
logEmitOutcome(event, target.tryEmit(event))
}
Comment thread
gpunto marked this conversation as resolved.

private fun logEmitOutcome(event: ChatEvent, emitted: Boolean) {
if (emitted) {
val cCount = collectedCount.get()
val eCount = emittedCount.incrementAndGet()
val ratio = eCount.toDouble() / cCount.toDouble()
StreamLog.v(TAG_SOCKET) {
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
}
} else {
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
}
}

init {
logger.d { "<init> no args" }
}
Expand All @@ -199,26 +252,23 @@ internal class EventHandlerSequential(
)
}
}
scope.launch {
socketEvents.collect { event ->
collectedCount.incrementAndGet()
initJob.join()
sideEffect()
socketEventCollector.collect(event)
}
val collectSocketEvent: suspend (ChatEvent) -> Unit = { event ->
collectedCount.incrementAndGet()
initJob.join()
sideEffect()
socketEventCollector.collect(event)
}
eventsDisposable = subscribeForEvents { event ->
if (socketEvents.tryEmit(event)) {
val cCount = collectedCount.get()
val eCount = emittedCount.incrementAndGet()
val ratio = eCount.toDouble() / cCount.toDouble()
StreamLog.v(TAG_SOCKET) {
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
}
} else {
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
}
scope.launch { socketEvents.collect(collectSocketEvent) }
val isBufferingEnabled = bufferConfig.channelTypes.isNotEmpty()
if (isBufferingEnabled) {
scope.launch { bufferedNewMessageEvents.collect(collectSocketEvent) }
}
val activeListener = if (isBufferingEnabled) {
bufferedSocketEventListener
} else {
defaultSocketEventListener
}
eventsDisposable = subscribeForEvents(activeListener)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,14 @@ public data class StatePluginConfig @JvmOverloads constructor(
* @param channelMessageLimits A set of [ChannelMessageLimit] defining the maximum number of messages to keep in
* memory for different channel types. By default, this is an empty set, meaning no limits are applied and all
* messages are kept in memory. Each channel type can have its own limit configured independently.
*
* @param messageBufferConfig Configuration for bounding the inbound `NewMessageEvent` buffer on selected channel
* types. By default, no buffering is applied — events flow through the unbuffered path. See [MessageBufferConfig]
* for details and trade-offs.
*/
public data class MessageLimitConfig(
public val channelMessageLimits: Set<ChannelMessageLimit> = setOf(),
public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(),
)

/**
Expand Down Expand Up @@ -161,3 +166,75 @@ public data class ChannelMessageLimit(
public val channelType: String,
public val baseLimit: Int,
)

/**
* Configuration for buffering inbound `NewMessageEvent`s for specific channel types before they
* are dispatched to the sequential event-handling pipeline.
*
* High-traffic channel types (e.g. livestreams) can produce a flood of new-message events that
* arrive faster than they can be processed sequentially. This configuration applies a bounded
* buffer with a configurable overflow strategy (e.g. drop oldest) for `NewMessageEvent`s on the
* configured channel types only. Events for other channel types — and all non-`NewMessageEvent`
* events — continue to flow through the default unbuffered path with `Int.MAX_VALUE` capacity,
* so signal-critical events such as reads, bans or member updates are never dropped.
*
* By default this is a no-op: no channel types are configured, so the buffered code path is not
* active and the SDK behaves exactly as if this configuration did not exist.
*
* **Event ordering caveat.** When buffering is active, `NewMessageEvent`s for opted-in channel
* types flow through a separate buffer from all other events. As a consequence, the relative
* ordering between buffered `NewMessageEvent`s and non-buffered events (e.g. `ReactionNewEvent`,
* `MessageUpdatedEvent`) for the same channel is **not guaranteed** — a reaction added to
* message X may be processed before the `NewMessageEvent` for X. Because this configuration
* already tolerates dropping events on overflow, callers opting in are expected to tolerate
* this consistency relaxation as well.
*
* Example — drop the oldest pending `NewMessageEvent` for `messaging` channels when more than
* 100 are queued:
* ```kotlin
* StatePluginConfig(
* messageLimitConfig = MessageLimitConfig(
* messageBufferConfig = MessageBufferConfig(
* channelTypes = setOf("messaging"),
* capacity = 100,
* overflow = MessageBufferOverflow.DROP_OLDEST,
* ),
* ),
* )
* ```
*
* @param channelTypes The set of channel types whose `NewMessageEvent`s should be routed through
* the bounded buffer. Channel types not in this set continue to use the unbuffered path. When
* this set is empty (the default), buffering is disabled entirely and the per-event channel-type
* check is skipped.
*
* @param capacity The maximum number of `NewMessageEvent`s that can be queued in the buffer
* while the consumer is busy. Once exceeded, [overflow] decides which event to drop. Defaults to
* `Int.MAX_VALUE`, which effectively disables overflow.
*
* @param overflow The strategy applied when the buffer is full:
* - [MessageBufferOverflow.DROP_OLDEST] (default): the oldest queued event is evicted to make
* room for the new one. Useful for live channels where freshness matters more than completeness.
* - [MessageBufferOverflow.DROP_LATEST]: the newest event is discarded and the queued events are
* kept.
*/
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
)

/**
* Strategy applied when the [MessageBufferConfig] buffer is full.
*
* Mirrors a subset of [kotlinx.coroutines.channels.BufferOverflow]: the suspending strategy is
* intentionally excluded because the SDK emits into the buffer via non-suspending `tryEmit`,
* which makes the suspending semantics unreachable.
*/
public enum class MessageBufferOverflow {
/** Evict the oldest queued event to make room for the new one. */
DROP_OLDEST,

/** Discard the newest event and keep the queued events. */
DROP_LATEST,
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import io.getstream.chat.android.models.User
import io.getstream.chat.android.state.errorhandler.StateErrorHandlerFactory
import io.getstream.chat.android.state.event.handler.internal.EventHandler
import io.getstream.chat.android.state.event.handler.internal.EventHandlerSequential
import io.getstream.chat.android.state.plugin.config.MessageBufferConfig
import io.getstream.chat.android.state.plugin.config.StatePluginConfig
import io.getstream.chat.android.state.plugin.internal.StatePlugin
import io.getstream.chat.android.state.plugin.logic.internal.LogicRegistry
Expand Down Expand Up @@ -151,6 +152,7 @@ public class StreamStatePluginFactory(
repos = repositoryFacade,
syncedEvents = syncManager.syncedEvents,
sideEffect = syncManager::awaitSyncing,
bufferConfig = config.messageLimitConfig.messageBufferConfig,
)

if (config.backgroundSyncEnabled) {
Expand Down Expand Up @@ -192,6 +194,7 @@ public class StreamStatePluginFactory(
repos: RepositoryFacade,
sideEffect: suspend () -> Unit,
syncedEvents: Flow<List<ChatEvent>>,
bufferConfig: MessageBufferConfig,
): EventHandler {
return EventHandlerSequential(
scope = scope,
Expand All @@ -204,6 +207,7 @@ public class StreamStatePluginFactory(
repos = repos,
syncedEvents = syncedEvents,
sideEffect = sideEffect,
bufferConfig = bufferConfig,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.getstream.chat.android.models.ChannelCapabilities
import io.getstream.chat.android.models.User
import io.getstream.chat.android.state.event.handler.internal.EventHandler
import io.getstream.chat.android.state.event.handler.internal.EventHandlerSequential
import io.getstream.chat.android.state.plugin.config.MessageBufferConfig
import io.getstream.chat.android.state.plugin.state.global.internal.MutableGlobalState
import io.getstream.chat.android.test.TestCoroutineExtension
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand Down Expand Up @@ -146,6 +147,7 @@ internal class TotalUnreadCountTest {
repos = repos,
sideEffect = sideEffect,
syncedEvents = syncedEvents,
bufferConfig = MessageBufferConfig(),
)

fun givenMockedRepositories(): Fixture {
Expand Down
Loading
Loading