Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
5de3923
Round one FSM
codepitbull Sep 26, 2025
bfca306
Start/Stop working
codepitbull Sep 26, 2025
70d8d94
Northbound working in FSM
codepitbull Sep 26, 2025
503cea7
CleanUp
codepitbull Sep 26, 2025
8a37383
CleanUp
codepitbull Sep 26, 2025
6929eb4
Stop northbound working
codepitbull Sep 26, 2025
625fcb1
add southbound transition logic
marregui Sep 26, 2025
a72bc00
small improvements
marregui Sep 30, 2025
b0c9934
document and add coverage
marregui Oct 1, 2025
0ac3de0
add license heathers
marregui Oct 20, 2025
34ec98a
integrate adapter FSM into adapter wrapper
marregui Oct 20, 2025
fe281a3
strengthen the thread-safety of the protocol adapter wrapper, clean r…
marregui Oct 20, 2025
bff57b8
fix broken tests due to missing transition to stopped
marregui Oct 20, 2025
eccb1ae
fix retry logic on contention
marregui Oct 20, 2025
2b3737e
fix race condition on stop
marregui Oct 20, 2025
c206682
cleanup not used parameters
marregui Oct 20, 2025
80eb3ec
improve thread model by using a shared executor.
marregui Oct 20, 2025
caf7104
improve tests, remove flakiness
marregui Oct 21, 2025
dc64f7c
improve start/close rest resource for protocol adapter
marregui Oct 21, 2025
1f24e08
improve thread safety of ProtocolAdapterStateImpl
marregui Oct 21, 2025
6e03980
remove race condition on stop
marregui Oct 21, 2025
d9ec4d6
propagate connection errors
marregui Oct 21, 2025
64e439b
stop adapters orderly on shutdown of EDGE
marregui Oct 21, 2025
5df1d7f
coordinate adapter shutdown
marregui Oct 21, 2025
7f7ff31
fix flakes in sql relater adapters
marregui Oct 22, 2025
df2051a
fix more flakes
marregui Oct 22, 2025
658c93f
fix more flakes, this time in the shutdown sequence
marregui Oct 22, 2025
6a80148
this was a mistake, revert
marregui Oct 22, 2025
1702274
cleaner shutdown in tests only
marregui Oct 22, 2025
81f7f33
cosmetic changes
marregui Oct 22, 2025
07f8a3d
fix shutdown sequence, improve code by simplifying
marregui Oct 23, 2025
32b3e64
fix misnomer
marregui Oct 23, 2025
7784973
fixes
marregui Oct 23, 2025
5b7c590
strengthen shutdown
marregui Oct 23, 2025
3a58ce9
strengthen tests
marregui Oct 23, 2025
dae7713
put back this stateless state in http adapter, which I removed in error
marregui Oct 23, 2025
f31d57a
cleanup resources after tests run
marregui Oct 24, 2025
12962b4
strengthen tests
marregui Oct 24, 2025
2f6f2a5
strengthen tests, use awaitability
marregui Oct 24, 2025
c7e49de
make database adapter/connection thread safe for FSM. ditto with file…
marregui Oct 25, 2025
48939c0
make CAS less aggressive, more fair, as per Sam's recommendation
marregui Oct 25, 2025
a2535b4
remove stress over adapter by not restarting it and instead providing…
marregui Oct 25, 2025
61f2a96
simplify code
marregui Oct 25, 2025
dd644d8
strengthen code
marregui Oct 25, 2025
cc1cca7
prevent thread leak
marregui Oct 25, 2025
9e4e8dc
dont propagate error event messages if the adapter is already stopped
marregui Oct 25, 2025
be8fb07
fix slurped delete of adapter
marregui Oct 26, 2025
e3b0e08
remove race condition
marregui Oct 26, 2025
f2ebdee
code improvements
marregui Oct 26, 2025
1b35b41
Changes by jmader
marregui Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions SESSION_0_FSM_REVIEW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

## Discussions for later
- We need to take care of modifications of the configuration file (Global lock)

## Tasks
- the state machine to functions within transitions (fully unit tested)
- (?) Add a testing protocol adapter for testing (let's see if we need it, mockito might be)
- new PR Epic!! ->
89 changes: 35 additions & 54 deletions hivemq-edge/src/main/java/com/hivemq/HiveMQEdgeMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,32 @@
import com.hivemq.bootstrap.LoggingBootstrap;
import com.hivemq.bootstrap.ioc.Injector;
import com.hivemq.bootstrap.ioc.Persistences;
import com.hivemq.bootstrap.services.AfterHiveMQStartBootstrapService;
import com.hivemq.bootstrap.services.AfterHiveMQStartBootstrapServiceImpl;
import com.hivemq.common.shutdown.ShutdownHooks;
import com.hivemq.configuration.info.SystemInformation;
import com.hivemq.configuration.info.SystemInformationImpl;
import com.hivemq.configuration.service.ApiConfigurationService;
import com.hivemq.configuration.service.ConfigurationService;
import com.hivemq.edge.modules.ModuleLoader;
import com.hivemq.embedded.EmbeddedExtension;
import com.hivemq.exceptions.HiveMQEdgeStartupException;
import com.hivemq.http.JaxrsHttpServer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import com.hivemq.http.JaxrsHttpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

public class HiveMQEdgeMain {
private static final Logger log = LoggerFactory.getLogger(HiveMQEdgeMain.class);
private static final @NotNull Logger log = LoggerFactory.getLogger(HiveMQEdgeMain.class);

private @Nullable ConfigurationService configService;
private final @NotNull ModuleLoader moduleLoader;
private final @NotNull MetricRegistry metricRegistry;
private final @NotNull SystemInformation systemInformation;

private @Nullable ConfigurationService configService;
private @Nullable JaxrsHttpServer jaxrsServer;

private @Nullable Injector injector;
private @Nullable Thread shutdownThread;

Expand All @@ -64,22 +61,30 @@ public HiveMQEdgeMain(
this.moduleLoader = moduleLoader;
}

public void bootstrap() throws HiveMQEdgeStartupException {
// Already bootstrapped.
if (injector != null) {
return;
public static void main(final String @NotNull [] args) throws Exception {
log.info("Starting HiveMQ Edge...");
final long startTime = System.nanoTime();
final SystemInformationImpl systemInformation = new SystemInformationImpl(true);
final ModuleLoader moduleLoader = new ModuleLoader(systemInformation);
final HiveMQEdgeMain server = new HiveMQEdgeMain(systemInformation, new MetricRegistry(), null, moduleLoader);
try {
server.start(null);
log.info("Started HiveMQ Edge in {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
} catch (final HiveMQEdgeStartupException e) {
log.error("HiveMQ Edge start was aborted with error.", e);
}
final HiveMQEdgeBootstrap bootstrap =
new HiveMQEdgeBootstrap(metricRegistry, systemInformation, moduleLoader, configService);

}

injector = bootstrap.bootstrap();
if (configService == null) {
configService = injector.configurationService();
public void bootstrap() throws HiveMQEdgeStartupException {
if (injector == null) {
injector =
new HiveMQEdgeBootstrap(metricRegistry, systemInformation, moduleLoader, configService).bootstrap();
if (configService == null) {
configService = injector.configurationService();
}
}
}


protected void startGateway(final @Nullable EmbeddedExtension embeddedExtension) throws HiveMQEdgeStartupException {
if (injector == null) {
throw new HiveMQEdgeStartupException("invalid startup state");
Expand All @@ -90,9 +95,7 @@ protected void startGateway(final @Nullable EmbeddedExtension embeddedExtension)
throw new HiveMQEdgeStartupException("User aborted.");
}

final HiveMQEdgeGateway instance = injector.edgeGateway();
instance.start(embeddedExtension);

injector.edgeGateway().start(embeddedExtension);
initializeApiServer(injector);
startApiServer();
}
Expand All @@ -102,25 +105,21 @@ protected void stopGateway() {
return;
}
final ShutdownHooks shutdownHooks = injector.shutdownHooks();
// Already shutdown.
if (shutdownHooks.isShuttingDown()) {
return;
}

shutdownHooks.runShutdownHooks();

//clear metrics
metricRegistry.removeMatching(MetricFilter.ALL);

//Stop the API Webserver
stopApiServer();

LoggingBootstrap.resetLogging();
}

protected void initializeApiServer(final @NotNull Injector injector) {
final ApiConfigurationService config = Objects.requireNonNull(configService).apiConfiguration();
if (jaxrsServer == null && config.isEnabled()) {
if (jaxrsServer == null && requireNonNull(configService).apiConfiguration().isEnabled()) {
jaxrsServer = injector.apiServer();
} else {
log.info("API is DISABLED by configuration");
Expand All @@ -142,21 +141,18 @@ protected void stopApiServer() {

protected void afterStart() {
afterHiveMQStartBootstrap();
//hook method
}

private void afterHiveMQStartBootstrap() {
Preconditions.checkNotNull(injector);
final Persistences persistences = injector.persistences();
Preconditions.checkNotNull(persistences);
Preconditions.checkNotNull(configService);

try {
final AfterHiveMQStartBootstrapService afterHiveMQStartBootstrapService =
AfterHiveMQStartBootstrapServiceImpl.decorate(injector.completeBootstrapService(),
injector.commercialModuleLoaderDiscovery()
.afterHiveMQStart(AfterHiveMQStartBootstrapServiceImpl.decorate(injector.completeBootstrapService(),
injector.protocolAdapterManager(),
injector.services().modulesAndExtensionsService());
injector.commercialModuleLoaderDiscovery().afterHiveMQStart(afterHiveMQStartBootstrapService);
injector.services().modulesAndExtensionsService()));
} catch (final Exception e) {
log.warn("Error on bootstrapping modules:", e);
throw new HiveMQEdgeStartupException(e);
Expand All @@ -174,31 +170,16 @@ public void start(final @Nullable EmbeddedExtension embeddedExtension)

public void stop() {
stopGateway();
try {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
} catch (final IllegalStateException ignored) {
//ignore
}
}

public static void main(final String @NotNull [] args) throws Exception {
log.info("Starting HiveMQ Edge...");
final long startTime = System.nanoTime();
final SystemInformationImpl systemInformation = new SystemInformationImpl(true);
final ModuleLoader moduleLoader = new ModuleLoader(systemInformation);
final HiveMQEdgeMain server =
new HiveMQEdgeMain(systemInformation, new MetricRegistry(), null, moduleLoader);
try {
server.start(null);
log.info("Started HiveMQ Edge in {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
} catch (final HiveMQEdgeStartupException e) {
log.error("HiveMQ Edge start was aborted with error.", e);
if (shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
} catch (final IllegalStateException ignored) {
//ignore
}
}
}

public @Nullable Injector getInjector() {
return injector;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@ public class ProtocolAdapter {
private final @NotNull String name;
@JsonProperty("description")
@Schema(description = "The description")
private final @NotNull String description;
private final @Nullable String description;
@JsonProperty("url")
@Schema(description = "The url of the adapter")
private final @NotNull String url;
private final @Nullable String url;
@JsonProperty("version")
@Schema(description = "The installed version of the adapter")
private final @NotNull String version;
@JsonProperty("logoUrl")
@Schema(description = "The logo of the adapter")
private final @NotNull String logoUrl;
private final @Nullable String logoUrl;
@JsonProperty("provisioningUrl")
@Schema(description = "The provisioning url of the adapter")
private final @NotNull String provisioningUrl;
private final @Nullable String provisioningUrl;
@JsonProperty("author")
@Schema(description = "The author of the adapter")
private final @NotNull String author;
@JsonProperty("installed")
@Schema(description = "Is the adapter installed?")
private final @NotNull Boolean installed;
private final @Nullable Boolean installed;
@JsonProperty("category")
@Schema(description = "The category of the adapter")
private final @NotNull ProtocolAdapterCategory category;
private final @Nullable ProtocolAdapterCategory category;
@JsonProperty("tags")
@Schema(description = "The search tags associated with this adapter")
private final @NotNull List<String> tags;
Expand All @@ -72,27 +72,27 @@ public class ProtocolAdapter {
private final @NotNull Set<Capability> capabilities;
@JsonProperty("configSchema")
@Schema(description = "JSONSchema in the 'https://json-schema.org/draft/2020-12/schema' format, which describes the configuration requirements for the adapter.")
private final @NotNull JsonNode configSchema;
private final @Nullable JsonNode configSchema;
@JsonProperty("uiSchema")
@Schema(description = "UISchema (see https://rjsf-team.github.io/react-jsonschema-form/docs/api-reference/uiSchema/), which describes the UI rendering of the configuration for the adapter.")
private final @NotNull JsonNode uiSchema;
private final @Nullable JsonNode uiSchema;

public ProtocolAdapter(
@JsonProperty("id") final @NotNull String id,
@JsonProperty("protocol") final @NotNull String protocol,
@JsonProperty("name") final @NotNull String name,
@JsonProperty("description") final @NotNull String description,
@JsonProperty("url") final @NotNull String url,
@JsonProperty("description") final @Nullable String description,
@JsonProperty("url") final @Nullable String url,
@JsonProperty("version") final @NotNull String version,
@JsonProperty("logoUrl") final @NotNull String logoUrl,
@JsonProperty("logoUrl") final @Nullable String logoUrl,
@JsonProperty("provisioningUrl") final @Nullable String provisioningUrl,
@JsonProperty("author") final @NotNull String author,
@JsonProperty("installed") final @Nullable Boolean installed,
@JsonProperty("capabilities") final @NotNull Set<Capability> capabilities,
@JsonProperty("category") final @Nullable ProtocolAdapterCategory category,
@JsonProperty("tags") final @Nullable List<String> tags,
@JsonProperty("configSchema") final @NotNull JsonNode configSchema,
@JsonProperty("uiSchema") final @NotNull JsonNode uiSchema) {
@JsonProperty("tags") final @NotNull List<String> tags,
@JsonProperty("configSchema") final @Nullable JsonNode configSchema,
@JsonProperty("uiSchema") final @Nullable JsonNode uiSchema) {
this.id = id;
this.protocol = protocol;
this.name = name;
Expand Down Expand Up @@ -122,19 +122,19 @@ public ProtocolAdapter(
return name;
}

public @NotNull String getDescription() {
public @Nullable String getDescription() {
return description;
}

public @NotNull String getUrl() {
public @Nullable String getUrl() {
return url;
}

public @NotNull String getVersion() {
return version;
}

public @NotNull String getLogoUrl() {
public @Nullable String getLogoUrl() {
return logoUrl;
}

Expand All @@ -146,7 +146,7 @@ public ProtocolAdapter(
return author;
}

public @NotNull JsonNode getConfigSchema() {
public @Nullable JsonNode getConfigSchema() {
return configSchema;
}

Expand All @@ -158,7 +158,7 @@ public ProtocolAdapter(
return installed;
}

public @Nullable List<String> getTags() {
public @NotNull List<String> getTags() {
return tags;
}

Expand All @@ -172,7 +172,7 @@ public ProtocolAdapter(

@Override
public boolean equals(final @Nullable Object o) {
return this == o || o instanceof final ProtocolAdapter that && Objects.equals(id, that.id);
return this == o || (o instanceof final ProtocolAdapter that && Objects.equals(id, that.id));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.hivemq.edge.HiveMQEdgeConstants;
import io.swagger.v3.oas.annotations.media.Schema;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import io.swagger.v3.oas.annotations.media.Schema;

/**
* A category is a unique entity and represents a curated grouping of a protocol adapter. A protocol adapter
* maybe in 1 category.
*
* @author Simon L Johnson
*/
public class ProtocolAdapterCategory {

Expand All @@ -34,7 +32,7 @@ public class ProtocolAdapterCategory {
description = "The unique name of the category to be used in API communication.",
format = "string",
minLength = 1,
required = true,
requiredMode = Schema.RequiredMode.REQUIRED,
maxLength = HiveMQEdgeConstants.MAX_NAME_LEN,
pattern = HiveMQEdgeConstants.NAME_REGEX)
private final @NotNull String name;
Expand All @@ -44,20 +42,16 @@ public class ProtocolAdapterCategory {
description = "The display name of the category to be used in HCIs.",
format = "string",
minLength = 1,
required = true)
requiredMode = Schema.RequiredMode.REQUIRED)
private final @NotNull String displayName;

@JsonProperty("description")
@Schema(name = "description",
description = "The description associated with the category.",
format = "string")
private final @NotNull String description;
@Schema(name = "description", description = "The description associated with the category.", format = "string")
private final @Nullable String description;

@JsonProperty("image")
@Schema(name = "image",
description = "The image associated with the category.",
format = "string")
private final @NotNull String image;
@Schema(name = "image", description = "The image associated with the category.", format = "string")
private final @Nullable String image;

public ProtocolAdapterCategory(
@JsonProperty("name") final @NotNull String name,
Expand All @@ -70,19 +64,19 @@ public ProtocolAdapterCategory(
this.image = image;
}

public String getName() {
public @NotNull String getName() {
return name;
}

public String getDisplayName() {
public @NotNull String getDisplayName() {
return displayName;
}

public String getDescription() {
public @Nullable String getDescription() {
return description;
}

public String getImage() {
public @Nullable String getImage() {
return image;
}
}
Loading
Loading