Skip to content

Commit d79f61c

Browse files
committed
fix: increase sync concurrency count based on associations (#1267)
1 parent f7e83c1 commit d79f61c

File tree

4 files changed

+103
-18
lines changed

4 files changed

+103
-18
lines changed

Amplify/Categories/DataStore/Model/Internal/Schema/ModelSchema+Attributes.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@ public extension ModelSchema {
1919
var hasAuthenticationRules: Bool {
2020
return !authRules.isEmpty
2121
}
22+
23+
var hasAssociations: Bool {
24+
fields.values.contains { modelField in
25+
modelField.hasAssociation
26+
}
27+
}
2228
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/SQLite/Model+SQLite.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,9 @@ extension Array where Element == ModelSchema {
182182
return sortedKeys.map { sortMap[$0]! }
183183
}
184184

185+
func hasAssociations() -> Bool {
186+
contains { modelSchema in
187+
modelSchema.hasAssociations
188+
}
189+
}
185190
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOrchestrator.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
4242

4343
// Future optimization: can perform sync on each root in parallel, since we know they won't have any
4444
// interdependencies
45-
private let syncOperationQueue: OperationQueue
45+
let syncOperationQueue: OperationQueue
4646
private let concurrencyQueue = DispatchQueue(label: "com.amazonaws.InitialSyncOrchestrator.concurrencyQueue",
4747
target: DispatchQueue.global())
4848

@@ -85,6 +85,9 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
8585

8686
let modelNames = syncableModelSchemas.map { $0.name }
8787
self.dispatchSyncQueriesStarted(for: modelNames)
88+
if !syncableModelSchemas.hasAssociations() {
89+
self.syncOperationQueue.maxConcurrentOperationCount = syncableModelSchemas.count
90+
}
8891
self.syncOperationQueue.isSuspended = false
8992
}
9093
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/InitialSync/InitialSyncOrchestratorTests.swift

Lines changed: 88 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@ class InitialSyncOrchestratorTests: XCTestCase {
1616

1717
override class func setUp() {
1818
Amplify.Logging.logLevel = .info
19-
ModelRegistry.reset()
20-
PostCommentModelRegistration().registerModels(registry: ModelRegistry.self)
2119
}
2220

21+
2322
/// - Given: An InitialSyncOrchestrator with a model dependency graph
2423
/// - When:
2524
/// - The orchestrator starts up
2625
/// - Then:
2726
/// - It performs a sync query for each registered model
2827
func testInvokesCompletionCallback() throws {
28+
ModelRegistry.reset()
29+
PostCommentModelRegistration().registerModels(registry: ModelRegistry.self)
2930
let responder = QueryRequestListenerResponder<PaginatedList<AnyModel>> { _, listener in
3031
let startedAt = Int(Date().timeIntervalSince1970)
3132
let list = PaginatedList<AnyModel>(items: [], nextToken: nil, startedAt: startedAt)
@@ -42,11 +43,10 @@ class InitialSyncOrchestratorTests: XCTestCase {
4243

4344
let reconciliationQueue = MockReconciliationQueue()
4445

45-
let orchestrator: InitialSyncOrchestrator =
46-
AWSInitialSyncOrchestrator(dataStoreConfiguration: .default,
47-
api: apiPlugin,
48-
reconciliationQueue: reconciliationQueue,
49-
storageAdapter: storageAdapter)
46+
let orchestrator = AWSInitialSyncOrchestrator(dataStoreConfiguration: .default,
47+
api: apiPlugin,
48+
reconciliationQueue: reconciliationQueue,
49+
storageAdapter: storageAdapter)
5050

5151
let syncCallbackReceived = expectation(description: "Sync callback received, sync operation is complete")
5252
let syncQueriesStartedReceived = expectation(description: "syncQueriesStarted received")
@@ -89,16 +89,85 @@ class InitialSyncOrchestratorTests: XCTestCase {
8989
}
9090

9191
waitForExpectations(timeout: 1)
92+
XCTAssertEqual(orchestrator.syncOperationQueue.maxConcurrentOperationCount, 1)
9293
Amplify.Hub.removeListener(hubListener)
9394
sink.cancel()
9495
}
9596

97+
/// - Given: An InitialSyncOrchestrator with a model dependency graph containing no associations
98+
/// - When:
99+
/// - The orchestrator starts up
100+
/// - Then:
101+
/// - It performs a sync query for each registered model with concurrency set to count of models
102+
func testInvokesCompletionCallback_ModelWithNoAssociations() throws {
103+
ModelRegistry.reset()
104+
struct TestModelsWithNoAssociations: AmplifyModelRegistration {
105+
func registerModels(registry: ModelRegistry.Type) {
106+
// Models without no associations
107+
registry.register(modelType: MockSynced.self)
108+
registry.register(modelType: ExampleWithEveryType.self)
109+
}
110+
111+
let version: String = "1"
112+
}
113+
TestModelsWithNoAssociations().registerModels(registry: ModelRegistry.self)
114+
115+
let responder = QueryRequestListenerResponder<PaginatedList<AnyModel>> { _, listener in
116+
let startedAt = Int(Date().timeIntervalSince1970)
117+
let list = PaginatedList<AnyModel>(items: [], nextToken: nil, startedAt: startedAt)
118+
let event: GraphQLOperation<PaginatedList<AnyModel>>.OperationResult = .success(.success(list))
119+
listener?(event)
120+
return nil
121+
}
122+
123+
let apiPlugin = MockAPICategoryPlugin()
124+
apiPlugin.responders[.queryRequestListener] = responder
125+
126+
let storageAdapter = MockSQLiteStorageEngineAdapter()
127+
storageAdapter.returnOnQueryModelSyncMetadata(nil)
128+
129+
let reconciliationQueue = MockReconciliationQueue()
130+
131+
let orchestrator = AWSInitialSyncOrchestrator(dataStoreConfiguration: .default,
132+
api: apiPlugin,
133+
reconciliationQueue: reconciliationQueue,
134+
storageAdapter: storageAdapter)
135+
let syncCallbackReceived = expectation(description: "Sync callback received, sync operation is complete")
136+
let syncStartedReceived = expectation(description: "Sync started received, sync operation started")
137+
syncStartedReceived.expectedFulfillmentCount = 2
138+
let finishedReceived = expectation(description: "InitialSyncOperation finished paginating and offering")
139+
finishedReceived.expectedFulfillmentCount = 2
140+
let sink = orchestrator
141+
.publisher
142+
.sink(receiveCompletion: { _ in },
143+
receiveValue: { value in
144+
switch value {
145+
case .started:
146+
syncStartedReceived.fulfill()
147+
case .finished:
148+
finishedReceived.fulfill()
149+
default:
150+
break
151+
}
152+
})
153+
154+
orchestrator.sync { _ in
155+
syncCallbackReceived.fulfill()
156+
}
157+
158+
waitForExpectations(timeout: 1)
159+
XCTAssertEqual(orchestrator.syncOperationQueue.maxConcurrentOperationCount, 2)
160+
sink.cancel()
161+
}
162+
96163
/// - Given: An InitialSyncOrchestrator with a model dependency graph
97164
/// - When:
98165
/// - The orchestrator starts up
99166
/// - Then:
100167
/// - It queries models in dependency order, from "parent" to "child"
101168
func testShouldQueryModelsInDependencyOrder() {
169+
ModelRegistry.reset()
170+
PostCommentModelRegistration().registerModels(registry: ModelRegistry.self)
102171
let postWasQueried = expectation(description: "Post was queried")
103172
let commentWasQueried = expectation(description: "Comment was queried")
104173
let responder = QueryRequestListenerResponder<PaginatedList<AnyModel>> { request, listener in
@@ -125,11 +194,10 @@ class InitialSyncOrchestratorTests: XCTestCase {
125194

126195
let reconciliationQueue = MockReconciliationQueue()
127196

128-
let orchestrator: InitialSyncOrchestrator =
129-
AWSInitialSyncOrchestrator(dataStoreConfiguration: .default,
130-
api: apiPlugin,
131-
reconciliationQueue: reconciliationQueue,
132-
storageAdapter: storageAdapter)
197+
let orchestrator = AWSInitialSyncOrchestrator(dataStoreConfiguration: .default,
198+
api: apiPlugin,
199+
reconciliationQueue: reconciliationQueue,
200+
storageAdapter: storageAdapter)
133201

134202
let syncStartedReceived = expectation(description: "Sync started received, sync operation started")
135203
syncStartedReceived.expectedFulfillmentCount = 2
@@ -152,6 +220,7 @@ class InitialSyncOrchestratorTests: XCTestCase {
152220
orchestrator.sync { _ in }
153221

154222
waitForExpectations(timeout: 1)
223+
XCTAssertEqual(orchestrator.syncOperationQueue.maxConcurrentOperationCount, 1)
155224
sink.cancel()
156225
}
157226

@@ -162,6 +231,8 @@ class InitialSyncOrchestratorTests: XCTestCase {
162231
/// - It queries models in dependency order, from "parent" to "child", even if parent data is returned in
163232
/// multiple pages
164233
func testShouldQueryModelsInDependencyOrderWithPaginatedResults() {
234+
ModelRegistry.reset()
235+
PostCommentModelRegistration().registerModels(registry: ModelRegistry.self)
165236
let pageCount = 50
166237

167238
let postWasQueried = expectation(description: "Post was queried")
@@ -196,11 +267,10 @@ class InitialSyncOrchestratorTests: XCTestCase {
196267

197268
let reconciliationQueue = MockReconciliationQueue()
198269

199-
let orchestrator: InitialSyncOrchestrator =
200-
AWSInitialSyncOrchestrator(dataStoreConfiguration: .default,
201-
api: apiPlugin,
202-
reconciliationQueue: reconciliationQueue,
203-
storageAdapter: storageAdapter)
270+
let orchestrator = AWSInitialSyncOrchestrator(dataStoreConfiguration: .default,
271+
api: apiPlugin,
272+
reconciliationQueue: reconciliationQueue,
273+
storageAdapter: storageAdapter)
204274

205275
let syncStartedReceived = expectation(description: "Sync started received, sync operation started")
206276
syncStartedReceived.expectedFulfillmentCount = 2
@@ -223,6 +293,7 @@ class InitialSyncOrchestratorTests: XCTestCase {
223293
orchestrator.sync { _ in }
224294

225295
waitForExpectations(timeout: 1)
296+
XCTAssertEqual(orchestrator.syncOperationQueue.maxConcurrentOperationCount, 1)
226297
sink.cancel()
227298
}
228299

0 commit comments

Comments
 (0)