Skip to content

Commit f2bc851

Browse files
authored
fix(DataStore): serialize IncomingAsyncSubscriptionEventPublisher events (#3489)
* fix(DataStore): serialize IncomingAsyncSubscriptionEventPublisher events * address PR comments
1 parent 3cc17d2 commit f2bc851

File tree

2 files changed

+129
-5
lines changed

2 files changed

+129
-5
lines changed

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
4343
private let awsAuthService: AWSAuthServiceBehavior
4444

4545
private let consistencyQueue: DispatchQueue
46-
46+
private let taskQueue: TaskQueue<Void>
4747
private let modelName: ModelName
4848

4949
init(modelSchema: ModelSchema,
@@ -58,6 +58,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
5858
self.consistencyQueue = DispatchQueue(
5959
label: "com.amazonaws.Amplify.RemoteSyncEngine.\(modelSchema.name)"
6060
)
61+
self.taskQueue = TaskQueue()
6162
self.modelName = modelSchema.name
6263

6364
self.connectionStatusQueue = OperationQueue()
@@ -170,26 +171,26 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
170171

171172
func sendConnectionEventIfConnected(event: Event) {
172173
if combinedConnectionStatusIsConnected {
173-
incomingSubscriptionEvents.send(event)
174+
send(event)
174175
}
175176
}
176177

177178
func genericValueListenerHandler(event: Event, cancelAwareBlock: CancelAwareBlockOperation) {
178179
if case .connection = event {
179180
connectionStatusQueue.addOperation(cancelAwareBlock)
180181
} else {
181-
incomingSubscriptionEvents.send(event)
182+
send(event)
182183
}
183184
}
184185

185186
func genericCompletionListenerHandler(result: Result<Void, APIError>) {
186187
switch result {
187188
case .success:
188-
incomingSubscriptionEvents.send(completion: .finished)
189+
send(completion: .finished)
189190
case .failure(let apiError):
190191
log.verbose("[InitializeSubscription.1] API.subscribe failed for `\(modelName)` error: \(apiError.errorDescription)")
191192
let dataStoreError = DataStoreError(error: apiError)
192-
incomingSubscriptionEvents.send(completion: .failure(dataStoreError))
193+
send(completion: .failure(dataStoreError))
193194
}
194195
}
195196

@@ -237,6 +238,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
237238
incomingSubscriptionEvents.subscribe(subscriber)
238239
}
239240

241+
func send(_ event: Event) {
242+
taskQueue.async { [weak self] in
243+
guard let self else { return }
244+
self.incomingSubscriptionEvents.send(event)
245+
}
246+
}
247+
248+
func send(completion: Subscribers.Completion<DataStoreError>) {
249+
taskQueue.async { [weak self] in
250+
guard let self else { return }
251+
self.incomingSubscriptionEvents.send(completion: completion)
252+
}
253+
}
254+
240255
func cancel() {
241256
consistencyQueue.sync {
242257
genericCompletionListenerHandler(result: .successfulVoid)
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import XCTest
9+
@testable import Amplify
10+
@testable import AmplifyTestCommon
11+
@testable import AWSPluginsCore
12+
@testable import AWSDataStorePlugin
13+
14+
final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase {
15+
var apiPlugin: MockAPICategoryPlugin!
16+
override func setUp() {
17+
apiPlugin = MockAPICategoryPlugin()
18+
ModelRegistry.register(modelType: Post.self)
19+
}
20+
21+
/// This test was written to to reproduce a bug where the subscribe would miss events emitted by the publisher.
22+
/// The pattern in this test using the publisher (`IncomingAsyncSubscriptionEventPublisher`) and subscriber
23+
/// (`IncomingAsyncSubscriptionEventToAnyModelMapper`) are identical to the usage in `AWSModelReconciliationQueue.init()`.
24+
///
25+
/// See the changes in this PR: https://github.com/aws-amplify/amplify-swift/pull/3489
26+
///
27+
/// Before the PR changes, the publisher would emit events concurrently which caused some of them to be missed
28+
/// by the subscriber even though the subscriber applied back pressure to process one event at a time (demand
29+
/// of `max(1)`). For more details regarding back-pressure, see
30+
/// https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers
31+
///
32+
/// The change, to publish the events though the same TaskQueue ensures that the events are properly buffered
33+
/// and sent only when the subscriber demands for it.
34+
func testSubscriberRecievedEvents() async throws {
35+
let expectedEvents = expectation(description: "Expected number of ")
36+
let numberOfEvents = 50
37+
expectedEvents.expectedFulfillmentCount = numberOfEvents
38+
let asyncEvents = await IncomingAsyncSubscriptionEventPublisher(
39+
modelSchema: Post.schema,
40+
api: apiPlugin,
41+
modelPredicate: nil,
42+
auth: nil,
43+
authModeStrategy: AWSDefaultAuthModeStrategy(),
44+
awsAuthService: nil)
45+
let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper()
46+
asyncEvents.subscribe(subscriber: mapper)
47+
let sink = mapper
48+
.publisher
49+
.sink(
50+
receiveCompletion: { _ in },
51+
receiveValue: { _ in
52+
expectedEvents.fulfill()
53+
}
54+
)
55+
DispatchQueue.concurrentPerform(iterations: numberOfEvents) { index in
56+
asyncEvents.send(.connection(.connected))
57+
}
58+
59+
await fulfillment(of: [expectedEvents], timeout: 2)
60+
sink.cancel()
61+
}
62+
63+
/// Ensure that the publisher-subscriber with back pressure is receiving all the events in the order in which they were sent.
64+
func testSubscriberRecievedEventsInOrder() async throws {
65+
let expectedEvents = expectation(description: "Expected number of ")
66+
let expectedOrder = AtomicValue<[String]>(initialValue: [])
67+
let actualOrder = AtomicValue<[String]>(initialValue: [])
68+
let numberOfEvents = 50
69+
expectedEvents.expectedFulfillmentCount = numberOfEvents
70+
let asyncEvents = await IncomingAsyncSubscriptionEventPublisher(
71+
modelSchema: Post.schema,
72+
api: apiPlugin,
73+
modelPredicate: nil,
74+
auth: nil,
75+
authModeStrategy: AWSDefaultAuthModeStrategy(),
76+
awsAuthService: nil)
77+
let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper()
78+
asyncEvents.subscribe(subscriber: mapper)
79+
let sink = mapper
80+
.publisher
81+
.sink(
82+
receiveCompletion: { _ in },
83+
receiveValue: { event in
84+
switch event {
85+
case .payload(let mutationSync):
86+
actualOrder.append(mutationSync.syncMetadata.modelId)
87+
default:
88+
break
89+
}
90+
expectedEvents.fulfill()
91+
}
92+
)
93+
94+
for index in 0..<numberOfEvents {
95+
let post = Post(id: "\(index)", title: "title", content: "content", createdAt: .now())
96+
expectedOrder.append(post.id)
97+
asyncEvents.send(.data(.success(.init(model: AnyModel(post),
98+
syncMetadata: .init(modelId: post.id,
99+
modelName: "Post",
100+
deleted: false,
101+
lastChangedAt: 0,
102+
version: 0)))))
103+
}
104+
105+
await fulfillment(of: [expectedEvents], timeout: 2)
106+
XCTAssertEqual(expectedOrder.get(), actualOrder.get())
107+
sink.cancel()
108+
}
109+
}

0 commit comments

Comments
 (0)