Skip to content

Commit 64fcd53

Browse files
authored
feat(datastore): Dispatch outboxStatus, subscriptionsEstablished, syncQueriesStarted events (#721)
* networkStatus not implemented * Network Status isn't implemented yet and need to check payload of modelSynced * revert some changes * push one unsaved change * removed one commented out block * removed one commented out block * addressed some comments * removed 3 hub events implementation, 4 to be implemented * did some clean up * switching branch * removed outboxMutation*Event, now only three implemented 3,4,5 * did some clean up * fixed some comments * switching position of dispatchSyncQueriesStarted * fixed comments for subscriptionEstablished * fixed some comments for outboxStatus and subscriptionsEstablished * fixed PR comments and added integration tests * improved hub events integration test and outgoingMutaitonQueue.swift logic * spelling and format correction * updated some unit tests to cover syncQueriesStarted, outboxStatus, but need to work on testing outboxStatus Received when MutationEvent finish processing * fix a bug that causes unit test to fail * updated a test to cover subscriptionsEstablished * addressed tim's re-review's comments
1 parent 62c103a commit 64fcd53

File tree

15 files changed

+332
-14
lines changed

15 files changed

+332
-14
lines changed

Amplify.xcodeproj/project.pbxproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,13 +353,13 @@
353353
B9FAA180238FBB5D009414B4 /* Model+Array.swift in Sources */ = {isa = PBXBuildFile; fileRef = B9FAA17F238FBB5D009414B4 /* Model+Array.swift */; };
354354
B9FB05F82383740D00DE1FD4 /* DataStoreStatement.swift in Sources */ = {isa = PBXBuildFile; fileRef = B9FB05F72383740D00DE1FD4 /* DataStoreStatement.swift */; };
355355
D83C5160248964780091548E /* ModelGraphQLTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D83C515F248964780091548E /* ModelGraphQLTests.swift */; };
356+
D8DD7A1D24A1CCCD001C49FD /* QuerySortInput.swift in Sources */ = {isa = PBXBuildFile; fileRef = D8DD7A1C24A1CCCD001C49FD /* QuerySortInput.swift */; };
356357
FA00F68824DA37EE003E8A71 /* AuthCategoryBehavior+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA00F68724DA37EE003E8A71 /* AuthCategoryBehavior+Combine.swift */; };
357358
FA00F68A24DA3A43003E8A71 /* AuthCategoryDeviceBehavior+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA00F68924DA3A43003E8A71 /* AuthCategoryDeviceBehavior+Combine.swift */; };
358359
FA00F68C24DA3A8F003E8A71 /* AuthCategoryUserBehavior+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA00F68B24DA3A8F003E8A71 /* AuthCategoryUserBehavior+Combine.swift */; };
359360
FA00F68E24DA3DFF003E8A71 /* HubCategoryBehavior+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA00F68D24DA3DFE003E8A71 /* HubCategoryBehavior+Combine.swift */; };
360361
FA00F69024DA3F95003E8A71 /* HubCombineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA00F68F24DA3F95003E8A71 /* HubCombineTests.swift */; };
361362
FA00F69224DA4087003E8A71 /* PredictionsCategoryBehavior+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA00F69124DA4087003E8A71 /* PredictionsCategoryBehavior+Combine.swift */; };
362-
D8DD7A1D24A1CCCD001C49FD /* QuerySortInput.swift in Sources */ = {isa = PBXBuildFile; fileRef = D8DD7A1C24A1CCCD001C49FD /* QuerySortInput.swift */; };
363363
FA0173352375F8A5005DDDFC /* LoggingError.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0173342375F8A5005DDDFC /* LoggingError.swift */; };
364364
FA0173372375FAA5005DDDFC /* HubError.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0173362375FAA5005DDDFC /* HubError.swift */; };
365365
FA05B83424CE265E0026180B /* StorageCategory+ClientBehavior+Combine.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA05B83324CE265D0026180B /* StorageCategory+ClientBehavior+Combine.swift */; };

Amplify/Categories/DataStore/DataStoreCategory+HubPayloadEventName.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,18 @@ public extension HubPayload.EventName.DataStore {
2323
/// Dispatched when DataStore receives a sync response from the remote API via the API category. The Hub Payload
2424
/// will be a `MutationEvent` instance that caused the conditional save failed.
2525
static let conditionalSaveFailed = "DataStore.conditionalSaveFailed"
26+
27+
/// Dispatched when:
28+
/// - the DataStore starts
29+
/// - each time a local mutation is enqueued into the outbox
30+
/// - each time a local mutation is finished processing
31+
/// HubPayload `OutboxStatusEvent` contains a boolean value `isEmpty` to notify if there are mutations in the outbox
32+
static let outboxStatus = "DataStore.outboxStatus"
33+
34+
/// Dispatched when DataStore has finished establishing its subscriptions to all syncable models
35+
static let subscriptionsEstablished = "DataStore.subscriptionEstablished"
36+
37+
/// Dispatched when DataStore is about to start sync queries
38+
/// HubPayload `syncQueriesStartedEvent` contains an array of each model's `name`
39+
static let syncQueriesStarted = "DataStore.syncQueriesStarted"
2640
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
/// Used as HubPayload for the `OutboxStatus`
9+
public struct OutboxStatusEvent {
10+
/// status of outbox: true if there are no events in the outbox at the time the event was dispatched
11+
public let isEmpty: Bool
12+
13+
public init(isEmpty: Bool) {
14+
self.isEmpty = isEmpty
15+
}
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
/// Used as HubPayload for the `SyncQueriesStarted`
9+
public struct SyncQueriesStartedEvent {
10+
/// A list of all model names for which DataStore has started establishing subscriptions
11+
public let models: [String]
12+
13+
public init(models: [String]) {
14+
self.models = models
15+
}
16+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/InitialSync/InitialSyncOrchestrator.swift

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,21 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
6262

6363
log.info("Beginning initial sync")
6464

65-
enqueueSyncableModels()
65+
let syncableModels = ModelRegistry.models.filter { $0.schema.isSyncable }
66+
enqueueSyncableModels(syncableModels)
6667

6768
// This operation is intentionally not cancel-aware; we always want resolveCompletion to execute
6869
// as the last item
6970
syncOperationQueue.addOperation {
7071
self.resolveCompletion()
7172
}
7273

74+
let modelNames = syncableModels.map { $0.modelName }
75+
dispatchSyncQueriesStarted(for: modelNames)
7376
syncOperationQueue.isSuspended = false
7477
}
7578

76-
private func enqueueSyncableModels() {
77-
let syncableModels = ModelRegistry.models.filter { $0.schema.isSyncable }
79+
private func enqueueSyncableModels(_ syncableModels: [Model.Type]) {
7880
let sortedModels = syncableModels.sortByDependencyOrder()
7981
for model in sortedModels {
8082
enqueueSyncOperation(for: model)
@@ -118,6 +120,13 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator {
118120
completion?(.successfulVoid)
119121
}
120122

123+
private func dispatchSyncQueriesStarted(for modelNames: [String]) {
124+
let syncQueriesStartedEvent = SyncQueriesStartedEvent(models: modelNames)
125+
let syncQueriesStartedEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.syncQueriesStarted,
126+
data: syncQueriesStartedEvent)
127+
Amplify.Hub.dispatch(to: .dataStore, payload: syncQueriesStartedEventPayload)
128+
}
129+
121130
}
122131

123132
@available(iOS 13.0, *)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,12 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
138138
mutationEventPublisher: MutationEventPublisher) {
139139
log.verbose(#function)
140140
self.api = api
141-
operationQueue.isSuspended = false
142141

143-
// State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)`
144-
mutationEventPublisher.publisher.subscribe(self)
142+
queryMutationEventsFromStorage(onComplete: {
143+
self.operationQueue.isSuspended = false
144+
// State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)`
145+
mutationEventPublisher.publisher.subscribe(self)
146+
})
145147
}
146148

147149
// MARK: - Event loop processing
@@ -187,6 +189,8 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
187189
"[SyncMutationToCloudOperation] mutationEvent finished: \(mutationEvent.id); result: \(result)")
188190
self.processSyncMutationToCloudResult(result, mutationEvent: mutationEvent, api: api)
189191
}
192+
193+
dispatchOutboxStatusEvent(isEmpty: false)
190194
operationQueue.addOperation(syncMutationToCloudOperation)
191195
stateMachine.notify(action: .enqueuedEvent)
192196
}
@@ -241,10 +245,37 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
241245
self.log.verbose("mutationEvent deleted successfully")
242246
}
243247

244-
self.stateMachine.notify(action: .processedEvent)
248+
self.queryMutationEventsFromStorage {
249+
self.stateMachine.notify(action: .processedEvent)
250+
}
245251
}
246252
}
247253

254+
private func queryMutationEventsFromStorage(onComplete: @escaping (() -> Void)) {
255+
let fields = MutationEvent.keys
256+
let predicate = fields.inProcess == false || fields.inProcess == nil
257+
258+
storageAdapter.query(MutationEvent.self,
259+
predicate: predicate,
260+
sort: nil,
261+
paginationInput: nil) { result in
262+
switch result {
263+
case .success(let events):
264+
self.dispatchOutboxStatusEvent(isEmpty: events.isEmpty)
265+
case .failure(let error):
266+
log.error("Error querying mutation events: \(error)")
267+
}
268+
onComplete()
269+
}
270+
}
271+
272+
private func dispatchOutboxStatusEvent(isEmpty: Bool) {
273+
let outboxStatusEvent = OutboxStatusEvent(isEmpty: isEmpty)
274+
let outboxStatusEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.outboxStatus,
275+
data: outboxStatusEvent)
276+
Amplify.Hub.dispatch(to: .dataStore, payload: outboxStatusEventPayload)
277+
}
278+
248279
}
249280

250281
@available(iOS 13.0, *)

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/RemoteSyncEngine+IncomingEventReconciliationQueueEvent.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ extension RemoteSyncEngine {
3434
func onReceive(receiveValue: IncomingEventReconciliationQueueEvent) {
3535
switch receiveValue {
3636
case .initialized:
37+
let payload = HubPayload(eventName: HubPayload.EventName.DataStore.subscriptionsEstablished)
38+
Amplify.Hub.dispatch(to: .dataStore, payload: payload)
3739
remoteSyncTopicPublisher.send(.subscriptionsInitialized)
3840
stateMachine.notify(action: .initializedSubscriptions)
3941
case .started:

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
157157
modelReconciliationQueueSubject.send(.connected(modelName))
158158
}
159159
}
160+
160161
private func receiveCompletion(_ completion: Subscribers.Completion<DataStoreError>) {
161162
switch completion {
162163
case .finished:

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreEndToEndTests.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,16 @@ class DataStoreEndToEndTests: SyncEngineIntegrationTestBase {
173173
createdAt: date)
174174

175175
var updatedPost = newPost
176-
updatedPost.content = "UPDATED CONTENT from DataStoreEndToEndTests at \(Date())"
176+
updatedPost.content = "UPDATED CONTENT from DataStoreEndToEndTests at \(Date())"
177177

178178
let createReceived = expectation(description: "Create notification received")
179179
let updateLocalSuccess = expectation(description: "Update local successful")
180180
let conditionalReceived = expectation(description: "Conditional save failed received")
181181

182-
let hubListener = Amplify.Hub.listen(to: .dataStore) { payload in
182+
let syncReceivedFilter = HubFilters.forEventName(HubPayload.EventName.DataStore.syncReceived)
183+
let conditionalSaveFailedFilter = HubFilters.forEventName(HubPayload.EventName.DataStore.conditionalSaveFailed)
184+
let filters = HubFilters.any(filters: syncReceivedFilter, conditionalSaveFailedFilter)
185+
let hubListener = Amplify.Hub.listen(to: .dataStore, isIncluded: filters) { payload in
183186
guard let mutationEvent = payload.data as? MutationEvent
184187
else {
185188
XCTFail("Can't cast payload as mutation event")
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import XCTest
9+
10+
import AmplifyPlugins
11+
import AWSPluginsCore
12+
13+
@testable import Amplify
14+
@testable import AmplifyTestCommon
15+
@testable import AWSDataStoreCategoryPlugin
16+
17+
@available(iOS 13.0, *)
18+
class DataStoreHubEventTests: HubEventsIntegrationTestBase {
19+
20+
/// - Given:
21+
/// - registered two models from `TestModelRegistration`
22+
/// - no pending MutationEvents in MutationEvent database
23+
/// - When:
24+
/// - DataStore's remote sync engine is initialized
25+
/// - Then:
26+
/// - subscriptionEstablished received, payload should be nil
27+
/// - syncQueriesStarted received, payload should be: {models: ["Post", "Comment"]}
28+
/// - outboxStatus received, payload should be {isEmpty: true}
29+
func testDataStoreConfiguredDispatchesHubEvents() throws {
30+
31+
let subscriptionsEstablishedReceived = expectation(description: "subscriptionsEstablished received")
32+
let syncQueriesStartedReceived = expectation(description: "syncQueriesStarted received")
33+
let outboxStatusReceived = expectation(description: "outboxStatus received")
34+
35+
let hubListener = Amplify.Hub.listen(to: .dataStore) { payload in
36+
if payload.eventName == HubPayload.EventName.DataStore.subscriptionsEstablished {
37+
XCTAssertNil(payload.data)
38+
subscriptionsEstablishedReceived.fulfill()
39+
}
40+
41+
if payload.eventName == HubPayload.EventName.DataStore.syncQueriesStarted {
42+
guard let syncQueriesStartedEvent = payload.data as? SyncQueriesStartedEvent else {
43+
XCTFail("Failed to cast payload data as SyncQueriesStartedEvent")
44+
return
45+
}
46+
XCTAssertEqual(syncQueriesStartedEvent.models.count, 2)
47+
syncQueriesStartedReceived.fulfill()
48+
}
49+
50+
if payload.eventName == HubPayload.EventName.DataStore.outboxStatus {
51+
guard let outboxStatusEvent = payload.data as? OutboxStatusEvent else {
52+
XCTFail("Failed to cast payload data as OutboxStatusEvent")
53+
return
54+
}
55+
XCTAssertTrue(outboxStatusEvent.isEmpty)
56+
outboxStatusReceived.fulfill()
57+
}
58+
}
59+
60+
guard try HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else {
61+
XCTFail("Listener not registered for hub")
62+
return
63+
}
64+
65+
waitForExpectations(timeout: networkTimeout, handler: nil)
66+
}
67+
}

0 commit comments

Comments
 (0)