Skip to content

Commit a647f2a

Browse files
authored
Fix/opcua interruption (#1147)
* Proper reconnect for OPC UA * Cleanup the subscription handling * flatMap to map
1 parent 0382362 commit a647f2a

File tree

6 files changed

+274
-300
lines changed

6 files changed

+274
-300
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ json-schema-inferrer = "0.2.1"
3636
json-schema-validator = "1.5.8"
3737
junit-jupiter = "5.13.2"
3838
logback = "1.5.18"
39-
milo = "1.0.0"
39+
milo = "1.0.5"
4040
mockito = "5.17.0"
4141
mqtt-sn-codec = "838f51d691"
4242
mssql="12.8.1.jre11"

modules/hivemq-edge-module-opcua/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ dependencies {
3636
implementation(libs.milo.encoding.json)
3737
implementation(libs.milo.encoding.xml)
3838
implementation(libs.milo.client)
39-
implementation(libs.milo.server)
4039
implementation(libs.milo.dtd.reader)
4140
implementation(libs.milo.dtd.manager)
4241
errorprone(libs.errorprone)
4342
}
4443

4544
dependencies {
45+
testImplementation(libs.milo.server)
46+
4647
testImplementation("com.hivemq:hivemq-edge")
4748
testImplementation(libs.jackson.databind)
4849
testImplementation(libs.hivemq.edge.adaptersdk)

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java

Lines changed: 10 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,26 @@
2828
import com.hivemq.edge.adapters.opcua.config.tag.OpcuaTag;
2929
import com.hivemq.edge.adapters.opcua.listeners.OpcUaServiceFaultListener;
3030
import com.hivemq.edge.adapters.opcua.listeners.OpcUaSessionActivityListener;
31-
import com.hivemq.edge.adapters.opcua.listeners.OpcUaSubscriptionListener;
31+
import com.hivemq.edge.adapters.opcua.listeners.OpcUaSubscriptionLifecycleHandler;
3232
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
3333
import org.eclipse.milo.opcua.sdk.client.ServiceFaultListener;
3434
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
35-
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaMonitoredItem;
3635
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
3736
import org.eclipse.milo.opcua.stack.core.UaException;
38-
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
3937
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
40-
import org.eclipse.milo.opcua.stack.core.types.structured.TransferSubscriptionsResponse;
4138
import org.jetbrains.annotations.NotNull;
4239
import org.jetbrains.annotations.Nullable;
4340
import org.slf4j.Logger;
4441
import org.slf4j.LoggerFactory;
4542

4643
import java.util.List;
47-
import java.util.Map;
4844
import java.util.Optional;
4945
import java.util.concurrent.atomic.AtomicReference;
50-
import java.util.function.Function;
51-
import java.util.stream.Collectors;
5246

5347
import static com.hivemq.edge.adapters.opcua.Constants.PROTOCOL_ID_OPCUA;
5448
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
5549

56-
class OpcUaClientConnection {
50+
public class OpcUaClientConnection {
5751
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);
5852

5953
private final @NotNull OpcUaSpecificAdapterConfig config;
@@ -65,8 +59,6 @@ class OpcUaClientConnection {
6559
private final @NotNull ProtocolAdapterState protocolAdapterState;
6660
private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
6761

68-
private final @NotNull AtomicReference<UInteger> lastKnownSubscriptionId;
69-
7062
private final @NotNull AtomicReference<ConnectionContext> context = new AtomicReference<>();
7163

7264
OpcUaClientConnection(
@@ -87,12 +79,10 @@ class OpcUaClientConnection {
8779
this.adapterId = adapterId;
8880
this.protocolAdapterState = protocolAdapterState;
8981
this.tags = tags;
90-
this.lastKnownSubscriptionId = lastSubscriptionId;
9182
}
9283

93-
@NotNull synchronized boolean start(final ParsedConfig parsedConfig) {
94-
final var subscriptionIdOptional = Optional.ofNullable(lastKnownSubscriptionId.get());
95-
log.debug("Subscribing to OPC UA client with subscriptionId: {}", subscriptionIdOptional.orElse(null));
84+
synchronized boolean start(final ParsedConfig parsedConfig) {
85+
log.debug("Subscribing to OPC UA client");
9686
final OpcUaClient client;
9787
final var faultListener = new OpcUaServiceFaultListener(protocolAdapterMetricsService, eventService, adapterId);
9888
final var activityListener = new OpcUaSessionActivityListener(protocolAdapterMetricsService, eventService, adapterId, protocolAdapterState);
@@ -117,7 +107,9 @@ class OpcUaClientConnection {
117107
return false;
118108
}
119109

120-
final var subscriptionOptional = subscribe(client, subscriptionIdOptional);
110+
final var subscriptionLifecycleHandler = new OpcUaSubscriptionLifecycleHandler(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory, config);
111+
112+
final var subscriptionOptional = subscriptionLifecycleHandler.subscribe(client);
121113

122114
if(subscriptionOptional.isEmpty()) {
123115
log.error("Failed to create or transfer OPC UA subscription. Closing client connection.");
@@ -167,156 +159,6 @@ void destroy() {
167159
return Optional.empty();
168160
}
169161

170-
/**
171-
* Subscribes to the OPC UA client.
172-
* If a subscription ID is provided, it attempts to transfer the subscription.
173-
* If the transfer fails or no ID is provided, it creates a new subscription.
174-
* It then synchronizes the tags and monitored items.
175-
*
176-
* @param client the OPC UA client
177-
* @param subscriptionOptional an Optional containing the subscription ID if available
178-
* @return an Optional containing the created or transferred subscription, or empty if failed
179-
*/
180-
private @NotNull Optional<OpcUaSubscription> subscribe(final @NotNull OpcUaClient client, final @NotNull Optional<UInteger> subscriptionOptional) {
181-
return subscriptionOptional
182-
.flatMap(subscriptionId -> transferSubscription(client, subscriptionId))
183-
.or(() -> createNewSubscription(client))
184-
.flatMap(subscription -> {
185-
subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval());
186-
subscription.setSubscriptionListener(new OpcUaSubscriptionListener(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory));
187-
if(syncTagsAndMonitoredItems(subscription, tags, config)) {
188-
return Optional.of(subscription);
189-
} else {
190-
return Optional.empty();
191-
}
192-
});
193-
}
194-
195-
/**
196-
* Creates a new OPC UA subscription.
197-
* If the subscription is created successfully, it returns an Optional containing the subscription.
198-
* If the subscription creation fails, it returns an empty Optional.
199-
*
200-
* @param client the OPC UA client
201-
* @return an Optional containing the created subscription or empty if creation failed
202-
*/
203-
private @NotNull Optional<OpcUaSubscription> createNewSubscription(final @NotNull OpcUaClient client) {
204-
log.debug("Creating new OPC UA subscription");
205-
final OpcUaSubscription subscription = new OpcUaSubscription(client);
206-
try {
207-
subscription.create();
208-
return subscription
209-
.getSubscriptionId()
210-
.map(subscriptionId -> {
211-
log.trace("New subscription ID: {}", subscriptionId);
212-
lastKnownSubscriptionId.set(subscriptionId);
213-
return subscription;
214-
})
215-
.or(() -> {
216-
log.error("Subscription not created on the server");
217-
return Optional.empty();
218-
});
219-
} catch (final UaException e) {
220-
log.error("Failed to create subscription", e);
221-
}
222-
return Optional.empty();
223-
}
224-
225-
/**
226-
* Transfers an existing subscription to the current client.
227-
* If the subscription is not found, it will return an empty Optional.
228-
*
229-
* @param client the OPC UA client
230-
* @param subscriptionId the subscription ID to transfer
231-
* @return an Optional containing the transferred subscription or empty if not found
232-
*/
233-
private static @NotNull Optional<OpcUaSubscription> transferSubscription(final @NotNull OpcUaClient client, final @NotNull UInteger subscriptionId) {
234-
log.debug("Transfer OPC UA subscription: {}", subscriptionId);
235-
final TransferSubscriptionsResponse response;
236-
try {
237-
response = client.transferSubscriptions(List.of(subscriptionId), true);
238-
} catch (final UaException e) {
239-
log.debug("OPC UA subscription not transferred to connection", e);
240-
return Optional.empty();
241-
}
242-
243-
final var results = response.getResults();
244-
if (results != null && results.length > 0) {
245-
if (results[0].getStatusCode().isGood()) {
246-
return client.getSubscriptions().stream()
247-
.filter(subscription ->
248-
subscription
249-
.getSubscriptionId()
250-
.map(currentSubscriptionId -> currentSubscriptionId.equals(subscriptionId))
251-
.orElse(false))
252-
.findFirst();
253-
} else {
254-
log.debug("OPC UA subscription not transferred to connection: {}", results[0].getStatusCode().toString());
255-
return Optional.empty();
256-
}
257-
} else {
258-
log.error("OPC UA subscription not transferred to connection: no results returned");
259-
return Optional.empty();
260-
}
261-
262-
}
263-
264-
/**
265-
* Synchronizes the tags and monitored items in the subscription.
266-
* It removes monitored items that are not in the tags list and adds new monitored items from the tags list.
267-
* It also updates existing monitored items with the configured queue size and sampling interval.
268-
*
269-
* @param subscription the OPC UA subscription
270-
* @param tags the list of tags to synchronize
271-
* @param config the configuration for the OPC UA adapter
272-
* @return true if synchronization was successful, false otherwise
273-
*/
274-
private static boolean syncTagsAndMonitoredItems(final @NotNull OpcUaSubscription subscription, final @NotNull List<OpcuaTag> tags, final @NotNull OpcUaSpecificAdapterConfig config) {
275-
276-
final var nodeIdToTag = tags.stream().collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
277-
final var nodeIdToMonitoredItem = subscription.getMonitoredItems().stream().collect(Collectors.toMap(monitoredItem -> monitoredItem.getReadValueId().getNodeId(), Function.identity()));
278-
279-
final var monitoredItemsToRemove = nodeIdToMonitoredItem.entrySet().stream().filter(entry -> !nodeIdToTag.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();
280-
final var monitoredItemsToAdd = nodeIdToTag.entrySet().stream().filter(entry -> !nodeIdToMonitoredItem.containsKey(entry.getKey())).map(Map.Entry::getValue).toList();
281-
282-
//clear deleted monitored items
283-
if(!monitoredItemsToRemove.isEmpty()) {
284-
subscription.removeMonitoredItems(monitoredItemsToRemove);
285-
log.debug("Removed monitored items: {}", monitoredItemsToRemove.stream().map(item -> item.getReadValueId().getNodeId()));
286-
}
287-
288-
//update existing monitored items
289-
subscription.getMonitoredItems().forEach(monitoredItem -> {
290-
//TODO: allow to configure these values per TAG!!!!
291-
monitoredItem.setQueueSize(uint(config.getOpcuaToMqttConfig().serverQueueSize()));
292-
monitoredItem.setSamplingInterval(config.getOpcuaToMqttConfig().publishingInterval());
293-
});
294-
295-
//add new monitored items
296-
if(!monitoredItemsToAdd.isEmpty()) {
297-
monitoredItemsToAdd.forEach(opcuaTag -> {
298-
final String nodeId = opcuaTag.getDefinition().getNode();
299-
final var monitoredItem = OpcUaMonitoredItem.newDataItem(NodeId.parse(nodeId));
300-
monitoredItem.setQueueSize(uint(config.getOpcuaToMqttConfig().serverQueueSize()));
301-
monitoredItem.setSamplingInterval(config.getOpcuaToMqttConfig().publishingInterval());
302-
subscription.addMonitoredItem(monitoredItem);
303-
});
304-
log.debug("Added monitored items: {}", monitoredItemsToAdd.stream().map(item -> item.getDefinition().getNode()).toList());
305-
}
306-
307-
try {
308-
subscription.synchronizeMonitoredItems();
309-
log.info("All monitored items synchronized successfully");
310-
return true;
311-
} catch (final UaException e) {
312-
log.error("Failed to synchronize monitored items: {} {}", e.getStatusCode(), e.getMessage(), e);
313-
return false;
314-
}
315-
}
316-
317-
private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
318-
}
319-
320162
private static void quietlyDeleteSubscription(
321163
final @NotNull OpcUaClient client,
322164
final @NotNull OpcUaSubscription subscription) {
@@ -367,4 +209,7 @@ private static void quietlyCloseClient(
367209
log.error("Failed to disconnect: {}", e.getMessage());
368210
}
369211
}
212+
213+
private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
214+
}
370215
}

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/client/OpcUaClientConfigurator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ public OpcUaClientConfigurator(final @NotNull String adapterId, final @NotNull P
3939

4040
@Override
4141
public void accept(final @NotNull OpcUaClientConfigBuilder configBuilder) {
42-
configBuilder.setApplicationName(LocalizedText.english(Constants.OPCUA_APPLICATION_NAME));
43-
configBuilder.setApplicationUri(Constants.OPCUA_APPLICATION_URI);
44-
configBuilder.setProductUri(Constants.OPCUA_PRODUCT_URI);
45-
configBuilder.setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId);
42+
configBuilder
43+
.setApplicationName(LocalizedText.english(Constants.OPCUA_APPLICATION_NAME))
44+
.setApplicationUri(Constants.OPCUA_APPLICATION_URI)
45+
.setProductUri(Constants.OPCUA_PRODUCT_URI)
46+
.setSessionName(() -> Constants.OPCUA_SESSION_NAME_PREFIX + adapterId);
4647

4748
log.info("TLS is enabled: {}", parsedConfig.tlsEnabled());
4849
if (parsedConfig.tlsEnabled()) {

0 commit comments

Comments
 (0)