Skip to content

Commit f7e83c1

Browse files
committed
feat(DataStore): Multiple models ReconcileAndLocalSave transaction (#1237)
* feat(DataStore): Multiple models ReconcileAndLocalSave transaction * feat(DataStore): address PR comments * fix(DataStore): address PR comments 2
1 parent 3723eb0 commit f7e83c1

28 files changed

+1547
-603
lines changed

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/SQLite/StorageEngineAdapter+SQLite.swift

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
1818
private var dbFilePath: URL?
1919
static let dbVersionKey = "com.amazonaws.DataStore.dbVersion"
2020

21+
// TODO benchmark whether a SELECT FROM FOO WHERE ID IN (1, 2, 3...) performs measurably
22+
// better than SELECT FROM FOO WHERE ID = 1 OR ID=2 OR ID=3
23+
//
24+
// SQLite supports up to 1000 expressions per SQLStatement. We have chosen to use 50 expressions
25+
// less (equaling 950) than the maximum because it is possible that our SQLStatement already has
26+
// some expressions. If we encounter performance problems in the future, we will want to profile
27+
// our system and find an optimal value.
28+
static var maxNumberOfPredicates: Int = 950
29+
2130
convenience init(version: String,
2231
databaseName: String = "database",
2332
userDefaults: UserDefaults = UserDefaults.standard) throws {
@@ -289,12 +298,28 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
289298
}
290299

291300
func queryMutationSyncMetadata(for modelId: Model.Identifier) throws -> MutationSyncMetadata? {
301+
let results = try queryMutationSyncMetadata(for: [modelId])
302+
return try results.unique()
303+
}
304+
305+
func queryMutationSyncMetadata(for modelIds: [Model.Identifier]) throws -> [MutationSyncMetadata] {
292306
let modelType = MutationSyncMetadata.self
293-
let statement = SelectStatement(from: modelType.schema, predicate: field("id").eq(modelId))
294-
let rows = try connection.prepare(statement.stringValue).run(statement.variables)
295-
let result = try rows.convert(to: modelType,
296-
using: statement)
297-
return try result.unique()
307+
let fields = MutationSyncMetadata.keys
308+
var results = [MutationSyncMetadata]()
309+
let chunkedModelIdsArr = modelIds.chunked(into: SQLiteStorageEngineAdapter.maxNumberOfPredicates)
310+
for chunkedModelIds in chunkedModelIdsArr {
311+
var queryPredicates: [QueryPredicateOperation] = []
312+
for id in chunkedModelIds {
313+
queryPredicates.append(QueryPredicateOperation(field: fields.id.stringValue, operator: .equals(id)))
314+
}
315+
let groupedQueryPredicates = QueryPredicateGroup(type: .or, predicates: queryPredicates)
316+
let statement = SelectStatement(from: modelType.schema, predicate: groupedQueryPredicates)
317+
let rows = try connection.prepare(statement.stringValue).run(statement.variables)
318+
let result = try rows.convert(to: modelType,
319+
using: statement)
320+
results.append(contentsOf: result)
321+
}
322+
return results
298323
}
299324

300325
func queryModelSyncMetadata(for modelSchema: ModelSchema) throws -> ModelSyncMetadata? {

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/StorageEngine+DeleteTransaction.swift

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,8 @@ extension StorageEngine {
122122
return []
123123
}
124124

125-
// SQLite supports up to 1000 expressions per SQLStatement. We have chosen to use 50 expressions
126-
// less (equaling 950) than the maximum because it is possible that our SQLStatement already has
127-
// some expressions. If we encounter performance problems in the future, we will want to profile
128-
// our system and find an optimal value.
129-
let maxNumberOfPredicates = 950
130125
var queriedModels: [Model] = []
131-
let chunkedArrays = ids.chunked(into: maxNumberOfPredicates)
126+
let chunkedArrays = ids.chunked(into: SQLiteStorageEngineAdapter.maxNumberOfPredicates)
132127
for chunkedArray in chunkedArrays {
133128
// TODO: Add conveinence to queryPredicate where we have a list of items, to be all or'ed
134129
var queryPredicates: [QueryPredicateOperation] = []

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/StorageEngineAdapter.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import AWSPluginsCore
1111

1212
protocol StorageEngineAdapter: class, ModelStorageBehavior {
1313

14+
static var maxNumberOfPredicates: Int { get }
15+
1416
// MARK: - Async APIs
1517
func save(untypedModel: Model, completion: @escaping DataStoreCallback<Model>)
1618

@@ -47,6 +49,8 @@ protocol StorageEngineAdapter: class, ModelStorageBehavior {
4749

4850
func queryMutationSyncMetadata(for modelId: Model.Identifier) throws -> MutationSyncMetadata?
4951

52+
func queryMutationSyncMetadata(for modelIds: [Model.Identifier]) throws -> [MutationSyncMetadata]
53+
5054
func queryModelSyncMetadata(for modelSchema: ModelSchema) throws -> ModelSyncMetadata?
5155

5256
func transaction(_ basicClosure: BasicThrowableClosure) throws

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOperation.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ final class InitialSyncOperation: AsynchronousOperation {
168168
let items = syncQueryResult.items
169169
recordsReceived += UInt(items.count)
170170

171+
reconciliationQueue.offer(items, modelSchema: modelSchema)
171172
for item in items {
172-
reconciliationQueue.offer(item, modelSchema: modelSchema)
173173
initialSyncOperationTopic.send(.enqueued(item))
174174
}
175175

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
4949
}
5050

5151
MutationEvent.pendingMutationEvents(
52-
forModelId: mutationEvent.modelId,
52+
for: mutationEvent.modelId,
5353
storageAdapter: storageAdapter) { result in
5454
switch result {
5555
case .failure(let dataStoreError):

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,13 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
102102
eventReconciliationQueueTopic.send(.paused)
103103
}
104104

105-
func offer(_ remoteModel: MutationSync<AnyModel>, modelSchema: ModelSchema) {
105+
func offer(_ remoteModels: [MutationSync<AnyModel>], modelSchema: ModelSchema) {
106106
guard let queue = reconciliationQueues[modelSchema.name] else {
107107
// TODO: Error handling
108108
return
109109
}
110110

111-
queue.enqueue(remoteModel)
111+
queue.enqueue(remoteModels)
112112
}
113113

114114
private func onReceiveCompletion(completed: Subscribers.Completion<DataStoreError>) {

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/IncomingEventReconciliationQueue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ enum IncomingEventReconciliationQueueEvent {
2525
protocol IncomingEventReconciliationQueue: class, AmplifyCancellable {
2626
func start()
2727
func pause()
28-
func offer(_ remoteModel: MutationSync<AnyModel>, modelSchema: ModelSchema)
28+
func offer(_ remoteModels: [MutationSync<AnyModel>], modelSchema: ModelSchema)
2929
var publisher: AnyPublisher<IncomingEventReconciliationQueueEvent, DataStoreError> { get }
3030
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,14 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
135135
incomingSubscriptionEventQueue.cancelAllOperations()
136136
}
137137

138-
func enqueue(_ remoteModel: MutationSync<AnyModel>) {
138+
func enqueue(_ remoteModels: [MutationSync<AnyModel>]) {
139+
guard let remoteModelName = remoteModels.first?.model.modelName else {
140+
log.debug("\(#function) skipping reconciliation, no models to enqueue.")
141+
return
142+
}
143+
139144
let reconcileOp = ReconcileAndLocalSaveOperation(modelSchema: modelSchema,
140-
remoteModel: remoteModel,
145+
remoteModels: remoteModels,
141146
storageAdapter: storageAdapter)
142147
var reconcileAndLocalSaveOperationSink: AnyCancellable?
143148
reconcileAndLocalSaveOperationSink = reconcileOp.publisher.sink(receiveCompletion: { completion in
@@ -154,7 +159,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
154159
}
155160
})
156161
reconcileAndLocalSaveOperationSinks.with { $0.insert(reconcileAndLocalSaveOperationSink) }
157-
reconcileAndSaveQueue.addOperation(reconcileOp, modelName: remoteModel.model.modelName)
162+
reconcileAndSaveQueue.addOperation(reconcileOp, modelName: remoteModelName)
158163
}
159164

160165
private func receive(_ receive: IncomingSubscriptionEventPublisherEvent) {
@@ -166,7 +171,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
166171
}
167172
}
168173
incomingSubscriptionEventQueue.addOperation(CancelAwareBlockOperation {
169-
self.enqueue(remoteModel)
174+
self.enqueue([remoteModel])
170175
})
171176
case .connectionConnected:
172177
modelReconciliationQueueSubject.send(.connected(modelName: modelSchema.name))

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ModelReconciliationQueue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ protocol ModelReconciliationQueue {
2828
func start()
2929
func pause()
3030
func cancel()
31-
func enqueue(_ remoteModel: MutationSync<AnyModel>)
31+
func enqueue(_ remoteModels: [MutationSync<AnyModel>])
3232
var publisher: AnyPublisher<ModelReconciliationQueueEvent, DataStoreError> { get }
3333
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation+Action.swift

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,10 @@ extension ReconcileAndLocalSaveOperation {
1313
/// Actions are declarative, they say what I just did
1414
enum Action {
1515
/// Operation has been started by the queue
16-
case started(RemoteModel)
16+
case started([RemoteModel])
1717

18-
/// Operation has retrieved RemoteModel's corresponding sync metadata from local database
19-
case queried(RemoteModel, LocalMetadata?)
20-
21-
/// Operation has reconciled the incoming remote model with local model and sync metadata
22-
case reconciled(RemoteSyncReconciler.Disposition)
23-
24-
/// Operation has applied the incoming RemoteModel to the local database per the reconciled disposition. This
25-
/// could result in either a save to the local database, or a delete from the local database.
26-
case applied(AppliedModel, mutationType: MutationEvent.MutationType)
27-
28-
/// Operation dropped the remote model per the reconciled disposition.
29-
case dropped(modelName: String)
30-
31-
/// Operation notified listeners and callbacks of completion
32-
case notified
18+
/// Operation completed reconcilliation
19+
case reconciled
3320

3421
/// Operation has been cancelled by the queue
3522
case cancelled

0 commit comments

Comments
 (0)