Skip to content

Commit 076048a

Browse files
committed
improve thread safety of ProtocolAdapterStateImpl
1 parent 6ec9fc9 commit 076048a

File tree

2 files changed

+47
-37
lines changed

2 files changed

+47
-37
lines changed

hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/ProtocolAdapterStateImpl.java

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,42 @@
2020
import com.hivemq.adapter.sdk.api.events.model.Payload;
2121
import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState;
2222
import com.hivemq.edge.modules.api.events.model.EventImpl;
23+
import org.apache.commons.lang3.exception.ExceptionUtils;
2324
import org.jetbrains.annotations.NotNull;
2425
import org.jetbrains.annotations.Nullable;
25-
import org.apache.commons.lang3.exception.ExceptionUtils;
2626

2727
import java.util.concurrent.atomic.AtomicReference;
2828
import java.util.function.Consumer;
2929

3030
public class ProtocolAdapterStateImpl implements ProtocolAdapterState {
31+
protected final @NotNull AtomicReference<RuntimeStatus> runtimeStatus;
32+
protected final @NotNull AtomicReference<ConnectionStatus> connectionStatus;
33+
protected final @NotNull AtomicReference<@Nullable String> lastErrorMessage;
3134
private final @NotNull EventService eventService;
3235
private final @NotNull String adapterId;
3336
private final @NotNull String protocolId;
34-
protected @NotNull AtomicReference<RuntimeStatus> runtimeStatus = new AtomicReference<>(RuntimeStatus.STOPPED);
35-
protected @NotNull AtomicReference<ConnectionStatus> connectionStatus =
36-
new AtomicReference<>(ConnectionStatus.DISCONNECTED);
37-
protected @Nullable String lastErrorMessage;
38-
private final @NotNull AtomicReference<Consumer<ConnectionStatus>> connectionStatusListener = new AtomicReference<>();
37+
private final @NotNull AtomicReference<Consumer<ConnectionStatus>> connectionStatusListener;
3938

40-
public ProtocolAdapterStateImpl(final @NotNull EventService eventService,
41-
final @NotNull String adapterId,
42-
final @NotNull String protocolId) {
39+
public ProtocolAdapterStateImpl(
40+
final @NotNull EventService eventService,
41+
final @NotNull String adapterId,
42+
final @NotNull String protocolId) {
4343
this.eventService = eventService;
4444
this.adapterId = adapterId;
4545
this.protocolId = protocolId;
46+
this.runtimeStatus = new AtomicReference<>(RuntimeStatus.STOPPED);
47+
this.connectionStatus = new AtomicReference<>(ConnectionStatus.DISCONNECTED);
48+
this.lastErrorMessage = new AtomicReference<>(null);
49+
this.connectionStatusListener = new AtomicReference<>();
4650
}
4751

4852
@Override
4953
public boolean setConnectionStatus(final @NotNull ConnectionStatus connectionStatus) {
5054
Preconditions.checkNotNull(connectionStatus);
5155
final var changed = this.connectionStatus.getAndSet(connectionStatus) != connectionStatus;
52-
if(changed) {
56+
if (changed) {
5357
final var listener = connectionStatusListener.get();
54-
if(listener != null) {
58+
if (listener != null) {
5559
listener.accept(connectionStatus);
5660
}
5761
}
@@ -68,11 +72,8 @@ public boolean setConnectionStatus(final @NotNull ConnectionStatus connectionSta
6872
* and the errorMessage to that supplied.
6973
*/
7074
@Override
71-
public void setErrorConnectionStatus(
72-
final @Nullable Throwable t,
73-
final @Nullable String errorMessage) {
74-
final boolean changed = setConnectionStatus(ConnectionStatus.ERROR);
75-
reportErrorMessage( t, errorMessage, changed);
75+
public void setErrorConnectionStatus(final @Nullable Throwable t, final @Nullable String errorMessage) {
76+
reportErrorMessage(t, errorMessage, setConnectionStatus(ConnectionStatus.ERROR));
7677
}
7778

7879
/**
@@ -86,33 +87,40 @@ public void reportErrorMessage(
8687
final @Nullable Throwable throwable,
8788
final @Nullable String errorMessage,
8889
final boolean sendEvent) {
89-
this.lastErrorMessage = errorMessage == null ? throwable == null ? null : throwable.getMessage() : errorMessage;
90+
final String msg = errorMessage == null ? throwable == null ? null : throwable.getMessage() : errorMessage;
91+
this.lastErrorMessage.set(msg);
9092
if (sendEvent) {
91-
eventService.createAdapterEvent(adapterId, protocolId)
93+
final var eventBuilder = eventService.createAdapterEvent(adapterId, protocolId)
9294
.withSeverity(EventImpl.SEVERITY.ERROR)
93-
.withMessage(String.format("Adapter '%s' encountered an error.", adapterId))
94-
.withPayload(Payload.ContentType.PLAIN_TEXT, ExceptionUtils.getStackTrace(throwable))
95-
.fire();
95+
.withMessage(String.format("Adapter '%s' encountered an error.", adapterId));
96+
if (throwable != null) {
97+
eventBuilder.withPayload(Payload.ContentType.PLAIN_TEXT, ExceptionUtils.getStackTrace(throwable));
98+
} else if (errorMessage != null) {
99+
eventBuilder.withPayload(Payload.ContentType.PLAIN_TEXT, errorMessage);
100+
}
101+
eventBuilder.fire();
96102
}
97103
}
98104

99105
@Override
100-
public void setRuntimeStatus(final @NotNull RuntimeStatus runtimeStatus) {
101-
this.runtimeStatus.set(runtimeStatus);
106+
public @NotNull RuntimeStatus getRuntimeStatus() {
107+
return this.runtimeStatus.get();
102108
}
103109

104110
@Override
105-
public @NotNull RuntimeStatus getRuntimeStatus() {
106-
return this.runtimeStatus.get();
111+
public void setRuntimeStatus(final @NotNull RuntimeStatus runtimeStatus) {
112+
this.runtimeStatus.set(runtimeStatus);
107113
}
108114

109115
@Override
110116
public @Nullable String getLastErrorMessage() {
111-
return lastErrorMessage;
117+
return lastErrorMessage.get();
112118
}
113119

114-
public void setConnectionStatusListener(final @NotNull Consumer<ConnectionStatus> connectionStatusListener) {
115-
this.connectionStatusListener.set(connectionStatusListener);
116-
connectionStatusListener.accept(connectionStatus.get());
120+
public void setConnectionStatusListener(final @NotNull Consumer<ConnectionStatus> listener) {
121+
// Capture current status before setting listener to reduce race window
122+
final ConnectionStatus currentStatus = connectionStatus.get();
123+
connectionStatusListener.set(listener);
124+
listener.accept(currentStatus);
117125
}
118126
}

hivemq-edge/src/main/java/com/hivemq/fsm/ProtocolAdapterFSM.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,21 @@ public enum AdapterStateEnum {
6969
AdapterStateEnum.STOPPING, Set.of(AdapterStateEnum.STOPPED)
7070
);
7171

72-
private final AtomicReference<StateEnum> northboundState = new AtomicReference<>(StateEnum.DISCONNECTED);
73-
private final AtomicReference<StateEnum> southboundState = new AtomicReference<>(StateEnum.DISCONNECTED);
74-
private final AtomicReference<AdapterStateEnum> adapterState = new AtomicReference<>(AdapterStateEnum.STOPPED);
75-
76-
private final List<Consumer<State>> stateTransitionListeners = new CopyOnWriteArrayList<>();
72+
private final @NotNull AtomicReference<StateEnum> northboundState;
73+
private final @NotNull AtomicReference<StateEnum> southboundState;
74+
private final @NotNull AtomicReference<AdapterStateEnum> adapterState;
75+
private final @NotNull List<Consumer<State>> stateTransitionListeners;
7776

7877
public record State(AdapterStateEnum state, StateEnum northbound, StateEnum southbound) { }
7978

80-
private final String adapterId;
79+
private final @NotNull String adapterId;
8180

8281
public ProtocolAdapterFSM(final @NotNull String adapterId) {
8382
this.adapterId = adapterId;
83+
this.northboundState = new AtomicReference<>(StateEnum.DISCONNECTED);
84+
this.southboundState = new AtomicReference<>(StateEnum.DISCONNECTED);
85+
this.adapterState = new AtomicReference<>(AdapterStateEnum.STOPPED);
86+
this.stateTransitionListeners = new CopyOnWriteArrayList<>();
8487
}
8588

8689
public abstract boolean onStarting();
@@ -172,7 +175,6 @@ public void accept(final ProtocolAdapterState.ConnectionStatus connectionStatus)
172175
final var transitionResult = switch (connectionStatus) {
173176
case CONNECTED ->
174177
transitionNorthboundState(StateEnum.CONNECTED) && startSouthbound();
175-
176178
case CONNECTING -> transitionNorthboundState(StateEnum.CONNECTING);
177179
case DISCONNECTED -> transitionNorthboundState(StateEnum.DISCONNECTED);
178180
case ERROR -> transitionNorthboundState(StateEnum.ERROR);
@@ -243,7 +245,7 @@ public void unregisterStateTransitionListener(final @NotNull Consumer<State> sta
243245
stateTransitionListeners.remove(stateTransitionListener);
244246
}
245247

246-
public State currentState() {
248+
public @NotNull State currentState() {
247249
return new State(adapterState.get(), northboundState.get(), southboundState.get());
248250
}
249251

0 commit comments

Comments
 (0)