Skip to content

Commit 48a296b

Browse files
committed
Extract reconnect and isHealthy methods
1 parent 0e6372c commit 48a296b

File tree

2 files changed

+106
-6
lines changed

2 files changed

+106
-6
lines changed

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaClientConnection.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ synchronized boolean start(final ParsedConfig parsedConfig) {
204204
final var subscription = subscriptionOptional.get();
205205
log.trace("Creating Subscription for OPC UA client");
206206

207-
context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener));
207+
context.set(new ConnectionContext(subscription.getClient(), faultListener, activityListener, subscriptionLifecycleHandler));
208208
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
209209
client.addSessionActivityListener(activityListener);
210210

@@ -250,6 +250,40 @@ void destroy() {
250250
}
251251
}
252252

253+
/**
254+
* Checks if the connection is healthy by verifying both:
255+
* 1. Session is active (client connection exists and session is present)
256+
* 2. Keep-alive messages are being received (subscription is healthy)
257+
*
258+
* @return true if connection is healthy, false otherwise
259+
*/
260+
public boolean isHealthy() {
261+
final ConnectionContext ctx = context.get();
262+
if (ctx == null) {
263+
log.debug("Connection health check failed: no connection context");
264+
return false;
265+
}
266+
267+
try {
268+
// Check 1: Session is active
269+
if (ctx.client().getSession().isEmpty()) {
270+
log.debug("Connection health check failed: session inactive for adapter '{}'", adapterId);
271+
return false;
272+
}
273+
274+
// Check 2: Keep-alive is healthy
275+
if (!ctx.subscriptionHandler().isKeepAliveHealthy()) {
276+
log.debug("Connection health check failed: keep-alive timeout for adapter '{}'", adapterId);
277+
return false;
278+
}
279+
280+
return true;
281+
} catch (final UaException e) {
282+
log.debug("Connection health check failed with exception for adapter '{}'", adapterId, e);
283+
return false;
284+
}
285+
}
286+
253287
@NotNull Optional<OpcUaClient> client() {
254288
final ConnectionContext ctx = context.get();
255289
if(ctx != null) {
@@ -361,6 +395,9 @@ private void cancelHealthCheck() {
361395
}
362396
}
363397

364-
private record ConnectionContext(@NotNull OpcUaClient client, @NotNull ServiceFaultListener faultListener, @NotNull SessionActivityListener activityListener) {
398+
private record ConnectionContext(@NotNull OpcUaClient client,
399+
@NotNull ServiceFaultListener faultListener,
400+
@NotNull SessionActivityListener activityListener,
401+
@NotNull OpcUaSubscriptionLifecycleHandler subscriptionHandler) {
365402
}
366403
}

modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/OpcUaProtocolAdapter.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.hivemq.adapter.sdk.api.model.ProtocolAdapterStopOutput;
2828
import com.hivemq.adapter.sdk.api.schema.TagSchemaCreationInput;
2929
import com.hivemq.adapter.sdk.api.schema.TagSchemaCreationOutput;
30+
import com.hivemq.adapter.sdk.api.services.ModuleServices;
3031
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService;
3132
import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState;
3233
import com.hivemq.adapter.sdk.api.writing.WritingContext;
@@ -78,6 +79,10 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
7879
private final @NotNull ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor();
7980
private final @NotNull AtomicReference<ScheduledFuture<?>> retryFuture = new AtomicReference<>();
8081

82+
// Stored for reconnection - set during start()
83+
private volatile ParsedConfig parsedConfig;
84+
private volatile ModuleServices moduleServices;
85+
8186
public OpcUaProtocolAdapter(
8287
final @NotNull ProtocolAdapterInformation adapterInformation,
8388
final @NotNull ProtocolAdapterInput<OpcUaSpecificAdapterConfig> input) {
@@ -111,6 +116,9 @@ public synchronized void start(
111116
return;
112117
} else if (result instanceof Success<ParsedConfig, String>(final ParsedConfig result1)) {
113118
parsedConfig = result1;
119+
// Store for reconnection
120+
this.parsedConfig = result1;
121+
this.moduleServices = input.moduleServices();
114122
} else {
115123
output.failStart(new IllegalStateException("Unexpected result type: " + result.getClass().getName()),
116124
"Failed to parse configuration for OPC UA client");
@@ -154,6 +162,10 @@ public synchronized void stop(
154162
// Cancel any pending retries
155163
cancelRetry();
156164

165+
// Clear stored configuration to prevent reconnection after stop
166+
this.parsedConfig = null;
167+
this.moduleServices = null;
168+
157169
final OpcUaClientConnection conn = opcUaClientConnection.getAndSet(null);
158170
if (conn != null) {
159171
conn.stop();
@@ -163,6 +175,56 @@ public synchronized void stop(
163175
output.stoppedSuccessfully();
164176
}
165177

178+
/**
179+
* Triggers reconnection by stopping the current connection and creating a new one.
180+
* Used for runtime reconnection when health check detects issues.
181+
* Requires that start() has been called previously to initialize parsedConfig and moduleServices.
182+
*/
183+
private void reconnect() {
184+
log.info("Reconnecting OPC UA adapter '{}'", adapterId);
185+
186+
// Verify we have the necessary configuration
187+
if (parsedConfig == null || moduleServices == null) {
188+
log.error("Cannot reconnect OPC UA adapter '{}' - adapter has not been started yet", adapterId);
189+
return;
190+
}
191+
192+
// Cancel any pending retries
193+
cancelRetry();
194+
195+
// Stop and clean up current connection
196+
final OpcUaClientConnection oldConn = opcUaClientConnection.getAndSet(null);
197+
if (oldConn != null) {
198+
oldConn.stop();
199+
log.debug("Stopped old connection for OPC UA adapter '{}'", adapterId);
200+
}
201+
202+
// Create new connection
203+
final OpcUaClientConnection newConn = new OpcUaClientConnection(adapterId,
204+
tagList,
205+
protocolAdapterState,
206+
moduleServices.protocolAdapterTagStreamingService(),
207+
dataPointFactory,
208+
moduleServices.eventService(),
209+
protocolAdapterMetricsService,
210+
config);
211+
212+
// Set as current connection and attempt connection with retry logic
213+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
214+
if (opcUaClientConnection.compareAndSet(null, newConn)) {
215+
// Create a minimal ProtocolAdapterStartInput for attemptConnection
216+
final ProtocolAdapterStartInput input = new ProtocolAdapterStartInput() {
217+
@Override
218+
public ModuleServices moduleServices() {
219+
return moduleServices;
220+
}
221+
};
222+
attemptConnection(newConn, parsedConfig, input);
223+
} else {
224+
log.warn("OPC UA adapter '{}' reconnect failed - another connection was created concurrently", adapterId);
225+
}
226+
}
227+
166228
@Override
167229
public void destroy() {
168230
log.info("Destroying OPC UA protocol adapter {}", adapterId);
@@ -369,7 +431,7 @@ private void scheduleRetry(
369431
final int retryIntervalSeconds = config.getRetryInterval();
370432
final ScheduledFuture<?> future = retryScheduler.schedule(() -> {
371433
// Check if adapter was stopped before retry executes
372-
if (opcUaClientConnection.get() == null) {
434+
if (this.parsedConfig == null || this.moduleServices == null) {
373435
log.debug("OPC UA adapter '{}' retry cancelled - adapter was stopped", adapterId);
374436
return;
375437
}
@@ -380,15 +442,16 @@ private void scheduleRetry(
380442
final OpcUaClientConnection newConn = new OpcUaClientConnection(adapterId,
381443
tagList,
382444
protocolAdapterState,
383-
input.moduleServices().protocolAdapterTagStreamingService(),
445+
this.moduleServices.protocolAdapterTagStreamingService(),
384446
dataPointFactory,
385-
input.moduleServices().eventService(),
447+
this.moduleServices.eventService(),
386448
protocolAdapterMetricsService,
387449
config);
388450

389451
// Set as current connection and attempt
452+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
390453
if (opcUaClientConnection.compareAndSet(null, newConn)) {
391-
attemptConnection(newConn, parsedConfig, input);
454+
attemptConnection(newConn, this.parsedConfig, input);
392455
} else {
393456
log.debug("OPC UA adapter '{}' retry skipped - connection already exists", adapterId);
394457
}

0 commit comments

Comments
 (0)