Skip to content

Commit 27166e0

Browse files
authored
Fixed a situation where rappidly restarting the adapter could lead to two open clients producing multiple tag reads (#1126)
1 parent 3e71064 commit 27166e0

File tree

3 files changed

+16
-14
lines changed

3 files changed

+16
-14
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class OpcUaClientConnection {
9090
this.lastKnownSubscriptionId = lastSubscriptionId;
9191
}
9292

93-
@NotNull boolean start(final ParsedConfig parsedConfig) {
93+
@NotNull synchronized boolean start(final ParsedConfig parsedConfig) {
9494
final var subscriptionIdOptional = Optional.ofNullable(lastKnownSubscriptionId.get());
9595
log.debug("Subscribing to OPC UA client with subscriptionId: {}", subscriptionIdOptional.orElse(null));
9696
final OpcUaClient client;
@@ -141,7 +141,7 @@ class OpcUaClientConnection {
141141
return true;
142142
}
143143

144-
void stop() {
144+
synchronized void stop() {
145145
log.info("Stopping OPC UA client");
146146
final ConnectionContext ctx = context.get();
147147
if(ctx != null) {

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public OpcUaProtocolAdapter(
9595
}
9696

9797
@Override
98-
public void start(
98+
public synchronized void start(
9999
final @NotNull ProtocolAdapterStartInput input,
100100
final @NotNull ProtocolAdapterStartOutput output) {
101101
log.info("Starting OPC UA protocol adapter {}", adapterId);
@@ -112,9 +112,15 @@ public void start(
112112
return;
113113
}
114114

115+
if (opcUaClientConnection != null) {
116+
log.warn("Tried starting already started OPC UA protocol adapter {}", adapterId);
117+
output.startedSuccessfully();
118+
return;
119+
}
120+
115121
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
116122

117-
final var newOpcUaClientConnection = new OpcUaClientConnection(
123+
opcUaClientConnection = new OpcUaClientConnection(
118124
adapterId,
119125
tagList,
120126
protocolAdapterState,
@@ -126,37 +132,32 @@ public void start(
126132
lastSubscriptionId);
127133

128134
CompletableFuture
129-
.supplyAsync(() -> newOpcUaClientConnection.start(parsedConfig))
135+
.supplyAsync(() -> opcUaClientConnection.start(parsedConfig))
130136
.whenComplete((success, throwable) -> {
131-
if(throwable != null) {
137+
if(!success || throwable != null) {
138+
this.opcUaClientConnection = null;
132139
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
133140
log.error("Failed to start OPC UA client", throwable);
134141
return;
135142
}
136-
if(success) {
137-
this.opcUaClientConnection = newOpcUaClientConnection;
138-
} else {
139-
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
140-
log.error("Failed to start OPC UA client", throwable);
141-
}
142143
});
143144

144145
log.info("Successfully started OPC UA protocol adapter {}", adapterId);
145146
output.startedSuccessfully();
146147
}
147148

148149
@Override
149-
public void stop(final @NotNull ProtocolAdapterStopInput input, final @NotNull ProtocolAdapterStopOutput output) {
150+
public synchronized void stop(final @NotNull ProtocolAdapterStopInput input, final @NotNull ProtocolAdapterStopOutput output) {
150151
log.info("Stopping OPC UA protocol adapter {}", adapterId);
151152
final var tempOpcUaClientConnection = opcUaClientConnection;
152153
if(tempOpcUaClientConnection != null) {
154+
opcUaClientConnection = null;
153155
tempOpcUaClientConnection.stop();
154156
output.stoppedSuccessfully();
155157
} else {
156158
log.info("Tried stopping stopped OPC UA protocol adapter {}", adapterId);
157159
output.stoppedSuccessfully();
158160
}
159-
160161
}
161162

162163
@Override

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void onDataReceived(
107107
try {
108108
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_DATA_RECEIVED_COUNT);
109109
final String payload = extractPayload(client, values.get(i));
110+
log.error("RECEIVED " + payload.replace("\n","") + " " + this.hashCode());
110111
tagStreamingService.feed(tn, List.of(dataPointFactory.createJsonDataPoint(tn, payload)));
111112
} catch (final Throwable e) {
112113
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_DATA_ERROR_COUNT);

0 commit comments

Comments
 (0)