Skip to content

Connection lost (32109) - java.io.EOFException in release builds Android 11 #794

@Tritonfc

Description

@Tritonfc

So I am currently testing using version 4.3 specifically on WEAR OS devices. I am not sure exactly how Work managers handles tasks on that hardware but a remote tester keeps losing connection with this exception: Connection lost (32109) - java.io.EOFException. I am unable to get full logs as I am only able to track the issue on POST-HOG, but it works much fine on my wear os emulator which is a debug build. Here is how the client is set up for reference:

import android.util.Base64
import android.util.Log
import com.posthog.PostHog
import com.webw8er.waiter.BuildConfig
import com.webw8er.waiter.presentation.utils.LocalDeviceIDProvider
import com.webw8er.waiter.presentation.utils.loadPemFromRaw
import dagger.hilt.android.qualifiers.ApplicationContext


import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch

import com.webw8er.waiter.presentation.utils.copyPemFileFromAssetsToInternalStorage
import com.webw8er.waiter.presentation.utils.loadPemContent
import info.mqtt.android.service.MqttAndroidClient
import info.mqtt.android.service.QoS
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions

import org.eclipse.paho.client.mqttv3.IMqttActionListener
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.IMqttToken
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage


import java.net.InetAddress

import java.util.UUID
import javax.inject.Inject

private const val TAG = "MQTT MANAGER"
class MQTTManager @Inject constructor(
    @ApplicationContext private val context: Context
) {


    private val clientPassword = BuildConfig.MQTT_PASSWORD
    private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO)

    private val mqttClient: MqttAndroidClient by lazy {
        MqttAndroidClient(context, SERVER_URI, clientId,)
    }


    var onMessageReceived: ((String?,MqttMessage?) -> Unit)? = null

    var onFailureToSubscribe:(Throwable?)-> Unit = {}





    @OptIn(ExperimentalUnsignedTypes::class)
    fun connect(onConnected: () -> Unit = {}, onError: (Throwable) -> Unit = {}) {
        mqttClient.setCallback(object : MqttCallback {
            override fun messageArrived(topic: String?, message: MqttMessage?) {
                Log.d(TAG, "Message Arrived for $topic"
                )


                onMessageReceived?.invoke(topic, message)
            }
            override fun connectionLost(cause: Throwable?) {
                Log.e(TAG, "Connection lost ${cause.toString()}", cause)



                PostHog.capture(
                    event = "MQTT FAILURE",
                    properties = mapOf(
                        "error" to "Connection lost ${cause.toString()}",
                    )
                )
            }

            override fun deliveryComplete(token: IMqttDeliveryToken?) {
                PostHog.capture(
                    event = "MQTT PUBLISH SUCCESS",
                    properties = mapOf(
                        "publish state" to "Vitals published successfully",
                    )
                )
                Log.d(TAG, "Publish Successful")
            }
        })
        val options = MqttConnectOptions().apply {
            isCleanSession = true
            isAutomaticReconnect = true
            userName=BuildConfig.MQTT_USER_NAME
            password = clientPassword.toCharArray()
           connectionTimeout = 0
            keepAliveInterval = 60
        }


        mqttClient.connect(options, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken?) {
                Log.d(TAG, "Connected successfully")
                val disconnectedBufferOptions = DisconnectedBufferOptions().apply {
                    isBufferEnabled = true
                    bufferSize = 100
                    isPersistBuffer = false
                    isDeleteOldestMessages = false
                }
                mqttClient.setBufferOpts(disconnectedBufferOptions)
                PostHog.capture(
                    event = "MQTT SUCCESS",
                    properties = mapOf(
                        "success" to "connected",
                    )
                )


                onConnected()
            }

            override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                Log.e(TAG,"MQTT connection failed",exception)

                PostHog.capture(
                    event = "MQTT FAILURE",
                    properties = mapOf(
                        "error" to "${exception?.message}  connecting",
                    )
                )
                onError(exception ?: Throwable("MQTT connection failed"))
            }
        })


    }


    @OptIn(ExperimentalUnsignedTypes::class)
    fun publish(topic: String, message: String) {

        val payload = message.toByteArray()

        mqttClient.publish(topic, payload, QoS.AtLeastOnce.value, false)


    }

    fun testNetworkConnectivity() {
        coroutineScope.launch {
            try {
                Log.d("Network", "Testing connection to MQTT server...")
                val inetAddress = InetAddress.getByName("broker.emqx.io")
                Log.d("Network", "DNS resolution successful: ${inetAddress.hostAddress}")
            } catch (e: Exception) {
                Log.e("Network", "DNS resolution failed", e)
            }
        }
    }

    fun subscribe(
        topic: String,
        qos: Int = QoS.AtLeastOnce.value

        ) {
        try {
            mqttClient.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                    onFailureToSubscribe(exception)
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    fun disconnect() {
        if (mqttClient.isConnected) mqttClient.disconnect()
    }

    fun isConnected() = mqttClient.isConnected```

Also when it tries reconnecting sometimes it just fails with a "Connect already in progress error".

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions