Skip to content

Commit b00680e

Browse files
committed
simplify code
1 parent c20e978 commit b00680e

File tree

3 files changed

+191
-176
lines changed

3 files changed

+191
-176
lines changed

hivemq-edge/src/main/java/com/hivemq/fsm/ProtocolAdapterFSM.java

Lines changed: 134 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -26,57 +26,61 @@
2626
import java.util.concurrent.CopyOnWriteArrayList;
2727
import java.util.concurrent.atomic.AtomicReference;
2828
import java.util.concurrent.locks.LockSupport;
29+
import java.util.function.BiFunction;
30+
import java.util.function.BiPredicate;
2931
import java.util.function.Consumer;
3032

3133
public abstract class ProtocolAdapterFSM implements Consumer<ProtocolAdapterState.ConnectionStatus> {
3234

33-
private static final @NotNull Logger log = LoggerFactory.getLogger(ProtocolAdapterFSM.class);
34-
35-
public enum StateEnum {
36-
DISCONNECTED,
37-
CONNECTING,
38-
CONNECTED,
39-
DISCONNECTING,
40-
ERROR_CLOSING,
41-
CLOSING,
42-
ERROR,
43-
CLOSED,
44-
NOT_SUPPORTED
45-
}
46-
47-
public static final @NotNull Map<StateEnum, Set<StateEnum>> possibleTransitions = Map.of(
48-
StateEnum.DISCONNECTED, Set.of(StateEnum.DISCONNECTED, StateEnum.CONNECTING, StateEnum.CONNECTED, StateEnum.CLOSED, StateEnum.NOT_SUPPORTED), //allow idempotent DISCONNECTED->DISCONNECTED transitions; for compatibility, we allow to go from CONNECTING to CONNECTED directly, and allow testing transition to CLOSED; NOT_SUPPORTED for adapters without southbound
49-
StateEnum.CONNECTING, Set.of(StateEnum.CONNECTING, StateEnum.CONNECTED, StateEnum.ERROR, StateEnum.DISCONNECTED), // allow idempotent CONNECTING->CONNECTING; can go back to DISCONNECTED
50-
StateEnum.CONNECTED, Set.of(StateEnum.CONNECTED, StateEnum.DISCONNECTING, StateEnum.CONNECTING, StateEnum.CLOSING, StateEnum.ERROR_CLOSING, StateEnum.DISCONNECTED), // allow idempotent CONNECTED->CONNECTED; transition to CONNECTING in case of recovery, DISCONNECTED for direct transition
51-
StateEnum.DISCONNECTING, Set.of(StateEnum.DISCONNECTED, StateEnum.CLOSING), // can go to DISCONNECTED or CLOSING
52-
StateEnum.CLOSING, Set.of(StateEnum.CLOSED),
53-
StateEnum.ERROR_CLOSING, Set.of(StateEnum.ERROR),
54-
StateEnum.ERROR, Set.of(StateEnum.ERROR, StateEnum.CONNECTING, StateEnum.DISCONNECTED), // allow idempotent ERROR->ERROR; can recover from error
55-
StateEnum.CLOSED, Set.of(StateEnum.DISCONNECTED, StateEnum.CLOSING), // can restart from closed or go to closing
56-
StateEnum.NOT_SUPPORTED, Set.of(StateEnum.NOT_SUPPORTED) // Terminal state for adapters without southbound support; allow idempotent transitions
35+
public static final @NotNull Map<StateEnum, Set<StateEnum>> possibleTransitions = Map.of(StateEnum.DISCONNECTED,
36+
Set.of(StateEnum.DISCONNECTED,
37+
StateEnum.CONNECTING,
38+
StateEnum.CONNECTED,
39+
StateEnum.CLOSED,
40+
StateEnum.NOT_SUPPORTED),
41+
//allow idempotent DISCONNECTED->DISCONNECTED transitions; for compatibility, we allow to go from CONNECTING to CONNECTED directly, and allow testing transition to CLOSED; NOT_SUPPORTED for adapters without southbound
42+
StateEnum.CONNECTING,
43+
Set.of(StateEnum.CONNECTING, StateEnum.CONNECTED, StateEnum.ERROR, StateEnum.DISCONNECTED),
44+
// allow idempotent CONNECTING->CONNECTING; can go back to DISCONNECTED
45+
StateEnum.CONNECTED,
46+
Set.of(StateEnum.CONNECTED,
47+
StateEnum.DISCONNECTING,
48+
StateEnum.CONNECTING,
49+
StateEnum.CLOSING,
50+
StateEnum.ERROR_CLOSING,
51+
StateEnum.DISCONNECTED),
52+
// allow idempotent CONNECTED->CONNECTED; transition to CONNECTING in case of recovery, DISCONNECTED for direct transition
53+
StateEnum.DISCONNECTING,
54+
Set.of(StateEnum.DISCONNECTED, StateEnum.CLOSING),
55+
// can go to DISCONNECTED or CLOSING
56+
StateEnum.CLOSING,
57+
Set.of(StateEnum.CLOSED),
58+
StateEnum.ERROR_CLOSING,
59+
Set.of(StateEnum.ERROR),
60+
StateEnum.ERROR,
61+
Set.of(StateEnum.ERROR, StateEnum.CONNECTING, StateEnum.DISCONNECTED),
62+
// allow idempotent ERROR->ERROR; can recover from error
63+
StateEnum.CLOSED,
64+
Set.of(StateEnum.DISCONNECTED, StateEnum.CLOSING),
65+
// can restart from closed or go to closing
66+
StateEnum.NOT_SUPPORTED,
67+
Set.of(StateEnum.NOT_SUPPORTED)
68+
// Terminal state for adapters without southbound support; allow idempotent transitions
5769
);
58-
59-
public enum AdapterStateEnum {
60-
STARTING,
61-
STARTED,
62-
STOPPING,
63-
STOPPED
64-
}
65-
6670
public static final Map<AdapterStateEnum, Set<AdapterStateEnum>> possibleAdapterStateTransitions = Map.of(
67-
AdapterStateEnum.STOPPED, Set.of(AdapterStateEnum.STARTING),
68-
AdapterStateEnum.STARTING, Set.of(AdapterStateEnum.STARTED, AdapterStateEnum.STOPPED),
69-
AdapterStateEnum.STARTED, Set.of(AdapterStateEnum.STOPPING),
70-
AdapterStateEnum.STOPPING, Set.of(AdapterStateEnum.STOPPED)
71-
);
72-
71+
AdapterStateEnum.STOPPED,
72+
Set.of(AdapterStateEnum.STARTING),
73+
AdapterStateEnum.STARTING,
74+
Set.of(AdapterStateEnum.STARTED, AdapterStateEnum.STOPPED),
75+
AdapterStateEnum.STARTED,
76+
Set.of(AdapterStateEnum.STOPPING),
77+
AdapterStateEnum.STOPPING,
78+
Set.of(AdapterStateEnum.STOPPED));
79+
private static final @NotNull Logger log = LoggerFactory.getLogger(ProtocolAdapterFSM.class);
80+
protected final @NotNull AtomicReference<AdapterStateEnum> adapterState;
7381
private final @NotNull AtomicReference<StateEnum> northboundState;
7482
private final @NotNull AtomicReference<StateEnum> southboundState;
75-
protected final @NotNull AtomicReference<AdapterStateEnum> adapterState;
7683
private final @NotNull List<Consumer<State>> stateTransitionListeners;
77-
78-
public record State(AdapterStateEnum adapter, StateEnum northbound, StateEnum southbound) { }
79-
8084
private final @NotNull String adapterId;
8185

8286
public ProtocolAdapterFSM(final @NotNull String adapterId) {
@@ -87,6 +91,11 @@ public ProtocolAdapterFSM(final @NotNull String adapterId) {
8791
this.stateTransitionListeners = new CopyOnWriteArrayList<>();
8892
}
8993

94+
private static boolean canTransition(final @NotNull StateEnum currentState, final @NotNull StateEnum newState) {
95+
final var allowedTransitions = possibleTransitions.get(currentState);
96+
return allowedTransitions != null && allowedTransitions.contains(newState);
97+
}
98+
9099
public abstract boolean onStarting();
91100

92101
public abstract void onStopping();
@@ -95,10 +104,10 @@ public ProtocolAdapterFSM(final @NotNull String adapterId) {
95104

96105
// ADAPTER signals
97106
public void startAdapter() {
98-
if(transitionAdapterState(AdapterStateEnum.STARTING)) {
107+
if (transitionAdapterState(AdapterStateEnum.STARTING)) {
99108
log.debug("Protocol adapter {} starting", adapterId);
100-
if(onStarting()) {
101-
if(!transitionAdapterState(AdapterStateEnum.STARTED)) {
109+
if (onStarting()) {
110+
if (!transitionAdapterState(AdapterStateEnum.STARTED)) {
102111
log.warn("Protocol adapter {} already started", adapterId);
103112
}
104113
} else {
@@ -110,9 +119,9 @@ public void startAdapter() {
110119
}
111120

112121
public void stopAdapter() {
113-
if(transitionAdapterState(AdapterStateEnum.STOPPING)) {
122+
if (transitionAdapterState(AdapterStateEnum.STOPPING)) {
114123
onStopping();
115-
if(!transitionAdapterState(AdapterStateEnum.STOPPED)) {
124+
if (!transitionAdapterState(AdapterStateEnum.STOPPED)) {
116125
log.warn("Protocol adapter {} already stopped", adapterId);
117126
}
118127
} else {
@@ -121,99 +130,95 @@ public void stopAdapter() {
121130
}
122131

123132
public boolean transitionAdapterState(final @NotNull AdapterStateEnum newState) {
124-
int retryCount = 0;
125-
while (true) {
126-
final var currentState = adapterState.get();
127-
if (canTransition(currentState, newState)) {
128-
if (adapterState.compareAndSet(currentState, newState)) {
129-
final State snapshotState = new State(newState, northboundState.get(), southboundState.get());
130-
log.debug("Adapter state transition from {} to {} for adapter {}", currentState, newState, adapterId);
131-
notifyListenersAboutStateTransition(snapshotState);
132-
return true;
133-
}
134-
retryCount++;
135-
if (retryCount > 3) {
136-
// progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
137-
// reduces CPU consumption and cache line contention under high load
138-
final long backoffNanos = Math.min(1_000L * (1L << (retryCount - 4)), 100_000L);
139-
LockSupport.parkNanos(backoffNanos);
140-
}
141-
// Fast retry for attempts 1-3 (optimizes for low contention case)
142-
} else {
143-
// Transition not allowed from current state
144-
throw new IllegalStateException("Cannot transition adapter state to " + newState);
145-
}
146-
}
133+
return performStateTransition(adapterState,
134+
newState,
135+
this::canTransition,
136+
"Adapter",
137+
(current, next) -> new State(next, northboundState.get(), southboundState.get()));
147138
}
148139

149140
public boolean transitionNorthboundState(final @NotNull StateEnum newState) {
150-
int retryCount = 0;
151-
while (true) {
152-
final var currentState = northboundState.get();
153-
if (canTransition(currentState, newState)) {
154-
if (northboundState.compareAndSet(currentState, newState)) {
155-
final State snapshotState = new State(adapterState.get(), newState, southboundState.get());
156-
log.debug("Northbound state transition from {} to {} for adapter {}", currentState, newState, adapterId);
157-
notifyListenersAboutStateTransition(snapshotState);
158-
return true;
159-
}
160-
retryCount++;
161-
if (retryCount > 3) {
162-
// progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
163-
final long backoffNanos = Math.min(1_000L * (1L << (retryCount - 4)), 100_000L);
164-
LockSupport.parkNanos(backoffNanos);
165-
}
166-
// Fast retry for attempts 1-3 (optimizes for low contention case)
167-
} else {
168-
// Transition not allowed from current state
169-
throw new IllegalStateException("Cannot transition northbound state to " + newState);
170-
}
171-
}
141+
return performStateTransition(northboundState,
142+
newState,
143+
ProtocolAdapterFSM::canTransition,
144+
"Northbound",
145+
(current, next) -> new State(adapterState.get(), next, southboundState.get()));
172146
}
173147

174148
public boolean transitionSouthboundState(final @NotNull StateEnum newState) {
149+
return performStateTransition(southboundState,
150+
newState,
151+
ProtocolAdapterFSM::canTransition,
152+
"Southbound",
153+
(current, next) -> new State(adapterState.get(), northboundState.get(), next));
154+
}
155+
156+
/**
157+
* Generic state transition implementation with retry logic and exponential backoff.
158+
* Eliminates code duplication across adapter, northbound, and southbound transitions.
159+
*
160+
* @param stateRef the atomic reference to the state being transitioned
161+
* @param newState the target state
162+
* @param canTransitionFn function to check if transition is valid
163+
* @param stateName name of the state type for logging
164+
* @param stateSnapshotFn function to create state snapshot after successful transition
165+
* @param <T> the state type (StateEnum or AdapterStateEnum)
166+
* @return true if transition succeeded
167+
* @throws IllegalStateException if transition is not allowed from current state
168+
*/
169+
private <T> boolean performStateTransition(
170+
final @NotNull AtomicReference<T> stateRef,
171+
final @NotNull T newState,
172+
final @NotNull BiPredicate<T, T> canTransitionFn,
173+
final @NotNull String stateName,
174+
final @NotNull BiFunction<T, T, State> stateSnapshotFn) {
175175
int retryCount = 0;
176176
while (true) {
177-
final var currentState = southboundState.get();
178-
if (canTransition(currentState, newState)) {
179-
if (southboundState.compareAndSet(currentState, newState)) {
180-
final State snapshotState = new State(adapterState.get(), northboundState.get(), newState);
181-
log.debug("Southbound state transition from {} to {} for adapter {}", currentState, newState, adapterId);
177+
final T currentState = stateRef.get();
178+
if (canTransitionFn.test(currentState, newState)) {
179+
if (stateRef.compareAndSet(currentState, newState)) {
180+
final State snapshotState = stateSnapshotFn.apply(currentState, newState);
181+
log.debug("{} state transition from {} to {} for adapter {}",
182+
stateName,
183+
currentState,
184+
newState,
185+
adapterId);
182186
notifyListenersAboutStateTransition(snapshotState);
183187
return true;
184188
}
185189
retryCount++;
186190
if (retryCount > 3) {
187191
// progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
192+
// reduces CPU consumption and cache line contention under high load
188193
final long backoffNanos = Math.min(1_000L * (1L << (retryCount - 4)), 100_000L);
189194
LockSupport.parkNanos(backoffNanos);
190195
}
191196
// Fast retry for attempts 1-3 (optimizes for low contention case)
192197
} else {
193198
// Transition not allowed from current state
194-
throw new IllegalStateException("Cannot transition southbound state to " + newState);
199+
throw new IllegalStateException("Cannot transition " +
200+
stateName.toLowerCase() +
201+
" state to " +
202+
newState);
195203
}
196204
}
197205
}
198206

199207
@Override
200208
public void accept(final ProtocolAdapterState.ConnectionStatus connectionStatus) {
201209
final var transitionResult = switch (connectionStatus) {
202-
case CONNECTED ->
203-
transitionNorthboundState(StateEnum.CONNECTED) && startSouthbound();
210+
case CONNECTED -> transitionNorthboundState(StateEnum.CONNECTED) && startSouthbound();
204211
case CONNECTING -> transitionNorthboundState(StateEnum.CONNECTING);
205212
case DISCONNECTED -> transitionNorthboundState(StateEnum.DISCONNECTED);
206213
case ERROR -> transitionNorthboundState(StateEnum.ERROR);
207214
case UNKNOWN -> transitionNorthboundState(StateEnum.DISCONNECTED);
208215
case STATELESS -> transitionNorthboundState(StateEnum.NOT_SUPPORTED);
209216
};
210-
if(!transitionResult) {
217+
if (!transitionResult) {
211218
log.warn("Failed to transition connection state to {} for adapter {}", connectionStatus, adapterId);
212219
}
213220
}
214221

215-
// Additional methods to support full state machine functionality
216-
217222
public boolean startDisconnecting() {
218223
return transitionNorthboundState(StateEnum.DISCONNECTING);
219224
}
@@ -222,6 +227,8 @@ public boolean startClosing() {
222227
return transitionNorthboundState(StateEnum.CLOSING);
223228
}
224229

230+
// Additional methods to support full state machine functionality
231+
225232
public boolean startErrorClosing() {
226233
return transitionNorthboundState(StateEnum.ERROR_CLOSING);
227234
}
@@ -279,14 +286,33 @@ private void notifyListenersAboutStateTransition(final @NotNull State newState)
279286
stateTransitionListeners.forEach(listener -> listener.accept(newState));
280287
}
281288

282-
private static boolean canTransition(final @NotNull StateEnum currentState, final @NotNull StateEnum newState) {
283-
final var allowedTransitions = possibleTransitions.get(currentState);
289+
private boolean canTransition(
290+
final @NotNull AdapterStateEnum currentState,
291+
final @NotNull AdapterStateEnum newState) {
292+
final var allowedTransitions = possibleAdapterStateTransitions.get(currentState);
284293
return allowedTransitions != null && allowedTransitions.contains(newState);
285294
}
286295

287-
private static boolean canTransition(final @NotNull AdapterStateEnum currentState, final @NotNull AdapterStateEnum newState) {
288-
final var allowedTransitions = possibleAdapterStateTransitions.get(currentState);
289-
return allowedTransitions != null && allowedTransitions.contains(newState);
296+
public enum StateEnum {
297+
DISCONNECTED,
298+
CONNECTING,
299+
CONNECTED,
300+
DISCONNECTING,
301+
ERROR_CLOSING,
302+
CLOSING,
303+
ERROR,
304+
CLOSED,
305+
NOT_SUPPORTED
306+
}
307+
308+
public enum AdapterStateEnum {
309+
STARTING,
310+
STARTED,
311+
STOPPING,
312+
STOPPED
313+
}
314+
315+
public record State(AdapterStateEnum adapter, StateEnum northbound, StateEnum southbound) {
290316
}
291317
}
292318

0 commit comments

Comments
 (0)