Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,10 @@ internal class JNISession {
options.attachment?.into()?.bytes,
options.payload?.into()?.bytes,
options.encoding?.id ?: Encoding.defaultEncoding().id,
options.encoding?.schema
options.encoding?.schema,
options.qos.congestionControl.value,
options.qos.priority.value,
options.qos.express
)
}

Expand Down Expand Up @@ -348,12 +351,14 @@ internal class JNISession {
options.attachment?.into()?.bytes,
options.payload?.into()?.bytes,
options.encoding?.id ?: Encoding.defaultEncoding().id,
options.encoding?.schema
options.encoding?.schema,
options.qos.congestionControl.value,
options.qos.priority.value,
options.qos.express
)
return handler.receiver()
}


@Throws(ZError::class)
fun declareKeyExpr(keyExpr: String): KeyExpr {
val ptr = declareKeyExprViaJNI(sessionPtr.get(), keyExpr)
Expand Down Expand Up @@ -501,6 +506,9 @@ internal class JNISession {
payload: ByteArray?,
encodingId: Int,
encodingSchema: String?,
congestionControl: Int,
priority: Int,
express: Boolean,
)

@Throws(ZError::class)
Expand Down
5 changes: 4 additions & 1 deletion zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh.query
import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.qos.QoS
import java.time.Duration

/**
Expand All @@ -28,14 +29,16 @@ import java.time.Duration
* @param payload Optional payload.
* @param encoding Encoding of the payload.
* @param attachment Optional attachment.
* @param qos The intended [QoS] for the query.
*/
data class GetOptions(
var timeout: Duration = Duration.ofMillis(10000),
var target: QueryTarget = QueryTarget.BEST_MATCHING,
var consolidation: ConsolidationMode = ConsolidationMode.AUTO,
var payload: IntoZBytes? = null,
var encoding: Encoding? = null,
var attachment: IntoZBytes? = null
var attachment: IntoZBytes? = null,
var qos: QoS = QoS.defaultQoS
) {
fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) }
fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) }
Expand Down
11 changes: 11 additions & 0 deletions zenoh-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_undeclareKeyExprViaJNI(
/// - `payload`: Optional payload for the query.
/// - `encoding_id`: The encoding of the payload.
/// - `encoding_schema`: The encoding schema of the payload, may be null.
/// - `congestion_control`: The ordinal value of the congestion control enum value.
/// - `priority`: The ordinal value of the priority enum value.
/// - `is_express`: The boolean express value of the QoS provided.
///
/// Safety:
/// - The function is marked as unsafe due to raw pointer manipulation and JNI interaction.
Expand Down Expand Up @@ -883,6 +886,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
payload: /*nullable*/ JByteArray,
encoding_id: jint,
encoding_schema: /*nullable*/ JString,
congestion_control: jint,
priority: jint,
is_express: jboolean,
) {
let session = Arc::from_raw(session_ptr);
let _ = || -> ZResult<()> {
Expand All @@ -893,6 +899,8 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
let query_target = decode_query_target(target)?;
let consolidation = decode_consolidation(consolidation)?;
let timeout = Duration::from_millis(timeout_ms as u64);
let congestion_control = decode_congestion_control(congestion_control)?;
let priority = decode_priority(priority)?;
let on_close = load_on_close(&java_vm, on_close_global_ref);
let selector_params = if selector_params.is_null() {
String::new()
Expand All @@ -902,6 +910,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI(
let selector = Selector::owned(&key_expr, selector_params);
let mut get_builder = session
.get(selector)
.congestion_control(congestion_control)
.priority(priority)
.express(is_express != 0)
.callback(move |reply| {
|| -> ZResult<()> {
on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure
Expand Down
Loading