Skip to content

Old connections get notified about new connection failures, lib version 4.3 #799

@davidburstromvourity

Description

@davidburstromvourity

The following log and stack traces shows that the collector for the shared flow is not cleaned up in
MqttAndroidClient

        clientScope?.launch {
            mqttService?.collect(::onReceive)
        }

Basically, each new client instance will register a coroutine that collects events, so the previous connection will see the new connection failures.

Connecting, token info.mqtt.android.service.MqttTokenAndroid@b3aea18
Connection failure for info.mqtt.android.service.MqttTokenAndroid@b3aea18
java.lang.Exception: Client is closed (32111)
	at MqttClientHandler$connect$connectToken$1.onFailure(MqttClientHandler.kt:103)
	at info.mqtt.android.service.MqttTokenAndroid.notifyFailure(MqttTokenAndroid.kt:66)
	at info.mqtt.android.service.MqttAndroidClient.simpleAction(MqttAndroidClient.kt:1020)
	at info.mqtt.android.service.MqttAndroidClient.connectAction(MqttAndroidClient.kt:959)
	at info.mqtt.android.service.MqttAndroidClient.onReceive(MqttAndroidClient.kt:898)
	at info.mqtt.android.service.MqttAndroidClient.access$onReceive(MqttAndroidClient.kt:45)
	at info.mqtt.android.service.MqttAndroidClient$collect$1$1.invoke(MqttAndroidClient.kt:241)
	at info.mqtt.android.service.MqttAndroidClient$collect$1$1.invoke(MqttAndroidClient.kt:241)
	at info.mqtt.android.service.MqttService.collect$suspendConversion0(MqttService.kt:259)
	at info.mqtt.android.service.MqttService.access$collect$suspendConversion0(MqttService.kt:179)
	at info.mqtt.android.service.MqttService$collect$2.emit(MqttService.kt:259)
	at info.mqtt.android.service.MqttService$collect$2.emit(MqttService.kt:259)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:397)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
Caused by: Client is closed (32111)
	at org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:732)
	at info.mqtt.android.service.MqttConnection.connect(MqttConnection.kt:168)
	at info.mqtt.android.service.MqttService$connect$1.invokeSuspend(MqttService.kt:295)
	... 8 more

// Later, connecting using new client instance.

Connecting, token info.mqtt.android.service.MqttTokenAndroid@d72ff89
Connection failure for info.mqtt.android.service.MqttTokenAndroid@b3aea18
java.lang.Exception: Client is closed (32111)
	at MqttClientHandler$connect$connectToken$1.onFailure(MqttClientHandler.kt:103)
	at info.mqtt.android.service.MqttTokenAndroid.notifyFailure(MqttTokenAndroid.kt:66)
	at info.mqtt.android.service.MqttAndroidClient.simpleAction(MqttAndroidClient.kt:1020)
	at info.mqtt.android.service.MqttAndroidClient.connectAction(MqttAndroidClient.kt:959)
	at info.mqtt.android.service.MqttAndroidClient.onReceive(MqttAndroidClient.kt:898)
	at info.mqtt.android.service.MqttAndroidClient.access$onReceive(MqttAndroidClient.kt:45)
	at info.mqtt.android.service.MqttAndroidClient$collect$1$1.invoke(MqttAndroidClient.kt:241)
	at info.mqtt.android.service.MqttAndroidClient$collect$1$1.invoke(MqttAndroidClient.kt:241)
	at info.mqtt.android.service.MqttService.collect$suspendConversion0(MqttService.kt:259)
	at info.mqtt.android.service.MqttService.access$collect$suspendConversion0(MqttService.kt:179)
	at info.mqtt.android.service.MqttService$collect$2.emit(MqttService.kt:259)
	at info.mqtt.android.service.MqttService$collect$2.emit(MqttService.kt:259)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:397)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
Caused by: Client is closed (32111)
	at org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:732)
	at info.mqtt.android.service.MqttConnection.connect(MqttConnection.kt:168)
	at info.mqtt.android.service.MqttService$connect$1.invokeSuspend(MqttService.kt:295)
	... 8 more
Connection failure for info.mqtt.android.service.MqttTokenAndroid@d72ff89
java.lang.Exception: Client is closed (32111)
	at MqttClientHandler$connect$connectToken$1.onFailure(MqttClientHandler.kt:103)
	at info.mqtt.android.service.MqttTokenAndroid.notifyFailure(MqttTokenAndroid.kt:66)
	at info.mqtt.android.service.MqttAndroidClient.simpleAction(MqttAndroidClient.kt:1020)
	at info.mqtt.android.service.MqttAndroidClient.connectAction(MqttAndroidClient.kt:959)
	at info.mqtt.android.service.MqttAndroidClient.onReceive(MqttAndroidClient.kt:898)
	at info.mqtt.android.service.MqttAndroidClient.access$onReceive(MqttAndroidClient.kt:45)
	at info.mqtt.android.service.MqttAndroidClient$collect$1$1.invoke(MqttAndroidClient.kt:241)
	at info.mqtt.android.service.MqttAndroidClient$collect$1$1.invoke(MqttAndroidClient.kt:241)
	at info.mqtt.android.service.MqttService.collect$suspendConversion0(MqttService.kt:259)
	at info.mqtt.android.service.MqttService.access$collect$suspendConversion0(MqttService.kt:179)
	at info.mqtt.android.service.MqttService$collect$2.emit(MqttService.kt:259)
	at info.mqtt.android.service.MqttService$collect$2.emit(MqttService.kt:259)
	at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:397)
	at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:89)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:820)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:717)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:704)
Caused by: Client is closed (32111)
	at org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:732)
	at info.mqtt.android.service.MqttConnection.connect(MqttConnection.kt:168)
	at info.mqtt.android.service.MqttService$connect$1.invokeSuspend(MqttService.kt:295)
	... 8 more

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions