diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt index 594e9f2d..4c6de44a 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt @@ -286,7 +286,10 @@ internal class JNISession { options.attachment?.into()?.bytes, options.payload?.into()?.bytes, options.encoding?.id ?: Encoding.defaultEncoding().id, - options.encoding?.schema + options.encoding?.schema, + options.qos.congestionControl.value, + options.qos.priority.value, + options.qos.express ) } @@ -348,12 +351,14 @@ internal class JNISession { options.attachment?.into()?.bytes, options.payload?.into()?.bytes, options.encoding?.id ?: Encoding.defaultEncoding().id, - options.encoding?.schema + options.encoding?.schema, + options.qos.congestionControl.value, + options.qos.priority.value, + options.qos.express ) return handler.receiver() } - @Throws(ZError::class) fun declareKeyExpr(keyExpr: String): KeyExpr { val ptr = declareKeyExprViaJNI(sessionPtr.get(), keyExpr) @@ -501,6 +506,9 @@ internal class JNISession { payload: ByteArray?, encodingId: Int, encodingSchema: String?, + congestionControl: Int, + priority: Int, + express: Boolean, ) @Throws(ZError::class) diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt index 59266188..fdb2f507 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt @@ -17,6 +17,7 @@ package io.zenoh.query import io.zenoh.bytes.Encoding import io.zenoh.bytes.IntoZBytes import io.zenoh.bytes.ZBytes +import io.zenoh.qos.QoS import java.time.Duration /** @@ -28,6 +29,7 @@ import java.time.Duration * @param payload Optional payload. * @param encoding Encoding of the payload. * @param attachment Optional attachment. + * @param qos The intended [QoS] for the query. */ data class GetOptions( var timeout: Duration = Duration.ofMillis(10000), @@ -35,7 +37,8 @@ data class GetOptions( var consolidation: ConsolidationMode = ConsolidationMode.AUTO, var payload: IntoZBytes? = null, var encoding: Encoding? = null, - var attachment: IntoZBytes? = null + var attachment: IntoZBytes? = null, + var qos: QoS = QoS.defaultQoS ) { fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) } fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) } diff --git a/zenoh-jni/src/session.rs b/zenoh-jni/src/session.rs index 83181025..8044c708 100644 --- a/zenoh-jni/src/session.rs +++ b/zenoh-jni/src/session.rs @@ -854,6 +854,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_undeclareKeyExprViaJNI( /// - `payload`: Optional payload for the query. /// - `encoding_id`: The encoding of the payload. /// - `encoding_schema`: The encoding schema of the payload, may be null. +/// - `congestion_control`: The ordinal value of the congestion control enum value. +/// - `priority`: The ordinal value of the priority enum value. +/// - `is_express`: The boolean express value of the QoS provided. /// /// Safety: /// - The function is marked as unsafe due to raw pointer manipulation and JNI interaction. @@ -883,6 +886,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI( payload: /*nullable*/ JByteArray, encoding_id: jint, encoding_schema: /*nullable*/ JString, + congestion_control: jint, + priority: jint, + is_express: jboolean, ) { let session = Arc::from_raw(session_ptr); let _ = || -> ZResult<()> { @@ -893,6 +899,8 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI( let query_target = decode_query_target(target)?; let consolidation = decode_consolidation(consolidation)?; let timeout = Duration::from_millis(timeout_ms as u64); + let congestion_control = decode_congestion_control(congestion_control)?; + let priority = decode_priority(priority)?; let on_close = load_on_close(&java_vm, on_close_global_ref); let selector_params = if selector_params.is_null() { String::new() @@ -902,6 +910,9 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_getViaJNI( let selector = Selector::owned(&key_expr, selector_params); let mut get_builder = session .get(selector) + .congestion_control(congestion_control) + .priority(priority) + .express(is_express != 0) .callback(move |reply| { || -> ZResult<()> { on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure