2727import com .hivemq .adapter .sdk .api .model .ProtocolAdapterStopOutput ;
2828import com .hivemq .adapter .sdk .api .schema .TagSchemaCreationInput ;
2929import com .hivemq .adapter .sdk .api .schema .TagSchemaCreationOutput ;
30+ import com .hivemq .adapter .sdk .api .services .ModuleServices ;
3031import com .hivemq .adapter .sdk .api .services .ProtocolAdapterMetricsService ;
3132import com .hivemq .adapter .sdk .api .state .ProtocolAdapterState ;
3233import com .hivemq .adapter .sdk .api .writing .WritingContext ;
@@ -78,6 +79,10 @@ public class OpcUaProtocolAdapter implements WritingProtocolAdapter {
7879 private final @ NotNull ScheduledExecutorService retryScheduler = Executors .newSingleThreadScheduledExecutor ();
7980 private final @ NotNull AtomicReference <ScheduledFuture <?>> retryFuture = new AtomicReference <>();
8081
82+ // Stored for reconnection - set during start()
83+ private volatile ParsedConfig parsedConfig ;
84+ private volatile ModuleServices moduleServices ;
85+
8186 public OpcUaProtocolAdapter (
8287 final @ NotNull ProtocolAdapterInformation adapterInformation ,
8388 final @ NotNull ProtocolAdapterInput <OpcUaSpecificAdapterConfig > input ) {
@@ -111,6 +116,9 @@ public synchronized void start(
111116 return ;
112117 } else if (result instanceof Success <ParsedConfig , String >(final ParsedConfig result1 )) {
113118 parsedConfig = result1 ;
119+ // Store for reconnection
120+ this .parsedConfig = result1 ;
121+ this .moduleServices = input .moduleServices ();
114122 } else {
115123 output .failStart (new IllegalStateException ("Unexpected result type: " + result .getClass ().getName ()),
116124 "Failed to parse configuration for OPC UA client" );
@@ -154,6 +162,10 @@ public synchronized void stop(
154162 // Cancel any pending retries
155163 cancelRetry ();
156164
165+ // Clear stored configuration to prevent reconnection after stop
166+ this .parsedConfig = null ;
167+ this .moduleServices = null ;
168+
157169 final OpcUaClientConnection conn = opcUaClientConnection .getAndSet (null );
158170 if (conn != null ) {
159171 conn .stop ();
@@ -163,6 +175,56 @@ public synchronized void stop(
163175 output .stoppedSuccessfully ();
164176 }
165177
178+ /**
179+ * Triggers reconnection by stopping the current connection and creating a new one.
180+ * Used for runtime reconnection when health check detects issues.
181+ * Requires that start() has been called previously to initialize parsedConfig and moduleServices.
182+ */
183+ private void reconnect () {
184+ log .info ("Reconnecting OPC UA adapter '{}'" , adapterId );
185+
186+ // Verify we have the necessary configuration
187+ if (parsedConfig == null || moduleServices == null ) {
188+ log .error ("Cannot reconnect OPC UA adapter '{}' - adapter has not been started yet" , adapterId );
189+ return ;
190+ }
191+
192+ // Cancel any pending retries
193+ cancelRetry ();
194+
195+ // Stop and clean up current connection
196+ final OpcUaClientConnection oldConn = opcUaClientConnection .getAndSet (null );
197+ if (oldConn != null ) {
198+ oldConn .stop ();
199+ log .debug ("Stopped old connection for OPC UA adapter '{}'" , adapterId );
200+ }
201+
202+ // Create new connection
203+ final OpcUaClientConnection newConn = new OpcUaClientConnection (adapterId ,
204+ tagList ,
205+ protocolAdapterState ,
206+ moduleServices .protocolAdapterTagStreamingService (),
207+ dataPointFactory ,
208+ moduleServices .eventService (),
209+ protocolAdapterMetricsService ,
210+ config );
211+
212+ // Set as current connection and attempt connection with retry logic
213+ protocolAdapterState .setConnectionStatus (ProtocolAdapterState .ConnectionStatus .DISCONNECTED );
214+ if (opcUaClientConnection .compareAndSet (null , newConn )) {
215+ // Create a minimal ProtocolAdapterStartInput for attemptConnection
216+ final ProtocolAdapterStartInput input = new ProtocolAdapterStartInput () {
217+ @ Override
218+ public ModuleServices moduleServices () {
219+ return moduleServices ;
220+ }
221+ };
222+ attemptConnection (newConn , parsedConfig , input );
223+ } else {
224+ log .warn ("OPC UA adapter '{}' reconnect failed - another connection was created concurrently" , adapterId );
225+ }
226+ }
227+
166228 @ Override
167229 public void destroy () {
168230 log .info ("Destroying OPC UA protocol adapter {}" , adapterId );
@@ -369,7 +431,7 @@ private void scheduleRetry(
369431 final int retryIntervalSeconds = config .getRetryInterval ();
370432 final ScheduledFuture <?> future = retryScheduler .schedule (() -> {
371433 // Check if adapter was stopped before retry executes
372- if (opcUaClientConnection . get () == null ) {
434+ if (this . parsedConfig == null || this . moduleServices == null ) {
373435 log .debug ("OPC UA adapter '{}' retry cancelled - adapter was stopped" , adapterId );
374436 return ;
375437 }
@@ -380,15 +442,16 @@ private void scheduleRetry(
380442 final OpcUaClientConnection newConn = new OpcUaClientConnection (adapterId ,
381443 tagList ,
382444 protocolAdapterState ,
383- input .moduleServices () .protocolAdapterTagStreamingService (),
445+ this .moduleServices .protocolAdapterTagStreamingService (),
384446 dataPointFactory ,
385- input .moduleServices () .eventService (),
447+ this .moduleServices .eventService (),
386448 protocolAdapterMetricsService ,
387449 config );
388450
389451 // Set as current connection and attempt
452+ protocolAdapterState .setConnectionStatus (ProtocolAdapterState .ConnectionStatus .DISCONNECTED );
390453 if (opcUaClientConnection .compareAndSet (null , newConn )) {
391- attemptConnection (newConn , parsedConfig , input );
454+ attemptConnection (newConn , this . parsedConfig , input );
392455 } else {
393456 log .debug ("OPC UA adapter '{}' retry skipped - connection already exists" , adapterId );
394457 }
0 commit comments