2626import java .util .Optional ;
2727import java .util .concurrent .CompletableFuture ;
2828import java .util .concurrent .ConcurrentHashMap ;
29+ import java .util .concurrent .ExecutionException ;
2930
3031public class OpcUaSubscriptionLifecycle implements UaSubscriptionManager .SubscriptionListener {
3132
@@ -63,18 +64,29 @@ public void onKeepAlive(final @NotNull UaSubscription subscription, final @NotNu
6364 @ Override
6465 public void onSubscriptionTransferFailed (
6566 final @ NotNull UaSubscription subscription , final @ NotNull StatusCode statusCode ) {
67+
6668 protocolAdapterMetricsService .increment ("subscription.transfer.failed.count" );
69+
70+ //clear subscription from the map since it is dead
6771 final OpcUaSubscriptionConsumer .SubscriptionResult subscriptionResult =
68- subscriptionMap .get (subscription .getSubscriptionId ());
72+ subscriptionMap .remove (subscription .getSubscriptionId ());
73+
6974 if (subscriptionResult != null ) {
7075 subscribe (subscriptionResult .subscription )
7176 .exceptionally (ex -> {
7277 if (ex instanceof UaServiceFaultException ) {
7378 UaServiceFaultException cause = (UaServiceFaultException ) ex .getCause ();
7479 if (cause .getStatusCode ().getValue () == StatusCodes .Bad_SubscriptionIdInvalid ) {
75- log .warn ("Resubscribing to OPC UA after transfer failure {}" , statusCode , ex );
76- subscriptionMap .remove (subscription .getSubscriptionId ());
77- subscribe (subscriptionResult .subscription );
80+ log .warn ("Resubscribing to OPC UA after transfer failure: {}" , statusCode , ex );
81+ try {
82+ subscribe (subscriptionResult .subscription )
83+ .exceptionally (t -> {
84+ log .error ("Problem resucbscribing after subscription {} failed with {}" , subscription .getSubscriptionId (), statusCode , t );
85+ return null ;
86+ }).get ();
87+ } catch (InterruptedException | ExecutionException e ) {
88+ log .error ("Problem resucbscribing after subscription {} failed with {}" , subscription .getSubscriptionId (), statusCode , e );
89+ }
7890 } else {
7991 log .error ("Not able to recreate OPC UA subscription after transfer failure" , ex );
8092 }
@@ -89,6 +101,7 @@ public void onSubscriptionTransferFailed(
89101 }
90102
91103 public CompletableFuture <Void > subscribe (final @ NotNull OpcUaToMqttMapping subscription ) {
104+ log .info ("Subscribing to OPC UA node {}" , subscription .getNode ());
92105 final ReadValueId readValueId = new ReadValueId (NodeId .parse (subscription .getNode ()),
93106 AttributeId .Value .uid (),
94107 null ,
0 commit comments