Skip to content

Commit 26de8d9

Browse files
committed
make CAS less aggressive, more fair, as per Sam's recommendation
1 parent 4959c0b commit 26de8d9

File tree

2 files changed

+45
-14
lines changed

2 files changed

+45
-14
lines changed

hivemq-edge/src/main/java/com/hivemq/fsm/ProtocolAdapterFSM.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Set;
2626
import java.util.concurrent.CopyOnWriteArrayList;
2727
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.concurrent.locks.LockSupport;
2829
import java.util.function.Consumer;
2930

3031
public abstract class ProtocolAdapterFSM implements Consumer<ProtocolAdapterState.ConnectionStatus> {
@@ -120,15 +121,24 @@ public void stopAdapter() {
120121
}
121122

122123
public boolean transitionAdapterState(final @NotNull AdapterStateEnum newState) {
124+
int retryCount = 0;
123125
while (true) {
124126
final var currentState = adapterState.get();
125127
if (canTransition(currentState, newState)) {
126128
if (adapterState.compareAndSet(currentState, newState)) {
129+
final State snapshotState = new State(newState, northboundState.get(), southboundState.get());
127130
log.debug("Adapter state transition from {} to {} for adapter {}", currentState, newState, adapterId);
128-
notifyListenersAboutStateTransition(currentState());
131+
notifyListenersAboutStateTransition(snapshotState);
129132
return true;
130133
}
131-
// CAS failed due to concurrent modification, retry
134+
retryCount++;
135+
if (retryCount > 3) {
136+
// progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
137+
// reduces CPU consumption and cache line contention under high load
138+
final long backoffNanos = Math.min(1_000L * (1L << (retryCount - 4)), 100_000L);
139+
LockSupport.parkNanos(backoffNanos);
140+
}
141+
// Fast retry for attempts 1-3 (optimizes for low contention case)
132142
} else {
133143
// Transition not allowed from current state
134144
throw new IllegalStateException("Cannot transition adapter state to " + newState);
@@ -137,15 +147,23 @@ public boolean transitionAdapterState(final @NotNull AdapterStateEnum newState)
137147
}
138148

139149
public boolean transitionNorthboundState(final @NotNull StateEnum newState) {
150+
int retryCount = 0;
140151
while (true) {
141152
final var currentState = northboundState.get();
142153
if (canTransition(currentState, newState)) {
143154
if (northboundState.compareAndSet(currentState, newState)) {
155+
final State snapshotState = new State(adapterState.get(), newState, southboundState.get());
144156
log.debug("Northbound state transition from {} to {} for adapter {}", currentState, newState, adapterId);
145-
notifyListenersAboutStateTransition(currentState());
157+
notifyListenersAboutStateTransition(snapshotState);
146158
return true;
147159
}
148-
// CAS failed due to concurrent modification, retry
160+
retryCount++;
161+
if (retryCount > 3) {
162+
// progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
163+
final long backoffNanos = Math.min(1_000L * (1L << (retryCount - 4)), 100_000L);
164+
LockSupport.parkNanos(backoffNanos);
165+
}
166+
// Fast retry for attempts 1-3 (optimizes for low contention case)
149167
} else {
150168
// Transition not allowed from current state
151169
throw new IllegalStateException("Cannot transition northbound state to " + newState);
@@ -154,15 +172,23 @@ public boolean transitionNorthboundState(final @NotNull StateEnum newState) {
154172
}
155173

156174
public boolean transitionSouthboundState(final @NotNull StateEnum newState) {
175+
int retryCount = 0;
157176
while (true) {
158177
final var currentState = southboundState.get();
159178
if (canTransition(currentState, newState)) {
160179
if (southboundState.compareAndSet(currentState, newState)) {
180+
final State snapshotState = new State(adapterState.get(), northboundState.get(), newState);
161181
log.debug("Southbound state transition from {} to {} for adapter {}", currentState, newState, adapterId);
162-
notifyListenersAboutStateTransition(currentState());
182+
notifyListenersAboutStateTransition(snapshotState);
163183
return true;
164184
}
165-
// CAS failed due to concurrent modification, retry
185+
retryCount++;
186+
if (retryCount > 3) {
187+
// progressive backoff: 1μs, 2μs, 4μs, 8μs, ..., capped at 100μs
188+
final long backoffNanos = Math.min(1_000L * (1L << (retryCount - 4)), 100_000L);
189+
LockSupport.parkNanos(backoffNanos);
190+
}
191+
// Fast retry for attempts 1-3 (optimizes for low contention case)
166192
} else {
167193
// Transition not allowed from current state
168194
throw new IllegalStateException("Cannot transition southbound state to " + newState);

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public class ProtocolAdapterWrapper extends ProtocolAdapterFSM {
8484
private @Nullable CompletableFuture<Void> currentStartFuture;
8585
private @Nullable CompletableFuture<Void> currentStopFuture;
8686
private @Nullable Consumer<ProtocolAdapterState.ConnectionStatus> connectionStatusListener;
87+
private volatile boolean startOperationInProgress;
88+
private volatile boolean stopOperationInProgress;
8789

8890
public ProtocolAdapterWrapper(
8991
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
@@ -164,22 +166,22 @@ public boolean startSouthbound() {
164166
public @NotNull CompletableFuture<Void> startAsync(final @NotNull ModuleServices moduleServices) {
165167
operationLock.lock();
166168
try {
167-
// validate state
168-
if (currentStartFuture != null && !currentStartFuture.isDone()) {
169+
if (startOperationInProgress) {
169170
log.info("Start operation already in progress for adapter '{}'", getId());
170171
return currentStartFuture;
171172
}
172173
if (adapterState.get() == AdapterStateEnum.STARTED) {
173174
log.info("Adapter '{}' is already started, returning success", getId());
174175
return CompletableFuture.completedFuture(null);
175176
}
176-
if (currentStopFuture != null && !currentStopFuture.isDone()) {
177+
if (stopOperationInProgress) {
177178
log.warn("Cannot start adapter '{}' while stop operation is in progress", getId());
178179
return CompletableFuture.failedFuture(new IllegalStateException("Cannot start adapter '" +
179180
adapter.getId() +
180181
"' while stop operation is in progress"));
181182
}
182183

184+
startOperationInProgress = true;
183185
lastStartAttemptTime = System.currentTimeMillis();
184186
currentStartFuture =
185187
CompletableFuture.supplyAsync(startProtocolAdapter(moduleServices), sharedAdapterExecutor)
@@ -196,6 +198,7 @@ public boolean startSouthbound() {
196198
}
197199
operationLock.lock();
198200
try {
201+
startOperationInProgress = false;
199202
currentStartFuture = null;
200203
} finally {
201204
operationLock.unlock();
@@ -210,8 +213,7 @@ public boolean startSouthbound() {
210213
public @NotNull CompletableFuture<Void> stopAsync() {
211214
operationLock.lock();
212215
try {
213-
// validate state
214-
if (currentStopFuture != null && !currentStopFuture.isDone()) {
216+
if (stopOperationInProgress) {
215217
log.info("Stop operation already in progress for adapter '{}'", getId());
216218
return currentStopFuture;
217219
}
@@ -220,13 +222,14 @@ public boolean startSouthbound() {
220222
log.info("Adapter '{}' is already stopped, returning success", getId());
221223
return CompletableFuture.completedFuture(null);
222224
}
223-
if (currentStartFuture != null && !currentStartFuture.isDone()) {
225+
if (startOperationInProgress) {
224226
log.warn("Cannot stop adapter '{}' while start operation is in progress", getId());
225227
return CompletableFuture.failedFuture(new IllegalStateException("Cannot stop adapter '" +
226228
adapter.getId() +
227229
"' while start operation is in progress"));
228230
}
229231

232+
stopOperationInProgress = true;
230233
log.debug("Adapter '{}': Creating stop operation future", getId());
231234
currentStopFuture = CompletableFuture.supplyAsync(this::stopProtocolAdapter, sharedAdapterExecutor)
232235
.thenCompose(Function.identity())
@@ -244,6 +247,7 @@ public boolean startSouthbound() {
244247
}
245248
operationLock.lock();
246249
try {
250+
stopOperationInProgress = false;
247251
currentStopFuture = null;
248252
} finally {
249253
operationLock.unlock();
@@ -342,9 +346,10 @@ public boolean isBatchPolling() {
342346
}
343347

344348
private void cleanupConnectionStatusListener() {
345-
if (connectionStatusListener != null) {
346-
protocolAdapterState.setConnectionStatusListener(CONNECTION_STATUS_NOOP_CONSUMER);
349+
final Consumer<ProtocolAdapterState.ConnectionStatus> listenerToClean = connectionStatusListener;
350+
if (listenerToClean != null) {
347351
connectionStatusListener = null;
352+
protocolAdapterState.setConnectionStatusListener(CONNECTION_STATUS_NOOP_CONSUMER);
348353
}
349354
}
350355

0 commit comments

Comments
 (0)