diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt index 4e9dfae7..62221e4a 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt @@ -32,6 +32,7 @@ import io.zenoh.query.Queryable import io.zenoh.sample.Sample import io.zenoh.session.SessionDeclaration import io.zenoh.session.SessionInfo +import java.lang.ref.WeakReference import java.util.* import java.util.concurrent.BlockingQueue import java.util.concurrent.LinkedBlockingDeque @@ -54,7 +55,11 @@ class Session private constructor(private val config: Config) : AutoCloseable { internal var jniSession: JNISession? = null - private var declarations = mutableListOf() + // Subscribers and Queryables that keep running despite losing references to them. + private var strongDeclarations = mutableListOf() + + // Publishers and queriers that shouldn't be kept alive when losing all references to them. + private var weakDeclarations = mutableListOf>() companion object { @@ -83,11 +88,16 @@ class Session private constructor(private val config: Config) : AutoCloseable { * However, any session declaration that was still alive and bound to the session previous to closing it, will still be alive. */ override fun close() { - declarations.removeIf { + strongDeclarations.removeIf { it.undeclare() true } + weakDeclarations.removeIf { + it.get()?.undeclare() + true + } + jniSession?.close() jniSession = null } @@ -375,7 +385,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { fun declareKeyExpr(keyExpr: String): KeyExpr { return jniSession?.run { val keyexpr = declareKeyExpr(keyExpr) - declarations.add(keyexpr) + strongDeclarations.add(keyexpr) keyexpr } ?: throw sessionClosedException } @@ -568,7 +578,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { internal fun resolvePublisher(keyExpr: KeyExpr, options: PublisherOptions): Publisher { return jniSession?.run { val publisher = declarePublisher(keyExpr, options) - declarations.add(publisher) + weakDeclarations.add(WeakReference(publisher)) publisher } ?: throw (sessionClosedException) } @@ -579,7 +589,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { ): HandlerSubscriber { return jniSession?.run { val subscriber = declareSubscriberWithHandler(keyExpr, handler) - declarations.add(subscriber) + strongDeclarations.add(subscriber) subscriber } ?: throw (sessionClosedException) } @@ -590,7 +600,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { ): CallbackSubscriber { return jniSession?.run { val subscriber = declareSubscriberWithCallback(keyExpr, callback) - declarations.add(subscriber) + strongDeclarations.add(subscriber) subscriber } ?: throw (sessionClosedException) } @@ -601,7 +611,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { ): HandlerQueryable { return jniSession?.run { val queryable = declareQueryableWithHandler(keyExpr, handler, options) - declarations.add(queryable) + strongDeclarations.add(queryable) queryable } ?: throw (sessionClosedException) } @@ -612,7 +622,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { ): CallbackQueryable { return jniSession?.run { val queryable = declareQueryableWithCallback(keyExpr, callback, options) - declarations.add(queryable) + strongDeclarations.add(queryable) queryable } ?: throw (sessionClosedException) } @@ -623,7 +633,9 @@ class Session private constructor(private val config: Config) : AutoCloseable { options: QuerierOptions ): Querier { return jniSession?.run { - declareQuerier(keyExpr, options) + val querier = declareQuerier(keyExpr, options) + weakDeclarations.add(WeakReference(querier)) + querier } ?: throw sessionClosedException }