Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/src/main/java/io/zenoh/ZLiveliness.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package io.zenoh;

import io.zenoh.exceptions.ZError;
import io.zenoh.keyexpr.KeyExpr;
import picocli.CommandLine;

Expand Down
9 changes: 4 additions & 5 deletions examples/src/main/java/io/zenoh/ZPub.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,17 @@ public Integer call() throws ZError, InterruptedException {
Publisher publisher = session.declarePublisher(keyExpr, publisherOptions);

System.out.println("Press CTRL-C to quit...");
ZBytes attachmentBytes = attachment != null ? ZBytes.from(attachment) : null;
int idx = 0;
while (true) {
Thread.sleep(1000);
String payload = String.format("[%4d] %s", idx, value);
System.out.println("Putting Data ('" + keyExpr + "': '" + payload + "')...");
if (attachmentBytes != null) {
if (attachment != null) {
PutOptions putOptions = new PutOptions();
putOptions.setAttachment(attachmentBytes);
publisher.put(ZBytes.from(payload), putOptions);
putOptions.setAttachment(attachment);
publisher.put(payload, putOptions);
} else {
publisher.put(ZBytes.from(payload));
publisher.put(payload);
}
idx++;
}
Expand Down
8 changes: 3 additions & 5 deletions examples/src/main/java/io/zenoh/ZPut.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package io.zenoh;

import io.zenoh.bytes.ZBytes;
import io.zenoh.exceptions.ZError;
import io.zenoh.keyexpr.KeyExpr;
import io.zenoh.pubsub.PutOptions;
import picocli.CommandLine;
Expand Down Expand Up @@ -44,10 +42,10 @@ public Integer call() throws Exception {
System.out.println("Putting Data ('" + keyExpr + "': '" + value + "')...");
if (attachment != null) {
var putOptions = new PutOptions();
putOptions.setAttachment(ZBytes.from(attachment));
session.put(keyExpr, ZBytes.from(value), putOptions);
putOptions.setAttachment(attachment);
session.put(keyExpr, value, putOptions);
} else {
session.put(keyExpr, ZBytes.from(value));
session.put(keyExpr, value);
}
}
return 0;
Expand Down
3 changes: 1 addition & 2 deletions examples/src/main/java/io/zenoh/ZQuerier.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package io.zenoh;

import io.zenoh.bytes.ZBytes;
import io.zenoh.exceptions.ZError;
import io.zenoh.query.*;
import picocli.CommandLine;
Expand Down Expand Up @@ -63,7 +62,7 @@ private void performQueries(Querier querier, Selector selector) throws ZError, I
System.out.println("Querying '" + selector + "' with payload: '" + queryPayload + "'...");

Querier.GetOptions options = new Querier.GetOptions();
options.setPayload(ZBytes.from(queryPayload));
options.setPayload(queryPayload);
options.setParameters(selector.getParameters());

querier.get(this::handleReply, options);
Expand Down
3 changes: 1 addition & 2 deletions examples/src/main/java/io/zenoh/ZQueryable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package io.zenoh;

import io.zenoh.bytes.ZBytes;
import io.zenoh.exceptions.ZError;
import io.zenoh.keyexpr.KeyExpr;
import io.zenoh.query.Query;
Expand Down Expand Up @@ -100,7 +99,7 @@ private void handleQuery(Query query) {
System.out.println(">> [Queryable] Received Query '" + query.getSelector() + "'" + valueInfo);
var options = new ReplyOptions();
options.setTimeStamp(TimeStamp.getCurrentTime());
query.reply(query.getKeyExpr(), ZBytes.from(value), options);
query.reply(query.getKeyExpr(), value, options);
} catch (Exception e) {
System.err.println(">> [Queryable] Error sending reply: " + e.getMessage());
}
Expand Down
30 changes: 25 additions & 5 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package io.zenoh

import io.zenoh.annotations.Unstable
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.config.ZenohId
import io.zenoh.exceptions.ZError
import io.zenoh.handlers.BlockingQueueHandler
Expand Down Expand Up @@ -115,7 +116,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
* Thread.sleep(1000);
* String payload = String.format("[%4d] %s", idx, value);
* System.out.println("Putting Data ('" + keyExpr + "': '" + payload + "')...");
* publisher.put(ZBytes.from(payload));
* publisher.put(payload);
* idx++;
* }
* }
Expand Down Expand Up @@ -240,7 +241,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
* break;
* }
* Query query = wrapper.get();
* query.reply(query.getKeyExpr(), ZBytes.from("Example reply));
* query.reply(query.getKeyExpr(), "Example reply");
* }
* }
* ```
Expand Down Expand Up @@ -271,7 +272,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
* @Override
* public void handle(Query query) {
* var keyExpr = query.getKeyExpr();
* query.reply(keyExpr, ZBytes.from("Reply #" + counter + "!"));
* query.reply(keyExpr, "Reply #" + counter + "!");
* counter++;
* }
*
Expand Down Expand Up @@ -306,7 +307,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
*
* ```java
* try (Session session = Zenoh.open(config)) {
* var queryable = session.declareQueryable(keyExpr, query -> query.reply(keyExpr, ZBytes.from("Example reply")));
* var queryable = session.declareQueryable(keyExpr, query -> query.reply(keyExpr, "Example reply"));
* //...
* }
* ```
Expand Down Expand Up @@ -338,7 +339,7 @@ class Session private constructor(private val config: Config) : AutoCloseable {
* Querier querier = session.declareQuerier(selector.getKeyExpr(), options);
* //...
* Querier.GetOptions options = new Querier.GetOptions();
* options.setPayload(ZBytes.from("Example payload"));
* options.setPayload("Example payload");
* querier.get(reply -> {...}, options);
* }
* ```
Expand Down Expand Up @@ -513,6 +514,25 @@ class Session private constructor(private val config: Config) : AutoCloseable {
resolvePut(keyExpr, payload, options)
}

/**
* Perform a put with the provided [payload] to the specified [keyExpr].
*
* Example:
* ```java
* session.put(KeyExpr.from("a/b/c"), "Example payload");
* //...
* ```
*
* @param keyExpr The [KeyExpr] for performing the put.
* @param payload The payload to put as a string.
* @param options Optional [PutOptions] to configure the put.
*/
@JvmOverloads
@Throws(ZError::class)
fun put(keyExpr: KeyExpr, payload: String, options: PutOptions = PutOptions()) {
resolvePut(keyExpr, ZBytes.from(payload), options)
}

/**
* Perform a delete operation to the specified [keyExpr].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.zenoh.ext

import com.google.common.reflect.TypeToken
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.jni.JNIZBytes

Expand Down Expand Up @@ -103,8 +104,8 @@ abstract class ZDeserializer<T>: TypeToken<T>() {
/**
* Deserialize the [zbytes] into an element of type [T].
*/
fun deserialize(zbytes: ZBytes): T {
fun deserialize(zbytes: IntoZBytes): T {
@Suppress("UNCHECKED_CAST")
return JNIZBytes.deserializeViaJNI(zbytes, this.type) as T
return JNIZBytes.deserializeViaJNI(zbytes.into(), this.type) as T
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.zenoh.pubsub

import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.qos.CongestionControl
import io.zenoh.qos.Priority
import io.zenoh.qos.QoS
Expand All @@ -35,4 +36,6 @@ data class DeleteOptions(
var express: Boolean = QoS.defaultQoS.express,
var congestionControl: CongestionControl = QoS.defaultQoS.congestionControl,
var priority: Priority = QoS.defaultQoS.priority
)
) {
fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) }
}
12 changes: 10 additions & 2 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/pubsub/Publisher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh.pubsub
import io.zenoh.*
import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.exceptions.ZError
import io.zenoh.jni.JNIPublisher
import io.zenoh.keyexpr.KeyExpr
Expand All @@ -41,8 +42,7 @@ import kotlin.Throws
* try (Publisher publisher = session.declarePublisher(keyExpr)) {
* int i = 0;
* while (true) {
* var payload = ZBytes.from("Hello for the " + i + "th time!");
* publisher.put(payload);
* publisher.put("Hello for the " + i + "th time!");
* Thread.sleep(1000);
* i++;
* }
Expand Down Expand Up @@ -88,6 +88,14 @@ class Publisher internal constructor(
jniPublisher?.put(payload, options.encoding ?: this.encoding, options.attachment) ?: throw publisherNotValid
}

/** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */
@Throws(ZError::class)
fun put(payload: String) = put(ZBytes.from(payload))

/** Performs a PUT operation on the specified [keyExpr] with the specified [payload]. */
@Throws(ZError::class)
fun put(payload: String, options: PutOptions) = put(ZBytes.from(payload), options)

/**
* Performs a DELETE operation on the specified [keyExpr]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package io.zenoh.pubsub

import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.qos.*

/**
Expand All @@ -35,4 +36,6 @@ data class PutOptions(
var express: Boolean = QoS.defaultQoS.express,
var congestionControl: CongestionControl = QoS.defaultQoS.congestionControl,
var priority: Priority = QoS.defaultQoS.priority
)
) {
fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) }
}
6 changes: 5 additions & 1 deletion zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package io.zenoh.query

import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import java.time.Duration

/**
Expand All @@ -35,4 +36,7 @@ data class GetOptions(
var payload: IntoZBytes? = null,
var encoding: Encoding? = null,
var attachment: IntoZBytes? = null
)
) {
fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) }
fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) }
}
6 changes: 5 additions & 1 deletion zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.zenoh.query
import io.zenoh.annotations.Unstable
import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.ZBytes
import io.zenoh.exceptions.ZError
import io.zenoh.handlers.BlockingQueueHandler
import io.zenoh.handlers.Callback
Expand Down Expand Up @@ -65,7 +66,10 @@ class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private v
var payload: IntoZBytes? = null,
var encoding: Encoding? = null,
var attachment: IntoZBytes? = null
)
) {
fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) }
fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) }
}

/**
* Perform a get operation to the [keyExpr] from the Querier and pipe them into a blocking queue.
Expand Down
25 changes: 24 additions & 1 deletion zenoh-java/src/commonMain/kotlin/io/zenoh/query/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Query internal constructor(
*
* @param keyExpr Key expression to reply to. This parameter must not be necessarily the same
* as the key expression from the Query, however it must intersect with the query key.
* @param payload The reply payload.
* @param options Optional options for configuring the reply.
*/
@Throws(ZError::class)
Expand All @@ -65,14 +66,26 @@ class Query internal constructor(
SampleKind.PUT,
options.timeStamp,
QoS(options.congestionControl, options.priority, options.express),
options.attachment
options.attachment?.into()
)
jniQuery?.apply {
replySuccess(sample)
jniQuery = null
} ?: throw (ZError("Query is invalid"))
}

/**
* Reply to the specified key expression.
*
* @param keyExpr Key expression to reply to. This parameter must not be necessarily the same
* as the key expression from the Query, however it must intersect with the query key.
* @param payload The reply payload as a string.
* @param options Optional options for configuring the reply.
*/
@Throws(ZError::class)
@JvmOverloads
fun reply(keyExpr: KeyExpr, payload: String, options: ReplyOptions = ReplyOptions()) = reply(keyExpr, ZBytes.from(payload), options)

/**
* Reply "delete" to the specified key expression.
*
Expand Down Expand Up @@ -109,6 +122,16 @@ class Query internal constructor(
} ?: throw (ZError("Query is invalid"))
}

/**
* Reply "error" to the specified key expression.
*
* @param message The error message as a String.
* @param options Optional options for configuring the reply.
*/
@JvmOverloads
@Throws(ZError::class)
fun replyErr(message: String, options: ReplyErrOptions = ReplyErrOptions()) = replyErr(ZBytes.from(message), options)

override fun close() {
jniQuery?.apply {
this.close()
Expand Down
8 changes: 4 additions & 4 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/query/Queryable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ import io.zenoh.session.SessionDeclaration
* break;
* }
* Query query = wrapper.get();
* query.reply(query.getKeyExpr(), ZBytes.from("Example reply");
* query.reply(query.getKeyExpr(), "Example reply");
* }
* }
* ```
*
* Example using a [io.zenoh.handlers.Callback]:
* ```java
* try (Session session = Zenoh.open(config)) {
* var queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), ZBytes.from("Example reply"));
* var queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), "Example reply");
* }
* ```
*
Expand Down Expand Up @@ -89,7 +89,7 @@ sealed class Queryable(
* Example
* ```java
* try (Session session = Zenoh.open(config)) {
* CallbackQueryable queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), ZBytes.from("Example reply"));
* CallbackQueryable queryable = session.declareQueryable(keyExpr, query -> query.reply(query.getKeyExpr(), "Example reply");
* }
* ```
*/
Expand All @@ -109,7 +109,7 @@ class CallbackQueryable internal constructor(keyExpr: KeyExpr, jniQueryable: JNI
* break;
* }
* Query query = wrapper.get();
* query.reply(query.getKeyExpr(), ZBytes.from("Example reply");
* query.reply(query.getKeyExpr(), "Example reply");
* }
* }
* ```
Expand Down
Loading
Loading