Skip to content

Commit afe2f77

Browse files
authored
Merge pull request #587 from hivemq/fix/26487-rework-reconnect
Fix/26487 rework reconnect
2 parents f4b56a9 + 653098e commit afe2f77

File tree

13 files changed

+612
-431
lines changed

13 files changed

+612
-431
lines changed
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
package com.hivemq.edge.adapters.opcua;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.google.common.collect.ImmutableList;
6+
import com.hivemq.adapter.sdk.api.discovery.NodeType;
7+
import com.hivemq.adapter.sdk.api.discovery.ProtocolAdapterDiscoveryInput;
8+
import com.hivemq.adapter.sdk.api.discovery.ProtocolAdapterDiscoveryOutput;
9+
import com.hivemq.adapter.sdk.api.events.model.Event;
10+
import com.hivemq.adapter.sdk.api.services.ModuleServices;
11+
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService;
12+
import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState;
13+
import com.hivemq.adapter.sdk.api.writing.WritingInput;
14+
import com.hivemq.adapter.sdk.api.writing.WritingOutput;
15+
import com.hivemq.edge.adapters.opcua.client.OpcUaClientConfigurator;
16+
import com.hivemq.edge.adapters.opcua.client.OpcUaEndpointFilter;
17+
import com.hivemq.edge.adapters.opcua.config.OpcUaAdapterConfig;
18+
import com.hivemq.edge.adapters.opcua.config.mqtt2opcua.MqttToOpcUaMapping;
19+
import com.hivemq.edge.adapters.opcua.mqtt2opcua.JsonSchemaGenerator;
20+
import com.hivemq.edge.adapters.opcua.mqtt2opcua.JsonToOpcUAConverter;
21+
import com.hivemq.edge.adapters.opcua.mqtt2opcua.OpcUaPayload;
22+
import org.eclipse.milo.opcua.binaryschema.GenericBsdParser;
23+
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
24+
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
25+
import org.eclipse.milo.opcua.sdk.client.api.UaSession;
26+
import org.eclipse.milo.opcua.sdk.client.dtd.DataTypeDictionarySessionInitializer;
27+
import org.eclipse.milo.opcua.stack.core.Identifiers;
28+
import org.eclipse.milo.opcua.stack.core.UaException;
29+
import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
30+
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
31+
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
32+
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
33+
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
34+
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
35+
import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
36+
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseDescription;
37+
import org.eclipse.milo.opcua.stack.core.types.structured.BrowseResult;
38+
import org.eclipse.milo.opcua.stack.core.types.structured.ReferenceDescription;
39+
import org.jetbrains.annotations.NotNull;
40+
import org.jetbrains.annotations.Nullable;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
44+
import java.util.Objects;
45+
import java.util.Optional;
46+
import java.util.concurrent.CompletableFuture;
47+
import java.util.function.BiConsumer;
48+
49+
import static com.hivemq.adapter.sdk.api.state.ProtocolAdapterState.ConnectionStatus.CONNECTED;
50+
import static com.hivemq.adapter.sdk.api.state.ProtocolAdapterState.ConnectionStatus.DISCONNECTED;
51+
import static java.util.Objects.requireNonNullElse;
52+
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
53+
54+
public class OpcUaClientWrapper {
55+
private static final Logger log = LoggerFactory.getLogger(OpcUaClientWrapper.class);
56+
57+
public final @NotNull OpcUaClient client;
58+
public final @NotNull Optional<JsonToOpcUAConverter> jsonToOpcUAConverter;
59+
public final @NotNull Optional<JsonSchemaGenerator> jsonSchemaGenerator;
60+
public final @NotNull OpcUaSubscriptionLifecycle opcUaSubscriptionLifecycle;
61+
62+
public OpcUaClientWrapper(
63+
@NotNull final OpcUaClient client,
64+
@NotNull final OpcUaSubscriptionLifecycle opcUaSubscriptionLifecycle,
65+
@NotNull final Optional<JsonToOpcUAConverter> jsonToOpcUAConverter,
66+
@NotNull final Optional<JsonSchemaGenerator> jsonSchemaGenerator) {
67+
this.client = client;
68+
this.jsonToOpcUAConverter = jsonToOpcUAConverter;
69+
this.jsonSchemaGenerator = jsonSchemaGenerator;
70+
this.opcUaSubscriptionLifecycle = opcUaSubscriptionLifecycle;
71+
}
72+
73+
public CompletableFuture<Void> stop() {
74+
return opcUaSubscriptionLifecycle
75+
.stop()
76+
.thenCompose(ignored ->
77+
client
78+
.disconnect()
79+
.thenApply(ignored2 -> null)
80+
);
81+
}
82+
83+
public @NotNull CompletableFuture<@NotNull JsonNode> createMqttPayloadJsonSchema(final @NotNull MqttToOpcUaMapping writeContext) {
84+
return jsonSchemaGenerator
85+
.map(gen -> gen.createJsonSchema(NodeId.parse(writeContext.getNode())))
86+
.orElseGet(() -> CompletableFuture.failedFuture(new NullPointerException()));
87+
}
88+
89+
public void discoverValues(
90+
final @NotNull ProtocolAdapterDiscoveryInput input,
91+
final @NotNull ProtocolAdapterDiscoveryOutput output) {
92+
Objects.requireNonNull(client, "OPC UA Adapter not started yet");
93+
94+
final NodeId browseRoot;
95+
if (input.getRootNode() == null || input.getRootNode().isBlank()) {
96+
browseRoot = Identifiers.ObjectsFolder;
97+
} else {
98+
final Optional<NodeId> parsedNodeId = NodeId.parseSafe(input.getRootNode());
99+
if (parsedNodeId.isEmpty()) {
100+
throw new IllegalArgumentException("OPC UA NodeId '" + input.getRootNode() + "' is not supported");
101+
}
102+
browseRoot = parsedNodeId.get();
103+
}
104+
105+
browse(client, browseRoot, null, (ref, parent) -> {
106+
final String name = ref.getBrowseName() != null ? ref.getBrowseName().getName() : "";
107+
final String displayName = ref.getDisplayName() != null ? ref.getDisplayName().getText() : "";
108+
final NodeType nodeType = getNodeType(ref);
109+
output.getNodeTree()
110+
.addNode(ref.getNodeId().toParseableString(),
111+
requireNonNullElse(name, ""),
112+
ref.getNodeId().toParseableString(),
113+
requireNonNullElse(displayName, ""),
114+
parent != null ? parent.getNodeId().toParseableString() : null,
115+
nodeType != null ? nodeType : NodeType.VALUE,
116+
nodeType == NodeType.VALUE);
117+
}, input.getDepth()).whenComplete((aVoid, t) -> {
118+
if (t != null) {
119+
output.fail(t, null);
120+
} else {
121+
output.finish();
122+
}
123+
});
124+
}
125+
126+
public void write(final @NotNull WritingInput writingInput, final @NotNull WritingOutput writingOutput) {
127+
final OpcUaPayload opcUAWritePayload = (OpcUaPayload) writingInput.getWritingPayload();
128+
final MqttToOpcUaMapping writeContext = (MqttToOpcUaMapping) writingInput.getWritingContext();
129+
log.debug("Write for opcua is invoked with payload '{}' and context '{}' ", opcUAWritePayload, writeContext);
130+
final NodeId nodeId = NodeId.parse(writeContext.getNode());
131+
132+
jsonToOpcUAConverter
133+
.map(conv -> conv.convertToOpcUAValue(opcUAWritePayload.getValue(), nodeId))
134+
.ifPresentOrElse(
135+
opcUaObject -> {
136+
final Variant variant = new Variant(opcUaObject);
137+
final DataValue dataValue = new DataValue(variant, null, null);
138+
final CompletableFuture<StatusCode> writeFuture = client.writeValue(nodeId, dataValue);
139+
writeFuture.whenComplete((statusCode, throwable) -> {
140+
if (throwable != null) {
141+
log.error("Exception while writing to opcua node '{}'", writeContext.getNode(), throwable);
142+
writingOutput.fail(throwable, null);
143+
} else {
144+
log.info("Wrote '{}' to nodeId={}", variant, nodeId);
145+
writingOutput.finish();
146+
}
147+
});
148+
},
149+
() -> writingOutput.fail("JsonToOpcUaConverter not available"));
150+
151+
}
152+
153+
private static @NotNull CompletableFuture<Void> browse(
154+
final @NotNull OpcUaClient client,
155+
final @NotNull NodeId browseRoot,
156+
final @Nullable ReferenceDescription parent,
157+
final @NotNull BiConsumer<ReferenceDescription, ReferenceDescription> callback,
158+
final int depth) {
159+
final BrowseDescription browse = new BrowseDescription(browseRoot,
160+
BrowseDirection.Forward,
161+
null,
162+
true,
163+
uint(0),
164+
uint(BrowseResultMask.All.getValue()));
165+
166+
return client
167+
.browse(browse)
168+
.thenCompose(browseResult -> handleBrowseResult(client, parent, callback, depth, browseResult));
169+
}
170+
171+
private static @NotNull CompletableFuture<Void> handleBrowseResult(
172+
final @NotNull OpcUaClient client,
173+
final @Nullable ReferenceDescription parent,
174+
final @NotNull BiConsumer<ReferenceDescription, ReferenceDescription> callback,
175+
final int depth,
176+
final BrowseResult browseResult) {
177+
final ReferenceDescription[] references = browseResult.getReferences();
178+
179+
final ImmutableList.Builder<CompletableFuture<Void>> childFutures = ImmutableList.builder();
180+
181+
if (references == null) {
182+
return CompletableFuture.completedFuture(null);
183+
}
184+
185+
for (final ReferenceDescription rd : references) {
186+
callback.accept(rd, parent);
187+
// recursively browse to children
188+
if (depth > 1) {
189+
final Optional<NodeId> childNodeId = rd.getNodeId().toNodeId(client.getNamespaceTable());
190+
childNodeId.ifPresent(nodeId -> childFutures.add(browse(client, nodeId, rd, callback, depth - 1)));
191+
}
192+
}
193+
194+
final ByteString continuationPoint = browseResult.getContinuationPoint();
195+
if (continuationPoint != null && !continuationPoint.isNull()) {
196+
childFutures.add(
197+
Objects.requireNonNull(client)
198+
.browseNext(false, continuationPoint)
199+
.thenCompose(nextBrowseResult ->
200+
handleBrowseResult(client, parent, callback, depth, nextBrowseResult)));
201+
}
202+
203+
return CompletableFuture.allOf(childFutures.build().toArray(new CompletableFuture[]{}));
204+
}
205+
206+
private static @Nullable NodeType getNodeType(final @NotNull ReferenceDescription ref) {
207+
switch (ref.getNodeClass()) {
208+
case Object:
209+
case ObjectType:
210+
case VariableType:
211+
case ReferenceType:
212+
case DataType:
213+
return NodeType.OBJECT;
214+
case Variable:
215+
return NodeType.VALUE;
216+
case View:
217+
return NodeType.FOLDER;
218+
default:
219+
return null;
220+
}
221+
}
222+
223+
public static CompletableFuture<OpcUaClientWrapper> createAndConnect(
224+
final @NotNull OpcUaAdapterConfig adapterConfig,
225+
final @NotNull ProtocolAdapterState protocolAdapterState,
226+
final @NotNull ModuleServices moduleServices,
227+
final @NotNull String id,
228+
final @NotNull String protocolId,
229+
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService
230+
) throws UaException {
231+
final String configPolicyUri = adapterConfig.getSecurity().getPolicy().getSecurityPolicy().getUri();
232+
233+
OpcUaClient opcUaClient = OpcUaClient.create(adapterConfig.getUri(),
234+
new OpcUaEndpointFilter(configPolicyUri, adapterConfig),
235+
new OpcUaClientConfigurator(adapterConfig));
236+
//Decoding a struct with custom DataType requires a DataTypeManager, so we register one that updates each time a session is activated.
237+
opcUaClient.addSessionInitializer(new DataTypeDictionarySessionInitializer(new GenericBsdParser()));
238+
239+
//-- Seems to be not connection monitoring hook, use the session activity listener
240+
opcUaClient.addSessionActivityListener(new SessionActivityListener() {
241+
@Override
242+
public void onSessionInactive(final @NotNull UaSession session) {
243+
log.info("OPC UA client of protocol adapter '{}' disconnected: {}", id, session);
244+
protocolAdapterState.setConnectionStatus(DISCONNECTED);
245+
}
246+
247+
@Override
248+
public void onSessionActive(final @NotNull UaSession session) {
249+
log.info("OPC UA client of protocol adapter '{}' connected: {}", id, session);
250+
protocolAdapterState.setConnectionStatus(CONNECTED);
251+
}
252+
});
253+
opcUaClient.addFaultListener(serviceFault -> {
254+
log.info("OPC UA client of protocol adapter '{}' detected a service fault: {}", id, serviceFault);
255+
moduleServices.eventService()
256+
.createAdapterEvent(adapterConfig.getId(), protocolId)
257+
.withSeverity(Event.SEVERITY.ERROR)
258+
.withPayload(serviceFault.getResponseHeader().getServiceResult())
259+
.withMessage("A Service Fault was Detected.")
260+
.fire();
261+
});
262+
263+
return opcUaClient.connect().thenCompose(uaClient -> {
264+
OpcUaSubscriptionLifecycle opcUaSubscriptionLifecycle = new OpcUaSubscriptionLifecycle(
265+
opcUaClient,
266+
adapterConfig.getId(),
267+
protocolId,
268+
protocolAdapterMetricsService,
269+
moduleServices.eventService(),
270+
moduleServices.adapterPublishService());
271+
272+
opcUaClient.getSubscriptionManager()
273+
.addSubscriptionListener(opcUaSubscriptionLifecycle);
274+
275+
try {
276+
Optional<JsonToOpcUAConverter> jsonToOpcUAConverterOpt = Optional.of(new JsonToOpcUAConverter(opcUaClient));
277+
Optional<JsonSchemaGenerator> jsonSchemaGeneratorOpt = Optional.of(new JsonSchemaGenerator(opcUaClient , new ObjectMapper()));
278+
return opcUaSubscriptionLifecycle
279+
.subscribeAll(adapterConfig.getOpcuaToMqttConfig().getOpcuaToMqttMappings())
280+
.thenApply(ignored -> new OpcUaClientWrapper(opcUaClient, opcUaSubscriptionLifecycle, jsonToOpcUAConverterOpt, jsonSchemaGeneratorOpt));
281+
} catch (final UaException e) {
282+
log.error("Unable to create the converters for writing.", e);
283+
Optional<JsonToOpcUAConverter> jsonToOpcUAConverterOpt = Optional.empty();
284+
Optional<JsonSchemaGenerator> jsonSchemaGeneratorOpt = Optional.empty();
285+
return opcUaSubscriptionLifecycle
286+
.subscribeAll(adapterConfig.getOpcuaToMqttConfig().getOpcuaToMqttMappings())
287+
.thenApply(ignored -> new OpcUaClientWrapper(opcUaClient, opcUaSubscriptionLifecycle, jsonToOpcUAConverterOpt, jsonSchemaGeneratorOpt));
288+
}
289+
});
290+
}
291+
292+
}

0 commit comments

Comments
 (0)