Skip to content

Commit 1f26c8c

Browse files
committed
[Enhancement]Implement Store coordinator
1 parent dade6d4 commit 1f26c8c

File tree

8 files changed

+200
-33
lines changed

8 files changed

+200
-33
lines changed

Sources/StreamVideo/Utils/Store/Store.swift

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
5959

6060
/// Executor that processes actions through the pipeline.
6161
private let executor: StoreExecutor<Namespace>
62-
62+
63+
/// Coordinator that can skip redundant actions before execution.
64+
private let coordinator: StoreCoordinator<Namespace>
65+
6366
/// Publisher that holds and emits the current state.
6467
private let stateSubject: CurrentValueSubject<Namespace.State, Never>
6568

@@ -81,20 +84,23 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
8184
/// - middleware: Array of middleware for side effects.
8285
/// - logger: Logger for recording store operations.
8386
/// - executor: Executor for processing the action pipeline.
87+
/// - coordinator: Coordinator that validates actions before execution.
8488
init(
8589
identifier: String,
8690
initialState: Namespace.State,
8791
reducers: [Reducer<Namespace>],
8892
middleware: [Middleware<Namespace>],
8993
logger: StoreLogger<Namespace>,
90-
executor: StoreExecutor<Namespace>
94+
executor: StoreExecutor<Namespace>,
95+
coordinator: StoreCoordinator<Namespace>
9196
) {
9297
self.identifier = identifier
9398
stateSubject = .init(initialState)
9499
self.reducers = reducers
95100
self.middleware = []
96101
self.logger = logger
97102
self.executor = executor
103+
self.coordinator = coordinator
98104

99105
middleware.forEach { add($0) }
100106
}
@@ -241,17 +247,17 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
241247
/// logger.error("Action failed: \(error)")
242248
/// }
243249
/// ```
244-
250+
///
251+
/// - Returns: A ``StoreTask`` that can be awaited or ignored for
252+
/// fire-and-forget semantics.
245253
@discardableResult
246-
/// - Returns: A ``StoreTask`` that can be awaited for completion
247-
/// or ignored for fire-and-forget semantics.
248254
func dispatch(
249255
_ actions: [StoreActionBox<Namespace.Action>],
250256
file: StaticString = #file,
251257
function: StaticString = #function,
252258
line: UInt = #line
253259
) -> StoreTask<Namespace> {
254-
let task = StoreTask(executor: executor)
260+
let task = StoreTask(executor: executor, coordinator: coordinator)
255261
processingQueue.addTaskOperation { [weak self] in
256262
guard let self else {
257263
return
@@ -272,9 +278,13 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
272278
return task
273279
}
274280

281+
/// Dispatches a single boxed action asynchronously.
282+
///
283+
/// Wraps the action in an array and forwards to
284+
/// ``dispatch(_:file:function:line:)``.
285+
///
286+
/// - Returns: A ``StoreTask`` that can be awaited or ignored.
275287
@discardableResult
276-
/// - Returns: A ``StoreTask`` that can be awaited for completion
277-
/// or ignored for fire-and-forget semantics.
278288
func dispatch(
279289
_ action: StoreActionBox<Namespace.Action>,
280290
file: StaticString = #file,
@@ -289,9 +299,13 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
289299
)
290300
}
291301

302+
/// Dispatches multiple unboxed actions asynchronously.
303+
///
304+
/// Actions are boxed automatically before being forwarded to
305+
/// ``dispatch(_:file:function:line:)``.
306+
///
307+
/// - Returns: A ``StoreTask`` that can be awaited or ignored.
292308
@discardableResult
293-
/// - Returns: A ``StoreTask`` that can be awaited for completion
294-
/// or ignored for fire-and-forget semantics.
295309
func dispatch(
296310
_ actions: [Namespace.Action],
297311
file: StaticString = #file,
@@ -306,9 +320,13 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
306320
)
307321
}
308322

323+
/// Dispatches a single unboxed action asynchronously.
324+
///
325+
/// The action is boxed automatically and forwarded to
326+
/// ``dispatch(_:file:function:line:)``.
327+
///
328+
/// - Returns: A ``StoreTask`` that can be awaited or ignored.
309329
@discardableResult
310-
/// - Returns: A ``StoreTask`` that can be awaited for completion
311-
/// or ignored for fire-and-forget semantics.
312330
func dispatch(
313331
_ action: Namespace.Action,
314332
file: StaticString = #file,
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Foundation
6+
7+
/// Coordinates store actions to prevent redundant state transitions.
8+
///
9+
/// The coordinator evaluates an action against the current state before the
10+
/// store processes it.
11+
/// Implementations can override ``shouldExecute(action:state:)``
12+
/// to skip actions that would not yield a different state,
13+
/// reducing unnecessary work along the pipeline.
14+
class StoreCoordinator<Namespace: StoreNamespace>: @unchecked Sendable {
15+
16+
/// Determines whether an action should run for the provided state snapshot.
17+
///
18+
/// This default implementation always executes the action.
19+
/// Subclasses can override the method to run diffing logic or other
20+
/// heuristics that detect state changes and return `false` when the action
21+
/// can be safely skipped.
22+
///
23+
/// - Parameters:
24+
/// - action: The action that is about to be dispatched.
25+
/// - state: The current state before the action runs.
26+
/// - Returns: `true` to process the action; `false` to skip it.
27+
func shouldExecute(
28+
action: Namespace.Action,
29+
state: Namespace.State
30+
) -> Bool {
31+
true
32+
}
33+
}

Sources/StreamVideo/Utils/Store/StoreExecutor.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class StoreExecutor<Namespace: StoreNamespace>: @unchecked Sendable {
6363
file: StaticString,
6464
function: StaticString,
6565
line: UInt
66-
) async throws {
66+
) async throws -> Namespace.State {
6767
// Apply optional delay before processing action
6868
await action.applyDelayBeforeIfRequired()
6969

@@ -106,6 +106,8 @@ class StoreExecutor<Namespace: StoreNamespace>: @unchecked Sendable {
106106

107107
// Apply optional delay after successful processing
108108
await action.applyDelayAfterIfRequired()
109+
110+
return updatedState
109111
} catch {
110112
// Log failure and rethrow
111113
logger.didFail(

Sources/StreamVideo/Utils/Store/StoreLogger.swift

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ class StoreLogger<Namespace: StoreNamespace> {
4646
/// aggregation tools.
4747
let logSubsystem: LogSubsystem
4848

49+
/// Aggregated metrics recorded for dispatched actions.
50+
///
51+
/// Statistics are enabled in DEBUG builds to help monitor action
52+
/// throughput.
4953
let statistics: StoreStatistics<Namespace> = .init()
5054

5155
/// Initializes a new store logger.
@@ -56,7 +60,10 @@ class StoreLogger<Namespace: StoreNamespace> {
5660
self.logSubsystem = logSubsystem
5761

5862
#if DEBUG
59-
statistics.enable(interval: 60) { [weak self] in self?.report($0, interval: $1) }
63+
statistics.enable(interval: 60) {
64+
[weak self] numberOfActions, interval in
65+
self?.report(numberOfActions, interval: interval)
66+
}
6067
#endif
6168
}
6269

@@ -82,7 +89,38 @@ class StoreLogger<Namespace: StoreNamespace> {
8289
) {
8390
defer { statistics.record(action) }
8491
log.debug(
85-
"Store identifier:\(identifier) completed action:\(action) state:\(state).",
92+
"Store identifier:\(identifier) completed action:\(action) "
93+
+ "state:\(state).",
94+
subsystems: logSubsystem,
95+
functionName: function,
96+
fileName: file,
97+
lineNumber: line
98+
)
99+
}
100+
101+
/// Called when an action is skipped by the coordinator.
102+
///
103+
/// Override to customize logging or metrics for redundant actions
104+
/// that do not require processing.
105+
///
106+
/// - Parameters:
107+
/// - identifier: The store's unique identifier.
108+
/// - action: The action that was skipped.
109+
/// - state: The snapshot used when making the decision.
110+
/// - file: Source file where the action was dispatched.
111+
/// - function: Function where the action was dispatched.
112+
/// - line: Line number where the action was dispatched.
113+
func didSkip(
114+
identifier: String,
115+
action: Namespace.Action,
116+
state: Namespace.State,
117+
file: StaticString,
118+
function: StaticString,
119+
line: UInt
120+
) {
121+
defer { statistics.record(action) }
122+
log.debug(
123+
"Store identifier:\(identifier) skipped action:\(action).",
86124
subsystems: logSubsystem,
87125
functionName: function,
88126
fileName: file,
@@ -121,12 +159,21 @@ class StoreLogger<Namespace: StoreNamespace> {
121159
)
122160
}
123161

162+
/// Reports aggregated statistics for the store.
163+
///
164+
/// This hook is invoked on a timer when statistics tracking is
165+
/// enabled. Override to forward metrics or customize formatting.
166+
///
167+
/// - Parameters:
168+
/// - numberOfActions: Count of actions recorded in the interval.
169+
/// - interval: The time window for the reported statistics.
124170
func report(
125171
_ numberOfActions: Int,
126172
interval: TimeInterval
127173
) {
128174
log.debug(
129-
"Store identifier:\(Namespace.identifier) performs \(numberOfActions) per \(interval) seconds.",
175+
"Store identifier:\(Namespace.identifier) performs "
176+
+ "\(numberOfActions) per \(interval) seconds.",
130177
subsystems: logSubsystem
131178
)
132179
}

Sources/StreamVideo/Utils/Store/StoreNamespace.swift

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,33 @@ protocol StoreNamespace: Sendable {
8989
/// - Returns: An executor instance for this store.
9090
static func executor() -> StoreExecutor<Self>
9191

92+
/// Creates the coordinator for evaluating actions before execution.
93+
///
94+
/// Override to provide custom logic that skips redundant actions.
95+
///
96+
/// - Returns: A coordinator instance for this store.
97+
static func coordinator() -> StoreCoordinator<Self>
98+
9299
/// Creates a configured store instance.
93100
///
94101
/// This method assembles all components into a functioning store.
95102
/// The default implementation should work for most cases.
96103
///
97-
/// - Parameter initialState: The initial state for the store.
98-
///
104+
/// - Parameters:
105+
/// - initialState: The initial state for the store.
106+
/// - reducers: Reducers used to transform state.
107+
/// - middleware: Middleware that handle side effects.
108+
/// - logger: Logger responsible for diagnostics.
109+
/// - executor: Executor that runs the action pipeline.
110+
/// - coordinator: Coordinator that can skip redundant actions.
99111
/// - Returns: A fully configured store instance.
100112
static func store(
101113
initialState: State,
102114
reducers: [Reducer<Self>],
103115
middleware: [Middleware<Self>],
104116
logger: StoreLogger<Self>,
105-
executor: StoreExecutor<Self>
117+
executor: StoreExecutor<Self>,
118+
coordinator: StoreCoordinator<Self>
106119
) -> Store<Self>
107120
}
108121

@@ -122,6 +135,9 @@ extension StoreNamespace {
122135
/// Default implementation returns basic executor.
123136
static func executor() -> StoreExecutor<Self> { .init() }
124137

138+
/// Default implementation returns a coordinator with no skip logic.
139+
static func coordinator() -> StoreCoordinator<Self> { .init() }
140+
125141
/// Default implementation creates a store with all components.
126142
///
127143
/// This implementation:
@@ -131,20 +147,23 @@ extension StoreNamespace {
131147
/// 4. Adds middleware from `middleware()`
132148
/// 5. Uses logger from `logger()`
133149
/// 6. Uses executor from `executor()`
150+
/// 7. Uses coordinator from `coordinator()`
134151
static func store(
135152
initialState: State,
136153
reducers: [Reducer<Self>] = Self.reducers(),
137154
middleware: [Middleware<Self>] = Self.middleware(),
138155
logger: StoreLogger<Self> = Self.logger(),
139-
executor: StoreExecutor<Self> = Self.executor()
156+
executor: StoreExecutor<Self> = Self.executor(),
157+
coordinator: StoreCoordinator<Self> = Self.coordinator()
140158
) -> Store<Self> {
141159
.init(
142160
identifier: Self.identifier,
143161
initialState: initialState,
144162
reducers: reducers,
145163
middleware: middleware,
146164
logger: logger,
147-
executor: executor
165+
executor: executor,
166+
coordinator: coordinator
148167
)
149168
}
150169
}

Sources/StreamVideo/Utils/Store/StoreTask.swift

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import Combine
66
import Foundation
77

8-
/// A lightweight handle for a single dispatched store action.
8+
/// A lightweight handle for dispatched store actions.
99
///
10-
/// `StoreTask` coordinates the execution of one action via
11-
/// ``StoreExecutor`` and exposes a way to await the result. Callers can
10+
/// `StoreTask` coordinates the execution of one or more actions via
11+
/// ``StoreExecutor`` and ``StoreCoordinator``. Callers can
1212
/// dispatch-and-forget using `run(...)` and optionally await completion
1313
/// or failure later with ``result()``.
1414
///
@@ -22,27 +22,30 @@ final class StoreTask<Namespace: StoreNamespace>: Sendable {
2222
private enum State { case idle, running, completed, failed(Error) }
2323

2424
private let executor: StoreExecutor<Namespace>
25+
private let coordinator: StoreCoordinator<Namespace>
2526
private let resultSubject: CurrentValueSubject<State, Never> = .init(.idle)
2627

2728
init(
28-
executor: StoreExecutor<Namespace>
29+
executor: StoreExecutor<Namespace>,
30+
coordinator: StoreCoordinator<Namespace>
2931
) {
3032
self.executor = executor
33+
self.coordinator = coordinator
3134
}
3235

3336
// MARK: - Execution
3437

35-
/// Executes the given action through the store pipeline.
38+
/// Executes the given actions through the store pipeline.
3639
///
3740
/// The task transitions to `.running`, delegates to the
38-
/// ``StoreExecutor`` and records completion or failure. Errors are
39-
/// captured and can be retrieved by awaiting ``result()``.
41+
/// ``StoreExecutor`` and ``StoreCoordinator``, and records completion
42+
/// or failure. Errors are captured and can be retrieved by awaiting
43+
/// ``result()``.
4044
///
4145
/// - Parameters:
4246
/// - identifier: Store identifier for logging context.
4347
/// - state: Current state snapshot before processing.
44-
/// - action: Action to execute.
45-
/// - delay: Optional before/after delays.
48+
/// - actions: Actions to execute, each optionally delayed.
4649
/// - reducers: Reducers to apply in order.
4750
/// - middleware: Middleware for side effects.
4851
/// - logger: Logger used for diagnostics.
@@ -64,10 +67,28 @@ final class StoreTask<Namespace: StoreNamespace>: Sendable {
6467
) async {
6568
resultSubject.send(.running)
6669
do {
70+
var updatedState = state
6771
for action in actions {
68-
try await executor.run(
72+
guard
73+
coordinator.shouldExecute(
74+
action: action.wrappedValue,
75+
state: updatedState
76+
)
77+
else {
78+
logger.didSkip(
79+
identifier: identifier,
80+
action: action.wrappedValue,
81+
state: updatedState,
82+
file: file,
83+
function: function,
84+
line: line
85+
)
86+
continue
87+
}
88+
89+
updatedState = try await executor.run(
6990
identifier: identifier,
70-
state: state,
91+
state: updatedState,
7192
action: action,
7293
reducers: reducers,
7394
middleware: middleware,

0 commit comments

Comments
 (0)