Skip to content

Commit beb38ca

Browse files
Readme and better example in StreamClient
1 parent b21d763 commit beb38ca

File tree

2 files changed

+155
-91
lines changed

2 files changed

+155
-91
lines changed

README.md

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,122 @@
1-
# stream-android-core
2-
Core SDK for the Android products.
1+
# Stream Android Core
2+
3+
> **Internal Stream SDK**
4+
> This repository is **for Stream products only**. It is not intended for public consumption or direct integration by third-party apps.
5+
6+
## Overview
7+
**Stream Android Core** is the foundational library powering Stream’s Android SDKs (Chat, Video, Feeds, etc.). It provides shared primitives and infrastructure: authentication/token handling, connection & event lifecycle, retries/backoff, single-flight execution, serial processing queues, batching, and logging.
8+
9+
## Project structure
10+
- **app/** – Demo app used for local development and manual testing.
11+
- **stream-android-core/** – Core library: models, processors, lifecycle, queues, batching.
12+
- **stream-android-core-annotations/** – Internal annotations / processors supporting the core.
13+
- **stream-android-core-lint/** – Custom lint rules tailored for Stream codebases.
14+
- **config/** – Static analysis and style configs (ktlint, detekt, license headers).
15+
- **gradle/** – Gradle wrapper and version catalogs.
16+
17+
## Requirements
18+
- **minSdk**: 21+
19+
- **Kotlin**: 1.9+
20+
- **Coroutines**: 1.8+
21+
- **AGP**: 8.x+
22+
23+
24+
## What it offers
25+
- Shared models and value types
26+
- Token management and client/session lifecycle hooks
27+
- **Serial processing queue** for ordered, single-threaded coroutine work
28+
- **Single-flight processor** to dedupe concurrent identical tasks
29+
- **Retry processor** with linear/exponential backoff and give-up predicates
30+
- **Batcher** for efficient event aggregation
31+
- Internal annotations and custom lint rules
32+
- Demo app to validate changes end-to-end
33+
34+
## Instantiating a client
35+
36+
```kotlin
37+
val logProvider = StreamLoggerProvider.defaultAndroidLogger(
38+
minLevel = StreamLogger.LogLevel.Verbose,
39+
honorAndroidIsLoggable = true,
40+
)
41+
42+
val clientSubscriptionManager = StreamSubscriptionManager<StreamClientListener>(
43+
logger = logProvider.taggedLogger("SCClientSubscriptions"),
44+
maxStrongSubscriptions = 250,
45+
maxWeakSubscriptions = 250,
46+
)
47+
48+
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
49+
50+
val singleFlight = StreamSingleFlightProcessor(scope)
51+
val tokenManager = StreamTokenManager(userId, tokenProvider, singleFlight)
52+
53+
val serialQueue = StreamSerialProcessingQueue(
54+
logger = logProvider.taggedLogger("SCSerialProcessing"),
55+
scope = scope,
56+
)
57+
58+
val retryProcessor = StreamRetryProcessor(
59+
logger = logProvider.taggedLogger("SCRetryProcessor")
60+
)
61+
62+
val connectionIdHolder = StreamConnectionIdHolder()
63+
val socketFactory = StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory"))
64+
val healthMonitor = StreamHealthMonitor(logger = logProvider.taggedLogger("SCHealthMonitor"), scope = scope)
65+
66+
val batcher = StreamBatcher<String>(
67+
scope = scope,
68+
batchSize = 10,
69+
initialDelayMs = 100L,
70+
maxDelayMs = 1_000L,
71+
)
72+
73+
val client = StreamClient(
74+
scope = scope,
75+
apiKey = apiKey,
76+
userId = userId,
77+
wsUrl = wsUrl,
78+
products = listOf("feeds"),
79+
clientInfoHeader = clientInfoHeader,
80+
tokenProvider = tokenProvider,
81+
logProvider = logProvider,
82+
clientSubscriptionManager = clientSubscriptionManager,
83+
tokenManager = tokenManager,
84+
singleFlight = singleFlight,
85+
serialQueue = serialQueue,
86+
retryProcessor = retryProcessor,
87+
connectionIdHolder = connectionIdHolder,
88+
socketFactory = socketFactory,
89+
healthMonitor = healthMonitor,
90+
batcher = batcher,
91+
)
92+
```
93+
94+
> **Gotcha:** don’t pass the **same** subscription/queue manager instance to both the client and a nested session. Keep ownership boundaries clear to avoid event recursion.
95+
96+
## Processing mechanisms
97+
98+
- **Serial Processing Queue**
99+
Ordered, single-consumer coroutine pipeline. Backpressure is natural (FIFO).
100+
```kotlin
101+
serialQueue.start()
102+
serialQueue.enqueue { /* work in order */ }
103+
serialQueue.stop()
104+
```
105+
106+
- **Single-Flight Processor**
107+
Coalesces concurrent calls with the same key into one in-flight job; callers await the same result.
108+
109+
- **Retry Processor**
110+
Linear/exponential policy with `minRetries`, `maxRetries`, `initialDelayMillis`, optional `maxDelayMillis`, and a `giveUpFunction(attempt, Throwable)`.
111+
112+
- **Batcher**
113+
Collects items into batches based on size and/or debounce window, then flushes on the queue/scope.
114+
115+
## Factories & default implementations
116+
Public interfaces ship with convenience factory functions that return the default internal implementation (e.g., `StreamSerialProcessingQueue(...)``StreamSerialProcessingQueueImpl`). Prefer these factories in internal code; they keep call-sites stable while impls evolve. You can also provide custom implementations for testing or specialized behavior.
117+
118+
## License
119+
Copyright (c) 2014-2025 Stream.io Inc.
120+
121+
Licensed under the Stream License; see [LICENSE](https://github.com/GetStream/stream-android-base/blob/main/LICENSE).
122+
Unless required by applicable law or agreed to in writing, software distributed under the License is provided **“as is”**, without warranties or conditions of any kind.

app/src/main/java/io/getstream/android/core/sample/client/StreamClient.kt

Lines changed: 33 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import kotlinx.coroutines.CoroutineScope
4444
* @param wsUrl The WebSocket URL.
4545
* @param clientInfoHeader The client info header.
4646
* @param tokenProvider The token provider.
47-
* @param module The client module.
4847
* @return A new [createStreamClient] instance.
4948
*/
5049
fun createStreamClient(
@@ -54,100 +53,45 @@ fun createStreamClient(
5453
wsUrl: StreamWsUrl,
5554
clientInfoHeader: StreamHttpClientInfoHeader,
5655
tokenProvider: StreamTokenProvider,
57-
module: StreamClientModule = StreamClientModule.defaults(scope, userId, tokenProvider),
58-
): StreamClient =
59-
StreamClient(
56+
): StreamClient {
57+
val logProvider = StreamLoggerProvider.Companion.defaultAndroidLogger(
58+
minLevel = StreamLogger.LogLevel.Verbose,
59+
honorAndroidIsLoggable = false,
60+
)
61+
val clientSubscriptionManager = StreamSubscriptionManager<StreamClientListener>(
62+
logger = logProvider.taggedLogger("SCClientSubscriptions"),
63+
maxStrongSubscriptions = 250,
64+
maxWeakSubscriptions = 250,
65+
)
66+
val singleFlight = StreamSingleFlightProcessor(scope)
67+
val tokenManager = StreamTokenManager(userId, tokenProvider, singleFlight)
68+
val serialQueue = StreamSerialProcessingQueue(
69+
logger = logProvider.taggedLogger("SCSerialProcessing"),
70+
scope = scope,
71+
)
72+
val retryProcessor = StreamRetryProcessor(logger = logProvider.taggedLogger("SCRetryProcessor"))
73+
val connectionIdHolder = StreamConnectionIdHolder()
74+
val socketFactory = StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory"))
75+
val healthMonitor = StreamHealthMonitor(logger = logProvider.taggedLogger("SCHealthMonitor"), scope = scope)
76+
val batcher = StreamBatcher<String>(scope = scope, batchSize = 10, initialDelayMs = 100L, maxDelayMs = 1_000L)
77+
78+
return StreamClient(
6079
scope = scope,
6180
apiKey = apiKey,
6281
userId = userId,
6382
wsUrl = wsUrl,
6483
products = listOf("feeds"),
6584
clientInfoHeader = clientInfoHeader,
6685
tokenProvider = tokenProvider,
67-
logProvider = module.logProvider,
68-
clientSubscriptionManager = module.clientSubscriptionManager,
69-
tokenManager = module.tokenManager,
70-
singleFlight = module.singleFlight,
71-
serialQueue = module.serialQueue,
72-
retryProcessor = module.retryProcessor,
73-
connectionIdHolder = module.connectionIdHolder,
74-
socketFactory = module.socketFactory,
75-
healthMonitor = module.healthMonitor,
76-
batcher = module.batcher,
86+
logProvider = logProvider,
87+
clientSubscriptionManager = clientSubscriptionManager,
88+
tokenManager = tokenManager,
89+
singleFlight = singleFlight,
90+
serialQueue = serialQueue,
91+
retryProcessor = retryProcessor,
92+
connectionIdHolder = connectionIdHolder,
93+
socketFactory = socketFactory,
94+
healthMonitor = healthMonitor,
95+
batcher = batcher,
7796
)
78-
79-
/**
80-
* Holds configuration and dependencies for the Stream client, including logging, coroutine scope,
81-
* subscription managers, token management, processing queues, retry logic, connection handling,
82-
* socket factory, health monitoring, event debouncing, and JSON serialization.
83-
*
84-
* This module is intended to be used as a central place for client-wide resources and processors.
85-
*
86-
* @property logProvider Provides logging functionality for the client.
87-
* @property scope Coroutine scope used for async operations.
88-
* @property clientSubscriptionManager Manages subscriptions for client listeners.
89-
* @property tokenManager Handles authentication tokens.
90-
* @property singleFlight Ensures single execution of concurrent requests.
91-
* @property serialQueue Serializes processing of tasks.
92-
* @property retryProcessor Handles retry logic for failed operations.
93-
* @property connectionIdHolder Stores the current connection ID.
94-
* @property socketFactory Creates WebSocket connections.
95-
* @property healthMonitor Monitors connection health.
96-
* @property batcher Batches socket events for batch processing.
97-
*/
98-
@ConsistentCopyVisibility
99-
data class StreamClientModule
100-
private constructor(
101-
val logProvider: StreamLoggerProvider =
102-
StreamLoggerProvider.Companion.defaultAndroidLogger(
103-
minLevel = StreamLogger.LogLevel.Verbose,
104-
honorAndroidIsLoggable = false,
105-
),
106-
val scope: CoroutineScope,
107-
val clientSubscriptionManager: StreamSubscriptionManager<StreamClientListener> =
108-
StreamSubscriptionManager(
109-
logger = logProvider.taggedLogger("SCClientSubscriptions"),
110-
maxStrongSubscriptions = 250,
111-
maxWeakSubscriptions = 250,
112-
),
113-
val tokenManager: StreamTokenManager,
114-
val singleFlight: StreamSingleFlightProcessor = StreamSingleFlightProcessor(scope = scope),
115-
val serialQueue: StreamSerialProcessingQueue =
116-
StreamSerialProcessingQueue(
117-
logger = logProvider.taggedLogger("SCSerialProcessing"),
118-
scope = scope,
119-
),
120-
val retryProcessor: StreamRetryProcessor =
121-
StreamRetryProcessor(logger = logProvider.taggedLogger("SCRetryProcessor")),
122-
val connectionIdHolder: StreamConnectionIdHolder = StreamConnectionIdHolder(),
123-
val socketFactory: StreamWebSocketFactory =
124-
StreamWebSocketFactory(logger = logProvider.taggedLogger("SCWebSocketFactory")),
125-
val healthMonitor: StreamHealthMonitor =
126-
StreamHealthMonitor(logger = logProvider.taggedLogger("SCHealthMonitor"), scope = scope),
127-
val batcher: StreamBatcher<String> =
128-
StreamBatcher(scope = scope, batchSize = 10, initialDelayMs = 100L, maxDelayMs = 1_000L),
129-
) {
130-
companion object {
131-
/**
132-
* Creates a default [StreamClientModule] instance with recommended settings and
133-
* dependencies.
134-
*
135-
* @param scope Coroutine scope for async operations.
136-
* @param userId The user ID for authentication.
137-
* @param tokenProvider Provider for authentication tokens.
138-
* @return A configured [StreamClientModule] instance.
139-
*/
140-
fun defaults(
141-
scope: CoroutineScope,
142-
userId: StreamUserId,
143-
tokenProvider: StreamTokenProvider,
144-
): StreamClientModule {
145-
val singleFlight = StreamSingleFlightProcessor(scope)
146-
return StreamClientModule(
147-
scope = scope,
148-
singleFlight = singleFlight,
149-
tokenManager = StreamTokenManager(userId, tokenProvider, singleFlight),
150-
)
151-
}
152-
}
15397
}

0 commit comments

Comments
 (0)