Skip to content

Commit 62946da

Browse files
ruiguoamzpalpatim
andauthored
feat(datastore): DataStore Hub events (#766) (#795)
* feat(datastore): Dispatch networkStatus event (#766) * initialize networkStatus PR * fixed an indentation * fixed unit test * added ready but not yet implemented * ready for review * removed one publisher * fixed a typo * updated passthroughSubject to CurrentValueSubject in NetworkReachabilityNotifier * updated ReachabilityNotifierTests * plural * removed one log info * Update DataStoreCategory+HubPayloadEventName.swift * updated integration test * updated integration test Co-authored-by: Guo <[email protected]> * feat(datastore): Dispatch networkStatus event (#766) * initialize networkStatus PR * fixed an indentation * fixed unit test * added ready but not yet implemented * ready for review * removed one publisher * fixed a typo * updated passthroughSubject to CurrentValueSubject in NetworkReachabilityNotifier * updated ReachabilityNotifierTests * plural * removed one log info * Update DataStoreCategory+HubPayloadEventName.swift * updated integration test * updated integration test Co-authored-by: Guo <[email protected]> * feat(datastore): Dispatch modelSynced, syncQueriesReady events (#758) * switching branch * switching branch * initial modelsynced && syncQueriesReady PR * updated some mock* files * self.modelSyncedEvent.build() * updated ReconcileAndLocalSave: query to check local store before deciding create or update mutationType * added one integration test for modelsynced event * update count to atomic operation * update count to atomic operation * did some clean up * ready for review * ready for review 2 * updated part of comments * updated logic * fixed one unit test * modified reconcileAndLocalSave * did some clean up * reimplement logic: Added SyncEventEmitter & ModelSyncEventEmitter * did some clean up and fixed a bug for causing a unit test to fail * ready for review * addressed some comments * fixed some mock class * removed testing part for modelsynced * addressed some PR comments * updated several unit tests and added ready event * DispatchQueue.main -> DispatchQueue.global() in RemoteSyncEngine * updated two unit tests * removed ready event from this PR * Added SyncEventEmitterTests * added one unit test in ReconcileAndLocalSave test * updated two unit tests class * updated two unit test classes: InitialSyncOrchestratorTests and SyncEventEmitterTests * 10/08/2020 comments addressed * fixed some comments part1 * fixed one build error * try to fix a cicd error * addressed lots of comments * fix underlying queue for InitialSyncOrchestrator * ready for re-review * abc * remove one reset from setup() * error handling in initialSyncOrchestrator * updated some comments Co-authored-by: Guo <[email protected]> Co-authored-by: Tim Schmelter <[email protected]> * feat(datastore): dispatch ready event (#812) * switching branch * switching branch * initial modelsynced && syncQueriesReady PR * updated some mock* files * self.modelSyncedEvent.build() * updated ReconcileAndLocalSave: query to check local store before deciding create or update mutationType * added one integration test for modelsynced event * update count to atomic operation * update count to atomic operation * did some clean up * ready for review * ready for review 2 * updated part of comments * updated logic * fixed one unit test * modified reconcileAndLocalSave * did some clean up * reimplement logic: Added SyncEventEmitter & ModelSyncEventEmitter * did some clean up and fixed a bug for causing a unit test to fail * ready for review * addressed some comments * fixed some mock class * removed testing part for modelsynced * addressed some PR comments * updated several unit tests and added ready event * DispatchQueue.main -> DispatchQueue.global() in RemoteSyncEngine * updated two unit tests * removed ready event from this PR * Added SyncEventEmitterTests * Initialize Ready event PR * remove dispatchGroup, use AtomicValue * fixed a typo * added one unit test in ReconcileAndLocalSave test * updated two unit tests class * Initialize Ready event PR * remove dispatchGroup, use AtomicValue * fixed a typo * Revert "fixed a typo" This reverts commit b7871af. * fjk * updated two unit test classes: InitialSyncOrchestratorTests and SyncEventEmitterTests * 10/08/2020 comments addressed * fixed some comments part1 * fixed one build error * try to fix a cicd error * addressed lots of comments * fix underlying queue for InitialSyncOrchestrator * ready for re-review * abc * remove one reset from setup() * error handling in initialSyncOrchestrator * addressed comments * updated some comments Co-authored-by: Guo <[email protected]> Co-authored-by: Tim Schmelter <[email protected]> Co-authored-by: Guo <[email protected]> Co-authored-by: Tim Schmelter <[email protected]>
1 parent 166e9b7 commit 62946da

File tree

39 files changed

+1409
-187
lines changed

39 files changed

+1409
-187
lines changed

Amplify/Categories/DataStore/DataStoreCategory+HubPayloadEventName.swift

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,23 @@ public extension HubPayload.EventName.DataStore {
3838
/// HubPayload `syncQueriesStartedEvent` contains an array of each model's `name`
3939
static let syncQueriesStarted = "DataStore.syncQueriesStarted"
4040

41+
/// Dispatched once for each model after the model instances have been synced from the cloud.
42+
/// HubPayload `modelSyncedEvent` contains: name of model, sync type (full/delta), count of instances' mutation type
43+
static let modelSynced = "DataStore.modelSynced"
44+
45+
/// Dispatched when all models have been synced
46+
static let syncQueriesReady = "DataStore.syncQueriesReady"
47+
48+
/// Dispatched when:
49+
/// - local store has loaded outgoing mutations from local storage
50+
/// - if online, all data has finished syncing with cloud
51+
/// When this event is emitted, DataStore is ready to sync changes between the local device and the cloud
52+
static let ready = "DataStore.ready"
53+
54+
/// Dispatched when DataStore starts and everytime network status changes
55+
/// HubPayload `NetworkStatusEvent` contains a boolean value `active` to notify network status
56+
static let networkStatus = "DataStore.networkStatus"
57+
4158
/// Dispatched when a local mutation is enqueued into the outgoing mutation queue `outbox`
4259
/// HubPayload `outboxMutationEvent` contains the name and instance of the model
4360
static let outboxMutationEnqueued = "DataStore.outboxMutationEnqueued"

AmplifyPlugins/API/AWSAPICategoryPlugin/Reachability/NetworkReachabilityNotifier.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class NetworkReachabilityNotifier {
1515
private var reachability: NetworkReachabilityProviding?
1616
private var allowsCellularAccess = true
1717

18-
let reachabilityPublisher = PassthroughSubject<ReachabilityUpdate, Never>()
18+
let reachabilityPublisher = CurrentValueSubject<ReachabilityUpdate, Never>(ReachabilityUpdate(isOnline: false))
1919
var publisher: AnyPublisher<ReachabilityUpdate, Never> {
2020
return reachabilityPublisher.eraseToAnyPublisher()
2121
}

AmplifyPlugins/API/AWSAPICategoryPluginTests/Reachability/NetworkReachabilityNotifierTests.swift

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,45 +31,60 @@ class NetworkReachabilityNotifierTests: XCTestCase {
3131

3232
func testWifiConnectivity() {
3333
MockReachability.iConnection = .wifi
34-
let expect = expectation(description: ".sink receives value")
34+
let expect = expectation(description: ".sink receives values")
35+
var values = [Bool]()
3536
let cancellable = notifier.publisher.sink(receiveCompletion: { _ in
3637
XCTFail("Not expecting any error")
3738
}, receiveValue: { value in
38-
XCTAssert(value.isOnline)
39-
expect.fulfill()
39+
values.append(value.isOnline)
40+
if values.count == 2 {
41+
XCTAssertFalse(values[0])
42+
XCTAssertTrue(values[1])
43+
expect.fulfill()
44+
}
4045
})
4146
notification = Notification.init(name: .reachabilityChanged)
4247
NotificationCenter.default.post(notification)
4348

4449
waitForExpectations(timeout: 1.0)
4550
cancellable.cancel()
4651
}
52+
4753
func testCellularConnectivity() {
4854
MockReachability.iConnection = .wifi
49-
let expect = expectation(description: ".sink receives value")
55+
let expect = expectation(description: ".sink receives values")
56+
var values = [Bool]()
5057
let cancellable = notifier.publisher.sink(receiveCompletion: { _ in
5158
XCTFail("Not expecting any error")
5259
}, receiveValue: { value in
53-
XCTAssert(value.isOnline)
54-
expect.fulfill()
60+
values.append(value.isOnline)
61+
if values.count == 2 {
62+
XCTAssertFalse(values[0])
63+
XCTAssertTrue(values[1])
64+
expect.fulfill()
65+
}
5566
})
5667

5768
notification = Notification.init(name: .reachabilityChanged)
5869
NotificationCenter.default.post(notification)
5970

6071
waitForExpectations(timeout: 1.0)
6172
cancellable.cancel()
62-
6373
}
6474

6575
func testNoConnectivity() {
6676
MockReachability.iConnection = .unavailable
67-
let expect = expectation(description: ".sink receives value")
77+
let expect = expectation(description: ".sink receives values")
78+
var values = [Bool]()
6879
let cancellable = notifier.publisher.sink(receiveCompletion: { _ in
6980
XCTFail("Not expecting any error")
7081
}, receiveValue: { value in
71-
XCTAssertFalse(value.isOnline)
72-
expect.fulfill()
82+
values.append(value.isOnline)
83+
if values.count == 2 {
84+
XCTAssertFalse(values[0])
85+
XCTAssertFalse(values[1])
86+
expect.fulfill()
87+
}
7388
})
7489

7590
notification = Notification.init(name: .reachabilityChanged)
@@ -81,18 +96,21 @@ class NetworkReachabilityNotifierTests: XCTestCase {
8196

8297
func testWifiConnectivity_publisherGoesOutOfScope() {
8398
MockReachability.iConnection = .wifi
84-
let expect = expectation(description: ".sink receives value")
99+
let defaultValueExpect = expectation(description: ".sink receives default value")
100+
let completeExpect = expectation(description: ".sink receives completion")
85101
let cancellable = notifier.publisher.sink(receiveCompletion: { _ in
86-
expect.fulfill()
87-
}, receiveValue: { _ in
88-
XCTAssertFalse(true)
102+
completeExpect.fulfill()
103+
}, receiveValue: { value in
104+
XCTAssertFalse(value.isOnline)
105+
defaultValueExpect.fulfill()
89106
})
90107

108+
wait(for: [defaultValueExpect], timeout: 1.0)
91109
notifier = nil
92110
notification = Notification.init(name: .reachabilityChanged)
93111
NotificationCenter.default.post(notification)
94112

95-
waitForExpectations(timeout: 1.0)
113+
wait(for: [completeExpect], timeout: 1.0)
96114
cancellable.cancel()
97115
}
98116
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Storage/StorageEngine.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ final class StorageEngine: StorageEngineBehavior {
2222

2323
var iSyncEngineSink: Any?
2424
@available(iOS 13.0, *)
25-
var sinkEngineSink: AnyCancellable? {
25+
var syncEngineSink: AnyCancellable? {
2626
get {
2727
if let iSyncEngineSink = iSyncEngineSink as? AnyCancellable {
2828
return iSyncEngineSink
@@ -98,7 +98,7 @@ final class StorageEngine: StorageEngineBehavior {
9898
validAPIPluginKey: validAPIPluginKey,
9999
validAuthPluginKey: validAuthPluginKey)
100100
self.storageEnginePublisher = PassthroughSubject<StorageEngineEvent, DataStoreError>()
101-
sinkEngineSink = syncEngine?.publisher.sink(receiveCompletion: onReceiveCompletion(receiveCompletion:),
101+
syncEngineSink = syncEngine?.publisher.sink(receiveCompletion: onReceiveCompletion(receiveCompletion:),
102102
receiveValue: onReceive(receiveValue:))
103103
} else {
104104
self.init(storageAdapter: storageAdapter,
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
10+
/// Hub payload for the `ModelSynced` event.
11+
public struct ModelSyncedEvent {
12+
/// Name of the model that was synced
13+
public let modelName: String
14+
/// True when a full sync query was performed for this event.
15+
public let isFullSync: Bool
16+
/// True when a delta sync query was performed for this event.
17+
public let isDeltaSync: Bool
18+
/// Number of model instances added to the local store
19+
public let added: Int
20+
/// Number of existing model instances updated in the local store
21+
public let updated: Int
22+
/// Number of existing model instances deleted from the local store
23+
public let deleted: Int
24+
25+
public init(modelName: String,
26+
isFullSync: Bool,
27+
isDeltaSync: Bool,
28+
added: Int,
29+
updated: Int,
30+
deleted: Int) {
31+
self.modelName = modelName
32+
self.isFullSync = isFullSync
33+
self.isDeltaSync = isDeltaSync
34+
self.added = added
35+
self.updated = updated
36+
self.deleted = deleted
37+
}
38+
}
39+
40+
extension ModelSyncedEvent {
41+
struct Builder {
42+
var modelName: String
43+
var isFullSync: Bool
44+
var isDeltaSync: Bool
45+
var added: Int
46+
var updated: Int
47+
var deleted: Int
48+
49+
init() {
50+
self.modelName = ""
51+
self.isFullSync = false
52+
self.isDeltaSync = false
53+
self.added = 0
54+
self.updated = 0
55+
self.deleted = 0
56+
}
57+
58+
func build() -> ModelSyncedEvent {
59+
ModelSyncedEvent(
60+
modelName: modelName,
61+
isFullSync: isFullSync,
62+
isDeltaSync: isDeltaSync,
63+
added: added,
64+
updated: updated,
65+
deleted: deleted
66+
)
67+
}
68+
}
69+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
/// Used as HubPayload for the `NetworkStatus`
9+
public struct NetworkStatusEvent {
10+
/// status of network: true if network is active
11+
public let active: Bool
12+
13+
public init(active: Bool) {
14+
self.active = active
15+
}
16+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOperation.swift

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import Amplify
99
import AWSPluginsCore
10+
import Combine
1011

1112
@available(iOS 13.0, *)
1213
final class InitialSyncOperation: AsynchronousOperation {
@@ -18,7 +19,6 @@ final class InitialSyncOperation: AsynchronousOperation {
1819
private let dataStoreConfiguration: DataStoreConfiguration
1920

2021
private let modelType: Model.Type
21-
private let completion: AWSInitialSyncOrchestrator.SyncOperationResultHandler
2222

2323
private var recordsReceived: UInt
2424

@@ -29,35 +29,41 @@ final class InitialSyncOperation: AsynchronousOperation {
2929
return dataStoreConfiguration.syncPageSize
3030
}
3131

32+
private let initialSyncOperationTopic: PassthroughSubject<InitialSyncOperationEvent, DataStoreError>
33+
var publisher: AnyPublisher<InitialSyncOperationEvent, DataStoreError> {
34+
return initialSyncOperationTopic.eraseToAnyPublisher()
35+
}
36+
3237
init(modelType: Model.Type,
3338
api: APICategoryGraphQLBehavior?,
3439
reconciliationQueue: IncomingEventReconciliationQueue?,
3540
storageAdapter: StorageEngineAdapter?,
36-
dataStoreConfiguration: DataStoreConfiguration,
37-
completion: @escaping AWSInitialSyncOrchestrator.SyncOperationResultHandler) {
41+
dataStoreConfiguration: DataStoreConfiguration) {
3842
self.modelType = modelType
3943
self.api = api
4044
self.reconciliationQueue = reconciliationQueue
4145
self.storageAdapter = storageAdapter
4246
self.dataStoreConfiguration = dataStoreConfiguration
43-
self.completion = completion
4447
self.recordsReceived = 0
48+
self.initialSyncOperationTopic = PassthroughSubject<InitialSyncOperationEvent, DataStoreError>()
4549
}
4650

4751
override func main() {
4852
guard !isCancelled else {
49-
super.finish()
53+
finish(result: .successfulVoid)
5054
return
5155
}
5256

5357
log.info("Beginning sync for \(modelType.modelName)")
5458
let lastSyncTime = getLastSyncTime()
59+
let syncType: SyncType = lastSyncTime == nil ? .fullSync : .deltaSync
60+
initialSyncOperationTopic.send(.started(modelType: modelType, syncType: syncType))
5561
query(lastSyncTime: lastSyncTime)
5662
}
5763

5864
private func getLastSyncTime() -> Int? {
5965
guard !isCancelled else {
60-
super.finish()
66+
finish(result: .successfulVoid)
6167
return nil
6268
}
6369

@@ -81,7 +87,7 @@ final class InitialSyncOperation: AsynchronousOperation {
8187

8288
private func getLastSyncMetadata() -> ModelSyncMetadata? {
8389
guard !isCancelled else {
84-
super.finish()
90+
finish(result: .successfulVoid)
8591
return nil
8692
}
8793

@@ -97,12 +103,11 @@ final class InitialSyncOperation: AsynchronousOperation {
97103
log.error(error: error)
98104
return nil
99105
}
100-
101106
}
102107

103108
private func query(lastSyncTime: Int?, nextToken: String? = nil) {
104109
guard !isCancelled else {
105-
super.finish()
110+
finish(result: .successfulVoid)
106111
return
107112
}
108113

@@ -136,7 +141,7 @@ final class InitialSyncOperation: AsynchronousOperation {
136141
private func handleQueryResults(lastSyncTime: Int?,
137142
graphQLResult: Result<SyncQueryResult, GraphQLResponseError<SyncQueryResult>>) {
138143
guard !isCancelled else {
139-
super.finish()
144+
finish(result: .successfulVoid)
140145
return
141146
}
142147

@@ -159,21 +164,22 @@ final class InitialSyncOperation: AsynchronousOperation {
159164

160165
for item in items {
161166
reconciliationQueue.offer(item)
167+
initialSyncOperationTopic.send(.enqueued(item))
162168
}
163169

164170
if let nextToken = syncQueryResult.nextToken, recordsReceived < syncMaxRecords {
165171
DispatchQueue.global().async {
166172
self.query(lastSyncTime: lastSyncTime, nextToken: nextToken)
167173
}
168174
} else {
175+
initialSyncOperationTopic.send(.finished(modelType: modelType))
169176
updateModelSyncMetadata(lastSyncTime: syncQueryResult.startedAt)
170177
}
171-
172178
}
173179

174180
private func updateModelSyncMetadata(lastSyncTime: Int?) {
175181
guard !isCancelled else {
176-
super.finish()
182+
finish(result: .successfulVoid)
177183
return
178184
}
179185

@@ -204,7 +210,12 @@ final class InitialSyncOperation: AsynchronousOperation {
204210
}
205211

206212
private func finish(result: AWSInitialSyncOrchestrator.SyncOperationResult) {
207-
completion(result)
213+
switch result {
214+
case .failure(let error):
215+
initialSyncOperationTopic.send(completion: .failure(error))
216+
case .success:
217+
initialSyncOperationTopic.send(completion: .finished)
218+
}
208219
super.finish()
209220
}
210221

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import AWSPluginsCore
10+
11+
enum InitialSyncOperationEvent {
12+
/// Published at the start of sync query (full or delta) for a particular Model
13+
/// Used by `SyncEventEmitter` and `ModelSyncedEmitted`
14+
case started(modelType: Model.Type, syncType: SyncType)
15+
/// Published when a remote model is enqueued for local store reconcillation.
16+
/// Used by `ModelSyncedEventEmitter` for record counting.
17+
case enqueued(MutationSync<AnyModel>)
18+
/// Published when the sync operation has completed and all remote models have been enqueued for reconcillation.
19+
/// Used by `ModelSyncedEventEmitter` to determine when to send `ModelSyncedEvent`
20+
case finished(modelType: Model.Type)
21+
}
22+
23+
enum SyncType {
24+
case fullSync
25+
case deltaSync
26+
}

0 commit comments

Comments
 (0)