Skip to content

Commit 8189460

Browse files
authored
fix(api): storing cancelablles with actor methods in AppSyncRTC (#3824)
* fix(api): storing cancelablles with actor methods in AppSyncRTC * add unit test cases * remove internal modifier
1 parent 673a075 commit 8189460

File tree

2 files changed

+70
-24
lines changed

2 files changed

+70
-24
lines changed

AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
165165
// Placing the actual subscription work in a deferred task and
166166
// promptly returning the filtered publisher for downstream consumption of all error messages.
167167
defer {
168-
Task { [weak self] in
168+
let task = Task { [weak self] in
169169
guard let self = self else { return }
170170
if !(await self.isConnected) {
171171
try await connect()
172172
try await waitForState(.connected)
173173
}
174-
await self.bindCancellableToConnection(try await self.startSubscription(id))
175-
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
174+
await self.storeInConnectionCancellables(try await self.startSubscription(id))
175+
}
176+
self.storeInConnectionCancellables(task.toAnyCancellable)
176177
}
177178

178179
return filterAppSyncSubscriptionEvent(with: id)
@@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
236237
}
237238

238239
private func subscribeToWebSocketEvent() async {
239-
await self.webSocketClient.publisher.sink { [weak self] _ in
240+
let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in
240241
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
241242
} receiveValue: { webSocketEvent in
242243
Task { [weak self] in
243-
await self?.onWebSocketEvent(webSocketEvent)
244-
}.toAnyCancellable.store(in: &self.cancellables)
244+
let task = Task { [weak self] in
245+
await self?.onWebSocketEvent(webSocketEvent)
246+
}
247+
await self?.storeInCancellables(task.toAnyCancellable)
248+
}
245249
}
246-
.store(in: &cancellables)
250+
self.storeInCancellables(cancellable)
247251
}
248252

249253
private func resumeExistingSubscriptions() {
250254
log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions")
251255
for (id, _) in self.subscriptions {
252-
Task {
256+
Task { [weak self] in
253257
do {
254-
try await self.startSubscription(id).store(in: &cancellablesBindToConnection)
258+
if let cancellable = try await self?.startSubscription(id) {
259+
await self?.storeInConnectionCancellables(cancellable)
260+
}
255261
} catch {
256-
log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
262+
Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
257263
}
258264
}
259265
}
@@ -286,7 +292,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
286292
subject.filter {
287293
switch $0 {
288294
case .success(let response): return response.id == id || response.type == .connectionError
289-
case .failure(let error): return true
295+
case .failure: return true
290296
}
291297
}
292298
.map { result -> AppSyncSubscriptionEvent? in
@@ -350,10 +356,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
350356
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
351357
}
352358

353-
private func bindCancellableToConnection(_ cancellable: AnyCancellable) {
354-
cancellable.store(in: &cancellablesBindToConnection)
355-
}
356-
357359
}
358360

359361
// MARK: - On WebSocket Events
@@ -366,8 +368,11 @@ extension AppSyncRealTimeClient {
366368
if self.state.value == .connectionDropped {
367369
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
368370
Task { [weak self] in
369-
try? await self?.connect()
370-
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
371+
let task = Task { [weak self] in
372+
try? await self?.connect()
373+
}
374+
await self?.storeInConnectionCancellables(task.toAnyCancellable)
375+
}
371376
}
372377

373378
case let .disconnected(closeCode, reason): //
@@ -425,24 +430,37 @@ extension AppSyncRealTimeClient {
425430
}
426431
}
427432

428-
private func monitorHeartBeats(_ connectionAck: JSONValue?) {
433+
func monitorHeartBeats(_ connectionAck: JSONValue?) {
429434
let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0
430435
log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms")
431-
heartBeats.eraseToAnyPublisher()
436+
let cancellable = heartBeats.eraseToAnyPublisher()
432437
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
433438
.first()
434-
.sink(receiveValue: {
435-
self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
439+
.sink(receiveValue: { [weak self] in
440+
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
436441
Task { [weak self] in
437-
await self?.reconnect()
438-
}.toAnyCancellable.store(in: &self.cancellables)
442+
let task = Task { [weak self] in
443+
await self?.reconnect()
444+
}
445+
await self?.storeInCancellables(task.toAnyCancellable)
446+
}
439447
})
440-
.store(in: &cancellablesBindToConnection)
448+
self.storeInConnectionCancellables(cancellable)
441449
// start counting down
442450
heartBeats.send(())
443451
}
444452
}
445453

454+
extension AppSyncRealTimeClient {
455+
private func storeInCancellables(_ cancellable: AnyCancellable) {
456+
self.cancellables.insert(cancellable)
457+
}
458+
459+
private func storeInConnectionCancellables(_ cancellable: AnyCancellable) {
460+
self.cancellablesBindToConnection.insert(cancellable)
461+
}
462+
}
463+
446464
extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never {
447465
func toAppSyncSubscriptionEventStream() -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
448466
self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in

AmplifyPlugins/API/Tests/AWSAPIPluginTests/AppSyncRealTimeClient/AppSyncRealTimeClientTests.swift

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,4 +551,32 @@ class AppSyncRealTimeClientTests: XCTestCase {
551551
await fulfillment(of: [startTriggered, errorReceived], timeout: 2)
552552

553553
}
554+
555+
func testReconnect_whenHeartBeatSignalIsNotReceived() async throws {
556+
var cancellables = Set<AnyCancellable>()
557+
let timeout = 1.0
558+
let mockWebSocketClient = MockWebSocketClient()
559+
let mockAppSyncRequestInterceptor = MockAppSyncRequestInterceptor()
560+
let appSyncClient = AppSyncRealTimeClient(
561+
endpoint: URL(string: "https://example.com")!,
562+
requestInterceptor: mockAppSyncRequestInterceptor,
563+
webSocketClient: mockWebSocketClient
564+
)
565+
566+
// start monitoring
567+
await appSyncClient.monitorHeartBeats(.object([
568+
"connectionTimeoutMs": 100
569+
]))
570+
571+
let reconnect = expectation(description: "webSocket triggers event to connection")
572+
await mockWebSocketClient.actionSubject.sink { action in
573+
switch action {
574+
case .connect:
575+
reconnect.fulfill()
576+
default: break
577+
}
578+
}.store(in: &cancellables)
579+
await fulfillment(of: [reconnect], timeout: 2)
580+
}
581+
554582
}

0 commit comments

Comments
 (0)