diff --git a/examples/src/main/java/io/zenoh/ZLiveliness.java b/examples/src/main/java/io/zenoh/ZLiveliness.java index 04ff24c0..039ef780 100644 --- a/examples/src/main/java/io/zenoh/ZLiveliness.java +++ b/examples/src/main/java/io/zenoh/ZLiveliness.java @@ -14,7 +14,6 @@ package io.zenoh; -import io.zenoh.exceptions.ZError; import io.zenoh.keyexpr.KeyExpr; import picocli.CommandLine; diff --git a/examples/src/main/java/io/zenoh/ZPub.java b/examples/src/main/java/io/zenoh/ZPub.java index 42b86ba7..b7cb585d 100644 --- a/examples/src/main/java/io/zenoh/ZPub.java +++ b/examples/src/main/java/io/zenoh/ZPub.java @@ -57,18 +57,17 @@ public Integer call() throws ZError, InterruptedException { Publisher publisher = session.declarePublisher(keyExpr, publisherOptions); System.out.println("Press CTRL-C to quit..."); - ZBytes attachmentBytes = attachment != null ? ZBytes.from(attachment) : null; int idx = 0; while (true) { Thread.sleep(1000); String payload = String.format("[%4d] %s", idx, value); System.out.println("Putting Data ('" + keyExpr + "': '" + payload + "')..."); - if (attachmentBytes != null) { + if (attachment != null) { PutOptions putOptions = new PutOptions(); - putOptions.setAttachment(attachmentBytes); - publisher.put(ZBytes.from(payload), putOptions); + putOptions.setAttachment(attachment); + publisher.put(payload, putOptions); } else { - publisher.put(ZBytes.from(payload)); + publisher.put(payload); } idx++; } diff --git a/examples/src/main/java/io/zenoh/ZPut.java b/examples/src/main/java/io/zenoh/ZPut.java index 720c8b4f..8ee802a8 100644 --- a/examples/src/main/java/io/zenoh/ZPut.java +++ b/examples/src/main/java/io/zenoh/ZPut.java @@ -14,8 +14,6 @@ package io.zenoh; -import io.zenoh.bytes.ZBytes; -import io.zenoh.exceptions.ZError; import io.zenoh.keyexpr.KeyExpr; import io.zenoh.pubsub.PutOptions; import picocli.CommandLine; @@ -44,10 +42,10 @@ public Integer call() throws Exception { System.out.println("Putting Data ('" + keyExpr + "': '" + value + "')..."); if (attachment != null) { var putOptions = new PutOptions(); - putOptions.setAttachment(ZBytes.from(attachment)); - session.put(keyExpr, ZBytes.from(value), putOptions); + putOptions.setAttachment(attachment); + session.put(keyExpr, value, putOptions); } else { - session.put(keyExpr, ZBytes.from(value)); + session.put(keyExpr, value); } } return 0; diff --git a/examples/src/main/java/io/zenoh/ZQuerier.java b/examples/src/main/java/io/zenoh/ZQuerier.java index 2786ea1c..13a2d7f2 100644 --- a/examples/src/main/java/io/zenoh/ZQuerier.java +++ b/examples/src/main/java/io/zenoh/ZQuerier.java @@ -14,7 +14,6 @@ package io.zenoh; -import io.zenoh.bytes.ZBytes; import io.zenoh.exceptions.ZError; import io.zenoh.query.*; import picocli.CommandLine; @@ -63,7 +62,7 @@ private void performQueries(Querier querier, Selector selector) throws ZError, I System.out.println("Querying '" + selector + "' with payload: '" + queryPayload + "'..."); Querier.GetOptions options = new Querier.GetOptions(); - options.setPayload(ZBytes.from(queryPayload)); + options.setPayload(queryPayload); options.setParameters(selector.getParameters()); querier.get(this::handleReply, options); diff --git a/examples/src/main/java/io/zenoh/ZQueryable.java b/examples/src/main/java/io/zenoh/ZQueryable.java index 6cb68e00..62693f7b 100644 --- a/examples/src/main/java/io/zenoh/ZQueryable.java +++ b/examples/src/main/java/io/zenoh/ZQueryable.java @@ -14,7 +14,6 @@ package io.zenoh; -import io.zenoh.bytes.ZBytes; import io.zenoh.exceptions.ZError; import io.zenoh.keyexpr.KeyExpr; import io.zenoh.query.Query; @@ -100,7 +99,7 @@ private void handleQuery(Query query) { System.out.println(">> [Queryable] Received Query '" + query.getSelector() + "'" + valueInfo); var options = new ReplyOptions(); options.setTimeStamp(TimeStamp.getCurrentTime()); - query.reply(query.getKeyExpr(), ZBytes.from(value), options); + query.reply(query.getKeyExpr(), value, options); } catch (Exception e) { System.err.println(">> [Queryable] Error sending reply: " + e.getMessage()); } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt index 48b6399b..5e10c3d6 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt @@ -16,6 +16,7 @@ package io.zenoh import io.zenoh.annotations.Unstable import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.ZBytes import io.zenoh.config.ZenohId import io.zenoh.exceptions.ZError import io.zenoh.handlers.BlockingQueueHandler @@ -115,7 +116,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { * Thread.sleep(1000); * String payload = String.format("[%4d] %s", idx, value); * System.out.println("Putting Data ('" + keyExpr + "': '" + payload + "')..."); - * publisher.put(ZBytes.from(payload)); + * publisher.put(payload); * idx++; * } * } @@ -240,7 +241,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { * break; * } * Query query = wrapper.get(); - * query.reply(query.getKeyExpr(), ZBytes.from("Example reply)); + * query.reply(query.getKeyExpr(), "Example reply"); * } * } * ``` @@ -271,7 +272,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { * @Override * public void handle(Query query) { * var keyExpr = query.getKeyExpr(); - * query.reply(keyExpr, ZBytes.from("Reply #" + counter + "!")); + * query.reply(keyExpr, "Reply #" + counter + "!"); * counter++; * } * @@ -306,7 +307,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { * * ```java * try (Session session = Zenoh.open(config)) { - * var queryable = session.declareQueryable(keyExpr, query -> query.reply(keyExpr, ZBytes.from("Example reply"))); + * var queryable = session.declareQueryable(keyExpr, query -> query.reply(keyExpr, "Example reply")); * //... * } * ``` @@ -338,7 +339,7 @@ class Session private constructor(private val config: Config) : AutoCloseable { * Querier querier = session.declareQuerier(selector.getKeyExpr(), options); * //... * Querier.GetOptions options = new Querier.GetOptions(); - * options.setPayload(ZBytes.from("Example payload")); + * options.setPayload("Example payload"); * querier.get(reply -> {...}, options); * } * ``` @@ -513,6 +514,25 @@ class Session private constructor(private val config: Config) : AutoCloseable { resolvePut(keyExpr, payload, options) } + /** + * Perform a put with the provided [payload] to the specified [keyExpr]. + * + * Example: + * ```java + * session.put(KeyExpr.from("a/b/c"), "Example payload"); + * //... + * ``` + * + * @param keyExpr The [KeyExpr] for performing the put. + * @param payload The payload to put as a string. + * @param options Optional [PutOptions] to configure the put. + */ + @JvmOverloads + @Throws(ZError::class) + fun put(keyExpr: KeyExpr, payload: String, options: PutOptions = PutOptions()) { + resolvePut(keyExpr, ZBytes.from(payload), options) + } + /** * Perform a delete operation to the specified [keyExpr]. * diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/ext/ZDeserializer.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/ext/ZDeserializer.kt index bbae67d2..1d039cbe 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/ext/ZDeserializer.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/ext/ZDeserializer.kt @@ -15,6 +15,7 @@ package io.zenoh.ext import com.google.common.reflect.TypeToken +import io.zenoh.bytes.IntoZBytes import io.zenoh.bytes.ZBytes import io.zenoh.jni.JNIZBytes @@ -103,8 +104,8 @@ abstract class ZDeserializer: TypeToken() { /** * Deserialize the [zbytes] into an element of type [T]. */ - fun deserialize(zbytes: ZBytes): T { + fun deserialize(zbytes: IntoZBytes): T { @Suppress("UNCHECKED_CAST") - return JNIZBytes.deserializeViaJNI(zbytes, this.type) as T + return JNIZBytes.deserializeViaJNI(zbytes.into(), this.type) as T } } diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/DeleteOptions.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/DeleteOptions.kt index 50980816..c9f71a75 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/DeleteOptions.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/DeleteOptions.kt @@ -15,6 +15,7 @@ package io.zenoh.pubsub import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.ZBytes import io.zenoh.qos.CongestionControl import io.zenoh.qos.Priority import io.zenoh.qos.QoS @@ -35,4 +36,6 @@ data class DeleteOptions( var express: Boolean = QoS.defaultQoS.express, var congestionControl: CongestionControl = QoS.defaultQoS.congestionControl, var priority: Priority = QoS.defaultQoS.priority -) +) { + fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) } +} diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt index 07521583..d0a9de2e 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt @@ -17,6 +17,7 @@ package io.zenoh.pubsub import io.zenoh.* import io.zenoh.bytes.Encoding import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.ZBytes import io.zenoh.exceptions.ZError import io.zenoh.jni.JNIPublisher import io.zenoh.keyexpr.KeyExpr @@ -41,8 +42,7 @@ import kotlin.Throws * try (Publisher publisher = session.declarePublisher(keyExpr)) { * int i = 0; * while (true) { - * var payload = ZBytes.from("Hello for the " + i + "th time!"); - * publisher.put(payload); + * publisher.put("Hello for the " + i + "th time!"); * Thread.sleep(1000); * i++; * } @@ -88,6 +88,14 @@ class Publisher internal constructor( jniPublisher?.put(payload, options.encoding ?: this.encoding, options.attachment) ?: throw publisherNotValid } + /** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */ + @Throws(ZError::class) + fun put(payload: String) = put(ZBytes.from(payload)) + + /** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */ + @Throws(ZError::class) + fun put(payload: String, options: PutOptions) = put(ZBytes.from(payload), options) + /** * Performs a DELETE operation on the specified [keyExpr] */ diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/PutOptions.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/PutOptions.kt index e5930d61..2b57e5f4 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/PutOptions.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/PutOptions.kt @@ -16,6 +16,7 @@ package io.zenoh.pubsub import io.zenoh.bytes.Encoding import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.ZBytes import io.zenoh.qos.* /** @@ -35,4 +36,6 @@ data class PutOptions( var express: Boolean = QoS.defaultQoS.express, var congestionControl: CongestionControl = QoS.defaultQoS.congestionControl, var priority: Priority = QoS.defaultQoS.priority -) +) { + fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) } +} diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt index 08d7a506..59266188 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt @@ -16,6 +16,7 @@ package io.zenoh.query import io.zenoh.bytes.Encoding import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.ZBytes import java.time.Duration /** @@ -35,4 +36,7 @@ data class GetOptions( var payload: IntoZBytes? = null, var encoding: Encoding? = null, var attachment: IntoZBytes? = null -) +) { + fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) } + fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) } +} diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt index ef4c5915..7662576f 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt @@ -17,6 +17,7 @@ package io.zenoh.query import io.zenoh.annotations.Unstable import io.zenoh.bytes.Encoding import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.ZBytes import io.zenoh.exceptions.ZError import io.zenoh.handlers.BlockingQueueHandler import io.zenoh.handlers.Callback @@ -65,7 +66,10 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v var payload: IntoZBytes? = null, var encoding: Encoding? = null, var attachment: IntoZBytes? = null - ) + ) { + fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) } + fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) } + } /** * Perform a get operation to the [keyExpr] from the Querier and pipe them into a blocking queue. diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Query.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Query.kt index 7fb349d4..0c8eccd1 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Query.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Query.kt @@ -53,6 +53,7 @@ class Query internal constructor( * * @param keyExpr Key expression to reply to. This parameter must not be necessarily the same * as the key expression from the Query, however it must intersect with the query key. + * @param payload The reply payload. * @param options Optional options for configuring the reply. */ @Throws(ZError::class) @@ -65,7 +66,7 @@ class Query internal constructor( SampleKind.PUT, options.timeStamp, QoS(options.congestionControl, options.priority, options.express), - options.attachment + options.attachment?.into() ) jniQuery?.apply { replySuccess(sample) @@ -73,6 +74,18 @@ class Query internal constructor( } ?: throw (ZError("Query is invalid")) } + /** + * Reply to the specified key expression. + * + * @param keyExpr Key expression to reply to. This parameter must not be necessarily the same + * as the key expression from the Query, however it must intersect with the query key. + * @param payload The reply payload as a string. + * @param options Optional options for configuring the reply. + */ + @Throws(ZError::class) + @JvmOverloads + fun reply(keyExpr: KeyExpr, payload: String, options: ReplyOptions = ReplyOptions()) = reply(keyExpr, ZBytes.from(payload), options) + /** * Reply "delete" to the specified key expression. * @@ -109,6 +122,16 @@ class Query internal constructor( } ?: throw (ZError("Query is invalid")) } + /** + * Reply "error" to the specified key expression. + * + * @param message The error message as a String. + * @param options Optional options for configuring the reply. + */ + @JvmOverloads + @Throws(ZError::class) + fun replyErr(message: String, options: ReplyErrOptions = ReplyErrOptions()) = replyErr(ZBytes.from(message), options) + override fun close() { jniQuery?.apply { this.close() diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Queryable.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Queryable.kt index eab11c56..f8159cd9 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Queryable.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Queryable.kt @@ -37,7 +37,7 @@ import io.zenoh.session.SessionDeclaration * break; * } * Query query = wrapper.get(); - * query.reply(query.getKeyExpr(), ZBytes.from("Example reply"); + * query.reply(query.getKeyExpr(), "Example reply"); * } * } * ``` @@ -45,7 +45,7 @@ import io.zenoh.session.SessionDeclaration * Example using a [io.zenoh.handlers.Callback]: * ```java * try (Session session = Zenoh.open(config)) { - * var queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), ZBytes.from("Example reply")); + * var queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), "Example reply"); * } * ``` * @@ -89,7 +89,7 @@ sealed class Queryable( * Example * ```java * try (Session session = Zenoh.open(config)) { - * CallbackQueryable queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), ZBytes.from("Example reply")); + * CallbackQueryable queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), "Example reply"); * } * ``` */ @@ -109,7 +109,7 @@ class CallbackQueryable internal constructor(keyExpr: KeyExpr, jniQueryable: JNI * break; * } * Query query = wrapper.get(); - * query.reply(query.getKeyExpr(), ZBytes.from("Example reply"); + * query.reply(query.getKeyExpr(), "Example reply"); * } * } * ``` diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt index 15b33e47..1317076f 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Reply.kt @@ -16,6 +16,7 @@ package io.zenoh.query import io.zenoh.ZenohType import io.zenoh.bytes.Encoding +import io.zenoh.bytes.IntoZBytes import io.zenoh.bytes.ZBytes import io.zenoh.config.ZenohId import io.zenoh.sample.Sample @@ -90,11 +91,13 @@ sealed class Reply private constructor(val replierId: ZenohId?) : ZenohType { data class ReplyOptions( var encoding: Encoding = Encoding.defaultEncoding(), var timeStamp: TimeStamp? = null, - var attachment: ZBytes? = null, + var attachment: IntoZBytes? = null, var express: Boolean = QoS.defaultQoS.express, var congestionControl: CongestionControl = QoS.defaultQoS.congestionControl, var priority: Priority = QoS.defaultQoS.priority -) +) { + fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) } +} /** * Options for performing a Reply Delete to a [Query]. @@ -107,11 +110,13 @@ data class ReplyOptions( */ data class ReplyDelOptions( var timeStamp: TimeStamp? = null, - var attachment: ZBytes? = null, + var attachment: IntoZBytes? = null, var express: Boolean = QoS.defaultQoS.express, var congestionControl: CongestionControl = QoS.defaultQoS.congestionControl, var priority: Priority = QoS.defaultQoS.priority -) +) { + fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) } +} /** diff --git a/zenoh-java/src/jvmTest/java/io/zenoh/EncodingTest.java b/zenoh-java/src/jvmTest/java/io/zenoh/EncodingTest.java index a524672b..964827d5 100644 --- a/zenoh-java/src/jvmTest/java/io/zenoh/EncodingTest.java +++ b/zenoh-java/src/jvmTest/java/io/zenoh/EncodingTest.java @@ -33,7 +33,7 @@ public class EncodingTest { private static final Encoding without_schema = Encoding.TEXT_CSV; private static final Encoding with_schema = Encoding.APPLICATION_JSON.withSchema("test_schema"); - private ZBytes payload = ZBytes.from("test"); + private final ZBytes payload = ZBytes.from("test"); @Test public void encoding_subscriberTest() throws ZError, InterruptedException {