Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.zenoh.query.Queryable
import io.zenoh.sample.Sample
import io.zenoh.session.SessionDeclaration
import io.zenoh.session.SessionInfo
import java.lang.ref.WeakReference
import java.util.*
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingDeque
Expand All @@ -54,7 +55,11 @@ class Session private constructor(private val config: Config) : AutoCloseable {

internal var jniSession: JNISession? = null

private var declarations = mutableListOf<SessionDeclaration>()
// Subscribers and Queryables that keep running despite losing references to them.
private var strongDeclarations = mutableListOf<SessionDeclaration>()

// Publishers and queriers that shouldn't be kept alive when losing all references to them.
private var weakDeclarations = mutableListOf<WeakReference<SessionDeclaration>>()

companion object {

Expand Down Expand Up @@ -83,11 +88,16 @@ class Session private constructor(private val config: Config) : AutoCloseable {
* However, any session declaration that was still alive and bound to the session previous to closing it, will still be alive.
*/
override fun close() {
declarations.removeIf {
strongDeclarations.removeIf {
it.undeclare()
true
}

weakDeclarations.removeIf {
it.get()?.undeclare()
true
}

jniSession?.close()
jniSession = null
}
Expand Down Expand Up @@ -375,7 +385,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
fun declareKeyExpr(keyExpr: String): KeyExpr {
return jniSession?.run {
val keyexpr = declareKeyExpr(keyExpr)
declarations.add(keyexpr)
strongDeclarations.add(keyexpr)
keyexpr
} ?: throw sessionClosedException
}
Expand Down Expand Up @@ -568,7 +578,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
internal fun resolvePublisher(keyExpr: KeyExpr, options: PublisherOptions): Publisher {
return jniSession?.run {
val publisher = declarePublisher(keyExpr, options)
declarations.add(publisher)
weakDeclarations.add(WeakReference(publisher))
publisher
} ?: throw (sessionClosedException)
}
Expand All @@ -579,7 +589,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
): HandlerSubscriber<R> {
return jniSession?.run {
val subscriber = declareSubscriberWithHandler(keyExpr, handler)
declarations.add(subscriber)
strongDeclarations.add(subscriber)
subscriber
} ?: throw (sessionClosedException)
}
Expand All @@ -590,7 +600,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
): CallbackSubscriber {
return jniSession?.run {
val subscriber = declareSubscriberWithCallback(keyExpr, callback)
declarations.add(subscriber)
strongDeclarations.add(subscriber)
subscriber
} ?: throw (sessionClosedException)
}
Expand All @@ -601,7 +611,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
): HandlerQueryable<R> {
return jniSession?.run {
val queryable = declareQueryableWithHandler(keyExpr, handler, options)
declarations.add(queryable)
strongDeclarations.add(queryable)
queryable
} ?: throw (sessionClosedException)
}
Expand All @@ -612,7 +622,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
): CallbackQueryable {
return jniSession?.run {
val queryable = declareQueryableWithCallback(keyExpr, callback, options)
declarations.add(queryable)
strongDeclarations.add(queryable)
queryable
} ?: throw (sessionClosedException)
}
Expand All @@ -623,7 +633,9 @@ class Session private constructor(private val config: Config) : AutoCloseable {
options: QuerierOptions
): Querier {
return jniSession?.run {
declareQuerier(keyExpr, options)
val querier = declareQuerier(keyExpr, options)
weakDeclarations.add(WeakReference(querier))
querier
} ?: throw sessionClosedException
}

Expand Down
Loading