diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/northbound/NorthboundTagConsumer.java b/hivemq-edge/src/main/java/com/hivemq/protocols/northbound/NorthboundTagConsumer.java index e6ec27b625..5f8b4e583b 100644 --- a/hivemq-edge/src/main/java/com/hivemq/protocols/northbound/NorthboundTagConsumer.java +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/northbound/NorthboundTagConsumer.java @@ -112,12 +112,14 @@ public void accept(final @NotNull List dataPoints) { try { final var jsonMap=objectMapper.readValue((String)jsonDataPoint.getTagValue(), typeRef); final var value = jsonMap.get("value"); - if(value!=null) { + if(value!=null && jsonMap.size() == 1) { return dataPointFactory.create(jsonDataPoint.getTagName(), value); - } else { + } else if(value!=null && jsonMap.size() > 1) { + return dataPointFactory.create(jsonDataPoint.getTagName(), jsonMap); + }else { throw new RuntimeException("No value entry in JSON message"); } - } catch (JsonProcessingException e) { + } catch (final JsonProcessingException e) { throw new RuntimeException(e); } }).toList(); diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java index d2bba9fb59..e807ee59a8 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java @@ -15,6 +15,8 @@ */ package com.hivemq.edge.adapters.opcua; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.hivemq.adapter.sdk.api.events.EventService; import com.hivemq.adapter.sdk.api.factories.DataPointFactory; import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService; @@ -55,6 +57,7 @@ class OpcUaClientConnection { private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class); + private static final @NotNull Gson GSON = new GsonBuilder().disableHtmlEscaping().create(); private final @NotNull OpcUaSpecificAdapterConfig config; private final @NotNull List tags; @@ -206,7 +209,7 @@ void destroy() { log.debug("Creating new OPC UA subscription"); final OpcUaSubscription subscription = new OpcUaSubscription(client); subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval()); - subscription.setSubscriptionListener(new OpcUaSubscriptionListener(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory)); + subscription.setSubscriptionListener(new OpcUaSubscriptionListener(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory, GSON)); try { subscription.create(); return subscription diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java index 6bb432096f..255bfc5516 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/config/tag/OpcuaTagDefinition.java @@ -22,6 +22,8 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.util.Objects; + public class OpcuaTagDefinition implements TagDefinition { @JsonProperty(value = "node", required = true) @@ -30,35 +32,45 @@ public class OpcuaTagDefinition implements TagDefinition { required = true) private final @NotNull String node; + @JsonProperty(value = "collectAllProperties") + @ModuleConfigField(title = "Collect all properties of the node", + description = "OPC UA defines a set of properties for each node. If this is enabled, all properties will be collected and sent to the MQTT broker.") + private final @NotNull boolean collectAllProperties; + @JsonCreator - public OpcuaTagDefinition(@JsonProperty(value = "node", required = true) final @NotNull String node) { + public OpcuaTagDefinition( + @JsonProperty(value = "node", required = true) final @NotNull String node, + @JsonProperty(value = "collectAllProperties", defaultValue = "false") final @Nullable Boolean collectAllProperties) { this.node = node; + if(collectAllProperties == null) { + this.collectAllProperties = false; + } else { + this.collectAllProperties = collectAllProperties; + } } public @NotNull String getNode() { return node; } - @Override - public boolean equals(final @Nullable Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } + public boolean isCollectAllProperties() { + return collectAllProperties; + } + @Override + public boolean equals(final Object o) { + if (o == null || getClass() != o.getClass()) return false; final OpcuaTagDefinition that = (OpcuaTagDefinition) o; - return node.equals(that.node); + return isCollectAllProperties() == that.isCollectAllProperties() && Objects.equals(getNode(), that.getNode()); } @Override public int hashCode() { - return node.hashCode(); + return Objects.hash(getNode(), isCollectAllProperties()); } @Override - public @NotNull String toString() { - return "OpcuaTagDefinition{" + "node='" + node + '\'' + '}'; + public String toString() { + return "OpcuaTagDefinition{" + "node='" + node + '\'' + ", collectAllProperties=" + collectAllProperties + '}'; } } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionListener.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionListener.java index 513587cc92..bdc2119a37 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionListener.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionListener.java @@ -15,6 +15,8 @@ */ package com.hivemq.edge.adapters.opcua.listeners; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import com.hivemq.adapter.sdk.api.events.EventService; import com.hivemq.adapter.sdk.api.events.model.Event; import com.hivemq.adapter.sdk.api.factories.DataPointFactory; @@ -31,11 +33,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -46,8 +44,6 @@ public class OpcUaSubscriptionListener implements OpcUaSubscription.SubscriptionListener { - private static final Logger log = LoggerFactory.getLogger(OpcUaSubscriptionListener.class); - private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService; private final @NotNull ProtocolAdapterTagStreamingService tagStreamingService; private final @NotNull EventService eventService; @@ -56,6 +52,7 @@ public class OpcUaSubscriptionListener implements OpcUaSubscription.Subscription private final @NotNull Map nodeIdToTag; private final @NotNull OpcUaClient client; private final @NotNull DataPointFactory dataPointFactory; + private final @NotNull Gson gson; public OpcUaSubscriptionListener( final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService, @@ -64,16 +61,19 @@ public OpcUaSubscriptionListener( final @NotNull String adapterId, final @NotNull List tags, final @NotNull OpcUaClient client, - final @NotNull DataPointFactory dataPointFactory) { + final @NotNull DataPointFactory dataPointFactory, + final @NotNull Gson gson) { this.protocolAdapterMetricsService = protocolAdapterMetricsService; this.tagStreamingService = tagStreamingService; this.eventService = eventService; this.adapterId = adapterId; this.client = client; this.dataPointFactory = dataPointFactory; + this.gson = gson; nodeIdToTag = tags.stream() .collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity())); } + @Override public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) { protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_KEEPALIVE_COUNT); @@ -106,7 +106,8 @@ public void onDataReceived( } try { protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_DATA_RECEIVED_COUNT); - final String payload = extractPayload(client, values.get(i)); + final var currentValue = values.get(i); + final String payload = extractPayload(client, currentValue, gson, tag.getDefinition().isCollectAllProperties()); tagStreamingService.feed(tn, List.of(dataPointFactory.createJsonDataPoint(tn, payload))); } catch (final Throwable e) { protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_DATA_ERROR_COUNT); @@ -115,15 +116,47 @@ public void onDataReceived( } } - private static @NotNull String extractPayload(final @NotNull OpcUaClient client, final @NotNull DataValue value) - throws UaException { + private static @NotNull String extractPayload( + final @NotNull OpcUaClient client, + final @NotNull DataValue value, + final @NotNull Gson gson, + final boolean collectAllProperties) throws UaException { + if (value.getValue().getValue() == null) { return ""; } + final var jsonObject = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value, gson); + + return jsonObject + .map(json -> { + final var ret = new JsonObject(); + ret.add("value", json); + if(collectAllProperties) { + if(value.getServerPicoseconds() != null) { + ret.addProperty("serverPicoseconds", value.getServerPicoseconds().longValue()); + } else { + ret.addProperty("serverPicoseconds", 0); + } + if(value.getSourcePicoseconds() != null) { + ret.addProperty("sourcePicoseconds", value.getSourcePicoseconds().longValue()); + } else { + ret.addProperty("sourcePicoseconds", 0); + } + if(value.getSourceTime() != null) { + ret.addProperty("sourceTime", value.getSourceTime().getUtcTime()); + } else { + ret.addProperty("sourceTime", 0); + } + if(value.getServerTime() != null) { + ret.addProperty("serverTime", value.getServerTime().getUtcTime()); + } else { + ret.addProperty("serverTime", 0); + } + } + return ret; + }) + .map(gson::toJson) + .orElseGet(() -> gson.toJson(new JsonObject())); - final ByteBuffer byteBuffer = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value); - final byte[] buffer = new byte[byteBuffer.remaining()]; - byteBuffer.get(buffer); - return new String(buffer, StandardCharsets.UTF_8); } } diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverter.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverter.java index 0c355d6715..d1b57188c1 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverter.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverter.java @@ -50,6 +50,7 @@ import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Base64; +import java.util.Optional; import java.util.UUID; import static com.hivemq.edge.adapters.opcua.Constants.EMPTY_BYTES; @@ -59,15 +60,15 @@ public class OpcUaToJsonConverter { private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaToJsonConverter.class); - private static final @NotNull Gson GSON = new GsonBuilder().disableHtmlEscaping().create(); private static final @NotNull Base64.Encoder BASE_64 = Base64.getEncoder(); - public static @NotNull ByteBuffer convertPayload( + public static @NotNull Optional convertPayload( final @NotNull EncodingContext serializationContext, - final @NotNull DataValue dataValue) { + final @NotNull DataValue dataValue, + final @NotNull Gson gson) { final Object value = dataValue.getValue().getValue(); if (value == null) { - return ByteBuffer.wrap(EMPTY_BYTES); + return Optional.empty(); } final JsonObject jsonObject = new JsonObject(); if (value instanceof final DataValue v) { @@ -89,15 +90,15 @@ public class OpcUaToJsonConverter { jsonObject.add("serverPicoseconds", new JsonPrimitive(v.getServerPicoseconds().intValue())); } } - jsonObject.add("value", convertValue(value, serializationContext)); - return ByteBuffer.wrap(GSON.toJson(jsonObject).getBytes(StandardCharsets.UTF_8)); + return Optional.ofNullable(convertValue(value, serializationContext, gson)); } private static JsonElement convertValue( final @NotNull Object value, - final @NotNull EncodingContext serializationContext) { + final @NotNull EncodingContext serializationContext, + final @NotNull Gson gson) { if (value instanceof final DataValue dv) { - return convertValue(dv.getValue(), serializationContext); + return convertValue(dv.getValue(), serializationContext, gson); } else if (value instanceof final Boolean b) { return new JsonPrimitive(b); } else if (value instanceof final Byte b) { @@ -162,31 +163,31 @@ private static JsonElement convertValue( } else if (value instanceof final ExtensionObject eo) { try { final Object decodedValue = eo.decode(serializationContext); - return convertValue(decodedValue, serializationContext); + return convertValue(decodedValue, serializationContext, gson); } catch (final Throwable t) { log.debug("Not able to decode body of OPC UA ExtensionObject, using undecoded body value instead", t); - return convertValue(eo.getBody(), serializationContext); + return convertValue(eo.getBody(), serializationContext, gson); } } else if (value instanceof final Variant variant) { final Object variantValue = variant.getValue(); - return variantValue != null ? convertValue(variantValue, serializationContext) : null; + return variantValue != null ? convertValue(variantValue, serializationContext, gson) : null; } else if (value instanceof final DiagnosticInfo info) { return convertDiagnosticInfo(info); } else if (value instanceof final DynamicStructType struct) { final JsonObject structRoot = new JsonObject(); struct.getMembers() - .forEach((key, value1) -> structRoot.add(key, convertValue(value1, serializationContext))); + .forEach((key, value1) -> structRoot.add(key, convertValue(value1, serializationContext, gson))); return structRoot; } else if (value.getClass().isArray()) { final Object[] values = (Object[]) value; final JsonArray ret = new JsonArray(); - Arrays.asList(values).forEach(in -> ret.add(convertValue(in, serializationContext))); + Arrays.asList(values).forEach(in -> ret.add(convertValue(in, serializationContext, gson))); return ret; } log.warn("No explicit converter for OPC UA type {} falling back to best effort json", value.getClass().getSimpleName()); - return GSON.toJsonTree(value); + return gson.toJsonTree(value); } private static @NotNull JsonObject convertNodeId(final @NotNull NodeId nodeId) { diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java index a7a06b2c98..d7b832a2e4 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/AbstractOpcUaPayloadConverterTest.java @@ -93,7 +93,7 @@ public void before() { } @NotNull - protected OpcUaProtocolAdapter createAndStartAdapter(final @NotNull String subcribedNodeId) + protected OpcUaProtocolAdapter createAndStartAdapter(final @NotNull String subcribedNodeId, final @Nullable Boolean collectAllProperties) throws Exception { final OpcUaToMqttConfig opcuaToMqttConfig = @@ -107,7 +107,7 @@ protected OpcUaProtocolAdapter createAndStartAdapter(final @NotNull String subcr null); when(protocolAdapterInput.getConfig()).thenReturn(config); - when(protocolAdapterInput.getTags()).thenReturn(List.of(new OpcuaTag(subcribedNodeId, "", new OpcuaTagDefinition(subcribedNodeId)))); + when(protocolAdapterInput.getTags()).thenReturn(List.of(new OpcuaTag(subcribedNodeId, "", new OpcuaTagDefinition(subcribedNodeId, collectAllProperties)))); final OpcUaProtocolAdapter protocolAdapter = new OpcUaProtocolAdapter(OpcUaProtocolAdapterInformation.INSTANCE, protocolAdapterInput); diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaStringPayloadConverterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaStringPayloadConverterTest.java index 05c47c51f9..edf0c7d4fc 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaStringPayloadConverterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaStringPayloadConverterTest.java @@ -83,7 +83,7 @@ public void whenTypeSubscriptionPresent_thenReceiveMsg( final String nodeId = opcUaServerExtension.getTestNamespace().addNode("Test" + name + "Node", typeId, () -> serverValue, 999); - final OpcUaProtocolAdapter protocolAdapter = createAndStartAdapter(nodeId); + final OpcUaProtocolAdapter protocolAdapter = createAndStartAdapter(nodeId, null); assertEquals(ProtocolAdapterState.ConnectionStatus.CONNECTED, protocolAdapter.getProtocolAdapterState().getConnectionStatus()); final var received = expectAdapterPublish(); diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverterTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverterTest.java index 65c9e5fc12..40ae7fdf3c 100644 --- a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverterTest.java +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/northbound/OpcUaToJsonConverterTest.java @@ -15,6 +15,9 @@ */ package com.hivemq.edge.adapters.opcua.northbound; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.hivemq.adapter.sdk.api.data.DataPoint; import com.hivemq.adapter.sdk.api.model.ProtocolAdapterStopInput; import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState; @@ -32,10 +35,15 @@ import org.junit.jupiter.params.provider.MethodSource; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.LONG; class OpcUaToJsonConverterTest extends AbstractOpcUaPayloadConverterTest { @@ -88,7 +96,7 @@ public void whenTypeSubscriptionPresent_thenReceiveMsg( final String nodeId = opcUaServerExtension.getTestNamespace().addNode("Test" + name + "Node", typeId, () -> value, 999); - final OpcUaProtocolAdapter protocolAdapter = createAndStartAdapter(nodeId); + final OpcUaProtocolAdapter protocolAdapter = createAndStartAdapter(nodeId, null); assertThat(ProtocolAdapterState.ConnectionStatus.CONNECTED).isEqualTo(protocolAdapter.getProtocolAdapterState() .getConnectionStatus()); @@ -102,4 +110,64 @@ public void whenTypeSubscriptionPresent_thenReceiveMsg( .containsExactly(Tuple.tuple(nodeId, "{\"value\":" + jsonValue + "}")); }); } + + @ParameterizedTest(name = "{index} - {0}") + @MethodSource("provideBaseTypes") + @Timeout(10) + public void whenTypeSubscriptionPresent_thenReceiveMsgAndCollectAllProperties( + final @NotNull String name, + final @NotNull NodeId typeId, + final @NotNull Object value, + final @NotNull String jsonValue) throws Exception { + final String nodeId = + opcUaServerExtension.getTestNamespace().addNode("Test" + name + "Node", typeId, () -> value, 999); + + final OpcUaProtocolAdapter protocolAdapter = createAndStartAdapter(nodeId, true); + assertThat(ProtocolAdapterState.ConnectionStatus.CONNECTED).isEqualTo(protocolAdapter.getProtocolAdapterState() + .getConnectionStatus()); + + final var received = expectAdapterPublish(); + protocolAdapter.stop(new ProtocolAdapterStopInput() { + }, new ProtocolAdapterStopOutputImpl()); + + final var typeRef + = new TypeReference>() {}; + var mapper = new ObjectMapper(); + var dataPoints = received.get(nodeId); + var tagNameToValue = dataPoints.stream().collect(Collectors.toMap(DataPoint::getTagName, v -> { + try { + return mapper.readValue((String)v.getTagValue(), typeRef); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + })); + + assertThat(tagNameToValue) + .hasSize(1) + .containsKey(nodeId) + .extractingByKey(nodeId) + .satisfies(innerMap -> { + assertThat(innerMap.get("serverPicoseconds")).isEqualTo(0); + assertThat(innerMap.get("sourcePicoseconds")).isEqualTo(0); + assertThat(innerMap.get("sourceTime")).asInstanceOf(LONG).isGreaterThan(1000L); + assertThat(innerMap.get("serverTime")).asInstanceOf(LONG).isGreaterThan(1000L); + + final String sanitized; + if(jsonValue.startsWith("\"")) { + final var temp = jsonValue.replaceFirst("\"", ""); + sanitized = temp.substring(0, temp.length() - 1); + } else { + sanitized = jsonValue; + } + + final Object toCheck; + if(value instanceof Float) { + //jackson deserializes Float as Double + toCheck = ((Double)innerMap.get("value")).floatValue(); + } else { + toCheck = innerMap.get("value"); + } + assertThat(toCheck).isIn(value, sanitized); + }); + } }