Add MessageBufferingConfig to allow custom back-pressure config for message.new events#6406
Add MessageBufferingConfig to allow custom back-pressure config for message.new events#6406VelikovPetar wants to merge 4 commits into
MessageBufferingConfig to allow custom back-pressure config for message.new events#6406Conversation
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
SDK Size Comparison 📏
|
|
This pull request has been automatically marked as stale because it has been inactive for 14 days. It will be closed in 7 days if no further activity occurs. |
WalkthroughThis PR extends message limit configuration with per-channel-type buffering control. A new ChangesMessage Buffering Configuration and Implementation
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt (1)
211-222: 💤 Low valuePotential division by zero in ratio calculation.
When
collectedCountis 0 (before any events are collected), the ratio calculationeCount.toDouble() / cCount.toDouble()will produceInfinity. While this won't crash (Kotlin/JVM handles it gracefully), the logged ratio value will be misleading.🔧 Suggested fix
private fun logEmitOutcome(event: ChatEvent, emitted: Boolean) { if (emitted) { val cCount = collectedCount.get() val eCount = emittedCount.incrementAndGet() - val ratio = eCount.toDouble() / cCount.toDouble() + val ratio = if (cCount > 0) eCount.toDouble() / cCount.toDouble() else 0.0 StreamLog.v(TAG_SOCKET) { "[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)" } } else { StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" } } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt` around lines 211 - 222, In logEmitOutcome, avoid dividing by zero when computing ratio using collectedCount; change the ratio calculation in the emitted branch of logEmitOutcome (which reads collectedCount.get() and emittedCount.incrementAndGet()) to compute ratio only when cCount > 0 (e.g., ratio = if (cCount > 0) eCount.toDouble() / cCount.toDouble() else 0.0) so the logged value is meaningful and not Infinity; keep the existing increments and logging call to StreamLog.v(TAG_SOCKET) but use the guarded ratio variable.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In
`@stream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.kt`:
- Around line 211-222: In logEmitOutcome, avoid dividing by zero when computing
ratio using collectedCount; change the ratio calculation in the emitted branch
of logEmitOutcome (which reads collectedCount.get() and
emittedCount.incrementAndGet()) to compute ratio only when cCount > 0 (e.g.,
ratio = if (cCount > 0) eCount.toDouble() / cCount.toDouble() else 0.0) so the
logged value is meaningful and not Infinity; keep the existing increments and
logging call to StreamLog.v(TAG_SOCKET) but use the guarded ratio variable.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 7572b91e-7416-4e90-a452-8fd4d36ac732
📒 Files selected for processing (7)
stream-chat-android-state/api/stream-chat-android-state.apistream-chat-android-state/src/main/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequential.ktstream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/config/StatePluginConfig.ktstream-chat-android-state/src/main/java/io/getstream/chat/android/state/plugin/factory/StreamStatePluginFactory.ktstream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/TotalUnreadCountTest.ktstream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialTest.ktstream-chat-android-state/src/test/java/io/getstream/chat/android/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt
|


Goal
High-traffic channel types (e.g. livestreams) can produce a flood of
message.newevents that arrive faster than the sequential event-handling pipeline can process them. The current implementation funnels every socket event through a singleMutableSharedFlowwithextraBufferCapacity = Int.MAX_VALUE, which means there is no back-pressure: a burst of new-message events queues up unbounded memory and starves more important signals (reads, bans, member updates) of timely processing.This PR introduces a
MessageBufferConfigthat lets integrators opt specific channel types into a bounded buffer forNewMessageEvents, with a configurable overflow strategy (SUSPEND/DROP_OLDEST/DROP_LATEST). Signal-critical events and events for non-opted-in channel types are unaffected.Implementation
MessageBufferConfig(underMessageLimitConfig.messageBufferConfig) exposing:channelTypes: Set<String>— channel types whoseNewMessageEvents go through the bounded buffer (empty by default → feature is a no-op).capacity: Int— buffer capacity (defaults toInt.MAX_VALUE).overflow: BufferOverflow— overflow strategy (defaults toSUSPEND).EventHandlerSequentialnow allocates a secondaryMutableSharedFlow(bufferedNewMessageEvents) lazily, only when buffering is enabled, so the default configuration pays no cost for it.defaultSocketEventListener— the existing unbuffered path; used when no channel types are opted in.bufferedSocketEventListener— routesNewMessageEvents for opted-in channel types to the bounded flow, and everything else (including non-opted-inNewMessageEvents and all other event types) to the unbuffered flow.startListening()picks the listener based onbufferConfig.channelTypes.isNotEmpty()and only collects frombufferedNewMessageEventswhen buffering is enabled.StreamStatePluginFactorywires the config fromStatePluginConfig.messageLimitConfig.messageBufferConfigintoEventHandlerSequential.The bounded flow shares the same downstream pipeline (
socketEventCollector→handleBatchEvent) as the unbuffered flow, so ordering inside each flow is preserved and back-pressure is applied independently per flow.Testing
EventHandlerSequentialTestcovering:channelTypesis empty.NewMessageEvents for opted-in channel types are routed through the bounded buffer.NewMessageEvents for non-opted-in channel types and all non-NewMessageEventevents keep using the unbuffered path.DROP_OLDEST/DROP_LATEST/SUSPENDoverflow strategies behave as expected when the buffer is full.TotalUnreadCountTest,EventHandlerSequentialUserMessagesDeletedTest) updated to pass the newbufferConfigargument.Summary by CodeRabbit
Release Notes