Skip to content

Commit 4959c0b

Browse files
committed
make database adapter/connection thread safe for FSM. ditto with file polling
1 parent 73bb08a commit 4959c0b

File tree

3 files changed

+78
-33
lines changed

3 files changed

+78
-33
lines changed

modules/hivemq-edge-module-databases/src/main/java/com/hivemq/edge/adapters/databases/DatabaseConnection.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import java.sql.Connection;
2727
import java.sql.SQLException;
2828
import java.util.Properties;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930

3031
public class DatabaseConnection {
3132
private static final @NotNull Logger log = LoggerFactory.getLogger(DatabaseConnection.class);
3233
private final @NotNull HikariConfig config;
33-
private @Nullable HikariDataSource ds;
34+
private final @NotNull AtomicBoolean connected = new AtomicBoolean(false);
35+
private volatile @Nullable HikariDataSource ds;
3436

3537
public DatabaseConnection(
3638
final @NotNull DatabaseType dbType,
@@ -88,6 +90,10 @@ public DatabaseConnection(
8890
}
8991

9092
public void connect() {
93+
if (!connected.compareAndSet(false, true)) {
94+
log.debug("Database connection already established, skipping connect");
95+
return; // Already connected
96+
}
9197
log.debug("Connection settings : {}", config.toString());
9298
this.ds = new HikariDataSource(config);
9399
}
@@ -100,6 +106,10 @@ public void connect() {
100106
}
101107

102108
public void close() {
109+
if (!connected.compareAndSet(true, false)) {
110+
log.debug("Database connection already closed or not connected");
111+
return; // Already closed or never connected
112+
}
103113
if (ds != null && !ds.isClosed()) {
104114
log.debug("Closing HikariCP datasource");
105115
try {

modules/hivemq-edge-module-databases/src/main/java/com/hivemq/edge/adapters/databases/DatabasesPollingProtocolAdapter.java

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,22 @@
4545
import java.sql.Types;
4646
import java.util.ArrayList;
4747
import java.util.List;
48-
48+
import java.util.concurrent.atomic.AtomicBoolean;
4949

5050
public class DatabasesPollingProtocolAdapter implements BatchPollingProtocolAdapter {
5151

5252
public static final int TIMEOUT = 30;
5353
private static final @NotNull Logger log = LoggerFactory.getLogger(DatabasesPollingProtocolAdapter.class);
5454
private static final @NotNull ObjectMapper OBJECT_MAPPER = new ObjectMapper();
55+
5556
private final @NotNull DatabasesAdapterConfig adapterConfig;
5657
private final @NotNull ProtocolAdapterInformation adapterInformation;
5758
private final @NotNull ProtocolAdapterState protocolAdapterState;
5859
private final @NotNull String adapterId;
5960
private final @NotNull List<Tag> tags;
6061
private final @NotNull DatabaseConnection databaseConnection;
6162
private final @NotNull AdapterFactories adapterFactories;
63+
private final @NotNull AtomicBoolean started;
6264

6365
public DatabasesPollingProtocolAdapter(
6466
final @NotNull ProtocolAdapterInformation adapterInformation,
@@ -69,9 +71,8 @@ public DatabasesPollingProtocolAdapter(
6971
this.protocolAdapterState = input.getProtocolAdapterState();
7072
this.tags = input.getTags();
7173
this.adapterFactories = input.adapterFactories();
72-
74+
this.started = new AtomicBoolean(false);
7375
log.debug("Building connection string");
74-
7576
this.databaseConnection = new DatabaseConnection(adapterConfig.getType(),
7677
adapterConfig.getServer(),
7778
adapterConfig.getPort(),
@@ -91,53 +92,74 @@ public DatabasesPollingProtocolAdapter(
9192
public void start(
9293
final @NotNull ProtocolAdapterStartInput input,
9394
final @NotNull ProtocolAdapterStartOutput output) {
94-
log.debug("Loading PostgreSQL Driver");
95-
try {
96-
Class.forName("org.postgresql.Driver");
97-
} catch (final ClassNotFoundException e) {
98-
output.failStart(e, null);
95+
if (!started.compareAndSet(false, true)) {
96+
log.debug("Database adapter {} already started, returning success", adapterId);
97+
output.startedSuccessfully();
9998
return;
10099
}
101100

102-
log.debug("Loading MariaDB Driver (for MySQL)");
103101
try {
104-
Class.forName("org.mariadb.jdbc.Driver");
105-
} catch (final ClassNotFoundException e) {
106-
output.failStart(e, null);
107-
return;
108-
}
102+
log.debug("Loading PostgreSQL Driver");
103+
try {
104+
Class.forName("org.postgresql.Driver");
105+
} catch (final ClassNotFoundException e) {
106+
output.failStart(e, null);
107+
started.set(false);
108+
return;
109+
}
109110

110-
log.debug("Loading MS SQL Driver");
111-
try {
112-
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDataSource");
113-
} catch (final ClassNotFoundException e) {
114-
output.failStart(e, null);
115-
return;
116-
}
111+
log.debug("Loading MariaDB Driver (for MySQL)");
112+
try {
113+
Class.forName("org.mariadb.jdbc.Driver");
114+
} catch (final ClassNotFoundException e) {
115+
output.failStart(e, null);
116+
started.set(false);
117+
return;
118+
}
117119

120+
log.debug("Loading MS SQL Driver");
121+
try {
122+
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDataSource");
123+
} catch (final ClassNotFoundException e) {
124+
output.failStart(e, null);
125+
started.set(false);
126+
return;
127+
}
118128

119-
databaseConnection.connect();
129+
databaseConnection.connect();
120130

121-
try {
122-
log.debug("Starting connection to the database instance");
123-
if (databaseConnection.getConnection().isValid(TIMEOUT)) {
124-
output.startedSuccessfully();
125-
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
126-
} else {
127-
output.failStart(new Throwable("Error connecting database, please check the configuration"),
128-
"Error connecting database, please check the configuration");
131+
try {
132+
log.debug("Starting connection to the database instance");
133+
if (databaseConnection.getConnection().isValid(TIMEOUT)) {
134+
output.startedSuccessfully();
135+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.CONNECTED);
136+
} else {
137+
output.failStart(new Throwable("Error connecting database, please check the configuration"),
138+
"Error connecting database, please check the configuration");
139+
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
140+
started.set(false);
141+
}
142+
} catch (final Exception e) {
143+
output.failStart(e, null);
129144
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
145+
started.set(false);
130146
}
131147
} catch (final Exception e) {
148+
log.error("Unexpected error during adapter start", e);
132149
output.failStart(e, null);
133-
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.DISCONNECTED);
150+
started.set(false);
134151
}
135152
}
136153

137154
@Override
138155
public void stop(
139156
final @NotNull ProtocolAdapterStopInput protocolAdapterStopInput,
140157
final @NotNull ProtocolAdapterStopOutput protocolAdapterStopOutput) {
158+
if (!started.compareAndSet(true, false)) {
159+
log.debug("Database adapter {} already stopped, returning success", adapterId);
160+
protocolAdapterStopOutput.stoppedSuccessfully();
161+
return;
162+
}
141163
databaseConnection.close();
142164
protocolAdapterStopOutput.stoppedSuccessfully();
143165
}
@@ -247,5 +269,4 @@ public int getPollingIntervalMillis() {
247269
public int getMaxPollingErrorsBeforeRemoval() {
248270
return adapterConfig.getMaxPollingErrorsBeforeRemoval();
249271
}
250-
251272
}

modules/hivemq-edge-module-file/src/main/java/com/hivemq/edge/adapters/file/FilePollingProtocolAdapter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.nio.file.Files;
3838
import java.nio.file.Path;
3939
import java.util.List;
40+
import java.util.concurrent.atomic.AtomicBoolean;
4041

4142

4243
public class FilePollingProtocolAdapter implements BatchPollingProtocolAdapter {
@@ -48,6 +49,7 @@ public class FilePollingProtocolAdapter implements BatchPollingProtocolAdapter {
4849
private final @NotNull ProtocolAdapterInformation adapterInformation;
4950
private final @NotNull ProtocolAdapterState protocolAdapterState;
5051
private final @NotNull List<FileTag> tags;
52+
private final @NotNull AtomicBoolean started;
5153

5254
public FilePollingProtocolAdapter(
5355
final @NotNull String adapterId,
@@ -58,6 +60,7 @@ public FilePollingProtocolAdapter(
5860
this.adapterConfig = input.getConfig();
5961
this.tags = input.getTags().stream().map(tag -> (FileTag)tag).toList();
6062
this.protocolAdapterState = input.getProtocolAdapterState();
63+
this.started = new AtomicBoolean(false);
6164
}
6265

6366
@Override
@@ -68,11 +71,17 @@ public FilePollingProtocolAdapter(
6871
@Override
6972
public void start(
7073
final @NotNull ProtocolAdapterStartInput input, final @NotNull ProtocolAdapterStartOutput output) {
74+
if (!started.compareAndSet(false, true)) {
75+
LOG.debug("File adapter {} already started, returning success", adapterId);
76+
output.startedSuccessfully();
77+
return;
78+
}
7179
// any setup which should be done before the adapter starts polling comes here.
7280
try {
7381
protocolAdapterState.setConnectionStatus(ProtocolAdapterState.ConnectionStatus.STATELESS);
7482
output.startedSuccessfully();
7583
} catch (final Exception e) {
84+
started.set(false);
7685
output.failStart(e, null);
7786
}
7887
}
@@ -81,6 +90,11 @@ public void start(
8190
public void stop(
8291
final @NotNull ProtocolAdapterStopInput protocolAdapterStopInput,
8392
final @NotNull ProtocolAdapterStopOutput protocolAdapterStopOutput) {
93+
if (!started.compareAndSet(true, false)) {
94+
LOG.debug("File adapter {} already stopped, returning success", adapterId);
95+
protocolAdapterStopOutput.stoppedSuccessfully();
96+
return;
97+
}
8498
protocolAdapterStopOutput.stoppedSuccessfully();
8599
}
86100

0 commit comments

Comments
 (0)