Skip to content

Commit 2c76122

Browse files
authored
feat(datastore): Dispatch outboxMutationEnqueued, outboxMutationProcessed events (#759)
* initial outboxMutation PR * renamed OutboxMutationEventMetadata as OutboxMutationEventElement * fixed one unit test per my PR * Added one integration test * Updated some comments * added one comment * outboxMutationProcessed is emitted only when there is a successful response * updated the logic * did some clean up * about to merge
1 parent 0120302 commit 2c76122

File tree

9 files changed

+219
-26
lines changed

9 files changed

+219
-26
lines changed

Amplify/Categories/DataStore/DataStoreCategory+HubPayloadEventName.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,12 @@ public extension HubPayload.EventName.DataStore {
3737
/// Dispatched when DataStore is about to start sync queries
3838
/// HubPayload `syncQueriesStartedEvent` contains an array of each model's `name`
3939
static let syncQueriesStarted = "DataStore.syncQueriesStarted"
40+
41+
/// Dispatched when a local mutation is enqueued into the outgoing mutation queue `outbox`
42+
/// HubPayload `outboxMutationEvent` contains the name and instance of the model
43+
static let outboxMutationEnqueued = "DataStore.outboxMutationEnqueued"
44+
45+
/// Dispatched when a mutation from outgoing mutation queue `outbox` is sent to backend and updated locally.
46+
/// HubPayload `outboxMutationEvent` contains the name and instance of the model
47+
static let outboxMutationProcessed = "DataStore.outboxMutationProcessed"
4048
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//
2+
// Copyright 2018-2020 Amazon.com,
3+
// Inc. or its affiliates. All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
import Amplify
9+
import AWSPluginsCore
10+
11+
/// Used as HubPayload for `OutboxMutationEnqueued` and `OutboxMutationProcessed`
12+
public struct OutboxMutationEvent {
13+
public let modelName: String
14+
public let element: OutboxMutationEventElement
15+
16+
public init(modelName: String, element: OutboxMutationEventElement) {
17+
self.modelName = modelName
18+
self.element = element
19+
}
20+
public static func fromModelWithMetadata(modelName: String,
21+
model: Model,
22+
mutationSync: MutationSync<AnyModel>) -> OutboxMutationEvent {
23+
let element = OutboxMutationEventElement(model: model,
24+
version: mutationSync.syncMetadata.version,
25+
lastChangedAt: mutationSync.syncMetadata.lastChangedAt,
26+
deleted: mutationSync.syncMetadata.deleted)
27+
return OutboxMutationEvent(modelName: modelName, element: element)
28+
}
29+
30+
public static func fromModelWithoutMetadata(modelName: String,
31+
model: Model) -> OutboxMutationEvent {
32+
let element = OutboxMutationEventElement(model: model)
33+
return OutboxMutationEvent(modelName: modelName, element: element)
34+
}
35+
36+
public struct OutboxMutationEventElement {
37+
public let model: Model
38+
public var version: Int?
39+
public var lastChangedAt: Int?
40+
public var deleted: Bool?
41+
}
42+
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
190190
self.processSyncMutationToCloudResult(result, mutationEvent: mutationEvent, api: api)
191191
}
192192

193+
dispatchOutboxMutationEnqueuedEvent(mutationEvent: mutationEvent)
193194
dispatchOutboxStatusEvent(isEmpty: false)
194195
operationQueue.addOperation(syncMutationToCloudOperation)
195196
stateMachine.notify(action: .enqueuedEvent)
@@ -198,18 +199,21 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
198199
private func processSyncMutationToCloudResult(_ result: GraphQLOperation<MutationSync<AnyModel>>.OperationResult,
199200
mutationEvent: MutationEvent,
200201
api: APICategoryGraphQLBehavior) {
201-
if case let .success(graphQLResponse) = result, case let .failure(graphQLResponseError) = graphQLResponse {
202-
processMutationErrorFromCloud(mutationEvent: mutationEvent,
203-
api: api,
204-
apiError: nil,
205-
graphQLResponseError: graphQLResponseError)
202+
if case let .success(graphQLResponse) = result {
203+
if case let .success(graphQLResult) = graphQLResponse {
204+
completeProcessingEvent(mutationEvent,
205+
mutationSyncMetadata: graphQLResult)
206+
} else if case let .failure(graphQLResponseError) = graphQLResponse {
207+
processMutationErrorFromCloud(mutationEvent: mutationEvent,
208+
api: api,
209+
apiError: nil,
210+
graphQLResponseError: graphQLResponseError)
211+
}
206212
} else if case let .failure(apiError) = result {
207213
processMutationErrorFromCloud(mutationEvent: mutationEvent,
208214
api: api,
209215
apiError: apiError,
210216
graphQLResponseError: nil)
211-
} else {
212-
completeProcessingEvent(mutationEvent)
213217
}
214218
}
215219

@@ -229,12 +233,14 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
229233
let outgoingMutationEvent = mutationEventOptional {
230234
self.outgoingMutationQueueSubject.send(outgoingMutationEvent)
231235
}
232-
self.completeProcessingEvent(mutationEvent)
236+
self.completeProcessingEvent(mutationEvent,
237+
mutationSyncMetadata: nil)
233238
}
234239
operationQueue.addOperation(processMutationErrorFromCloudOperation)
235240
}
236241

237-
private func completeProcessingEvent(_ mutationEvent: MutationEvent) {
242+
private func completeProcessingEvent(_ mutationEvent: MutationEvent,
243+
mutationSyncMetadata: MutationSync<AnyModel>?) {
238244
// This doesn't belong here--need to add a `delete` API to the MutationEventSource and pass a
239245
// reference into the mutation queue.
240246
Amplify.DataStore.delete(mutationEvent) { result in
@@ -245,6 +251,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
245251
self.log.verbose("mutationEvent deleted successfully")
246252
}
247253

254+
if let mutationSyncMetadata = mutationSyncMetadata {
255+
self.dispatchOutboxMutationProcessedEvent(mutationEvent: mutationEvent,
256+
mutationSync: mutationSyncMetadata)
257+
}
248258
self.queryMutationEventsFromStorage {
249259
self.stateMachine.notify(action: .processedEvent)
250260
}
@@ -269,6 +279,38 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
269279
}
270280
}
271281

282+
private func dispatchOutboxMutationProcessedEvent(mutationEvent: MutationEvent,
283+
mutationSync: MutationSync<AnyModel>) {
284+
do {
285+
let localModel = try mutationEvent.decodeModel()
286+
let outboxMutationProcessedEvent = OutboxMutationEvent
287+
.fromModelWithMetadata(modelName: mutationEvent.modelName,
288+
model: localModel,
289+
mutationSync: mutationSync)
290+
let payload = HubPayload(eventName: HubPayload.EventName.DataStore.outboxMutationProcessed,
291+
data: outboxMutationProcessedEvent)
292+
Amplify.Hub.dispatch(to: .dataStore, payload: payload)
293+
} catch {
294+
log.error("\(#function) Couldn't decode local model as \(mutationEvent.modelName)")
295+
return
296+
}
297+
}
298+
299+
private func dispatchOutboxMutationEnqueuedEvent(mutationEvent: MutationEvent) {
300+
do {
301+
let localModel = try mutationEvent.decodeModel()
302+
let outboxMutationEnqueuedEvent = OutboxMutationEvent
303+
.fromModelWithoutMetadata(modelName: mutationEvent.modelName,
304+
model: localModel)
305+
let payload = HubPayload(eventName: HubPayload.EventName.DataStore.outboxMutationEnqueued,
306+
data: outboxMutationEnqueuedEvent)
307+
Amplify.Hub.dispatch(to: .dataStore, payload: payload)
308+
} catch {
309+
log.error("\(#function) Couldn't decode local model as \(mutationEvent.modelName)")
310+
return
311+
}
312+
}
313+
272314
private func dispatchOutboxStatusEvent(isEmpty: Bool) {
273315
let outboxStatusEvent = OutboxStatusEvent(isEmpty: isEmpty)
274316
let outboxStatusEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.outboxStatus,

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/DataStoreHubEventsTests.swift

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,62 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase {
6363
}
6464

6565
waitForExpectations(timeout: networkTimeout, handler: nil)
66+
Amplify.Hub.removeListener(hubListener)
67+
}
68+
69+
/// - Given:
70+
/// - registered two models from `TestModelRegistration`
71+
/// - no pending MutationEvents in MutationEvent database
72+
/// - When:
73+
/// - Call `Amplify.DataStore.save()` to save a Post model
74+
/// - Then:
75+
/// - outboxMutationEnqueued received, payload should be:
76+
/// {modelName: "Post", element: {id: #, content: "some content"}}
77+
/// - outboxMutationProcessed received, payload should be:
78+
/// {modelName: "Post", element: {model: {id: #, content: "some content"}, version: 1, deleted: false, lastChangedAt: "some time"}}
79+
func testOutboxMutationEvents() throws {
80+
81+
let post = Post(title: "title", content: "content", createdAt: .now())
82+
83+
let outboxMutationEnqueuedReceived = expectation(description: "outboxMutationEnqueued received")
84+
let outboxMutationProcessedReceived = expectation(description: "outboxMutationProcessed received")
85+
86+
let hubListener = Amplify.Hub.listen(to: .dataStore) { payload in
87+
if payload.eventName == HubPayload.EventName.DataStore.outboxMutationEnqueued {
88+
guard let outboxMutationEnqueuedEvent = payload.data as? OutboxMutationEvent else {
89+
XCTFail("Failed to cast payload data as OutboxMutationEvent")
90+
return
91+
}
92+
XCTAssertEqual(outboxMutationEnqueuedEvent.modelName, "Post")
93+
XCTAssertEqual(outboxMutationEnqueuedEvent.element.model.modelName, "Post")
94+
XCTAssertNil(outboxMutationEnqueuedEvent.element.version)
95+
XCTAssertNil(outboxMutationEnqueuedEvent.element.lastChangedAt)
96+
XCTAssertNil(outboxMutationEnqueuedEvent.element.deleted)
97+
outboxMutationEnqueuedReceived.fulfill()
98+
}
99+
100+
if payload.eventName == HubPayload.EventName.DataStore.outboxMutationProcessed {
101+
guard let outboxMutationProcessedEvent = payload.data as? OutboxMutationEvent else {
102+
XCTFail("Failed to cast payload data as OutboxMutationEvent")
103+
return
104+
}
105+
XCTAssertEqual(outboxMutationProcessedEvent.modelName, "Post")
106+
XCTAssertEqual(outboxMutationProcessedEvent.element.model.modelName, "Post")
107+
XCTAssertEqual(outboxMutationProcessedEvent.element.version, 1)
108+
XCTAssertNotNil(outboxMutationProcessedEvent.element.lastChangedAt)
109+
XCTAssertEqual(outboxMutationProcessedEvent.element.deleted, false)
110+
outboxMutationProcessedReceived.fulfill()
111+
}
112+
}
113+
114+
guard try HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else {
115+
XCTFail("Listener not registered for hub")
116+
return
117+
}
118+
119+
Amplify.DataStore.save(post) { _ in }
120+
121+
waitForExpectations(timeout: networkTimeout, handler: nil)
122+
Amplify.Hub.removeListener(hubListener)
66123
}
67124
}

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginIntegrationTests/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ The following steps demonstrate how to set up DataStore with a conflict resoluti
2727

2828
4. `amplify push`
2929

30-
5. Copy `awsconfiguration.json` over to the Config folder
30+
5. Copy `amplifyconfiguration.json` over to the Config folder
3131

3232
You should now be able to run all of the tests

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/MutationQueue/OutgoingMutationQueueTests.swift

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,35 @@ class OutgoingMutationQueueTests: SyncEngineTestBase {
3333
var outboxStatusReceivedCurrentCount = 0
3434
let outboxStatusOnStart = expectation(description: "On DataStore start, outboxStatus received")
3535
let outboxStatusOnMutationEnqueued = expectation(description: "Mutation enqueued, outboxStatus received")
36+
let outboxMutationEnqueued = expectation(description: "Mutation enqueued, outboxMutationEnqueued received")
37+
38+
let outboxStatusFilter = HubFilters.forEventName(HubPayload.EventName.DataStore.outboxStatus)
39+
let outboxMutationEnqueuedFilter = HubFilters.forEventName(HubPayload.EventName.DataStore.outboxMutationEnqueued)
40+
let filters = HubFilters.any(filters: outboxStatusFilter, outboxMutationEnqueuedFilter)
41+
let hubListener = Amplify.Hub.listen(to: .dataStore, isIncluded: filters) { payload in
42+
if payload.eventName == HubPayload.EventName.DataStore.outboxStatus {
43+
outboxStatusReceivedCurrentCount += 1
44+
guard let outboxStatusEvent = payload.data as? OutboxStatusEvent else {
45+
XCTFail("Failed to cast payload data as OutboxStatusEvent")
46+
return
47+
}
3648

37-
let filter = HubFilters.forEventName(HubPayload.EventName.DataStore.outboxStatus)
38-
let hubListener = Amplify.Hub.listen(to: .dataStore, isIncluded: filter) { payload in
39-
outboxStatusReceivedCurrentCount += 1
40-
guard let outboxStatusEvent = payload.data as? OutboxStatusEvent else {
41-
XCTFail("Failed to cast payload data as OutboxStatusEvent")
42-
return
49+
if outboxStatusReceivedCurrentCount == 1 {
50+
XCTAssertTrue(outboxStatusEvent.isEmpty)
51+
outboxStatusOnStart.fulfill()
52+
} else {
53+
XCTAssertFalse(outboxStatusEvent.isEmpty)
54+
outboxStatusOnMutationEnqueued.fulfill()
55+
}
4356
}
4457

45-
if outboxStatusReceivedCurrentCount == 1 {
46-
XCTAssertTrue(outboxStatusEvent.isEmpty)
47-
outboxStatusOnStart.fulfill()
48-
} else {
49-
XCTAssertFalse(outboxStatusEvent.isEmpty)
50-
outboxStatusOnMutationEnqueued.fulfill()
58+
if payload.eventName == HubPayload.EventName.DataStore.outboxMutationEnqueued {
59+
guard let outboxStatusEvent = payload.data as? OutboxMutationEvent else {
60+
XCTFail("Failed to cast payload data as OutboxMutationEvent")
61+
return
62+
}
63+
XCTAssertEqual(outboxStatusEvent.modelName, "Post")
64+
outboxMutationEnqueued.fulfill()
5165
}
5266
}
5367

AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/MutationQueue/OutgoingMutationQueueTestsWithMockStateMachine.swift

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,33 +65,58 @@ class OutgoingMutationQueueMockStateTest: XCTestCase {
6565
waitForExpectations(timeout: 1)
6666
}
6767

68-
func testRequestingEvent_subscriptionSetup() {
68+
func testRequestingEvent_subscriptionSetup() throws {
6969
let semaphore = DispatchSemaphore(value: 0)
7070
stateMachine.pushExpectActionCriteria { action in
7171
XCTAssertEqual(action, OutgoingMutationQueue.Action.receivedSubscription)
7272
semaphore.signal()
7373
}
7474
stateMachine.state = .starting(apiBehavior, publisher)
7575
semaphore.wait()
76+
77+
let json = "{\"id\":\"1234\",\"title\":\"t\",\"content\":\"c\",\"createdAt\":\"2020-09-03T22:55:13.424Z\"}"
7678
let futureResult = MutationEvent(modelId: "1",
7779
modelName: "Post",
78-
json: "{}",
80+
json: json,
7981
mutationType: MutationEvent.MutationType.create)
8082
eventSource.pushMutationEvent(futureResult: .success(futureResult))
8183

8284
let enqueueEvent = expectation(description: "state requestingEvent, enqueueEvent")
8385
let processEvent = expectation(description: "state requestingEvent, processedEvent")
86+
8487
stateMachine.pushExpectActionCriteria { action in
8588
XCTAssertEqual(action, OutgoingMutationQueue.Action.enqueuedEvent)
8689
enqueueEvent.fulfill()
8790
}
91+
92+
let mutateAPICallExpecation = expectation(description: "Call to api category for mutate")
93+
var listenerFromRequest: GraphQLOperation<MutationSync<AnyModel>>.ResultListener!
94+
let responder = MutateRequestListenerResponder<MutationSync<AnyModel>> { _, eventListener in
95+
mutateAPICallExpecation.fulfill()
96+
listenerFromRequest = eventListener
97+
return nil
98+
}
99+
apiBehavior.responders[.mutateRequestListener] = responder
100+
101+
stateMachine.state = .requestingEvent
102+
103+
wait(for: [enqueueEvent, mutateAPICallExpecation], timeout: 1)
104+
88105
stateMachine.pushExpectActionCriteria { action in
89106
XCTAssertEqual(action, OutgoingMutationQueue.Action.processedEvent)
90107
processEvent.fulfill()
91108
}
92-
stateMachine.state = .requestingEvent
93109

94-
waitForExpectations(timeout: 1)
110+
let model = MockSynced(id: "id-1")
111+
let anyModel = try model.eraseToAnyModel()
112+
let remoteSyncMetadata = MutationSyncMetadata(id: model.id,
113+
deleted: false,
114+
lastChangedAt: Date().unixSeconds,
115+
version: 2)
116+
let remoteMutationSync = MutationSync(model: anyModel, syncMetadata: remoteSyncMetadata)
117+
listenerFromRequest(.success(.success(remoteMutationSync)))
118+
119+
wait(for: [processEvent], timeout: 1)
95120
}
96121

97122
func testRequestingEvent_nosubscription() {

0 commit comments

Comments
 (0)