Skip to content

Commit 485695b

Browse files
committed
Add better connection and timeout handling
1 parent edf47eb commit 485695b

File tree

2 files changed

+179
-23
lines changed

2 files changed

+179
-23
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,13 @@
4545

4646
import java.util.List;
4747
import java.util.Optional;
48+
import java.util.concurrent.CompletableFuture;
49+
import java.util.concurrent.ExecutionException;
50+
import java.util.concurrent.TimeUnit;
51+
import java.util.concurrent.TimeoutException;
4852
import java.util.concurrent.atomic.AtomicReference;
4953

5054
import static com.hivemq.edge.adapters.opcua.Constants.PROTOCOL_ID_OPCUA;
51-
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
5255

5356
public class OpcUaClientConnection {
5457
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);
@@ -72,8 +75,7 @@ public class OpcUaClientConnection {
7275
final @NotNull DataPointFactory dataPointFactory,
7376
final @NotNull EventService eventService,
7477
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
75-
final @NotNull OpcUaSpecificAdapterConfig config,
76-
final @NotNull AtomicReference<UInteger> lastSubscriptionId) {
78+
final @NotNull OpcUaSpecificAdapterConfig config) {
7779
this.config = config;
7880
this.tagStreamingService = tagStreamingService;
7981
this.dataPointFactory = dataPointFactory;
@@ -122,12 +124,56 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
122124
ignore -> {},
123125
new OpcUaClientConfigurator(adapterId, parsedConfig));
124126
client.addFaultListener(faultListener);
125-
client.connect();
127+
128+
// Add timeout to connection attempt to prevent hanging forever
129+
// Wrap synchronous connect() call with CompletableFuture timeout
130+
try {
131+
CompletableFuture.runAsync(() -> {
132+
try {
133+
client.connect();
134+
} catch (final UaException e) {
135+
throw new RuntimeException(e);
136+
}
137+
}).get(30, TimeUnit.SECONDS);
138+
log.debug("OPC UA client connected successfully for adapter '{}'", adapterId);
139+
} catch (final TimeoutException e) {
140+
log.error("Connection timeout after 30 seconds for OPC UA adapter '{}'", adapterId);
141+
eventService
142+
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
143+
.withMessage("Connection timeout after 30 seconds for adapter '" + adapterId + "'")
144+
.withSeverity(Event.SEVERITY.ERROR)
145+
.fire();
146+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
147+
quietlyCloseClient(client, false, faultListener, null);
148+
return false;
149+
} catch (final InterruptedException e) {
150+
Thread.currentThread().interrupt();
151+
log.error("Connection interrupted for OPC UA adapter '{}'", adapterId, e);
152+
eventService
153+
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
154+
.withMessage("Connection interrupted for adapter '" + adapterId + "'")
155+
.withSeverity(Event.SEVERITY.ERROR)
156+
.fire();
157+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
158+
quietlyCloseClient(client, false, faultListener, null);
159+
return false;
160+
} catch (final ExecutionException e) {
161+
final Throwable cause = e.getCause();
162+
log.error("Connection failed for OPC UA adapter '{}'", adapterId, cause);
163+
eventService
164+
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
165+
.withMessage("Connection failed for adapter '" + adapterId + "': " + cause.getMessage())
166+
.withSeverity(Event.SEVERITY.ERROR)
167+
.fire();
168+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
169+
quietlyCloseClient(client, false, faultListener, null);
170+
return false;
171+
}
126172
} catch (final UaException e) {
127-
log.error("Unable to connect and subscribe to the OPC UA server for adapter '{}'", adapterId, e);
173+
log.error("Unable to create OPC UA client for adapter '{}'", adapterId, e);
128174
eventService
129175
.createAdapterEvent(adapterId, PROTOCOL_ID_OPCUA)
130-
.withMessage("Unable to connect and subscribe to the OPC UA server for adapter '" + adapterId + "'")
176+
.withMessage("Unable to create OPC UA client for adapter '" + adapterId + "'")
131177
.withSeverity(Event.SEVERITY.ERROR)
132178
.fire();
133179
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java

Lines changed: 127 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
4747
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
4848
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
49-
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
5049
import org.jetbrains.annotations.NotNull;
5150
import org.jetbrains.annotations.VisibleForTesting;
5251
import org.slf4j.Logger;
@@ -55,12 +54,17 @@
5554
import java.util.List;
5655
import java.util.Map;
5756
import java.util.concurrent.CompletableFuture;
57+
import java.util.concurrent.Executors;
58+
import java.util.concurrent.ScheduledExecutorService;
59+
import java.util.concurrent.ScheduledFuture;
60+
import java.util.concurrent.TimeUnit;
5861
import java.util.concurrent.atomic.AtomicReference;
5962
import java.util.function.Function;
6063
import java.util.stream.Collectors;
6164

6265
public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
6366
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaProtocolAdapter.class);
67+
private static final int RETRY_DELAY_SECONDS = 30;
6468

6569
private final @NotNull ProtocolAdapterInformation adapterInformation;
6670
private final @NotNull ProtocolAdapterState protocolAdapterState;
@@ -72,7 +76,8 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
7276
private final @NotNull DataPointFactory dataPointFactory;
7377
private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
7478
private final @NotNull OpcUaSpecificAdapterConfig config;
75-
private final @NotNull AtomicReference<UInteger> lastSubscriptionId = new AtomicReference<>();
79+
private final @NotNull ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor();
80+
private final @NotNull AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
7681

7782
public OpcUaProtocolAdapter(
7883
final @NotNull ProtocolAdapterInformation adapterInformation,
@@ -100,13 +105,13 @@ public synchronized void start(
100105
log.info("Starting OPC UA protocol adapter {}", adapterId);
101106
final ParsedConfig parsedConfig;
102107
final var result = ParsedConfig.fromConfig(config);
103-
if (result instanceof final Failure<ParsedConfig, String> failure) {
104-
log.error("Failed to parse configuration for OPC UA client: {}", failure.failure());
105-
output.failStart(new IllegalStateException(failure.failure()),
108+
if (result instanceof Failure<ParsedConfig, String>(final String failure)) {
109+
log.error("Failed to parse configuration for OPC UA client: {}", failure);
110+
output.failStart(new IllegalStateException(failure),
106111
"Failed to parse configuration for OPC UA client");
107112
return;
108-
} else if (result instanceof final Success<ParsedConfig, String> success) {
109-
parsedConfig = success.result();
113+
} else if (result instanceof Success<ParsedConfig, String>(final ParsedConfig result1)) {
114+
parsedConfig = result1;
110115
} else {
111116
output.failStart(new IllegalStateException("Unexpected result type: " + result.getClass().getName()),
112117
"Failed to parse configuration for OPC UA client");
@@ -121,22 +126,23 @@ public synchronized void start(
121126
dataPointFactory,
122127
input.moduleServices().eventService(),
123128
protocolAdapterMetricsService,
124-
config,
125-
lastSubscriptionId))) {
129+
config))) {
126130

127131
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
128-
CompletableFuture.supplyAsync(() -> conn.start(parsedConfig)).whenComplete((success, throwable) -> {
129-
if (!success || throwable != null) {
130-
this.opcUaClientConnection.set(null);
131-
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
132-
log.error("Failed to start OPC UA client", throwable);
133-
}
134-
});
135132

133+
// Attempt initial connection asynchronously
134+
attemptConnection(conn, parsedConfig, input);
135+
136+
// Adapter starts successfully even if connection isn't established yet
137+
// Hardware may come online later and automatic retry will connect
136138
log.info("Successfully started OPC UA protocol adapter {}", adapterId);
137139
output.startedSuccessfully();
138140
} else {
139-
log.warn("Tried starting already started OPC UA protocol adapter {}", adapterId);
141+
log.error("Cannot start OPC UA protocol adapter '{}' - adapter is already started", adapterId);
142+
output.failStart(
143+
new IllegalStateException("Adapter already started"),
144+
"Cannot start already started adapter. Please stop the adapter first."
145+
);
140146
}
141147
}
142148

@@ -145,6 +151,10 @@ public synchronized void stop(
145151
final @NotNull ProtocolAdapterStopInput input,
146152
final @NotNull ProtocolAdapterStopOutput output) {
147153
log.info("Stopping OPC UA protocol adapter {}", adapterId);
154+
155+
// Cancel any pending retries
156+
cancelRetry();
157+
148158
final OpcUaClientConnection conn = opcUaClientConnection.getAndSet(null);
149159
if (conn != null) {
150160
conn.stop();
@@ -157,6 +167,21 @@ public synchronized void stop(
157167
@Override
158168
public void destroy() {
159169
log.info("Destroying OPC UA protocol adapter {}", adapterId);
170+
171+
// Cancel any pending retries
172+
cancelRetry();
173+
174+
// Shutdown retry scheduler
175+
retryScheduler.shutdown();
176+
try {
177+
if (!retryScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
178+
retryScheduler.shutdownNow();
179+
}
180+
} catch (final InterruptedException e) {
181+
Thread.currentThread().interrupt();
182+
retryScheduler.shutdownNow();
183+
}
184+
160185
final OpcUaClientConnection conn = opcUaClientConnection.getAndSet(null);
161186
if (conn != null) {
162187
CompletableFuture.runAsync(() -> {
@@ -300,4 +325,89 @@ public void createTagSchema(
300325
public @NotNull ProtocolAdapterState getProtocolAdapterState() {
301326
return protocolAdapterState;
302327
}
328+
329+
/**
330+
* Attempts to establish connection to OPC UA server.
331+
* On failure, schedules automatic retry after RETRY_DELAY_SECONDS.
332+
*/
333+
private void attemptConnection(
334+
final @NotNull OpcUaClientConnection conn,
335+
final @NotNull ParsedConfig parsedConfig,
336+
final @NotNull ProtocolAdapterStartInput input) {
337+
338+
CompletableFuture.supplyAsync(() -> conn.start(parsedConfig)).whenComplete((success, throwable) -> {
339+
if (success && throwable == null) {
340+
// Connection succeeded - cancel any pending retries
341+
cancelRetry();
342+
log.info("OPC UA adapter '{}' connected successfully", adapterId);
343+
} else {
344+
// Connection failed - clean up and schedule retry
345+
this.opcUaClientConnection.set(null);
346+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.ERROR);
347+
348+
if (throwable != null) {
349+
log.warn("OPC UA adapter '{}' connection failed, will retry in {} seconds",
350+
adapterId, RETRY_DELAY_SECONDS, throwable);
351+
} else {
352+
log.warn("OPC UA adapter '{}' connection returned false, will retry in {} seconds",
353+
adapterId, RETRY_DELAY_SECONDS);
354+
}
355+
356+
// Schedule retry attempt
357+
scheduleRetry(parsedConfig, input);
358+
}
359+
});
360+
}
361+
362+
/**
363+
* Schedules a retry attempt after RETRY_DELAY_SECONDS.
364+
*/
365+
private void scheduleRetry(
366+
final @NotNull ParsedConfig parsedConfig,
367+
final @NotNull ProtocolAdapterStartInput input) {
368+
369+
final ScheduledFuture<?> future = retryScheduler.schedule(() -> {
370+
// Check if adapter was stopped before retry executes
371+
if (opcUaClientConnection.get() == null) {
372+
log.debug("OPC UA adapter '{}' retry cancelled - adapter was stopped", adapterId);
373+
return;
374+
}
375+
376+
log.info("Retrying connection for OPC UA adapter '{}'", adapterId);
377+
378+
// Create new connection object for retry
379+
final OpcUaClientConnection newConn = new OpcUaClientConnection(adapterId,
380+
tagList,
381+
protocolAdapterState,
382+
input.moduleServices().protocolAdapterTagStreamingService(),
383+
dataPointFactory,
384+
input.moduleServices().eventService(),
385+
protocolAdapterMetricsService,
386+
config);
387+
388+
// Set as current connection and attempt
389+
if (opcUaClientConnection.compareAndSet(null, newConn)) {
390+
attemptConnection(newConn, parsedConfig, input);
391+
} else {
392+
log.debug("OPC UA adapter '{}' retry skipped - connection already exists", adapterId);
393+
}
394+
}, RETRY_DELAY_SECONDS, TimeUnit.SECONDS);
395+
396+
// Store future so it can be cancelled if needed
397+
final ScheduledFuture<?> oldFuture = retryFuture.getAndSet(future);
398+
if (oldFuture != null && !oldFuture.isDone()) {
399+
oldFuture.cancel(false);
400+
}
401+
}
402+
403+
/**
404+
* Cancels any pending retry attempts.
405+
*/
406+
private void cancelRetry() {
407+
final ScheduledFuture<?> future = retryFuture.getAndSet(null);
408+
if (future != null && !future.isDone()) {
409+
future.cancel(false);
410+
log.debug("Cancelled pending retry for OPC UA adapter '{}'", adapterId);
411+
}
412+
}
303413
}

0 commit comments

Comments
 (0)