2626import java .util .concurrent .CopyOnWriteArrayList ;
2727import java .util .concurrent .atomic .AtomicReference ;
2828import java .util .concurrent .locks .LockSupport ;
29+ import java .util .function .BiFunction ;
30+ import java .util .function .BiPredicate ;
2931import java .util .function .Consumer ;
3032
3133public abstract class ProtocolAdapterFSM implements Consumer <ProtocolAdapterState .ConnectionStatus > {
3234
33- private static final @ NotNull Logger log = LoggerFactory .getLogger (ProtocolAdapterFSM .class );
34-
35- public enum StateEnum {
36- DISCONNECTED ,
37- CONNECTING ,
38- CONNECTED ,
39- DISCONNECTING ,
40- ERROR_CLOSING ,
41- CLOSING ,
42- ERROR ,
43- CLOSED ,
44- NOT_SUPPORTED
45- }
46-
47- public static final @ NotNull Map <StateEnum , Set <StateEnum >> possibleTransitions = Map .of (
48- StateEnum .DISCONNECTED , Set .of (StateEnum .DISCONNECTED , StateEnum .CONNECTING , StateEnum .CONNECTED , StateEnum .CLOSED , StateEnum .NOT_SUPPORTED ), //allow idempotent DISCONNECTED->DISCONNECTED transitions; for compatibility, we allow to go from CONNECTING to CONNECTED directly, and allow testing transition to CLOSED; NOT_SUPPORTED for adapters without southbound
49- StateEnum .CONNECTING , Set .of (StateEnum .CONNECTING , StateEnum .CONNECTED , StateEnum .ERROR , StateEnum .DISCONNECTED ), // allow idempotent CONNECTING->CONNECTING; can go back to DISCONNECTED
50- StateEnum .CONNECTED , Set .of (StateEnum .CONNECTED , StateEnum .DISCONNECTING , StateEnum .CONNECTING , StateEnum .CLOSING , StateEnum .ERROR_CLOSING , StateEnum .DISCONNECTED ), // allow idempotent CONNECTED->CONNECTED; transition to CONNECTING in case of recovery, DISCONNECTED for direct transition
51- StateEnum .DISCONNECTING , Set .of (StateEnum .DISCONNECTED , StateEnum .CLOSING ), // can go to DISCONNECTED or CLOSING
52- StateEnum .CLOSING , Set .of (StateEnum .CLOSED ),
53- StateEnum .ERROR_CLOSING , Set .of (StateEnum .ERROR ),
54- StateEnum .ERROR , Set .of (StateEnum .ERROR , StateEnum .CONNECTING , StateEnum .DISCONNECTED ), // allow idempotent ERROR->ERROR; can recover from error
55- StateEnum .CLOSED , Set .of (StateEnum .DISCONNECTED , StateEnum .CLOSING ), // can restart from closed or go to closing
56- StateEnum .NOT_SUPPORTED , Set .of (StateEnum .NOT_SUPPORTED ) // Terminal state for adapters without southbound support; allow idempotent transitions
35+ public static final @ NotNull Map <StateEnum , Set <StateEnum >> possibleTransitions = Map .of (StateEnum .DISCONNECTED ,
36+ Set .of (StateEnum .DISCONNECTED ,
37+ StateEnum .CONNECTING ,
38+ StateEnum .CONNECTED ,
39+ StateEnum .CLOSED ,
40+ StateEnum .NOT_SUPPORTED ),
41+ //allow idempotent DISCONNECTED->DISCONNECTED transitions; for compatibility, we allow to go from CONNECTING to CONNECTED directly, and allow testing transition to CLOSED; NOT_SUPPORTED for adapters without southbound
42+ StateEnum .CONNECTING ,
43+ Set .of (StateEnum .CONNECTING , StateEnum .CONNECTED , StateEnum .ERROR , StateEnum .DISCONNECTED ),
44+ // allow idempotent CONNECTING->CONNECTING; can go back to DISCONNECTED
45+ StateEnum .CONNECTED ,
46+ Set .of (StateEnum .CONNECTED ,
47+ StateEnum .DISCONNECTING ,
48+ StateEnum .CONNECTING ,
49+ StateEnum .CLOSING ,
50+ StateEnum .ERROR_CLOSING ,
51+ StateEnum .DISCONNECTED ),
52+ // allow idempotent CONNECTED->CONNECTED; transition to CONNECTING in case of recovery, DISCONNECTED for direct transition
53+ StateEnum .DISCONNECTING ,
54+ Set .of (StateEnum .DISCONNECTED , StateEnum .CLOSING ),
55+ // can go to DISCONNECTED or CLOSING
56+ StateEnum .CLOSING ,
57+ Set .of (StateEnum .CLOSED ),
58+ StateEnum .ERROR_CLOSING ,
59+ Set .of (StateEnum .ERROR ),
60+ StateEnum .ERROR ,
61+ Set .of (StateEnum .ERROR , StateEnum .CONNECTING , StateEnum .DISCONNECTED ),
62+ // allow idempotent ERROR->ERROR; can recover from error
63+ StateEnum .CLOSED ,
64+ Set .of (StateEnum .DISCONNECTED , StateEnum .CLOSING ),
65+ // can restart from closed or go to closing
66+ StateEnum .NOT_SUPPORTED ,
67+ Set .of (StateEnum .NOT_SUPPORTED )
68+ // Terminal state for adapters without southbound support; allow idempotent transitions
5769 );
58-
59- public enum AdapterStateEnum {
60- STARTING ,
61- STARTED ,
62- STOPPING ,
63- STOPPED
64- }
65-
6670 public static final Map <AdapterStateEnum , Set <AdapterStateEnum >> possibleAdapterStateTransitions = Map .of (
67- AdapterStateEnum .STOPPED , Set .of (AdapterStateEnum .STARTING ),
68- AdapterStateEnum .STARTING , Set .of (AdapterStateEnum .STARTED , AdapterStateEnum .STOPPED ),
69- AdapterStateEnum .STARTED , Set .of (AdapterStateEnum .STOPPING ),
70- AdapterStateEnum .STOPPING , Set .of (AdapterStateEnum .STOPPED )
71- );
72-
71+ AdapterStateEnum .STOPPED ,
72+ Set .of (AdapterStateEnum .STARTING ),
73+ AdapterStateEnum .STARTING ,
74+ Set .of (AdapterStateEnum .STARTED , AdapterStateEnum .STOPPED ),
75+ AdapterStateEnum .STARTED ,
76+ Set .of (AdapterStateEnum .STOPPING ),
77+ AdapterStateEnum .STOPPING ,
78+ Set .of (AdapterStateEnum .STOPPED ));
79+ private static final @ NotNull Logger log = LoggerFactory .getLogger (ProtocolAdapterFSM .class );
80+ protected final @ NotNull AtomicReference <AdapterStateEnum > adapterState ;
7381 private final @ NotNull AtomicReference <StateEnum > northboundState ;
7482 private final @ NotNull AtomicReference <StateEnum > southboundState ;
75- protected final @ NotNull AtomicReference <AdapterStateEnum > adapterState ;
7683 private final @ NotNull List <Consumer <State >> stateTransitionListeners ;
77-
78- public record State (AdapterStateEnum adapter , StateEnum northbound , StateEnum southbound ) { }
79-
8084 private final @ NotNull String adapterId ;
8185
8286 public ProtocolAdapterFSM (final @ NotNull String adapterId ) {
@@ -87,6 +91,11 @@ public ProtocolAdapterFSM(final @NotNull String adapterId) {
8791 this .stateTransitionListeners = new CopyOnWriteArrayList <>();
8892 }
8993
94+ private static boolean canTransition (final @ NotNull StateEnum currentState , final @ NotNull StateEnum newState ) {
95+ final var allowedTransitions = possibleTransitions .get (currentState );
96+ return allowedTransitions != null && allowedTransitions .contains (newState );
97+ }
98+
9099 public abstract boolean onStarting ();
91100
92101 public abstract void onStopping ();
@@ -95,10 +104,10 @@ public ProtocolAdapterFSM(final @NotNull String adapterId) {
95104
96105 // ADAPTER signals
97106 public void startAdapter () {
98- if (transitionAdapterState (AdapterStateEnum .STARTING )) {
107+ if (transitionAdapterState (AdapterStateEnum .STARTING )) {
99108 log .debug ("Protocol adapter {} starting" , adapterId );
100- if (onStarting ()) {
101- if (!transitionAdapterState (AdapterStateEnum .STARTED )) {
109+ if (onStarting ()) {
110+ if (!transitionAdapterState (AdapterStateEnum .STARTED )) {
102111 log .warn ("Protocol adapter {} already started" , adapterId );
103112 }
104113 } else {
@@ -110,9 +119,9 @@ public void startAdapter() {
110119 }
111120
112121 public void stopAdapter () {
113- if (transitionAdapterState (AdapterStateEnum .STOPPING )) {
122+ if (transitionAdapterState (AdapterStateEnum .STOPPING )) {
114123 onStopping ();
115- if (!transitionAdapterState (AdapterStateEnum .STOPPED )) {
124+ if (!transitionAdapterState (AdapterStateEnum .STOPPED )) {
116125 log .warn ("Protocol adapter {} already stopped" , adapterId );
117126 }
118127 } else {
@@ -121,99 +130,95 @@ public void stopAdapter() {
121130 }
122131
123132 public boolean transitionAdapterState (final @ NotNull AdapterStateEnum newState ) {
124- int retryCount = 0 ;
125- while (true ) {
126- final var currentState = adapterState .get ();
127- if (canTransition (currentState , newState )) {
128- if (adapterState .compareAndSet (currentState , newState )) {
129- final State snapshotState = new State (newState , northboundState .get (), southboundState .get ());
130- log .debug ("Adapter state transition from {} to {} for adapter {}" , currentState , newState , adapterId );
131- notifyListenersAboutStateTransition (snapshotState );
132- return true ;
133- }
134- retryCount ++;
135- if (retryCount > 3 ) {
136- // progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
137- // reduces CPU consumption and cache line contention under high load
138- final long backoffNanos = Math .min (1_000L * (1L << (retryCount - 4 )), 100_000L );
139- LockSupport .parkNanos (backoffNanos );
140- }
141- // Fast retry for attempts 1-3 (optimizes for low contention case)
142- } else {
143- // Transition not allowed from current state
144- throw new IllegalStateException ("Cannot transition adapter state to " + newState );
145- }
146- }
133+ return performStateTransition (adapterState ,
134+ newState ,
135+ this ::canTransition ,
136+ "Adapter" ,
137+ (current , next ) -> new State (next , northboundState .get (), southboundState .get ()));
147138 }
148139
149140 public boolean transitionNorthboundState (final @ NotNull StateEnum newState ) {
150- int retryCount = 0 ;
151- while (true ) {
152- final var currentState = northboundState .get ();
153- if (canTransition (currentState , newState )) {
154- if (northboundState .compareAndSet (currentState , newState )) {
155- final State snapshotState = new State (adapterState .get (), newState , southboundState .get ());
156- log .debug ("Northbound state transition from {} to {} for adapter {}" , currentState , newState , adapterId );
157- notifyListenersAboutStateTransition (snapshotState );
158- return true ;
159- }
160- retryCount ++;
161- if (retryCount > 3 ) {
162- // progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
163- final long backoffNanos = Math .min (1_000L * (1L << (retryCount - 4 )), 100_000L );
164- LockSupport .parkNanos (backoffNanos );
165- }
166- // Fast retry for attempts 1-3 (optimizes for low contention case)
167- } else {
168- // Transition not allowed from current state
169- throw new IllegalStateException ("Cannot transition northbound state to " + newState );
170- }
171- }
141+ return performStateTransition (northboundState ,
142+ newState ,
143+ ProtocolAdapterFSM ::canTransition ,
144+ "Northbound" ,
145+ (current , next ) -> new State (adapterState .get (), next , southboundState .get ()));
172146 }
173147
174148 public boolean transitionSouthboundState (final @ NotNull StateEnum newState ) {
149+ return performStateTransition (southboundState ,
150+ newState ,
151+ ProtocolAdapterFSM ::canTransition ,
152+ "Southbound" ,
153+ (current , next ) -> new State (adapterState .get (), northboundState .get (), next ));
154+ }
155+
156+ /**
157+ * Generic state transition implementation with retry logic and exponential backoff.
158+ * Eliminates code duplication across adapter, northbound, and southbound transitions.
159+ *
160+ * @param stateRef the atomic reference to the state being transitioned
161+ * @param newState the target state
162+ * @param canTransitionFn function to check if transition is valid
163+ * @param stateName name of the state type for logging
164+ * @param stateSnapshotFn function to create state snapshot after successful transition
165+ * @param <T> the state type (StateEnum or AdapterStateEnum)
166+ * @return true if transition succeeded
167+ * @throws IllegalStateException if transition is not allowed from current state
168+ */
169+ private <T > boolean performStateTransition (
170+ final @ NotNull AtomicReference <T > stateRef ,
171+ final @ NotNull T newState ,
172+ final @ NotNull BiPredicate <T , T > canTransitionFn ,
173+ final @ NotNull String stateName ,
174+ final @ NotNull BiFunction <T , T , State > stateSnapshotFn ) {
175175 int retryCount = 0 ;
176176 while (true ) {
177- final var currentState = southboundState .get ();
178- if (canTransition (currentState , newState )) {
179- if (southboundState .compareAndSet (currentState , newState )) {
180- final State snapshotState = new State (adapterState .get (), northboundState .get (), newState );
181- log .debug ("Southbound state transition from {} to {} for adapter {}" , currentState , newState , adapterId );
177+ final T currentState = stateRef .get ();
178+ if (canTransitionFn .test (currentState , newState )) {
179+ if (stateRef .compareAndSet (currentState , newState )) {
180+ final State snapshotState = stateSnapshotFn .apply (currentState , newState );
181+ log .debug ("{} state transition from {} to {} for adapter {}" ,
182+ stateName ,
183+ currentState ,
184+ newState ,
185+ adapterId );
182186 notifyListenersAboutStateTransition (snapshotState );
183187 return true ;
184188 }
185189 retryCount ++;
186190 if (retryCount > 3 ) {
187191 // progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
192+ // reduces CPU consumption and cache line contention under high load
188193 final long backoffNanos = Math .min (1_000L * (1L << (retryCount - 4 )), 100_000L );
189194 LockSupport .parkNanos (backoffNanos );
190195 }
191196 // Fast retry for attempts 1-3 (optimizes for low contention case)
192197 } else {
193198 // Transition not allowed from current state
194- throw new IllegalStateException ("Cannot transition southbound state to " + newState );
199+ throw new IllegalStateException ("Cannot transition " +
200+ stateName .toLowerCase () +
201+ " state to " +
202+ newState );
195203 }
196204 }
197205 }
198206
199207 @ Override
200208 public void accept (final ProtocolAdapterState .ConnectionStatus connectionStatus ) {
201209 final var transitionResult = switch (connectionStatus ) {
202- case CONNECTED ->
203- transitionNorthboundState (StateEnum .CONNECTED ) && startSouthbound ();
210+ case CONNECTED -> transitionNorthboundState (StateEnum .CONNECTED ) && startSouthbound ();
204211 case CONNECTING -> transitionNorthboundState (StateEnum .CONNECTING );
205212 case DISCONNECTED -> transitionNorthboundState (StateEnum .DISCONNECTED );
206213 case ERROR -> transitionNorthboundState (StateEnum .ERROR );
207214 case UNKNOWN -> transitionNorthboundState (StateEnum .DISCONNECTED );
208215 case STATELESS -> transitionNorthboundState (StateEnum .NOT_SUPPORTED );
209216 };
210- if (!transitionResult ) {
217+ if (!transitionResult ) {
211218 log .warn ("Failed to transition connection state to {} for adapter {}" , connectionStatus , adapterId );
212219 }
213220 }
214221
215- // Additional methods to support full state machine functionality
216-
217222 public boolean startDisconnecting () {
218223 return transitionNorthboundState (StateEnum .DISCONNECTING );
219224 }
@@ -222,6 +227,8 @@ public boolean startClosing() {
222227 return transitionNorthboundState (StateEnum .CLOSING );
223228 }
224229
230+ // Additional methods to support full state machine functionality
231+
225232 public boolean startErrorClosing () {
226233 return transitionNorthboundState (StateEnum .ERROR_CLOSING );
227234 }
@@ -279,14 +286,33 @@ private void notifyListenersAboutStateTransition(final @NotNull State newState)
279286 stateTransitionListeners .forEach (listener -> listener .accept (newState ));
280287 }
281288
282- private static boolean canTransition (final @ NotNull StateEnum currentState , final @ NotNull StateEnum newState ) {
283- final var allowedTransitions = possibleTransitions .get (currentState );
289+ private boolean canTransition (
290+ final @ NotNull AdapterStateEnum currentState ,
291+ final @ NotNull AdapterStateEnum newState ) {
292+ final var allowedTransitions = possibleAdapterStateTransitions .get (currentState );
284293 return allowedTransitions != null && allowedTransitions .contains (newState );
285294 }
286295
287- private static boolean canTransition (final @ NotNull AdapterStateEnum currentState , final @ NotNull AdapterStateEnum newState ) {
288- final var allowedTransitions = possibleAdapterStateTransitions .get (currentState );
289- return allowedTransitions != null && allowedTransitions .contains (newState );
296+ public enum StateEnum {
297+ DISCONNECTED ,
298+ CONNECTING ,
299+ CONNECTED ,
300+ DISCONNECTING ,
301+ ERROR_CLOSING ,
302+ CLOSING ,
303+ ERROR ,
304+ CLOSED ,
305+ NOT_SUPPORTED
306+ }
307+
308+ public enum AdapterStateEnum {
309+ STARTING ,
310+ STARTED ,
311+ STOPPING ,
312+ STOPPED
313+ }
314+
315+ public record State (AdapterStateEnum adapter , StateEnum northbound , StateEnum southbound ) {
290316 }
291317}
292318
0 commit comments