From e266b7d240f7c3050b0bc6d5cdc99abba8411970 Mon Sep 17 00:00:00 2001 From: tjroach Date: Mon, 14 Apr 2025 12:33:36 -0400 Subject: [PATCH 1/6] connection ack timeout handling --- .../aws/appsync/events/EventsChannel.kt | 4 +- .../aws/appsync/events/EventsWebSocket.kt | 40 ++++++++++++---- .../appsync/events/data/WebSocketMessage.kt | 3 +- .../events/utils/ConnectionTimeoutTimer.kt | 47 +++++++++++++++++++ 4 files changed, 81 insertions(+), 13 deletions(-) create mode 100644 appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/utils/ConnectionTimeoutTimer.kt diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsChannel.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsChannel.kt index 9e22bb0e7..51e9f73df 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsChannel.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsChannel.kt @@ -116,10 +116,10 @@ class EventsChannel internal constructor( emit(EventsMessage(it.event)) } it is WebSocketMessage.Closed -> { - if (it.userInitiated) { + if (it.reason is DisconnectReason.UserInitiated) { throw UserClosedConnectionException() } else { - throw ConnectionClosedException(it.throwable) + throw ConnectionClosedException(it.reason.throwable) } } else -> Unit diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt index ba5f733c8..3b8460968 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt @@ -21,8 +21,10 @@ import com.amplifyframework.aws.appsync.core.util.Logger import com.amplifyframework.aws.appsync.events.data.ConnectException import com.amplifyframework.aws.appsync.events.data.EventsException import com.amplifyframework.aws.appsync.events.data.WebSocketMessage +import com.amplifyframework.aws.appsync.events.utils.ConnectionTimeoutTimer import com.amplifyframework.aws.appsync.events.utils.HeaderKeys import com.amplifyframework.aws.appsync.events.utils.HeaderValues +import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.MutableSharedFlow @@ -35,7 +37,6 @@ import okhttp3.Request import okhttp3.Response import okhttp3.WebSocket import okhttp3.WebSocketListener -import java.util.concurrent.atomic.AtomicBoolean internal class EventsWebSocket( private val eventsEndpoints: EventsEndpoints, @@ -50,7 +51,8 @@ internal class EventsWebSocket( private lateinit var webSocket: WebSocket internal val isClosed = AtomicBoolean(false) - private var userInitiatedDisconnect = false + private var disconnectReason: DisconnectReason? = null + private val connectionTimeoutTimer = ConnectionTimeoutTimer(onTimeout = ::onTimeout) @Throws(ConnectException::class) suspend fun connect() = coroutineScope { @@ -71,7 +73,7 @@ internal class EventsWebSocket( when (val connectionResponse = deferredConnectResponse.await()) { is WebSocketMessage.Closed -> { webSocket.cancel() - throw ConnectException(connectionResponse.throwable) + throw ConnectException(connectionResponse.reason.throwable) } is WebSocketMessage.Received.ConnectionError -> { webSocket.cancel() @@ -80,13 +82,16 @@ internal class EventsWebSocket( ?: EventsException.unknown() ) } - else -> Unit // It isn't obvious here, but only other connect response type is ConnectionAck + is WebSocketMessage.Received.ConnectionAck -> { + connectionTimeoutTimer.resetTimeoutTimer(connectionResponse.connectionTimeoutMs) + } + else -> Unit // Not obvious here but this block should never run } logger?.debug("Websocket Connection Open") } suspend fun disconnect(flushEvents: Boolean) = coroutineScope { - userInitiatedDisconnect = true + disconnectReason = DisconnectReason.UserInitiated val deferredClosedResponse = async { getClosedResponse() } when (flushEvents) { true -> webSocket.close(NORMAL_CLOSE_CODE, "User initiated disconnect") @@ -102,6 +107,7 @@ internal class EventsWebSocket( } override fun onMessage(webSocket: WebSocket, text: String) { + connectionTimeoutTimer.resetTimeoutTimer() logger?.debug { "Websocket onMessage: $text" } try { val eventMessage = json.decodeFromString(text) @@ -113,7 +119,7 @@ internal class EventsWebSocket( override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { logger?.error(t) { "$TAG onFailure" } - notifyClosed() // onClosed doesn't get called in failure. Treat this block the same as onClosed + handleClosed() // onClosed doesn't get called in failure. Treat this block the same as onClosed } override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { @@ -123,12 +129,20 @@ internal class EventsWebSocket( override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { // Events api sends normal close code even in failure // so inspecting code/reason isn't helpful as it should be - logger?.debug("$TAG onClosed: userInitiated = $userInitiatedDisconnect") - notifyClosed() + logger?.debug("$TAG onClosed: reason = $disconnectReason") + handleClosed() + } + + private fun onTimeout() { + disconnectReason = DisconnectReason.Timeout + webSocket.cancel() } - private fun notifyClosed() { - _events.tryEmit(WebSocketMessage.Closed(userInitiated = userInitiatedDisconnect)) + private fun handleClosed() { + connectionTimeoutTimer.stop() + _events.tryEmit( + WebSocketMessage.Closed(reason = disconnectReason ?: DisconnectReason.Service()) + ) isClosed.set(true) } @@ -193,3 +207,9 @@ private class ConnectAppSyncRequest( override val body: String get() = "{}" } + +internal sealed class DisconnectReason(val throwable: Throwable?) { + data object UserInitiated : DisconnectReason(null) + data object Timeout : DisconnectReason(EventsException("Connection timed out.")) + class Service(throwable: Throwable? = null) : DisconnectReason(throwable) +} diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt index db2999005..82a99cfbe 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt @@ -15,6 +15,7 @@ package com.amplifyframework.aws.appsync.events.data +import com.amplifyframework.aws.appsync.events.DisconnectReason import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement @@ -103,7 +104,7 @@ internal sealed class WebSocketMessage { data class Error(val errors: List) } - internal data class Closed(val userInitiated: Boolean, val throwable: Throwable? = null) : WebSocketMessage() + internal data class Closed(val reason: DisconnectReason) : WebSocketMessage() } @Serializable diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/utils/ConnectionTimeoutTimer.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/utils/ConnectionTimeoutTimer.kt new file mode 100644 index 000000000..2bdd3f073 --- /dev/null +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/utils/ConnectionTimeoutTimer.kt @@ -0,0 +1,47 @@ +/* + * Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amplifyframework.aws.appsync.events.utils + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +internal class ConnectionTimeoutTimer(val onTimeout: () -> Unit) { + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + private var timeoutInMillis: Long = 30_000L + private var timeoutJob: Job? = null + + fun resetTimeoutTimer(timeoutInMillis: Long = this.timeoutInMillis) { + if (this.timeoutInMillis != timeoutInMillis) { + this.timeoutInMillis = timeoutInMillis + } + + timeoutJob?.cancel() // Cancel existing timer if any + timeoutJob = scope.launch { + delay(timeoutInMillis) + // If this code executes, it means no events were received for 30 seconds + onTimeout() + } + } + + fun stop() { + timeoutJob?.cancel() + timeoutJob = null + } +} From 6c9b061d79d5e94150b29faa470443b2db5a1fb3 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 15 Apr 2025 08:54:07 -0400 Subject: [PATCH 2/6] PR comments --- .../aws/appsync/events/EventsWebSocket.kt | 8 ++---- .../appsync/events/EventsWebSocketProvider.kt | 28 +++++++++---------- .../appsync/events/data/WebSocketMessage.kt | 14 ++++++++-- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt index 3b8460968..b907dbedc 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt @@ -24,7 +24,6 @@ import com.amplifyframework.aws.appsync.events.data.WebSocketMessage import com.amplifyframework.aws.appsync.events.utils.ConnectionTimeoutTimer import com.amplifyframework.aws.appsync.events.utils.HeaderKeys import com.amplifyframework.aws.appsync.events.utils.HeaderValues -import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.MutableSharedFlow @@ -50,7 +49,7 @@ internal class EventsWebSocket( val events = _events.asSharedFlow() // publicly exposed as read-only shared flow private lateinit var webSocket: WebSocket - internal val isClosed = AtomicBoolean(false) + @Volatile internal var isClosed = false private var disconnectReason: DisconnectReason? = null private val connectionTimeoutTimer = ConnectionTimeoutTimer(onTimeout = ::onTimeout) @@ -101,9 +100,8 @@ internal class EventsWebSocket( } override fun onOpen(webSocket: WebSocket, response: Response) { - val connectionInitMessage = json.encodeToString(WebSocketMessage.Send.ConnectionInit()) logger?.debug { "$TAG onOpen: sending connection init" } - webSocket.send(connectionInitMessage) + send(WebSocketMessage.Send.ConnectionInit()) } override fun onMessage(webSocket: WebSocket, text: String) { @@ -143,7 +141,7 @@ internal class EventsWebSocket( _events.tryEmit( WebSocketMessage.Closed(reason = disconnectReason ?: DisconnectReason.Service()) ) - isClosed.set(true) + isClosed = true } inline fun send(webSocketMessage: T) { diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt index d05f6ffd3..4555b0e29 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt @@ -34,53 +34,53 @@ internal class EventsWebSocketProvider( private val logger: Logger? ) { private val mutex = Mutex() - private val _connectResult = AtomicReference?>(null) - private val _connectionInProgress = AtomicReference>?>(null) + private val connectionResultReference = AtomicReference?>(null) + private val connectionInProgressReference = AtomicReference>?>(null) - fun getExistingWebSocket(): EventsWebSocket? = _connectResult.get()?.getOrNull() + fun getExistingWebSocket(): EventsWebSocket? = connectionResultReference.get()?.getOrNull() suspend fun getConnectedWebSocket(): EventsWebSocket = getConnectedWebSocketResult().getOrThrow() private suspend fun getConnectedWebSocketResult(): Result = coroutineScope { // If connection is already established, return it mutex.withLock { - val existingResult = _connectResult.get() + val existingResult = connectionResultReference.get() val existingWebSocket = existingResult?.getOrNull() if (existingWebSocket != null) { - if (existingWebSocket.isClosed.get()) { - _connectResult.set(null) + if (existingWebSocket.isClosed) { + connectionResultReference.set(null) } else { return@coroutineScope existingResult } } } - val deferredInProgressConnection = _connectionInProgress.get() + val deferredInProgressConnection = connectionInProgressReference.get() if (deferredInProgressConnection != null && !deferredInProgressConnection.isCompleted) { return@coroutineScope deferredInProgressConnection.await() } mutex.withLock { - val existingResultInLock = _connectResult.get() + val existingResultInLock = connectionResultReference.get() val existingWebSocket = existingResultInLock?.getOrNull() if (existingWebSocket != null) { - if (existingWebSocket.isClosed.get()) { - _connectResult.set(null) + if (existingWebSocket.isClosed) { + connectionResultReference.set(null) } else { return@coroutineScope existingResultInLock } } - val deferredInProgressConnectionInLock = _connectionInProgress.get() + val deferredInProgressConnectionInLock = connectionInProgressReference.get() if (deferredInProgressConnectionInLock != null && !deferredInProgressConnectionInLock.isCompleted) { return@coroutineScope deferredInProgressConnectionInLock.await() } val newDeferredInProgressConnection = async { attemptConnection() } - _connectionInProgress.set(newDeferredInProgressConnection) + connectionInProgressReference.set(newDeferredInProgressConnection) val connectionResult = newDeferredInProgressConnection.await() - _connectResult.set(connectionResult) - _connectionInProgress.set(null) + connectionResultReference.set(connectionResult) + connectionInProgressReference.set(null) connectionResult } } diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt index 82a99cfbe..3e0f6b6b4 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt @@ -18,6 +18,7 @@ package com.amplifyframework.aws.appsync.events.data import com.amplifyframework.aws.appsync.events.DisconnectReason import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonElement @Serializable @@ -52,7 +53,8 @@ internal sealed class WebSocketMessage { internal data class Publish( val id: String, val channel: String, - val events: List + val events: JsonArray, + val authorization: Map ) : Send() { override val type = "publish" } @@ -98,6 +100,13 @@ internal sealed class WebSocketMessage { override val id: String, val errors: List ) : Subscription() + + @Serializable @SerialName("publish_success") + internal data class PublishSuccess( + override val id: String, + @SerialName("successful") val successfulEvents: List, + @SerialName("failed") val failedEvents: List + ) : Subscription() } @Serializable @SerialName("error") @@ -112,8 +121,9 @@ data class WebSocketError(val errorType: String, val message: String? = null) { // fallback message is only used if WebSocketError didn't provide a message fun toEventsException(fallbackMessage: String? = null): EventsException { + val message = this.message ?: fallbackMessage return when (errorType) { - "UnauthorizedException" -> UnauthorizedException(message ?: fallbackMessage) + "UnauthorizedException" -> UnauthorizedException(message) else -> EventsException(message = "$errorType: $message") } } From 0b9f4d5738e377d3f8212e762254ac6d8f9b78c6 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 15 Apr 2025 10:53:50 -0400 Subject: [PATCH 3/6] PR comments --- .../aws/appsync/core/{util => }/Logger.kt | 12 +++++------- .../amplifyframework/aws/appsync/events/Events.kt | 6 +++--- .../aws/appsync/events/EventsWebSocket.kt | 15 ++++++++------- .../aws/appsync/events/EventsWebSocketProvider.kt | 6 +++--- 4 files changed, 19 insertions(+), 20 deletions(-) rename appsync/aws-appsync-core/src/main/java/com/amplifyframework/aws/appsync/core/{util => }/Logger.kt (97%) diff --git a/appsync/aws-appsync-core/src/main/java/com/amplifyframework/aws/appsync/core/util/Logger.kt b/appsync/aws-appsync-core/src/main/java/com/amplifyframework/aws/appsync/core/Logger.kt similarity index 97% rename from appsync/aws-appsync-core/src/main/java/com/amplifyframework/aws/appsync/core/util/Logger.kt rename to appsync/aws-appsync-core/src/main/java/com/amplifyframework/aws/appsync/core/Logger.kt index dfa0a5183..0c6e54df3 100644 --- a/appsync/aws-appsync-core/src/main/java/com/amplifyframework/aws/appsync/core/util/Logger.kt +++ b/appsync/aws-appsync-core/src/main/java/com/amplifyframework/aws/appsync/core/Logger.kt @@ -13,10 +13,14 @@ * permissions and limitations under the License. */ -package com.amplifyframework.aws.appsync.core.util +package com.amplifyframework.aws.appsync.core import java.util.function.Supplier +fun interface LoggerProvider { + fun getLogger(namespace: String): Logger +} + /** * A component which can emit logs. */ @@ -28,12 +32,6 @@ interface Logger { */ val thresholdLevel: LogLevel - /** - * Gets the namespace of the logger. - * @return namespace for logger - */ - val namespace: String - /** * Logs a message at the [LogLevel.ERROR] level. * @param message An error message diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt index d4ed7dd04..426b1098a 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt @@ -15,7 +15,7 @@ package com.amplifyframework.aws.appsync.events import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer -import com.amplifyframework.aws.appsync.core.util.Logger +import com.amplifyframework.aws.appsync.core.LoggerProvider import com.amplifyframework.aws.appsync.events.data.ChannelAuthorizers import com.amplifyframework.aws.appsync.events.data.EventsException import com.amplifyframework.aws.appsync.events.data.PublishResult @@ -41,7 +41,7 @@ class Events @VisibleForTesting internal constructor( ) { data class Options( - val logger: Logger? = null + val loggerProvider: LoggerProvider? = null ) /** @@ -75,7 +75,7 @@ class Events @VisibleForTesting internal constructor( connectAuthorizer, okHttpClient, json, - options.logger + options.loggerProvider ) /** diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt index b907dbedc..50a9a4652 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt @@ -17,7 +17,7 @@ package com.amplifyframework.aws.appsync.events import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer import com.amplifyframework.aws.appsync.core.AppSyncRequest -import com.amplifyframework.aws.appsync.core.util.Logger +import com.amplifyframework.aws.appsync.core.LoggerProvider import com.amplifyframework.aws.appsync.events.data.ConnectException import com.amplifyframework.aws.appsync.events.data.EventsException import com.amplifyframework.aws.appsync.events.data.WebSocketMessage @@ -42,7 +42,7 @@ internal class EventsWebSocket( private val authorizer: AppSyncAuthorizer, private val okHttpClient: OkHttpClient, private val json: Json, - private val logger: Logger? + loggerProvider: LoggerProvider? ) : WebSocketListener() { private val _events = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) @@ -52,6 +52,7 @@ internal class EventsWebSocket( @Volatile internal var isClosed = false private var disconnectReason: DisconnectReason? = null private val connectionTimeoutTimer = ConnectionTimeoutTimer(onTimeout = ::onTimeout) + private val logger = loggerProvider?.getLogger(TAG) @Throws(ConnectException::class) suspend fun connect() = coroutineScope { @@ -100,7 +101,7 @@ internal class EventsWebSocket( } override fun onOpen(webSocket: WebSocket, response: Response) { - logger?.debug { "$TAG onOpen: sending connection init" } + logger?.debug { "onOpen: sending connection init" } send(WebSocketMessage.Send.ConnectionInit()) } @@ -116,18 +117,18 @@ internal class EventsWebSocket( } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { - logger?.error(t) { "$TAG onFailure" } + logger?.error(t) { "onFailure" } handleClosed() // onClosed doesn't get called in failure. Treat this block the same as onClosed } override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { - logger?.debug("$TAG onClosing") + logger?.debug("onClosing") } override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { // Events api sends normal close code even in failure // so inspecting code/reason isn't helpful as it should be - logger?.debug("$TAG onClosed: reason = $disconnectReason") + logger?.debug("onClosed: reason = $disconnectReason") handleClosed() } @@ -146,7 +147,7 @@ internal class EventsWebSocket( inline fun send(webSocketMessage: T) { val message = json.encodeToString(webSocketMessage) - logger?.debug("$TAG send: $message") + logger?.debug("send: $message") webSocket.send(message) } diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt index 4555b0e29..b62710ee8 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt @@ -16,7 +16,7 @@ package com.amplifyframework.aws.appsync.events import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer -import com.amplifyframework.aws.appsync.core.util.Logger +import com.amplifyframework.aws.appsync.core.LoggerProvider import java.util.concurrent.atomic.AtomicReference import kotlinx.coroutines.Deferred import kotlinx.coroutines.async @@ -31,7 +31,7 @@ internal class EventsWebSocketProvider( private val authorizer: AppSyncAuthorizer, private val okHttpClient: OkHttpClient, private val json: Json, - private val logger: Logger? + private val loggerProvider: LoggerProvider? ) { private val mutex = Mutex() private val connectionResultReference = AtomicReference?>(null) @@ -92,7 +92,7 @@ internal class EventsWebSocketProvider( authorizer, okHttpClient, json, - logger + loggerProvider ) eventsWebSocket.connect() Result.success(eventsWebSocket) From 9c74a14b7a780c9abf99da4098c44c9e0a11a085 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 15 Apr 2025 10:56:57 -0400 Subject: [PATCH 4/6] PR comments --- .../main/java/com/amplifyframework/aws/appsync/events/Events.kt | 2 +- .../aws/appsync/events/EventsWebSocketProvider.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt index 426b1098a..f19df0643 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt @@ -133,6 +133,6 @@ class Events @VisibleForTesting internal constructor( * @return a channel to manage subscriptions and publishes. */ suspend fun disconnect(flushEvents: Boolean = true): Unit = coroutineScope { - eventsWebSocketProvider.getExistingWebSocket()?.disconnect(flushEvents) + eventsWebSocketProvider.existingWebSocket?.disconnect(flushEvents) } } diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt index b62710ee8..5c81705c4 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt @@ -37,7 +37,7 @@ internal class EventsWebSocketProvider( private val connectionResultReference = AtomicReference?>(null) private val connectionInProgressReference = AtomicReference>?>(null) - fun getExistingWebSocket(): EventsWebSocket? = connectionResultReference.get()?.getOrNull() + val existingWebSocket: EventsWebSocket? = connectionResultReference.get()?.getOrNull() suspend fun getConnectedWebSocket(): EventsWebSocket = getConnectedWebSocketResult().getOrThrow() From 9d28fca1b499804e4747ab303f0e5d30d3560eca Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 15 Apr 2025 10:57:57 -0400 Subject: [PATCH 5/6] fix mistake --- .../aws/appsync/events/EventsWebSocketProvider.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt index 5c81705c4..c8796443c 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt @@ -37,7 +37,9 @@ internal class EventsWebSocketProvider( private val connectionResultReference = AtomicReference?>(null) private val connectionInProgressReference = AtomicReference>?>(null) - val existingWebSocket: EventsWebSocket? = connectionResultReference.get()?.getOrNull() + val existingWebSocket: EventsWebSocket? + get() = connectionResultReference.get()?.getOrNull() + suspend fun getConnectedWebSocket(): EventsWebSocket = getConnectedWebSocketResult().getOrThrow() From ebe8b0b5642a9effaa81621dad0a00d43e00c9b2 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 15 Apr 2025 11:08:59 -0400 Subject: [PATCH 6/6] Fix logging --- .../amplifyframework/aws/appsync/events/EventsWebSocket.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt index 50a9a4652..550094627 100644 --- a/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt +++ b/appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt @@ -101,7 +101,7 @@ internal class EventsWebSocket( } override fun onOpen(webSocket: WebSocket, response: Response) { - logger?.debug { "onOpen: sending connection init" } + logger?.debug ("onOpen: sending connection init") send(WebSocketMessage.Send.ConnectionInit()) } @@ -128,7 +128,7 @@ internal class EventsWebSocket( override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { // Events api sends normal close code even in failure // so inspecting code/reason isn't helpful as it should be - logger?.debug("onClosed: reason = $disconnectReason") + logger?.debug {"onClosed: reason = $disconnectReason" } handleClosed() } @@ -147,7 +147,7 @@ internal class EventsWebSocket( inline fun send(webSocketMessage: T) { val message = json.encodeToString(webSocketMessage) - logger?.debug("send: $message") + logger?.debug { "send: ${webSocketMessage::class.java}" } webSocket.send(message) }