Skip to content

Commit 8e719aa

Browse files
dnys1Jordan-Nelson
andauthored
fix(datastore): Re-emit events on hot restart (#980)
* Add hot restart protection * Add test * Update DataStoreHubEventStreamHandlerTests.swift Fix unit test * Add deinit for test cases * fix(datastore): replay events on Android after a hot restart (#965) * fix: replay events on Android after a hot restart * chore: use mutable list * Update formatting Co-authored-by: Jordan Nelson <[email protected]>
1 parent 75264e8 commit 8e719aa

File tree

4 files changed

+360
-184
lines changed

4 files changed

+360
-184
lines changed

packages/amplify_datastore/android/src/main/kotlin/com/amazonaws/amplify/amplify_datastore/DataStoreHubEventStreamHandler.kt

Lines changed: 124 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ class DataStoreHubEventStreamHandler : EventChannel.StreamHandler {
4040
private val LOG = Amplify.Logging.forNamespace("amplify:flutter:datastore")
4141
private var forwardHubResponse : (event: Map<String, Any>) -> Unit
4242

43+
// DataStore hub event history. Used to track events which may be lost on hot restart, such as sync and ready events.
44+
private var eventHistory: MutableList<HubEvent<*>> = mutableListOf();
45+
46+
// Event types which should be replayed on hot restart.
47+
private val replayEvents: Set<String> = setOf(
48+
DataStoreChannelEventName.NETWORK_STATUS.toString(),
49+
DataStoreChannelEventName.SUBSCRIPTIONS_ESTABLISHED.toString(),
50+
DataStoreChannelEventName.SYNC_QUERIES_STARTED.toString(),
51+
DataStoreChannelEventName.MODEL_SYNCED.toString(),
52+
DataStoreChannelEventName.SYNC_QUERIES_READY.toString(),
53+
DataStoreChannelEventName.READY.toString()
54+
)
55+
4356
constructor(){
4457
forwardHubResponse = {event: Map<String, Any> -> handler.post {
4558
eventSink?.success(event)
@@ -56,125 +69,134 @@ class DataStoreHubEventStreamHandler : EventChannel.StreamHandler {
5669
}
5770

5871
fun getHubListener(): SubscriptionToken {
59-
return Amplify.Hub.subscribe(HubChannel.DATASTORE
60-
) { hubEvent: HubEvent<*> ->
61-
try {
62-
when (hubEvent.name) {
63-
DataStoreChannelEventName.NETWORK_STATUS.toString() -> {
64-
try {
65-
var networkEvent = hubEvent.data as NetworkStatusEvent
66-
var res = FlutterNetworkStatusEvent(hubEvent.name, networkEvent.active)
67-
sendEvent(res.toValueMap())
68-
} catch (e: Exception) {
69-
LOG.error("Failed to parse and send networkStatus event: ", e)
70-
}
72+
for (event in eventHistory) {
73+
if (replayEvents.contains(event.name)) {
74+
sendPayload(event);
75+
}
76+
}
77+
return Amplify.Hub.subscribe(HubChannel.DATASTORE) { hubEvent: HubEvent<*> ->
78+
eventHistory.add(hubEvent)
79+
sendPayload(hubEvent)
80+
}
81+
}
82+
83+
private fun sendPayload(hubEvent: HubEvent<*>) {
84+
try {
85+
when (hubEvent.name) {
86+
DataStoreChannelEventName.NETWORK_STATUS.toString() -> {
87+
try {
88+
var networkEvent = hubEvent.data as NetworkStatusEvent
89+
var res = FlutterNetworkStatusEvent(hubEvent.name, networkEvent.active)
90+
sendEvent(res.toValueMap())
91+
} catch (e: Exception) {
92+
LOG.error("Failed to parse and send networkStatus event: ", e)
7193
}
72-
DataStoreChannelEventName.SUBSCRIPTIONS_ESTABLISHED.toString() -> {
73-
try {
74-
var res = FlutterSubscriptionsEstablishedEvent(hubEvent.name)
75-
sendEvent(res.toValueMap())
76-
} catch (e: Exception) {
77-
LOG.error("Failed to parse and send subscriptionsEstablished event: ", e)
78-
}
94+
}
95+
DataStoreChannelEventName.SUBSCRIPTIONS_ESTABLISHED.toString() -> {
96+
try {
97+
var res = FlutterSubscriptionsEstablishedEvent(hubEvent.name)
98+
sendEvent(res.toValueMap())
99+
} catch (e: Exception) {
100+
LOG.error("Failed to parse and send subscriptionsEstablished event: ", e)
79101
}
80-
DataStoreChannelEventName.SYNC_QUERIES_STARTED.toString() -> {
81-
try {
82-
var syncQueriesStartedEvent = hubEvent.data as SyncQueriesStartedEvent
83-
var res = FlutterSyncQueriesStartedEvent(hubEvent.name, syncQueriesStartedEvent.models)
84-
sendEvent(res.toValueMap())
85-
} catch (e: Exception) {
86-
LOG.error("Failed to parse and send syncQueriesStarted event: ", e)
87-
}
102+
}
103+
DataStoreChannelEventName.SYNC_QUERIES_STARTED.toString() -> {
104+
try {
105+
var syncQueriesStartedEvent = hubEvent.data as SyncQueriesStartedEvent
106+
var res = FlutterSyncQueriesStartedEvent(hubEvent.name, syncQueriesStartedEvent.models)
107+
sendEvent(res.toValueMap())
108+
} catch (e: Exception) {
109+
LOG.error("Failed to parse and send syncQueriesStarted event: ", e)
110+
}
111+
}
112+
DataStoreChannelEventName.MODEL_SYNCED.toString() -> {
113+
try {
114+
var modelSyncedEvent = hubEvent.data as ModelSyncedEvent
115+
var res = FlutterModelSyncedEvent(
116+
hubEvent.name,
117+
modelSyncedEvent.model,
118+
modelSyncedEvent.isFullSync,
119+
modelSyncedEvent.isDeltaSync,
120+
modelSyncedEvent.added,
121+
modelSyncedEvent.updated,
122+
modelSyncedEvent.deleted
123+
)
124+
sendEvent(res.toValueMap())
125+
} catch (e: Exception) {
126+
LOG.error("Failed to parse and send modelSynced event: ", e)
88127
}
89-
DataStoreChannelEventName.MODEL_SYNCED.toString() -> {
90-
try {
91-
var modelSyncedEvent = hubEvent.data as ModelSyncedEvent
92-
var res = FlutterModelSyncedEvent(
93-
hubEvent.name,
94-
modelSyncedEvent.model,
95-
modelSyncedEvent.isFullSync,
96-
modelSyncedEvent.isDeltaSync,
97-
modelSyncedEvent.added,
98-
modelSyncedEvent.updated,
99-
modelSyncedEvent.deleted
100-
)
101-
sendEvent(res.toValueMap())
102-
} catch (e: Exception) {
103-
LOG.error("Failed to parse and send modelSynced event: ", e)
104-
}
105128

129+
}
130+
DataStoreChannelEventName.SYNC_QUERIES_READY.toString() -> {
131+
try {
132+
var res = FlutterSyncQueriesReadyEvent(hubEvent.name)
133+
sendEvent(res.toValueMap())
134+
} catch (e: Exception) {
135+
LOG.error("Failed to parse and send syncQueriesReady event: ", e)
106136
}
107-
DataStoreChannelEventName.SYNC_QUERIES_READY.toString() -> {
108-
try {
109-
var res = FlutterSyncQueriesReadyEvent(hubEvent.name)
110-
sendEvent(res.toValueMap())
111-
} catch (e: Exception) {
112-
LOG.error("Failed to parse and send syncQueriesReady event: ", e)
113-
}
137+
}
138+
DataStoreChannelEventName.READY.toString() -> {
139+
try {
140+
var res = FlutterReadyEvent(hubEvent.name)
141+
sendEvent(res.toValueMap())
142+
} catch (e: Exception) {
143+
LOG.error("Failed to parse and send ready event: ", e)
114144
}
115-
DataStoreChannelEventName.READY.toString() -> {
116-
try {
117-
var res = FlutterReadyEvent(hubEvent.name)
145+
}
146+
DataStoreChannelEventName.OUTBOX_MUTATION_ENQUEUED.toString() -> {
147+
try {
148+
var outboxMutationEnqueued = hubEvent.data as OutboxMutationEvent<*>
149+
if (outboxMutationEnqueued.element.model is SerializedModel) {
150+
var modelName = (outboxMutationEnqueued.element.model as SerializedModel).modelName as String
151+
var res = FlutterOutboxMutationEnqueuedEvent(
152+
hubEvent.name,
153+
modelName,
154+
outboxMutationEnqueued.element
155+
)
118156
sendEvent(res.toValueMap())
119-
} catch (e: Exception) {
120-
LOG.error("Failed to parse and send ready event: ", e)
157+
} else {
158+
LOG.error("Element is not an instance of SerializedModel.")
121159
}
160+
} catch (e: Exception) {
161+
LOG.error("Failed to parse and send outboxMutationEnqueued event: ", e)
122162
}
123-
DataStoreChannelEventName.OUTBOX_MUTATION_ENQUEUED.toString() -> {
124-
try {
125-
var outboxMutationEnqueued = hubEvent.data as OutboxMutationEvent<*>
126-
if (outboxMutationEnqueued.element.model is SerializedModel) {
127-
var modelName = (outboxMutationEnqueued.element.model as SerializedModel).modelName as String
128-
var res = FlutterOutboxMutationEnqueuedEvent(
129-
hubEvent.name,
130-
modelName,
131-
outboxMutationEnqueued.element
132-
)
133-
sendEvent(res.toValueMap())
134-
} else {
135-
LOG.error("Element is not an instance of SerializedModel.")
136-
}
137-
} catch (e: Exception) {
138-
LOG.error("Failed to parse and send outboxMutationEnqueued event: ", e)
139-
}
140163

141-
}
142-
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED.toString() -> {
143-
try {
144-
var outboxMutationProcessed = hubEvent.data as OutboxMutationEvent<*>
145-
if (outboxMutationProcessed.element.model is SerializedModel) {
146-
var modelName = outboxMutationProcessed.modelName
147-
var res = FlutterOutboxMutationProcessedEvent(
148-
hubEvent.name,
149-
modelName,
150-
outboxMutationProcessed.element
151-
)
152-
sendEvent(res.toValueMap())
153-
} else {
154-
LOG.error("Element is not an instance of SerializedModel.")
155-
}
156-
} catch (e: Exception) {
157-
LOG.error("Failed to parse and send outboxMutationProcessed event: ", e)
158-
}
159-
}
160-
DataStoreChannelEventName.OUTBOX_STATUS.toString() -> {
161-
try {
162-
var outboxEvent = hubEvent.data as OutboxStatusEvent
163-
var res = FlutterOutboxStatusEvent(hubEvent.name, outboxEvent.isEmpty)
164+
}
165+
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED.toString() -> {
166+
try {
167+
var outboxMutationProcessed = hubEvent.data as OutboxMutationEvent<*>
168+
if (outboxMutationProcessed.element.model is SerializedModel) {
169+
var modelName = outboxMutationProcessed.modelName
170+
var res = FlutterOutboxMutationProcessedEvent(
171+
hubEvent.name,
172+
modelName,
173+
outboxMutationProcessed.element
174+
)
164175
sendEvent(res.toValueMap())
165-
} catch (e: Exception) {
166-
LOG.error("Failed to parse and send outboxStatus event: ", e)
176+
} else {
177+
LOG.error("Element is not an instance of SerializedModel.")
167178
}
179+
} catch (e: Exception) {
180+
LOG.error("Failed to parse and send outboxMutationProcessed event: ", e)
168181
}
169-
else -> {
170-
LOG.info("Unhandled DataStoreHubEvent: " + hubEvent.name + "\n" + hubEvent.data.toString())
182+
}
183+
DataStoreChannelEventName.OUTBOX_STATUS.toString() -> {
184+
try {
185+
var outboxEvent = hubEvent.data as OutboxStatusEvent
186+
var res = FlutterOutboxStatusEvent(hubEvent.name, outboxEvent.isEmpty)
187+
sendEvent(res.toValueMap())
188+
} catch (e: Exception) {
189+
LOG.error("Failed to parse and send outboxStatus event: ", e)
171190
}
172191
}
173-
} catch (e: Exception) {
174-
LOG.error("Error parsing DataStore Hub event.")
192+
else -> {
193+
LOG.info("Unhandled DataStoreHubEvent: " + hubEvent.name + "\n" + hubEvent.data.toString())
194+
}
175195
}
196+
} catch (e: Exception) {
197+
LOG.error("Error parsing DataStore Hub event.")
176198
}
177-
}
199+
}
178200

179201
fun sendEvent(flutterEvent: Map<String, Any>) {
180202
forwardHubResponse(flutterEvent)

packages/amplify_datastore/android/src/test/kotlin/com/amazonaws/amplify/amplify_datastore/AmplifyDataStoreHubTest.kt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,42 @@ class AmplifyDataStoreHubTest {
367367
verify(hubSpy, times(1)).sendEvent(hubMap.toValueMap())
368368
}
369369

370+
@Test
371+
fun test_replay_events() {
372+
flutterPlugin = AmplifyDataStorePlugin(eventHandler = mockStreamHandler, hubEventHandler = mockHubHandler)
373+
var eventData: HashMap<String, Any> = (readMapFromFile("hub",
374+
"readyEvent.json",
375+
HashMap::class.java) as HashMap<String, Any>)
376+
377+
var event: HubEvent<*> = HubEvent.create(DataStoreChannelEventName.READY)
378+
379+
val latch = CountDownLatch(1)
380+
381+
val realHubHandler = DataStoreHubEventStreamHandler(latch)
382+
383+
val hubSpy = spy(realHubHandler)
384+
val hubMap = FlutterReadyEvent(
385+
eventData["eventName"] as String
386+
)
387+
388+
// initial event
389+
val token: SubscriptionToken = hubSpy.getHubListener()
390+
Amplify.Hub.publish(HubChannel.DATASTORE, event)
391+
Latch.await(latch);
392+
Amplify.Hub.unsubscribe(token)
393+
shadowOf(getMainLooper()).idle()
394+
verify(hubSpy, times(1)).sendEvent(hubMap.toValueMap())
395+
396+
// replay event after getHubListener() is invoked a second time
397+
val token2: SubscriptionToken = hubSpy.getHubListener()
398+
Latch.await(latch);
399+
Amplify.Hub.unsubscribe(token2)
400+
shadowOf(getMainLooper()).idle()
401+
verify(hubSpy, times(2)).sendEvent(hubMap.toValueMap())
402+
403+
}
404+
405+
370406
private fun setFinalStatic(field: Field, newValue: Any?) {
371407
field.isAccessible = true
372408
val modifiersField: Field = Field::class.java.getDeclaredField("modifiers")

packages/amplify_datastore/example/ios/unit_tests/DataStoreHubEventStreamHandlerTests.swift

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,4 +328,73 @@ class DataStoreHubEventStreamHandlerTests: XCTestCase {
328328
waitForExpectations(timeout: 1.0)
329329
hubHandler.onCancel(withArguments: nil)
330330
}
331+
332+
func test_hot_restart_replays_sync_and_ready_events() {
333+
let payloads = [
334+
HubPayload(
335+
eventName: HubPayload.EventName.DataStore.ready,
336+
context: nil,
337+
data: nil
338+
),
339+
HubPayload(
340+
eventName: HubPayload.EventName.DataStore.modelSynced,
341+
context: nil,
342+
data: ModelSyncedEvent(
343+
modelName: "Test",
344+
isFullSync: true,
345+
isDeltaSync: false,
346+
added: 0,
347+
updated: 0,
348+
deleted: 0
349+
)
350+
),
351+
]
352+
353+
var events: Set<String> = []
354+
func buildEventSink(expectCount: Int) -> (FlutterEventSink, XCTestExpectation) {
355+
let expect = expectation(description: "Emits \(expectCount) events")
356+
let eventSink = { (event: Any?) in
357+
guard let eventMap = event as? [String: Any],
358+
let eventName = eventMap["eventName"] as? String else {
359+
XCTFail("Bad event: \(event ?? "nil")")
360+
return
361+
}
362+
events.insert(eventName)
363+
if events.count == expectCount {
364+
expect.fulfill()
365+
}
366+
}
367+
return (eventSink, expect)
368+
}
369+
370+
let hubHandler = DataStoreHubEventStreamHandler()
371+
var (eventSink, expect) = buildEventSink(expectCount: payloads.count)
372+
let listenError = hubHandler.onListen(withArguments: nil, eventSink: eventSink)
373+
XCTAssertNil(listenError)
374+
375+
for payload in payloads {
376+
Amplify.Hub.dispatch(to: .dataStore, payload: payload)
377+
}
378+
379+
let expectedEvents = Set<String>(payloads.map { shortEventName(eventName: $0.eventName) })
380+
381+
wait(for: [expect], timeout: 1)
382+
XCTAssertEqual(events, expectedEvents)
383+
384+
(eventSink, expect) = buildEventSink(expectCount: payloads.count)
385+
func hotRestart() {
386+
events = []
387+
388+
let cancelError = hubHandler.onCancel(withArguments: nil)
389+
XCTAssertNil(cancelError)
390+
391+
let listenError = hubHandler.onListen(withArguments: nil, eventSink: eventSink)
392+
XCTAssertNil(listenError)
393+
}
394+
395+
hotRestart()
396+
397+
wait(for: [expect], timeout: 1)
398+
XCTAssertEqual(events, expectedEvents)
399+
}
331400
}

0 commit comments

Comments
 (0)