diff --git a/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java b/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java index 5ad7affe6f..ea43d41ef6 100644 --- a/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java @@ -124,6 +124,8 @@ public class ProtocolAdaptersResourceImpl extends AbstractApi implements Protoco // no-op }; private static final @NotNull Logger log = LoggerFactory.getLogger(ProtocolAdaptersResourceImpl.class); + private static final long RETRY_TIMEOUT_MILLIS = 5000; + private static final long RETRY_INTERVAL_MILLIS = 200; private final @NotNull HiveMQEdgeRemoteService remoteService; private final @NotNull ConfigurationService configurationService; diff --git a/hivemq-edge/src/main/java/com/hivemq/bootstrap/LoggingBootstrap.java b/hivemq-edge/src/main/java/com/hivemq/bootstrap/LoggingBootstrap.java index b2c8b6b657..a718867932 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bootstrap/LoggingBootstrap.java +++ b/hivemq-edge/src/main/java/com/hivemq/bootstrap/LoggingBootstrap.java @@ -214,6 +214,7 @@ public void onStart(final @NotNull LoggerContext context) { @Override public void onReset(final @NotNull LoggerContext context) { log.trace("logback.xml was changed"); + context.getTurboFilterList().remove(logLevelModifierTurboFilter); context.addTurboFilter(logLevelModifierTurboFilter); } diff --git a/hivemq-edge/src/main/java/com/hivemq/bootstrap/factories/WritingServiceProvider.java b/hivemq-edge/src/main/java/com/hivemq/bootstrap/factories/WritingServiceProvider.java index ad28ef09d5..c8a9ead2cb 100644 --- a/hivemq-edge/src/main/java/com/hivemq/bootstrap/factories/WritingServiceProvider.java +++ b/hivemq-edge/src/main/java/com/hivemq/bootstrap/factories/WritingServiceProvider.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.CompletableFuture; @Singleton public class WritingServiceProvider { @@ -78,12 +79,13 @@ public boolean writingEnabled() { @Override - public boolean startWriting( + public @NotNull CompletableFuture startWritingAsync( final @NotNull WritingProtocolAdapter writingProtocolAdapter, final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService, final @NotNull List southboundMappings) { log.warn("No bidirectional module is currently installed. Writing to PLCs is currently not supported."); - return true; } + return CompletableFuture.completedFuture(true); + } @Override public void stopWriting( diff --git a/hivemq-edge/src/main/java/com/hivemq/edge/modules/ModuleLoader.java b/hivemq-edge/src/main/java/com/hivemq/edge/modules/ModuleLoader.java index 19a63fc5d0..a20d2a7d20 100644 --- a/hivemq-edge/src/main/java/com/hivemq/edge/modules/ModuleLoader.java +++ b/hivemq-edge/src/main/java/com/hivemq/edge/modules/ModuleLoader.java @@ -20,72 +20,72 @@ import com.hivemq.edge.modules.adapters.impl.IsolatedModuleClassloader; import com.hivemq.extensions.loader.ClassServiceLoader; import com.hivemq.http.handlers.AlternativeClassloadingStaticFileHandler; +import jakarta.inject.Inject; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Inject; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; public class ModuleLoader { - private static final Logger log = LoggerFactory.getLogger(ModuleLoader.class); - - private final @NotNull SystemInformation systemInformation; - protected final @NotNull Set modules = new HashSet<>(); - protected final @NotNull Comparator fileComparator = (o1, o2) -> { + protected static final @NotNull Comparator fileComparator = (o1, o2) -> { final long delta = o2.lastModified() - o1.lastModified(); - // we cna easily get an overflow within months, so we can not use the delta directly by casting it to integer! - if (delta == 0) { - return 0; - } else if (delta < 0) { - return -1; - } else { - return 1; - } + return delta == 0 ? 0 : delta < 0 ? -1 : 1; }; - - private final @NotNull ClassServiceLoader classServiceLoader = new ClassServiceLoader(); - private final AtomicBoolean loaded = new AtomicBoolean(); + private static final @NotNull Logger log = LoggerFactory.getLogger(ModuleLoader.class); + protected final @NotNull Set modules; + private final @NotNull SystemInformation systemInformation; + private final @NotNull ClassServiceLoader classServiceLoader; + private final @NotNull AtomicBoolean loaded; @Inject public ModuleLoader(final @NotNull SystemInformation systemInformation) { this.systemInformation = systemInformation; + this.classServiceLoader = new ClassServiceLoader(); + this.loaded = new AtomicBoolean(false); + this.modules = Collections.newSetFromMap(new ConcurrentHashMap<>()); + } + + private static void logException(final @NotNull File file, final @NotNull IOException ioException) { + log.warn("Exception with reason {} while reading module file {}", + ioException.getMessage(), + file.getAbsolutePath()); + log.debug("Original exception", ioException); } public void loadModules() { - if (loaded.get()) { - // avoid duplicate loads - return; - } - loaded.set(true); - final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); - if (Boolean.getBoolean(HiveMQEdgeConstants.DEVELOPMENT_MODE)) { - log.info(String.format("Welcome '%s' is starting...", "48 69 76 65 4D 51 45 64 67 65")); - log.warn( - "\n################################################################################################################\n" + - "# You are running HiveMQ Edge in Development Mode and Modules will be loaded from your workspace NOT your #\n" + - "# HIVEMQ_HOME/modules directory. To load runtime modules from your HOME directory please remove #\n" + - "# '-Dhivemq.edge.workspace.modules=true' from your startup script #\n" + - "################################################################################################################"); - loadFromWorkspace(contextClassLoader); - // load the commercial module loader from the workspace folder - // the loadFromWorkspace() will not find it. - log.info("Loading the commercial module loader from workspace."); - loadCommercialModuleLoaderFromWorkSpace(contextClassLoader); - } else { - // the commercial module loader will be found here in case of a "normal" running hivemq edge - loadFromModulesDirectory(contextClassLoader); + if (loaded.compareAndSet(false, true)) { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + if (Boolean.getBoolean(HiveMQEdgeConstants.DEVELOPMENT_MODE)) { + log.info(String.format("Welcome '%s' is starting...", "48 69 76 65 4D 51 45 64 67 65")); + log.warn(""" + + ################################################################################################################ + # You are running HiveMQ Edge in Development Mode and Modules will be loaded from your workspace NOT your # + # HIVEMQ_HOME/modules directory. To load runtime modules from your HOME directory please remove # + # '-Dhivemq.edge.workspace.modules=true' from your startup script # + ################################################################################################################"""); + loadFromWorkspace(contextClassLoader); + // load the commercial module loader from the workspace folder + // the loadFromWorkspace() will not find it. + log.info("Loading the commercial module loader from workspace."); + loadCommercialModuleLoaderFromWorkSpace(contextClassLoader); + } else { + // the commercial module loader will be found here in case of a "normal" running hivemq edge + loadFromModulesDirectory(contextClassLoader); + } } } @@ -98,31 +98,24 @@ private void loadCommercialModuleLoaderFromWorkSpace(final @NotNull ClassLoader return; } - - final File commercialModuleLoaderLibFolder = - new File(commercialModulesRepoRootFolder, "hivemq-edge-commercial-modules-loader/build/libs"); - if (!commercialModuleLoaderLibFolder.exists()) { + final File libs = new File(commercialModulesRepoRootFolder, "hivemq-edge-commercial-modules-loader/build/libs"); + if (!libs.exists()) { log.error("Could not load commercial module loader as the assumed lib folder '{}' does not exist.", - commercialModuleLoaderLibFolder.getAbsolutePath()); + libs.getAbsolutePath()); return; } - - final File[] tmp = commercialModuleLoaderLibFolder.listFiles(file -> file.getName().endsWith("proguarded.jar")); - + final File[] tmp = libs.listFiles(file -> file.getName().endsWith("proguarded.jar")); if (tmp == null || tmp.length == 0) { - log.info("No commercial module loader jar was discovered in libs folder '{}'", - commercialModuleLoaderLibFolder); + log.info("No commercial module loader jar was discovered in libs folder '{}'", libs); return; } - final List potentialCommercialModuleJars = - new ArrayList<>(Arrays.stream(tmp).sorted(fileComparator).toList()); - - final String absolutePathJar = potentialCommercialModuleJars.get(0).getAbsolutePath(); - if (potentialCommercialModuleJars.size() > 1) { + final List jars = new ArrayList<>(Arrays.stream(tmp).sorted(fileComparator).toList()); + final String absolutePathJar = jars.get(0).getAbsolutePath(); + if (jars.size() > 1) { log.debug( "More than one commercial module loader jar was discovered in libs folder '{}'. Clean unwanted jars to avoid loading the wrong version. The first found jar '{}' will be loaded.", - commercialModuleLoaderLibFolder, + libs, absolutePathJar); } else { log.info("Commercial Module jar '{}' was discovered.", absolutePathJar); @@ -138,15 +131,14 @@ private void loadCommercialModulesLoaderJar(final File jarFile, final @NotNull C log.error("", e); } log.info("Loading commercial module loader from {}", jarFile.getAbsoluteFile()); - final IsolatedModuleClassloader isolatedClassloader = - new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader); - modules.add(new ModuleLoader.EdgeModule(jarFile, isolatedClassloader, false)); + modules.add(new ModuleLoader.EdgeModule(jarFile, + new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader), + false)); } protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader) { log.debug("Loading modules from development workspace."); - final File userDir = new File(System.getProperty("user.dir")); - loadFromWorkspace(parentClassloader, userDir); + loadFromWorkspace(parentClassloader, new File(System.getProperty("user.dir"))); } /** @@ -158,7 +150,7 @@ protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader) { * @param parentClassloader the parent classloader * @param currentDir the current dir */ - protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader, final @NotNull File currentDir) { + private void loadFromWorkspace(final @NotNull ClassLoader parentClassloader, final @NotNull File currentDir) { if (currentDir.exists() && currentDir.isDirectory()) { if (currentDir.getName().equals("hivemq-edge")) { discoverWorkspaceModule(new File(currentDir, "modules"), parentClassloader); @@ -173,7 +165,7 @@ protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader, f } } - protected void discoverWorkspaceModule(final File dir, final @NotNull ClassLoader parentClassloader) { + protected void discoverWorkspaceModule(final @NotNull File dir, final @NotNull ClassLoader parentClassloader) { if (dir.exists()) { final File[] files = dir.listFiles(pathname -> pathname.isDirectory() && pathname.canRead() && @@ -196,14 +188,11 @@ protected void discoverWorkspaceModule(final File dir, final @NotNull ClassLoade urls.add(jar.toURI().toURL()); } } - final IsolatedModuleClassloader isolatedClassloader = - new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader); - modules.add(new EdgeModule(file, isolatedClassloader, true)); + modules.add(new EdgeModule(file, + new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader), + true)); } catch (final IOException ioException) { - log.warn("Exception with reason {} while reading module file {}", - ioException.getMessage(), - file.getAbsolutePath()); - log.debug("Original exception", ioException); + logException(file, ioException); } } } @@ -225,10 +214,7 @@ protected void loadFromModulesDirectory(final @NotNull ClassLoader parentClasslo log.debug("Ignoring non jar file in module folder {}.", lib.getAbsolutePath()); } } catch (final IOException ioException) { - log.warn("Exception with reason {} while reading module file {}", - ioException.getMessage(), - lib.getAbsolutePath()); - log.debug("Original exception", ioException); + logException(lib, ioException); } } } @@ -256,7 +242,7 @@ protected void loadFromModulesDirectory(final @NotNull ClassLoader parentClasslo } public @NotNull Set getModules() { - return modules; + return Collections.unmodifiableSet(modules); } public void clear() { @@ -264,7 +250,6 @@ public void clear() { } public static class EdgeModule { - private final @NotNull File root; private final @NotNull ClassLoader classloader; diff --git a/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/ProtocolAdapterStateImpl.java b/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/ProtocolAdapterStateImpl.java index 7a87ca2fed..7b08c12d96 100644 --- a/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/ProtocolAdapterStateImpl.java +++ b/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/ProtocolAdapterStateImpl.java @@ -20,38 +20,42 @@ import com.hivemq.adapter.sdk.api.events.model.Payload; import com.hivemq.adapter.sdk.api.state.ProtocolAdapterState; import com.hivemq.edge.modules.api.events.model.EventImpl; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; public class ProtocolAdapterStateImpl implements ProtocolAdapterState { + private final @NotNull AtomicReference runtimeStatus; + private final @NotNull AtomicReference connectionStatus; + private final @NotNull AtomicReference<@Nullable String> lastErrorMessage; private final @NotNull EventService eventService; private final @NotNull String adapterId; private final @NotNull String protocolId; - protected @NotNull AtomicReference runtimeStatus = new AtomicReference<>(RuntimeStatus.STOPPED); - protected @NotNull AtomicReference connectionStatus = - new AtomicReference<>(ConnectionStatus.DISCONNECTED); - protected @Nullable String lastErrorMessage; - private final AtomicReference> connectionStatusListener = new AtomicReference<>(); + private final @NotNull AtomicReference> connectionStatusListener; - public ProtocolAdapterStateImpl(final @NotNull EventService eventService, - final @NotNull String adapterId, - final @NotNull String protocolId) { + public ProtocolAdapterStateImpl( + final @NotNull EventService eventService, + final @NotNull String adapterId, + final @NotNull String protocolId) { this.eventService = eventService; this.adapterId = adapterId; this.protocolId = protocolId; + this.runtimeStatus = new AtomicReference<>(RuntimeStatus.STOPPED); + this.connectionStatus = new AtomicReference<>(ConnectionStatus.DISCONNECTED); + this.lastErrorMessage = new AtomicReference<>(null); + this.connectionStatusListener = new AtomicReference<>(); } @Override public boolean setConnectionStatus(final @NotNull ConnectionStatus connectionStatus) { Preconditions.checkNotNull(connectionStatus); final var changed = this.connectionStatus.getAndSet(connectionStatus) != connectionStatus; - if(changed) { + if (changed) { final var listener = connectionStatusListener.get(); - if(listener != null) { + if (listener != null) { listener.accept(connectionStatus); } } @@ -68,52 +72,50 @@ public boolean setConnectionStatus(final @NotNull ConnectionStatus connectionSta * and the errorMessage to that supplied. */ @Override - public void setErrorConnectionStatus( - final @Nullable Throwable t, - final @Nullable String errorMessage) { - final boolean changed = setConnectionStatus(ConnectionStatus.ERROR); - reportErrorMessage( t, errorMessage, changed); + public void setErrorConnectionStatus(final @Nullable Throwable t, final @Nullable String errorMessage) { + reportErrorMessage(t, errorMessage, setConnectionStatus(ConnectionStatus.ERROR)); } - /** - * Sets the last error message associated with the adapter runtime. This is can be sent through the API to - * give an indication of the status of an adapter runtime. - * - * @param errorMessage - */ @Override public void reportErrorMessage( final @Nullable Throwable throwable, final @Nullable String errorMessage, final boolean sendEvent) { - this.lastErrorMessage = errorMessage == null ? throwable == null ? null : throwable.getMessage() : errorMessage; + // Sets the last error message associated with the adapter runtime. + // This is can be sent through the API to give an indication of the + // status of an adapter runtime. + lastErrorMessage.set(errorMessage == null ? throwable == null ? null : throwable.getMessage() : errorMessage); if (sendEvent) { - eventService.createAdapterEvent(adapterId, protocolId) + final var eventBuilder = eventService.createAdapterEvent(adapterId, protocolId) .withSeverity(EventImpl.SEVERITY.ERROR) - .withMessage(String.format("Adapter '%s' encountered an error.", adapterId)) - .withPayload(Payload.ContentType.PLAIN_TEXT, ExceptionUtils.getStackTrace(throwable)) - .fire(); + .withMessage(String.format("Adapter '%s' encountered an error.", adapterId)); + if (throwable != null) { + eventBuilder.withPayload(Payload.ContentType.PLAIN_TEXT, ExceptionUtils.getStackTrace(throwable)); + } else if (errorMessage != null) { + eventBuilder.withPayload(Payload.ContentType.PLAIN_TEXT, errorMessage); + } + eventBuilder.fire(); } } @Override - public void setRuntimeStatus(final @NotNull RuntimeStatus runtimeStatus) { - this.runtimeStatus.set(runtimeStatus); + public @NotNull RuntimeStatus getRuntimeStatus() { + return runtimeStatus.get(); } @Override - public @NotNull RuntimeStatus getRuntimeStatus() { - return this.runtimeStatus.get(); + public void setRuntimeStatus(final @NotNull RuntimeStatus status) { + runtimeStatus.set(status); } @Override public @Nullable String getLastErrorMessage() { - return lastErrorMessage; + return lastErrorMessage.get(); } - public void setConnectionStatusListener(final @NotNull Consumer connectionStatusListener) { - this.connectionStatusListener.set(connectionStatusListener); - connectionStatusListener.accept(connectionStatus.get()); + public void setConnectionStatusListener(final @NotNull Consumer listener) { + final ConnectionStatus currentStatus = connectionStatus.get(); + connectionStatusListener.set(listener); + listener.accept(currentStatus); } - } diff --git a/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/PollingTask.java b/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/PollingTask.java index bfe9f4e653..85ab22daaa 100644 --- a/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/PollingTask.java +++ b/hivemq-edge/src/main/java/com/hivemq/edge/modules/adapters/impl/polling/PollingTask.java @@ -19,9 +19,9 @@ import com.hivemq.adapter.sdk.api.events.model.Event; import com.hivemq.configuration.service.InternalConfigurations; import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingSampler; -import org.jetbrains.annotations.NotNull; import com.hivemq.util.ExceptionUtils; import com.hivemq.util.NanoTimeProvider; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +29,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class PollingTask implements Runnable { @@ -42,12 +44,11 @@ public class PollingTask implements Runnable { private final @NotNull ScheduledExecutorService scheduledExecutorService; private final @NotNull EventService eventService; private final @NotNull NanoTimeProvider nanoTimeProvider; - private final @NotNull AtomicInteger watchdogErrorCount = new AtomicInteger(); - private final @NotNull AtomicInteger applicationErrorCount = new AtomicInteger(); - + private final @NotNull AtomicInteger watchdogErrorCount; + private final @NotNull AtomicInteger applicationErrorCount; + private final @NotNull AtomicBoolean continueScheduling; + private final @NotNull AtomicReference> currentScheduledFuture; private volatile long nanosOfLastPolling; - private final @NotNull AtomicBoolean continueScheduling = new AtomicBoolean(true); - public PollingTask( final @NotNull ProtocolAdapterPollingSampler sampler, @@ -58,6 +59,19 @@ public PollingTask( this.scheduledExecutorService = scheduledExecutorService; this.eventService = eventService; this.nanoTimeProvider = nanoTimeProvider; + this.watchdogErrorCount = new AtomicInteger(); + this.applicationErrorCount = new AtomicInteger(); + this.continueScheduling = new AtomicBoolean(true); + this.currentScheduledFuture = new AtomicReference<>(); + } + + private static long getBackoff(final int errorCount) { + //-- This will backoff up to a max of about a day (unless the max provided is less) + final long max = InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get(); + long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100); + f += ThreadLocalRandom.current().nextInt(0, errorCount * 100); + f = Math.min(f, max); + return f; } @Override @@ -90,6 +104,10 @@ public void run() { public void stopScheduling() { continueScheduling.set(false); + final ScheduledFuture future = currentScheduledFuture.getAndSet(null); + if (future != null) { + future.cancel(true); + } } private void handleInterruptionException(final @NotNull Throwable throwable) { @@ -99,7 +117,8 @@ private void handleInterruptionException(final @NotNull Throwable throwable) { final var errorCountTotal = watchdogErrorCount.incrementAndGet(); final var stopBecauseOfTooManyErrors = errorCountTotal > InternalConfigurations.ADAPTER_RUNTIME_WATCHDOG_TIMEOUT_ERRORS_BEFORE_INTERRUPT.get(); - final var milliSecondsSinceLastPoll = TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling); + final var milliSecondsSinceLastPoll = + TimeUnit.NANOSECONDS.toMillis(nanoTimeProvider.nanoTime() - nanosOfLastPolling); if (stopBecauseOfTooManyErrors) { log.warn( "Detected bad system process {} in sampler {} - terminating process to maintain health ({}ms runtime)", @@ -121,7 +140,6 @@ private void handleInterruptionException(final @NotNull Throwable throwable) { } } - private void handleExceptionDuringPolling(final @NotNull Throwable throwable) { final int errorCountTotal = applicationErrorCount.incrementAndGet(); final int maxErrorsBeforeRemoval = sampler.getMaxErrorsBeforeRemoval(); @@ -145,11 +163,12 @@ private void handleExceptionDuringPolling(final @NotNull Throwable throwable) { notifyOnError(sampler, throwable, false); // no rescheduling } - } private void notifyOnError( - final @NotNull ProtocolAdapterPollingSampler sampler, final @NotNull Throwable t, final boolean continuing) { + final @NotNull ProtocolAdapterPollingSampler sampler, + final @NotNull Throwable t, + final boolean continuing) { try { sampler.error(t, continuing); } catch (final Throwable samplerError) { @@ -178,12 +197,10 @@ private void reschedule(final int errorCountTotal) { } final long nonNegativeDelay = Math.max(0, delayInMillis); - if (errorCountTotal == 0) { schedule(nonNegativeDelay); } else { - final long backoff = getBackoff(errorCountTotal, - InternalConfigurations.ADAPTER_RUNTIME_MAX_APPLICATION_ERROR_BACKOFF.get()); + final long backoff = getBackoff(errorCountTotal); final long effectiveDelay = Math.max(nonNegativeDelay, backoff); schedule(effectiveDelay); } @@ -193,8 +210,10 @@ private void reschedule(final int errorCountTotal) { void schedule(final long nonNegativeDelay) { if (continueScheduling.get()) { try { - scheduledExecutorService.schedule(this, nonNegativeDelay, TimeUnit.MILLISECONDS); - } catch (final RejectedExecutionException rejectedExecutionException) { + currentScheduledFuture.set(scheduledExecutorService.schedule(this, + nonNegativeDelay, + TimeUnit.MILLISECONDS)); + } catch (final RejectedExecutionException ignored) { // ignore. This is fine during shutdown. } } @@ -204,12 +223,4 @@ private void resetErrorStats() { applicationErrorCount.set(0); watchdogErrorCount.set(0); } - - private static long getBackoff(final int errorCount, final long max) { - //-- This will backoff up to a max of about a day (unless the max provided is less) - long f = (long) (Math.pow(2, Math.min(errorCount, 20)) * 100); - f += ThreadLocalRandom.current().nextInt(0, errorCount * 100); - f = Math.min(f, max); - return f; - } } diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/InternalProtocolAdapterWritingService.java b/hivemq-edge/src/main/java/com/hivemq/protocols/InternalProtocolAdapterWritingService.java index 187442cc50..a9c645dd60 100644 --- a/hivemq-edge/src/main/java/com/hivemq/protocols/InternalProtocolAdapterWritingService.java +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/InternalProtocolAdapterWritingService.java @@ -17,7 +17,6 @@ import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService; import com.hivemq.adapter.sdk.api.services.ProtocolAdapterWritingService; -import com.hivemq.adapter.sdk.api.writing.WritingContext; import com.hivemq.adapter.sdk.api.writing.WritingProtocolAdapter; import org.jetbrains.annotations.NotNull; @@ -26,13 +25,14 @@ public interface InternalProtocolAdapterWritingService extends ProtocolAdapterWritingService { - - boolean startWriting( + @NotNull CompletableFuture startWritingAsync( @NotNull WritingProtocolAdapter writingProtocolAdapter, @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService, @NotNull List writingContexts); - void stopWriting(@NotNull WritingProtocolAdapter writingProtocolAdapter, final @NotNull List writingContexts); + void stopWriting( + @NotNull WritingProtocolAdapter writingProtocolAdapter, + final @NotNull List writingContexts); void addWritingChangedCallback(@NotNull WritingChangedCallback callback); diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java index 8df0b009bb..1c00808372 100644 --- a/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java @@ -46,7 +46,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -56,16 +55,6 @@ public class ProtocolAdapterWrapper { private static final Logger log = LoggerFactory.getLogger(ProtocolAdapterWrapper.class); - - /** - * Represents the current operation state of the adapter to handle concurrent start/stop operations. - */ - private enum OperationState { - IDLE, - STARTING, - STOPPING - } - private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService; private final @NotNull ProtocolAdapter adapter; private final @NotNull ProtocolAdapterFactory adapterFactory; @@ -76,31 +65,11 @@ private enum OperationState { private final @NotNull ProtocolAdapterConfig config; private final @NotNull NorthboundConsumerFactory northboundConsumerFactory; private final @NotNull TagManager tagManager; - protected @Nullable Long lastStartAttemptTime; private final List consumers = new ArrayList<>(); - private final AtomicReference> startFutureRef = new AtomicReference<>(null); private final AtomicReference> stopFutureRef = new AtomicReference<>(null); - private final AtomicReference operationState = new AtomicReference<>(OperationState.IDLE); - - /** - * Exception thrown when attempting to start an adapter while a stop operation is in progress. - */ - public static class AdapterStartConflictException extends RuntimeException { - public AdapterStartConflictException(final String adapterId) { - super("Cannot start adapter '" + adapterId + "' while stop operation is in progress"); - } - } - - /** - * Exception thrown when attempting to stop an adapter while a start operation is in progress. - */ - public static class AdapterStopConflictException extends RuntimeException { - public AdapterStopConflictException(final String adapterId) { - super("Cannot stop adapter '" + adapterId + "' while start operation is in progress"); - } - } + protected @Nullable Long lastStartAttemptTime; public ProtocolAdapterWrapper( final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService, @@ -129,7 +98,9 @@ public ProtocolAdapterWrapper( final boolean writingEnabled, final @NotNull ModuleServices moduleServices) { final var existingStartFuture = getOngoingOperationIfPresent(operationState.get(), OperationState.STARTING); - if (existingStartFuture != null) return existingStartFuture; + if (existingStartFuture != null) { + return existingStartFuture; + } // Try to atomically transition from IDLE to STARTING if (!operationState.compareAndSet(OperationState.IDLE, OperationState.STARTING)) { // State changed between check and set, retry @@ -138,44 +109,46 @@ public ProtocolAdapterWrapper( initStartAttempt(); final var output = new ProtocolAdapterStartOutputImpl(); final var input = new ProtocolAdapterStartInputImpl(moduleServices); - final var startFuture = CompletableFuture - .supplyAsync(() -> { - try { - adapter.start(input, output); - } catch (final Throwable throwable) { - output.getStartFuture().completeExceptionally(throwable); - } - return output.getStartFuture(); - }) - .thenCompose(Function.identity()) - .handle((ignored, error) -> { - if(error != null) { - log.error("Error starting adapter", error); + final var startFuture = CompletableFuture.supplyAsync(() -> { + try { + adapter.start(input, output); + } catch (final Throwable throwable) { + output.getStartFuture().completeExceptionally(throwable); + } + return output.getStartFuture(); + }).thenCompose(Function.identity()).handle((ignored, error) -> { + if (error != null) { + log.error("Error starting adapter", error); + stopAfterFailedStart(); + protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); + //we still return the initial error since that's the most significant information + return CompletableFuture.failedFuture(error); + } else { + return attemptStartingConsumers(writingEnabled, + moduleServices.eventService()).handle((success, startException) -> { + if (startException == null) { + protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED); + if (success) { + log.debug("Successfully started adapter with id {}", adapter.getId()); + } else { + log.debug("Partially started adapter with id {}", adapter.getId()); + } + } else { + log.error("Failed to start adapter with id {}", adapter.getId(), startException); stopAfterFailedStart(); - protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); //we still return the initial error since that's the most significant information - return CompletableFuture.failedFuture(error); - } else { - return attemptStartingConsumers(writingEnabled, moduleServices.eventService()) - .map(startException -> { - log.error("Failed to start adapter with id {}", adapter.getId(), startException); - stopAfterFailedStart(); - protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); - //we still return the initial error since that's the most significant information - return CompletableFuture.failedFuture(startException); - }) - .orElseGet(() -> { - protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED); - return CompletableFuture.completedFuture(null); - }); + protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); + throw new RuntimeException("Failed to start adapter with id " + adapter.getId(), + startException); } - }) - .thenApply(ignored -> (Void)null) - .whenComplete((result, throwable) -> { - //always clean up state - startFutureRef.set(null); - operationState.set(OperationState.IDLE); + return success; }); + } + }).thenApply(ignored -> (Void) null).whenComplete((result, throwable) -> { + //always clean up state + startFutureRef.set(null); + operationState.set(OperationState.IDLE); + }); startFutureRef.set(startFuture); return startFuture; @@ -200,36 +173,61 @@ private void stopAfterFailedStart() { } } - private Optional attemptStartingConsumers(final boolean writingEnabled, final @NotNull EventService eventService) { + private @NotNull CompletableFuture attemptStartingConsumers( + final boolean writingEnabled, + final @NotNull EventService eventService) { + final CompletableFuture future = new CompletableFuture<>(); try { //Adapter started successfully, now start the consumers createAndSubscribeTagConsumer(); startPolling(protocolAdapterPollingService, eventService); - if(writingEnabled && isWriting()) { - final var started = new AtomicBoolean(false); + if (writingEnabled && isWriting()) { + final AtomicBoolean futureCompleted = new AtomicBoolean(false); protocolAdapterState.setConnectionStatusListener(status -> { - if(status == ProtocolAdapterState.ConnectionStatus.CONNECTED) { - if(started.compareAndSet(false, true)) { - if(startWriting(protocolAdapterWritingService)) { - log.info("Successfully started adapter with id {}", adapter.getId()); - } else { - log.error("Protocol adapter start failed as data hub is not available."); + switch (status) { + case CONNECTED -> { + if (futureCompleted.compareAndSet(false, true)) { + try { + if (startWritingAsync(protocolAdapterWritingService).get()) { + log.info("Successfully started adapter with id {}", adapter.getId()); + future.complete(true); + } else { + log.error("Protocol adapter start writing failed as data hub is not available."); + future.complete(true); + } + } catch (final Exception e) { + log.error("Failed to start writing for adapter with id {}.", adapter.getId(), e); + future.complete(true); + } + } + } + case ERROR -> { + if (futureCompleted.compareAndSet(false, true)) { + log.error("Failed to start writing for adapter with id {} because the status is {}.", + adapter.getId(), + status); + future.complete(true); } } } }); + } else { + log.debug("Protocol adapter with id {} does not support Southbound.", adapter.getId()); + future.complete(true); } } catch (final Throwable e) { - log.error("Protocol adapter start failed"); - return Optional.of(e); + log.error("Protocol adapter with id {} failed to be started.", adapter.getId()); + future.completeExceptionally(e); } - return Optional.empty(); + return future; } public @NotNull CompletableFuture stopAsync(final boolean destroy) { final var existingStopFuture = getOngoingOperationIfPresent(operationState.get(), OperationState.STOPPING); - if (existingStopFuture != null) return existingStopFuture; + if (existingStopFuture != null) { + return existingStopFuture; + } // Try to atomically transition from IDLE to STOPPING if (!operationState.compareAndSet(OperationState.IDLE, OperationState.STOPPING)) { @@ -247,46 +245,47 @@ private Optional attemptStartingConsumers(final boolean writingEnable final var input = new ProtocolAdapterStopInputImpl(); final var output = new ProtocolAdapterStopOutputImpl(); - final var stopFuture = CompletableFuture - .supplyAsync(() -> { - stopPolling(protocolAdapterPollingService); - stopWriting(protocolAdapterWritingService); - try { - adapter.stop(input, output); - } catch (final Throwable throwable) { - output.getOutputFuture().completeExceptionally(throwable); - } - return output.getOutputFuture(); - }) - .thenCompose(Function.identity()) - .whenComplete((result, throwable) -> { - if (destroy) { - log.info("Destroying adapter with id '{}'", getId()); - adapter.destroy(); - } - if (throwable == null) { - log.info("Stopped adapter with id {}", adapter.getId()); - } else { - log.error("Error stopping adapter with id {}", adapter.getId(), throwable); - } - protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); - stopFutureRef.set(null); - operationState.set(OperationState.IDLE); - }); + final var stopFuture = CompletableFuture.supplyAsync(() -> { + stopPolling(protocolAdapterPollingService); + stopWriting(protocolAdapterWritingService); + try { + adapter.stop(input, output); + } catch (final Throwable throwable) { + output.getOutputFuture().completeExceptionally(throwable); + } + return output.getOutputFuture(); + }).thenCompose(Function.identity()).whenComplete((result, throwable) -> { + if (destroy) { + log.info("Destroying adapter with id '{}'", getId()); + adapter.destroy(); + } + if (throwable == null) { + log.info("Stopped adapter with id {}", adapter.getId()); + } else { + log.error("Error stopping adapter with id {}", adapter.getId(), throwable); + } + protocolAdapterState.setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); + stopFutureRef.set(null); + operationState.set(OperationState.IDLE); + }); stopFutureRef.set(stopFuture); return stopFuture; } - private @Nullable CompletableFuture getOngoingOperationIfPresent(final @NotNull OperationState currentState, final @NotNull OperationState targetState) { + private @Nullable CompletableFuture getOngoingOperationIfPresent( + final @NotNull OperationState currentState, + final @NotNull OperationState targetState) { switch (currentState) { case STARTING: - if(targetState == OperationState.STARTING) { + if (targetState == OperationState.STARTING) { // If already starting, return existing future final var existingStartFuture = startFutureRef.get(); if (existingStartFuture != null) { - log.info("Start operation already in progress for adapter with id '{}', returning existing future", getId()); + log.info( + "Start operation already in progress for adapter with id '{}', returning existing future", + getId()); return existingStartFuture; } } else { @@ -295,11 +294,13 @@ private Optional attemptStartingConsumers(final boolean writingEnable } break; case STOPPING: - if(targetState == OperationState.STOPPING) { + if (targetState == OperationState.STOPPING) { // If already stopping, return existing future final var existingStopFuture = stopFutureRef.get(); if (existingStopFuture != null) { - log.info("Stop operation already in progress for adapter with id '{}', returning existing future", getId()); + log.info( + "Stop operation already in progress for adapter with id '{}', returning existing future", + getId()); return existingStopFuture; } break; @@ -319,7 +320,8 @@ private Optional attemptStartingConsumers(final boolean writingEnable } public void discoverValues( - final @NotNull ProtocolAdapterDiscoveryInput input, final @NotNull ProtocolAdapterDiscoveryOutput output) { + final @NotNull ProtocolAdapterDiscoveryInput input, + final @NotNull ProtocolAdapterDiscoveryOutput output) { adapter.discoverValues(input, output); } @@ -331,6 +333,10 @@ public void discoverValues( return protocolAdapterState.getRuntimeStatus(); } + public void setRuntimeStatus(final @NotNull ProtocolAdapterState.RuntimeStatus runtimeStatus) { + protocolAdapterState.setRuntimeStatus(runtimeStatus); + } + public @Nullable String getErrorMessage() { return protocolAdapterState.getLastErrorMessage(); } @@ -387,10 +393,6 @@ public void setErrorConnectionStatus(final @NotNull Throwable exception, final @ protocolAdapterState.setErrorConnectionStatus(exception, errorMessage); } - public void setRuntimeStatus(final @NotNull ProtocolAdapterState.RuntimeStatus runtimeStatus) { - protocolAdapterState.setRuntimeStatus(runtimeStatus); - } - public boolean isWriting() { return adapter instanceof WritingProtocolAdapter; } @@ -409,27 +411,23 @@ private void startPolling( if (isBatchPolling()) { log.debug("Schedule batch polling for protocol adapter with id '{}'", getId()); - final PerAdapterSampler sampler = - new PerAdapterSampler(this, eventService, tagManager); + final PerAdapterSampler sampler = new PerAdapterSampler(this, eventService, tagManager); protocolAdapterPollingService.schedulePolling(sampler); } if (isPolling()) { config.getTags().forEach(tag -> { - final PerContextSampler sampler = - new PerContextSampler( - this, - new PollingContextWrapper( - "unused", - tag.getName(), - MessageHandlingOptions.MQTTMessagePerTag, - false, - false, - List.of(), - 1, - -1), - eventService, - tagManager); + final PerContextSampler sampler = new PerContextSampler(this, + new PollingContextWrapper("unused", + tag.getName(), + MessageHandlingOptions.MQTTMessagePerTag, + false, + false, + List.of(), + 1, + -1), + eventService, + tagManager); protocolAdapterPollingService.schedulePolling(sampler); }); } @@ -443,39 +441,66 @@ private void stopPolling( } } - private @NotNull boolean startWriting(final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) { + private @NotNull CompletableFuture startWritingAsync( + final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) { log.debug("Start writing for protocol adapter with id '{}'", getId()); final var southboundMappings = getSouthboundMappings(); final var writingContexts = southboundMappings.stream() - .map(InternalWritingContextImpl::new) - .collect(Collectors.toList()); + .map(InternalWritingContextImpl::new) + .collect(Collectors.toList()); - return protocolAdapterWritingService - .startWriting( - (WritingProtocolAdapter) getAdapter(), - getProtocolAdapterMetricsService(), - writingContexts); + return protocolAdapterWritingService.startWritingAsync((WritingProtocolAdapter) getAdapter(), + getProtocolAdapterMetricsService(), + writingContexts); } private void stopWriting(final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) { //no check for 'writing is enabled', as we have to stop it anyway since the license could have been removed in the meantime. if (isWriting()) { log.debug("Stopping writing for protocol adapter with id '{}'", getId()); - final var writingContexts = - getSouthboundMappings().stream() - .map(mapping -> (InternalWritingContext)new InternalWritingContextImpl(mapping)) - .toList(); + final var writingContexts = getSouthboundMappings().stream() + .map(mapping -> (InternalWritingContext) new InternalWritingContextImpl(mapping)) + .toList(); protocolAdapterWritingService.stopWriting((WritingProtocolAdapter) getAdapter(), writingContexts); } } private void createAndSubscribeTagConsumer() { getNorthboundMappings().stream() - .map(northboundMapping -> northboundConsumerFactory.build(this, northboundMapping, protocolAdapterMetricsService)) + .map(northboundMapping -> northboundConsumerFactory.build(this, + northboundMapping, + protocolAdapterMetricsService)) .forEach(northboundTagConsumer -> { tagManager.addConsumer(northboundTagConsumer); consumers.add(northboundTagConsumer); }); } + + /** + * Represents the current operation state of the adapter to handle concurrent start/stop operations. + */ + private enum OperationState { + IDLE, + STARTING, + STOPPING + } + + /** + * Exception thrown when attempting to start an adapter while a stop operation is in progress. + */ + public static class AdapterStartConflictException extends RuntimeException { + public AdapterStartConflictException(final String adapterId) { + super("Cannot start adapter '" + adapterId + "' while stop operation is in progress"); + } + } + + /** + * Exception thrown when attempting to stop an adapter while a start operation is in progress. + */ + public static class AdapterStopConflictException extends RuntimeException { + public AdapterStopConflictException(final String adapterId) { + super("Cannot stop adapter '" + adapterId + "' while start operation is in progress"); + } + } } diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterConnectionState.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterConnectionState.java new file mode 100644 index 0000000000..17d87c49ad --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterConnectionState.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +import org.jetbrains.annotations.NotNull; + +import java.util.function.Function; + +public enum ProtocolAdapterConnectionState { + Closed(context -> 0), + Closing(context -> 0), + Connected(context -> 0), + Connecting(context -> 0), + Disconnected(context -> 0), + Disconnecting(context -> 0), + Error(context -> 0), + ErrorClosing(context -> 0), + ; + + private final @NotNull Function transitionFunction; + + ProtocolAdapterConnectionState(@NotNull final Function transitionFunction) { + this.transitionFunction = transitionFunction; + } + + public @NotNull Integer transition( + final @NotNull ProtocolAdapterConnectionState targetState, + final @NotNull Object context) { + return transitionFunction.apply(context); + } +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterInstance.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterInstance.java new file mode 100644 index 0000000000..5a9b53674e --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterInstance.java @@ -0,0 +1,52 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +import org.jetbrains.annotations.NotNull; + +public class ProtocolAdapterInstance { + protected @NotNull ProtocolAdapterState state; + protected @NotNull ProtocolAdapterConnectionState connectionState; + + public ProtocolAdapterInstance() { + connectionState = ProtocolAdapterConnectionState.Closed; + state = ProtocolAdapterState.Stopped; + } + + public @NotNull ProtocolAdapterState getState() { + return state; + } + + public @NotNull ProtocolAdapterConnectionState getConnectionState() { + return connectionState; + } + + public synchronized @NotNull ProtocolAdapterTransitionResponse transitionTo(final @NotNull ProtocolAdapterState newState) { + final ProtocolAdapterTransitionResponse response = state.transition(newState, this); + if (response.status() == ProtocolAdapterTransitionStatus.Success) { + this.state = response.state(); + } else { + // Handle error (logging, throwing exception, etc.) + switch (response.status()) { + default -> { + // TODO + } + } + } + return response; + } +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterOperator.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterOperator.java new file mode 100644 index 0000000000..4bfede77ae --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterOperator.java @@ -0,0 +1,20 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +public class ProtocolAdapterOperator { +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterState.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterState.java new file mode 100644 index 0000000000..d9e683b37b --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterState.java @@ -0,0 +1,145 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +import org.jetbrains.annotations.NotNull; + +import java.util.function.BiFunction; + +public enum ProtocolAdapterState { + Starting(ProtocolAdapterState::transitionFromStarting), + Started(ProtocolAdapterState::transitionFromStarted), + Stopping(ProtocolAdapterState::transitionFromStopping), + Stopped(ProtocolAdapterState::transitionFromStopped), + Error(ProtocolAdapterState::transitionFromError), + ; + + private final @NotNull BiFunction + transitionFunction; + + ProtocolAdapterState(@NotNull final BiFunction transitionFunction) { + this.transitionFunction = transitionFunction; + } + + public static ProtocolAdapterTransitionResponse transitionFromStarted( + final @NotNull ProtocolAdapterState toState, + final @NotNull ProtocolAdapterInstance instance) { + switch (toState) { + case Starting: + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Starting); + case Started: + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Started); + case Stopping: + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopping); + case Stopped: + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopped); + default: + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Error); + } + } + + public static ProtocolAdapterTransitionResponse transitionFromStarting( + final @NotNull ProtocolAdapterState toState, + final @NotNull ProtocolAdapterInstance instance) { + return switch (toState) { + case Starting -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Starting); + case Started -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Started); + case Stopping -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopping); + case Stopped -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopped); + default -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Error); + }; + } + + public static @NotNull ProtocolAdapterTransitionResponse transitionFromStopped( + final @NotNull ProtocolAdapterState toState, + final @NotNull ProtocolAdapterInstance instance) { + return switch (toState) { + case Starting -> transitionFromStoppedToStarting(instance); + case Started, Stopping -> transitionToError(ProtocolAdapterState.Stopped, toState); + case Stopped -> transitionWithoutChanges(toState); + case Error -> transitionFromStoppedToError(instance); + }; + } + + public static ProtocolAdapterTransitionResponse transitionFromStoppedToError(final @NotNull ProtocolAdapterInstance instance) { + try { + // Do something to error. +// instance.doSomething(); + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Error); + } catch (final Exception e) { + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Error, + ProtocolAdapterTransitionStatus.Failure, + "Failed transition from Stopped to Error.", + e); + } + } + + public static ProtocolAdapterTransitionResponse transitionFromStoppedToStarting(final @NotNull ProtocolAdapterInstance instance) { + try { + // Do something to start the protocol adapter. + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Starting); + } catch (final Exception e) { + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopped, + ProtocolAdapterTransitionStatus.Failure, + "Failed transition from Stopped to Starting.", + e); + } + } + + public static @NotNull ProtocolAdapterTransitionResponse transitionWithoutChanges(final @NotNull ProtocolAdapterState toState) { + return new ProtocolAdapterTransitionResponse(toState); + } + + public static @NotNull ProtocolAdapterTransitionResponse transitionToError( + final @NotNull ProtocolAdapterState fromState, + final @NotNull ProtocolAdapterState toState) { + return new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Error, + ProtocolAdapterTransitionStatus.Failure, + "Unable to transition from " + fromState + " to " + toState + ".", + null); + } + + public static ProtocolAdapterTransitionResponse transitionFromStopping( + final @NotNull ProtocolAdapterState toState, + final @NotNull ProtocolAdapterInstance instance) { + return switch (toState) { + case Starting -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Starting); + case Started -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Started); + case Stopping -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopping); + case Stopped -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopped); + default -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Error); + }; + } + + public static ProtocolAdapterTransitionResponse transitionFromError( + final @NotNull ProtocolAdapterState toState, + final @NotNull ProtocolAdapterInstance instance) { + return switch (toState) { + case Starting -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Starting); + case Started -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Started); + case Stopping -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopping); + case Stopped -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Stopped); + default -> new ProtocolAdapterTransitionResponse(ProtocolAdapterState.Error); + }; + } + + public @NotNull ProtocolAdapterTransitionResponse transition( + final @NotNull ProtocolAdapterState toState, + final @NotNull ProtocolAdapterInstance instance) { + return transitionFunction.apply(toState, instance); + } +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransition.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransition.java new file mode 100644 index 0000000000..b28b114a98 --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransition.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +import org.jetbrains.annotations.NotNull; + +public interface ProtocolAdapterTransition { + @NotNull ProtocolAdapterTransitionResponse transition( + final @NotNull ProtocolAdapterState toState, + final @NotNull ProtocolAdapterInstance session, + final @NotNull ProtocolAdapterTransitionRequest request); +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionRequest.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionRequest.java new file mode 100644 index 0000000000..18f0e56edb --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionRequest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.TimeUnit; + +public record ProtocolAdapterTransitionRequest(boolean requestWriteLock, long timeout, TimeUnit timeUnit) { + public static class Builder { + private boolean requestWriteLock; + private long timeout; + private @Nullable TimeUnit timeUnit; + + public @NotNull Builder requestWriteLock(final boolean requestWriteLock) { + this.requestWriteLock = requestWriteLock; + return this; + } + + public @NotNull Builder timeout(final long timeout) { + this.timeout = timeout; + return this; + } + + public @NotNull Builder timeUnit(final @NotNull TimeUnit timeUnit) { + this.timeUnit = timeUnit; + return this; + } + + public @NotNull ProtocolAdapterTransitionRequest build() { + return new ProtocolAdapterTransitionRequest(requestWriteLock, timeout, timeUnit); + } + } +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionResponse.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionResponse.java new file mode 100644 index 0000000000..d572a51894 --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionResponse.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +import org.jetbrains.annotations.NotNull; + +public record ProtocolAdapterTransitionResponse(ProtocolAdapterState state, ProtocolAdapterTransitionStatus status, + String message, Throwable error) { + public ProtocolAdapterTransitionResponse(final @NotNull ProtocolAdapterState state) { + this(state, ProtocolAdapterTransitionStatus.Success, null, null); + } +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionStatus.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionStatus.java new file mode 100644 index 0000000000..201e447d38 --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/ProtocolAdapterTransitionStatus.java @@ -0,0 +1,22 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsm; + +public enum ProtocolAdapterTransitionStatus { + Success, + Failure, +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/state-machine.md b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/state-machine.md new file mode 100644 index 0000000000..0ab83e444c --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/state-machine.md @@ -0,0 +1,534 @@ +# Protocol Adapter Finite State Machine Specification + +## Overview + +`ProtocolAdapterFSM` manages three concurrent state machines: + +1. **Adapter State** - Lifecycle control (start/stop) +2. **Northbound State** - Connection to HiveMQ broker +3. **Southbound State** - Connection to physical device + +All three must coordinate. The adapter controls overall lifecycle. Northbound handles MQTT publishing. Southbound handles device communication. + +--- + +## Adapter State Machine + +Four states control adapter lifecycle. + +### States + +1. **STOPPED** + - Default state + - Adapter inactive + - Transitions: → STARTING + +2. **STARTING** + - Initialization in progress + - Calls `onStarting()` hook + - Transitions: → STARTED (success), → STOPPED (failure), → ERROR (non recoverable error) + +3. **STARTED** + - Adapter operational + - Transitions: → STOPPING, → ERROR (non recoverable error) + +4. **STOPPING** + - Cleanup in progress + - Calls `onStopping()` hook + - Transitions: → STOPPED + +5. **ERROR** + - Non-recoverable error, adapter is dead + - This is a terminal state + - Transitions: → STARTING + +### Transition Rules + +``` + + ERROR + +STOPPED → STARTING → STARTED → STOPPING → STOPPED + ↑ ↓ + └────────────────────────────────┘ +``` + +**Constraint:** Must transition through intermediate states. Cannot jump STOPPED → STARTED. + +--- + +## Connection State Machines + +Nine states apply to both northbound and southbound connections. Each connection state machine operates independently but coordinates via the adapter lifecycle. + +### States + +**Note:** The state-machine.png diagram visualizes these states with color coding for clarity. + +1. **DISCONNECTED** + - No connection established + - Initial state + - Transitions: → CONNECTING, → CONNECTED (legacy), → CLOSED (testing) + +2. **CONNECTING** + - Connection attempt in progress + - Transitions: → CONNECTED, → ERROR, → DISCONNECTED + +3. **CONNECTED** + - Active connection established + - Data flow operational + - Transitions: → DISCONNECTING, → CONNECTING, → CLOSING, → ERROR_CLOSING, → DISCONNECTED + +4. **DISCONNECTING** + - Graceful connection teardown + - Transitions: → DISCONNECTED, → CLOSING + +5. **CLOSING** + - Permanent closure in progress + - Transitions: → CLOSED + +6. **CLOSED** + - Connection permanently closed + - Transitions: → DISCONNECTED (restart), → CLOSING (verification) + +7. **ERROR** + - Connection failure + - Transitions: → CONNECTING (recovery), → DISCONNECTED (abort) + +8. **ERROR_CLOSING** + - Error during closure + - Transitions: → ERROR + +9. **NOT_SUPPORTED** + - Stateless operation mode + - No transitions allowed + +### Transition Graph + +![State Machine Diagram](state-machine.png) + +The diagram shows: +- **Left side:** Complete connection state transition graph with all possible states and transitions +- **Right side:** Ideal operational sequence (steps 1-8) showing coordinated northbound and southbound transitions +- **Color coding:** + - Orange (DISCONNECTED) - Initial/inactive state + - Blue (CONNECTING, CONNECTED, DISCONNECTING, CLOSING) - Normal operational states + - Red (ERROR, ERROR_CLOSING, CLOSED) - Error or terminal states + +**Ideal Transition Sequence (Right side of diagram, steps 1-8):** +1. **Step 1:** Both connections DISCONNECTED (initial state) +2. **Step 2:** Northbound transitions to CONNECTING while southbound remains DISCONNECTED +3. **Step 3:** Northbound reaches CONNECTED (automatically triggers `startSouthbound()`), southbound still DISCONNECTED +4. **Step 4:** Southbound transitions to CONNECTING, northbound maintains CONNECTED +5. **Step 5:** Southbound transitions to CLOSING (shutdown initiated), northbound maintains CONNECTED +6. **Step 6:** Southbound reaches CLOSED (terminal state), northbound maintains CONNECTED +7. **Step 7:** Northbound transitions to CLOSING, southbound remains CLOSED +8. **Step 8:** Both connections reach CLOSED (shutdown complete) + +**ASCII Alternative:** +``` + ┌──────┐ + │ERROR │←──────┐ + └───┬──┘ │ + │ │ + ┌──DISCONNECTED──┐ ↓ │ + │ │ │ │ + ↓ │ ↓ ┌────────────┐ +CONNECTING ←─────────┴─→CONNECTED→ERROR_CLOSING + │ │ │ + ↓ ↓ ↓ +DISCONNECTED DISCONNECTING ERROR + ↑ │ + │ ↓ + │ DISCONNECTED + │ │ +CLOSED←─CLOSING←─────────┘ + │ ↑ + └───────┘ +``` + +--- + +## Operational Sequences + +### Startup Sequence + +1. Initial state: Adapter STOPPED, both connections DISCONNECTED +2. Call `startAdapter()` + - Adapter → STARTING + - Execute `onStarting()` + - Adapter → STARTED (success) or STOPPED (failure) +3. Transition northbound: DISCONNECTED → CONNECTING → CONNECTED +4. Automatic southbound start when northbound reaches CONNECTED + - `startSouthbound()` called automatically + - Override to control behavior + +### Normal Operation + +``` +Adapter: STARTED +Northbound: CONNECTED +Southbound: CONNECTED +``` + +### Connection Failure (Northbound) + +``` +Adapter: STARTED +Northbound: ERROR +Southbound: DISCONNECTED +``` + +Southbound never starts because northbound failed to reach CONNECTED. + +### Connection Failure (Southbound) + +``` +Adapter: STARTED +Northbound: CONNECTED +Southbound: ERROR +``` + +Valid state. Adapter can communicate with broker but not with device. + +### Shutdown Sequence + +**Option 1: Adapter Stop Only** +1. Call `stopAdapter()` + - Adapter → STOPPING + - Execute `onStopping()` + - Adapter → STOPPED +2. Connection states preserved + - Connections maintain current state after adapter stops + - Northbound and southbound remain in their current states (e.g., CONNECTED) + +**Option 2: Full Connection Closure (Ideal Sequence)** +1. Close southbound connection + - Southbound: CONNECTED → CLOSING → CLOSED +2. Close northbound connection + - Northbound: CONNECTED → CLOSING → CLOSED +3. Stop adapter + - Adapter → STOPPING → STOPPED + +The diagram's "Ideal State Transition" (steps 5-8) demonstrates Option 2, where connections are explicitly closed before stopping the adapter. This ensures clean resource cleanup and proper connection termination. + +--- + +## API Methods + +### Adapter Lifecycle + +- `startAdapter()` - Transition STOPPED → STARTING → STARTED +- `stopAdapter()` - Transition STARTED → STOPPING → STOPPED +- `onStarting()` - Override for initialization logic +- `onStopping()` - Override for cleanup logic + +### Northbound Control + +- `transitionNorthboundState(ConnectionStatus)` - Manual state transition +- `accept(ConnectionStatus)` - Transition + trigger southbound on CONNECTED +- `startDisconnecting()` - Begin graceful disconnect +- `startClosing()` - Begin permanent closure +- `startErrorClosing()` - Error-state closure +- `markAsClosed()` - Confirm CLOSED state +- `recoverFromError()` - Attempt recovery from ERROR +- `restartFromClosed()` - Restart from CLOSED state + +### Southbound Control + +- `transitionSouthboundState(ConnectionStatus)` - Manual state transition +- `startSouthbound()` - Override to implement southbound startup logic +- `startSouthboundDisconnecting()` +- `startSouthboundClosing()` +- `startSouthboundErrorClosing()` +- `markSouthboundAsClosed()` +- `recoverSouthboundFromError()` +- `restartSouthboundFromClosed()` + +### State Queries + +- `getNorthboundConnectionStatus()` - Current northbound state +- `getSouthboundConnectionStatus()` - Current southbound state +- `getAdapterState()` - Current adapter state + +--- + +## Concurrency + +### Thread Safety + +- State transitions use `AtomicReference` with `compareAndSet` +- Multiple threads can safely call transition methods +- CAS loop ensures atomic state updates +- Return value indicates success/failure + +### Example + +```java +boolean success = fsm.transitionNorthboundState(StateEnum.CONNECTING); +if (!success) { + // Another thread changed state concurrently + // Handle race condition +} +``` + +### State Listeners + +- Register listeners via `registerAdapterStateListener()` or `registerConnectionStateListener()` +- Listeners use `CopyOnWriteArrayList` - thread-safe during iteration +- Can add/remove listeners during state transitions + +--- + +## Implementation Requirements + +### Adapter Implementation + +```java +public class MyAdapter extends ProtocolAdapterFSM { + + @Override + protected boolean onStarting() { + // Initialize resources + // Return true on success, false on failure + return initializeConnection(); + } + + @Override + protected void onStopping() { + // Clean up resources + closeConnection(); + } + + @Override + public boolean startSouthbound() { + // Called automatically when northbound reaches CONNECTED + return transitionSouthboundState(StateEnum.CONNECTING); + } +} +``` + +### State Transition Logic + +```java +// Start adapter +fsm.startAdapter(); + +// Connect northbound +fsm.transitionNorthboundState(StateEnum.CONNECTING); +fsm.transitionNorthboundState(StateEnum.CONNECTED); + +// Southbound starts automatically via startSouthbound() + +// Stop adapter +fsm.stopAdapter(); +``` + +--- + +## Valid State Combinations + +| Adapter State | Northbound | Southbound | Valid | Notes | +|---------------|----------------|----------------|-------|-------| +| STOPPED | DISCONNECTED | DISCONNECTED | Yes | Initial state | +| STARTING | DISCONNECTED | DISCONNECTED | Yes | Startup in progress | +| STARTED | CONNECTED | CONNECTED | Yes | Normal operation | +| STARTED | CONNECTED | ERROR | Yes | Device communication failed | +| STARTED | ERROR | DISCONNECTED | Yes | Broker communication failed | +| STARTED | DISCONNECTED | CONNECTED | No | Invalid - northbound must connect first | +| STOPPING | * | * | Yes | Any connection state during shutdown | + +--- + +## Validation Rules + +1. **Adapter must be STARTED** before northbound/southbound transitions +2. **Transitions must follow** `possibleTransitions` map +3. **Cannot skip intermediate states** in adapter lifecycle +4. **Southbound activation** requires northbound CONNECTED state +5. **Return values** indicate transition success - must be checked + +--- + +## Error Handling + +### Transition Failures + +```java +// Check return value +if (!fsm.transitionNorthboundState(StateEnum.CONNECTED)) { + log.error("Transition failed - state changed concurrently"); + // Retry or handle error +} +``` + +### Illegal Transitions + +```java +// Throws IllegalStateException +try { + fsm.transitionNorthboundState(StateEnum.CLOSING); // From DISCONNECTED +} catch (IllegalStateException e) { + log.error("Invalid transition: " + e.getMessage()); +} +``` + +### Startup Failures + +```java +@Override +protected boolean onStarting() { + try { + initializeResources(); + return true; + } catch (Exception e) { + log.error("Startup failed", e); + return false; // Adapter transitions to STOPPED + } +} +``` + +--- + +## Testing Scenarios + +### Test 1: Legacy Connection + +```java +fsm.startAdapter(); +fsm.accept(StateEnum.CONNECTED); // Direct jump to CONNECTED +// Southbound: DISCONNECTED (startSouthbound() not implemented) +``` + +### Test 2: Standard Connection Flow + +```java +fsm.startAdapter(); +fsm.accept(StateEnum.CONNECTING); +fsm.accept(StateEnum.CONNECTED); +// Southbound: Starts automatically +``` + +### Test 3: Northbound Error + +```java +fsm.startAdapter(); +fsm.transitionNorthboundState(StateEnum.CONNECTING); +fsm.transitionNorthboundState(StateEnum.ERROR); +// Southbound: DISCONNECTED (never started) +``` + +### Test 4: Southbound Error + +```java +fsm.startAdapter(); +fsm.accept(StateEnum.CONNECTED); // Northbound CONNECTED +// startSouthbound() called automatically +fsm.transitionSouthboundState(StateEnum.CONNECTING); +fsm.transitionSouthboundState(StateEnum.ERROR); +// Northbound: Still CONNECTED +// Southbound: ERROR +``` + +--- + +## State Transition Tables + +### Adapter State Transitions + +| From | To | Method | Condition | +|----------|----------|-------------------|-----------| +| STOPPED | STARTING | `startAdapter()` | Always | +| STARTING | STARTED | Internal | `onStarting()` returns true | +| STARTING | STOPPED | Internal | `onStarting()` returns false | +| STARTED | STOPPING | `stopAdapter()` | Always | +| STOPPING | STOPPED | Internal | After `onStopping()` | + +### Connection State Transitions + +| From | To | Allowed | +|-----------------|-----------------|---------| +| DISCONNECTED | CONNECTING | Yes | +| DISCONNECTED | CONNECTED | Yes (legacy) | +| DISCONNECTED | CLOSED | Yes (testing) | +| CONNECTING | CONNECTED | Yes | +| CONNECTING | ERROR | Yes | +| CONNECTING | DISCONNECTED | Yes | +| CONNECTED | DISCONNECTING | Yes | +| CONNECTED | CONNECTING | Yes | +| CONNECTED | CLOSING | Yes | +| CONNECTED | ERROR_CLOSING | Yes | +| CONNECTED | DISCONNECTED | Yes | +| DISCONNECTING | DISCONNECTED | Yes | +| DISCONNECTING | CLOSING | Yes | +| CLOSING | CLOSED | Yes | +| CLOSED | DISCONNECTED | Yes | +| CLOSED | CLOSING | Yes | +| ERROR | CONNECTING | Yes | +| ERROR | DISCONNECTED | Yes | +| ERROR_CLOSING | ERROR | Yes | +| NOT_SUPPORTED | * | No | + +--- + +## Common Implementation Errors + +### Error 1: Adapter Not Started + +```java +// Incorrect +fsm.transitionNorthboundState(StateEnum.CONNECTING); +// Throws IllegalStateException - adapter not started + +// Correct +fsm.startAdapter(); +fsm.transitionNorthboundState(StateEnum.CONNECTING); +``` + +### Error 2: Ignoring Return Values + +```java +// Incorrect +fsm.transitionNorthboundState(StateEnum.CONNECTING); +// May fail silently due to concurrent modification + +// Correct +boolean success = fsm.transitionNorthboundState(StateEnum.CONNECTING); +if (!success) { + // Handle race condition +} +``` + +### Error 3: Missing startSouthbound Implementation + +```java +// Incorrect +@Override +public boolean startSouthbound() { + log.info("Starting southbound"); + return true; // Doesn't actually transition state +} + +// Correct +@Override +public boolean startSouthbound() { + return transitionSouthboundState(StateEnum.CONNECTING); +} +``` + +--- + +## Key Constraints + +1. Adapter state transitions are sequential - no state skipping +2. Northbound must reach CONNECTED before southbound starts +3. Connection states allow multiple valid transition paths +4. All transitions are atomic via CAS operations +5. State listeners execute synchronously during transitions +6. Adapter ID included in all log messages for debugging + +--- + +## Summary + +The `ProtocolAdapterFSM` coordinates three state machines with defined transition rules. Adapter state controls lifecycle. Northbound handles broker communication. Southbound handles device communication. All operations are thread-safe. State transitions must follow defined paths. Return values indicate success. Override `onStarting()`, `onStopping()`, and `startSouthbound()` to implement adapter-specific behavior. diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/transitions.md b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/transitions.md new file mode 100644 index 0000000000..f1c3a6d48c --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsm/transitions.md @@ -0,0 +1,539 @@ +# Understanding Protocol Adapter State Machines: A Test-Driven Journey + +*5 min read • Learn how three coordinated state machines manage adapter lifecycle and connections* + +--- + +## What Are We Building? + +Protocol adapters connect HiveMQ Edge to physical devices. They need to manage: +1. **Adapter lifecycle** (starting/stopping) +2. **Northbound connection** (to MQTT broker) +3. **Southbound connection** (to physical device) + +These three state machines coordinate to ensure reliable communication. Let's explore them through their tests. + +--- + +## The Three State Machines + +```mermaid +graph TB + subgraph Adapter Lifecycle + STOPPED -->|startAdapter| STARTING + STARTING -->|success| STARTED + STARTING -->|non recoverable error| ERROR + STARTED -->|stopAdapter| STOPPING + STARTED -->|non recoverable error| ERROR + STOPPING --> STOPPED + end +``` + +```mermaid +graph TB + subgraph Connection States - Northbound & Southbound + DISCONNECTED -->|connect| CONNECTING + DISCONNECTED -->|legacy| CONNECTED + CONNECTING -->|success| CONNECTED + CONNECTING -->|fail| ERROR + CONNECTING -->|abort| DISCONNECTED + CONNECTED -->|reconnect| CONNECTING + CONNECTED -->|graceful| DISCONNECTING + CONNECTED -->|terminate| CLOSING + CONNECTED -->|error| ERROR_CLOSING + CONNECTED -->|instant| DISCONNECTED + DISCONNECTING --> DISCONNECTED + DISCONNECTING --> CLOSING + ERROR -->|recover| CONNECTING + ERROR -->|give up| DISCONNECTED + ERROR_CLOSING --> ERROR + CLOSING --> CLOSED + CLOSED -->|restart| DISCONNECTED + CLOSED -->|verify| CLOSING + end +``` + +--- + +## Adapter Lifecycle: The Boss + +The adapter lifecycle controls everything. No connections can transition without the adapter being STARTED. + +### Test: Successful Startup + +```java +@Test +void adapter_successfulStartup() { + fsm.startAdapter(); + assertState(fsm, STARTED, DISCONNECTED, DISCONNECTED); +} +``` + +**What happens:** +- Initial: `STOPPED → STARTING → STARTED` +- Connections stay `DISCONNECTED` +- Ready to establish connections + +### Test: Failed Startup + +```java +@Test +void adapter_failedStartup_returnsToStopped() { + // onStarting() returns false + fsm.startAdapter(); + assertState(fsm, STOPPED, DISCONNECTED, DISCONNECTED); +} +``` + +**What happens:** +- Attempted: `STOPPED → STARTING → STOPPED` +- Initialization failed, rolled back +- Connections never activated + +### Test: Stop Preserves Connections + +```java +@Test +void adapter_stopPreservesConnectionStates() { + fsm.startAdapter(); + fsm.transitionNorthboundState(CONNECTED); + fsm.stopAdapter(); + + assertState(fsm, STOPPED, CONNECTED, DISCONNECTED); +} +``` + +**Key insight:** Stopping the adapter doesn't disconnect connections. They maintain their state. + +--- + +## Northbound: Connection to MQTT Broker + +Northbound manages the connection to HiveMQ broker. It has the most complex state graph. + +### Test: Standard Connection Flow + +```java +@Test +void northbound_standardConnectFlow() { + fsm.startAdapter(); + fsm.accept(CONNECTING); + fsm.accept(CONNECTED); +} +``` + +```mermaid +sequenceDiagram + participant A as Adapter + participant N as Northbound + A->>A: STOPPED → STARTED + N->>N: DISCONNECTED → CONNECTING + N->>N: CONNECTING → CONNECTED +``` + +### Test: Legacy Direct Connect + +```java +@Test +void northbound_legacyDirectConnect() { + fsm.startAdapter(); + fsm.accept(CONNECTED); // Skip CONNECTING +} +``` + +**Why:** Backward compatibility. Old code could jump directly to CONNECTED. + +### Test: Error Handling + +```java +@Test +void northbound_errorRecovery() { + fsm.startAdapter(); + fsm.transitionNorthboundState(CONNECTING); + fsm.transitionNorthboundState(ERROR); + + fsm.recoverFromError(); // ERROR → CONNECTING +} +``` + +**The error recovery cycle:** +```mermaid +graph LR + CONNECTING -->|connection failed| ERROR + ERROR -->|retry| CONNECTING + ERROR -->|give up| DISCONNECTED +``` + +### Test: Graceful Shutdown + +```java +@Test +void northbound_closingSequence() { + fsm.transitionNorthboundState(CONNECTED); + fsm.startClosing(); // CONNECTED → CLOSING + fsm.markAsClosed(); // CLOSING → CLOSED +} +``` + +### Test: Emergency Shutdown + +```java +@Test +void northbound_errorClosingSequence() { + fsm.transitionNorthboundState(CONNECTED); + fsm.startErrorClosing(); // CONNECTED → ERROR_CLOSING + // ERROR_CLOSING → ERROR +} +``` + +**Two shutdown paths:** +```mermaid +graph TB + CONNECTED -->|graceful| CLOSING + CONNECTED -->|error while closing| ERROR_CLOSING + CLOSING --> CLOSED + ERROR_CLOSING --> ERROR +``` + +### Test: Reconnection + +```java +@Test +void northbound_reconnectFromConnected() { + fsm.transitionNorthboundState(CONNECTED); + fsm.transitionNorthboundState(CONNECTING); // Reconnect +} +``` + +**Use case:** Connection degraded, attempt reconnection without full disconnect. + +--- + +## Southbound: Connection to Physical Device + +Southbound mirrors northbound but has a special behavior: **automatic activation**. + +### Test: Automatic Start + +```java +@Test +void southbound_startsWhenNorthboundConnects() { + fsm = createFSMWithAutoSouthbound(); + fsm.startAdapter(); + fsm.accept(CONNECTED); // Northbound connects + + // Southbound automatically starts CONNECTING +} +``` + +**Why:** Device connection only makes sense when broker connection is established. + +```mermaid +sequenceDiagram + participant N as Northbound + participant S as Southbound + N->>N: CONNECTING → CONNECTED + N->>S: Trigger startSouthbound() + S->>S: DISCONNECTED → CONNECTING +``` + +### Test: Southbound Error While Northbound Connected + +```java +@Test +void southbound_errorWhileNorthboundConnected() { + fsm.accept(CONNECTED); // Northbound OK + // Southbound transitions to ERROR asynchronously + + assertState(fsm, STARTED, CONNECTED, ERROR); +} +``` + +**This is valid!** Broker connection works, device doesn't. Adapter can receive commands but can't execute them. + +### Test: Full Lifecycle + +```java +@Test +void southbound_fullLifecycle() { + fsm.accept(CONNECTED); // Auto-start + fsm.transitionSouthboundState(CONNECTED); // Connection succeeds + fsm.startSouthboundClosing(); // Begin shutdown + fsm.markSouthboundAsClosed(); // Complete shutdown +} +``` + +```mermaid +graph LR + DISCONNECTED -->|auto| CONNECTING + CONNECTING --> CONNECTED + CONNECTED --> CLOSING + CLOSING --> CLOSED +``` + +--- + +## Coordination: The Ideal Shutdown Sequence + +The most complex test demonstrates all three state machines coordinating. + +```java +@Test +void diagramSequence_idealShutdown() { + // Step 1: Initial state + assertState(fsm, STOPPED, DISCONNECTED, DISCONNECTED); + + // Step 2: Start adapter, begin northbound connection + fsm.startAdapter(); + fsm.transitionNorthboundState(CONNECTING); + + // Step 3: Northbound connects (triggers southbound) + fsm.accept(CONNECTED); + + // Step 4: Southbound connects + fsm.transitionSouthboundState(CONNECTED); + + // Step 5: Close southbound first + fsm.startSouthboundClosing(); + + // Step 6: Southbound closed + fsm.markSouthboundAsClosed(); + + // Step 7: Close northbound + fsm.startClosing(); + + // Step 8: Northbound closed + fsm.markAsClosed(); +} +``` + +```mermaid +sequenceDiagram + participant A as Adapter + participant N as Northbound + participant S as Southbound + + Note over A,S: Step 1: Initial State + A->>A: STOPPED + + Note over A,S: Step 2-3: Startup + A->>A: STOPPED → STARTED + N->>N: DISCONNECTED → CONNECTING → CONNECTED + N->>S: Trigger startSouthbound() + + Note over A,S: Step 4: Southbound Connects + S->>S: CONNECTING → CONNECTED + + Note over A,S: Step 5-8: Graceful Shutdown + S->>S: CONNECTED → CLOSING → CLOSED + N->>N: CONNECTED → CLOSING → CLOSED +``` + +**Why this order?** Close device connection before broker connection. Ensures clean resource cleanup. + +--- + +## Error Scenarios + +### Test: Northbound Fails, Southbound Never Starts + +```java +@Test +void northbound_errorState() { + fsm.startAdapter(); + fsm.accept(CONNECTING); + fsm.accept(ERROR); + + assertState(fsm, STARTED, ERROR, DISCONNECTED); +} +``` + +**Southbound stays DISCONNECTED** because northbound never reached CONNECTED. + +### Test: Concurrent State Changes + +```java +@Test +void concurrentTransition_casFailure() { + fsm.transitionNorthboundState(CONNECTING); + fsm.transitionNorthboundState(CONNECTED); // Succeeds + fsm.transitionNorthboundState(CONNECTING); // Also succeeds (reconnect) +} +``` + +**Thread-safe:** Uses atomic compare-and-set operations. Multiple threads can safely transition states. + +--- + +## Invalid Transitions + +Not all transitions are allowed. Tests verify enforcement: + +```java +@Test +void invalidTransition_disconnectedToClosing() { + fsm.startAdapter(); + assertThatThrownBy(() -> fsm.transitionNorthboundState(CLOSING)) + .isInstanceOf(IllegalStateException.class); +} +``` + +**Rule:** Cannot close what isn't connected. Must go through CONNECTED first. + +```java +@Test +void invalidTransition_connectingToErrorClosing() { + fsm.transitionNorthboundState(CONNECTING); + assertThatThrownBy(() -> fsm.transitionNorthboundState(ERROR_CLOSING)) + .isInstanceOf(IllegalStateException.class); +} +``` + +**Rule:** ERROR_CLOSING only valid from CONNECTED state. + +--- + +## State Observers + +### Test: Notifications + +```java +@Test +void stateListener_multipleNotifications() { + fsm.registerStateTransitionListener(state -> stateCount.incrementAndGet()); + + fsm.startAdapter(); // +2 (STOPPED→STARTING→STARTED) + fsm.transitionNorthboundState(CONNECTING); // +1 + fsm.transitionNorthboundState(CONNECTED); // +1 + + assertThat(stateCount.get()).isEqualTo(4); +} +``` + +**Key insight:** `startAdapter()` triggers **two** transitions internally. + +```mermaid +graph LR + STOPPED -->|1️⃣| STARTING + STARTING -->|2️⃣| STARTED +``` + +### Test: Unregister + +```java +@Test +void stateListener_unregister() { + fsm.registerStateTransitionListener(listener); + fsm.startAdapter(); // Notified + + fsm.unregisterStateTransitionListener(listener); + fsm.transitionNorthboundState(CONNECTING); // NOT notified +} +``` + +--- + +## State Machine Properties + +From analyzing the tests, we can identify key properties: + +### 1. **Thread-Safety** +All transitions use atomic operations. Concurrent calls are safe. + +### 2. **Separation of Concerns** +Three independent state machines coordinate via triggers, not tight coupling. + +### 3. **Fail-Safe** +- Startup failure returns to STOPPED +- Error states have recovery paths +- Invalid transitions throw exceptions + +### 4. **Flexibility** +- Legacy direct-connect supported +- Multiple shutdown paths (graceful, error, instant) +- Reconnection without full disconnect + +### 5. **Observability** +State listeners enable monitoring and reactive behavior. + +--- + +## Complete State Graphs + +### Adapter States + +```mermaid +stateDiagram-v2 + [*] --> STOPPED + STOPPED --> STARTING: startAdapter() + STARTING --> STARTED: onStarting() = true + STARTING --> STOPPED: onStarting() = false + STARTED --> STOPPING: stopAdapter() + STOPPING --> STOPPED + STOPPED --> [*] +``` + +### Connection States (Both Northbound & Southbound) + +```mermaid +stateDiagram-v2 + [*] --> DISCONNECTED + DISCONNECTED --> CONNECTING: connect attempt + DISCONNECTED --> CONNECTED: legacy direct connect + DISCONNECTED --> CLOSED: testing + + CONNECTING --> CONNECTED: success + CONNECTING --> ERROR: failure + CONNECTING --> DISCONNECTED: abort + + CONNECTED --> CONNECTING: reconnect + CONNECTED --> DISCONNECTING: graceful disconnect + CONNECTED --> CLOSING: terminate + CONNECTED --> ERROR_CLOSING: error during close + CONNECTED --> DISCONNECTED: instant disconnect + + DISCONNECTING --> DISCONNECTED + DISCONNECTING --> CLOSING + + CLOSING --> CLOSED + + CLOSED --> DISCONNECTED: restart + CLOSED --> CLOSING: verification + + ERROR --> CONNECTING: recovery + ERROR --> DISCONNECTED: give up + + ERROR_CLOSING --> ERROR + + DISCONNECTED --> [*] +``` + +--- + +## Key Takeaways + +1. **Three coordinated state machines** manage adapter lifecycle and bi-directional connections +2. **Adapter must be STARTED** before connections can transition +3. **Northbound triggers southbound** when reaching CONNECTED state +4. **Error states have recovery paths** - systems can self-heal +5. **Multiple shutdown paths** handle graceful and emergency scenarios +6. **Thread-safe by design** using atomic operations +7. **State preservation** when adapter stops - connections maintain state +8. **Observability built-in** via state transition listeners + +--- + +## Test Coverage + +**32 tests** verify: +- ✅ Adapter lifecycle (startup, failure, stop) +- ✅ Northbound transitions (13 scenarios) +- ✅ Southbound transitions (7 scenarios) +- ✅ Invalid transition rejection (3 guards) +- ✅ State listeners (register, notify, unregister) +- ✅ Concurrent modifications (CAS validation) +- ✅ Ideal shutdown sequence (8-step coordination) + +--- + +*Read the code: [`ProtocolAdapterFSMTest.java`](src/test/java/com/hivemq/fsm/ProtocolAdapterFSMTest.java)* diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/FsmExperiment.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/FsmExperiment.java new file mode 100644 index 0000000000..f5357417db --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/FsmExperiment.java @@ -0,0 +1,83 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsmjochen; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class FsmExperiment { + + private volatile State currentState = State.Stopped; + + private final ProtocolAdapterFsmJochen protocolAdapterFsmJochen; + + public enum State { + Stopped, + Starting, + Started, + Stopping, + Error + } + + final Map> stateToStatesMap = Map.of( + State.Stopped, List.of(State.Starting), + State.Starting, List.of(State.Started, State.Stopping, State.Error), + State.Started, List.of(State.Stopping, State.Error), + State.Stopping, List.of(State.Error, State.Stopped), + State.Error, List.of(State.Starting) + ); + + final Map> transitionsMap = Map.of( + State.Stopped, pa -> State.Stopped, + State.Stopping, pa -> { + try { + pa.stop(); + return State.Stopping; + } catch (Exception e) { + return State.Error; + } + }, + State.Started, pa -> State.Started, + State.Starting, pa -> { + pa.start(); + return State.Stopping; + }, + State.Error, pa -> State.Error + ); + + + public FsmExperiment(ProtocolAdapterFsmJochen protocolAdapterFsmJochen) { + this.protocolAdapterFsmJochen = protocolAdapterFsmJochen; + } + + public synchronized boolean transitionTo(State targetState) { + if(stateToStatesMap.get(currentState).contains(targetState)) { + currentState = transitionsMap.get(targetState).apply(protocolAdapterFsmJochen); + return true; + } else { + return false; + } + } + + + public State getCurrentState() { + return currentState; + } + +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/PATest.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/PATest.java new file mode 100644 index 0000000000..6cea7675f7 --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/PATest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsmjochen; + +import java.util.concurrent.CompletableFuture; + +public class PATest implements ProtocolAdapterFsmJochen { + FsmExperiment fsmExperiment; + + public PATest(FsmExperiment fsmExperiment) { + this.fsmExperiment = fsmExperiment; + } + + public void start() { + CompletableFuture.runAsync(() -> { + if (true) { + fsmExperiment.transitionTo(FsmExperiment.State.Started); + } else{ + fsmExperiment.transitionTo(FsmExperiment.State.Error); + } + }); + }; + + public void stop() { + CompletableFuture.runAsync(() -> { + + //DO ALL THE WORK + + if (true) { + fsmExperiment.transitionTo(FsmExperiment.State.Stopped); + } else{ + fsmExperiment.transitionTo(FsmExperiment.State.Error); + } + }); + } +} diff --git a/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/ProtocolAdapterFsmJochen.java b/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/ProtocolAdapterFsmJochen.java new file mode 100644 index 0000000000..3435326fbf --- /dev/null +++ b/hivemq-edge/src/main/java/com/hivemq/protocols/fsmjochen/ProtocolAdapterFsmJochen.java @@ -0,0 +1,22 @@ +/* + * Copyright 2019-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.protocols.fsmjochen; + +public interface ProtocolAdapterFsmJochen { + void start(); + void stop(); +} diff --git a/hivemq-edge/src/test/java/com/hivemq/protocols/ProtocolAdapterManagerTest.java b/hivemq-edge/src/test/java/com/hivemq/protocols/ProtocolAdapterManagerTest.java index bf723dd685..9f269339c8 100644 --- a/hivemq-edge/src/test/java/com/hivemq/protocols/ProtocolAdapterManagerTest.java +++ b/hivemq-edge/src/test/java/com/hivemq/protocols/ProtocolAdapterManagerTest.java @@ -47,6 +47,7 @@ import org.junit.jupiter.api.Test; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -100,9 +101,9 @@ void test_startWritingAdapterSucceeded_eventsFired() throws Exception { final EventBuilder eventBuilder = new EventBuilderImpl(mock()); when(protocolAdapterWritingService.writingEnabled()).thenReturn(true); - when(protocolAdapterWritingService.startWriting(any(), + when(protocolAdapterWritingService.startWritingAsync(any(), any(), - any())).thenReturn(true); + any())).thenReturn(CompletableFuture.completedFuture(true)); when(eventService.createAdapterEvent(anyString(), anyString())).thenReturn(eventBuilder); final var adapterState = new ProtocolAdapterStateImpl(eventService, "test-adapter", "test-protocol"); final ProtocolAdapterWrapper adapterWrapper = new ProtocolAdapterWrapper(mock(), @@ -145,7 +146,7 @@ void test_startWritingNotEnabled_writingNotStarted() throws Exception { assertThat(adapterWrapper.getRuntimeStatus()).isEqualTo(ProtocolAdapterState.RuntimeStatus.STARTED); verify(remoteService).fireUsageEvent(any()); - verify(protocolAdapterWritingService, never()).startWriting(any(), any(), any()); + verify(protocolAdapterWritingService, never()).startWritingAsync(any(), any(), any()); } @Test @@ -154,8 +155,8 @@ void test_startWriting_adapterFailedStart_resourcesCleanedUp() throws Exception{ when(protocolAdapterWritingService.writingEnabled()).thenReturn(true); when(protocolAdapterWritingService - .startWriting(any(), any(), any())) - .thenReturn(true); + .startWritingAsync(any(), any(), any())) + .thenReturn(CompletableFuture.completedFuture(true)); when(eventService.createAdapterEvent(anyString(), anyString())).thenReturn(eventBuilder); final var adapterState = new ProtocolAdapterStateImpl(eventService, "test-adapter", "test-protocol"); @@ -186,9 +187,9 @@ void test_startWriting_eventServiceFailedStart_resourcesCleanedUp() throws Excep when(eventService.createAdapterEvent(anyString(), anyString())).thenReturn(eventBuilder); when(protocolAdapterWritingService.writingEnabled()).thenReturn(true); - when(protocolAdapterWritingService.startWriting(any(), + when(protocolAdapterWritingService.startWritingAsync(any(), any(), - any())).thenReturn(true); + any())).thenReturn(CompletableFuture.completedFuture(true)); final var adapterState = new ProtocolAdapterStateImpl(eventService, "test-adapter", "test-protocol"); final ProtocolAdapterWrapper adapterWrapper = new ProtocolAdapterWrapper(mock(), diff --git a/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java b/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java deleted file mode 100644 index bcf57998ae..0000000000 --- a/src/main/java/com/hivemq/protocols/ProtocolAdapterWrapper.java +++ /dev/null @@ -1,120 +0,0 @@ -// ...existing code... - -import java.util.concurrent.atomic.AtomicReference; - -// ...existing code... - - private final AtomicBoolean stateChangeOngoing = new AtomicBoolean(false); - private final AtomicReference> startFutureRef = new AtomicReference<>(null); - private final AtomicReference> stopFutureRef = new AtomicReference<>(null); - - // ...existing code... - - public @NotNull CompletableFuture startAsync( - final boolean writingEnabled, - final @NotNull ModuleServices moduleServices) { - CompletableFuture currentFuture = startFutureRef.get(); - if (currentFuture != null && !currentFuture.isDone()) { - log.warn("Start already in progress for adapter with id '{}', returning existing future", getId()); - return currentFuture; - } - if(stateChangeOngoing.compareAndSet(false, true)) { - initStartAttempt(); - final ProtocolAdapterStartOutputImpl output = new ProtocolAdapterStartOutputImpl(); - final ProtocolAdapterStartInputImpl input = new ProtocolAdapterStartInputImpl(moduleServices); - - CompletableFuture startFuture = CompletableFuture - .supplyAsync(() -> { - adapter.start(input, output); - return output.getStartFuture(); - }) - .thenCompose(Function.identity()) - .handle((ignored, t) -> { - if(t == null) { - createAndSubscribeTagConsumer(); - startPolling(protocolAdapterPollingService, input.moduleServices().eventService()); - return startWriting(writingEnabled, protocolAdapterWritingService) - .thenApply(v -> { - log.info("Successfully started adapter with id {}", adapter.getId()); - setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STARTED); - return true; - }); - } else { - log.error("Error starting protocol adapter", t); - stopPolling(protocolAdapterPollingService); - return stopWriting(protocolAdapterWritingService) - .thenApply(v -> { - log.error("Error starting adapter with id {}", adapter.getId(), t); - setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); - return false; - }); - } - }) - .thenCompose(Function.identity()) - .whenComplete((ignored, throwable) -> { - stateChangeOngoing.set(false); - startFutureRef.set(null); - }); - startFutureRef.set(startFuture); - return startFuture; - } else { - log.warn("State change for adapter with id '{}' is already ongoing, returning existing start future", getId()); - currentFuture = startFutureRef.get(); - if (currentFuture != null) { - return currentFuture; - } - } - return CompletableFuture.completedFuture(true); - } - - public @NotNull CompletableFuture stopAsync(final boolean destroy) { - CompletableFuture currentFuture = stopFutureRef.get(); - if (currentFuture != null && !currentFuture.isDone()) { - log.warn("Stop already in progress for adapter with id '{}', returning existing future", getId()); - return currentFuture; - } - if(stateChangeOngoing.compareAndSet(false, true)) { - consumers.forEach(tagManager::removeConsumer); - final ProtocolAdapterStopInputImpl input = new ProtocolAdapterStopInputImpl(); - final ProtocolAdapterStopOutputImpl output = new ProtocolAdapterStopOutputImpl(); - - CompletableFuture stopFuture = CompletableFuture - .supplyAsync(() -> { - stopPolling(protocolAdapterPollingService); - return stopWriting(protocolAdapterWritingService); - }) - .thenCompose(Function.identity()) - .handle((stopped, t) -> { - adapter.stop(input, output); - return output.getOutputFuture(); - }) - .thenCompose(Function.identity()) - .handle((ignored, throwable) -> { - stateChangeOngoing.set(false); - stopFutureRef.set(null); - if(destroy) { - log.info("Destroying adapter with id '{}'", getId()); - adapter.destroy(); - } - if(throwable == null) { - setRuntimeStatus(ProtocolAdapterState.RuntimeStatus.STOPPED); - log.info("Stopped adapter with id {}", adapter.getId()); - return true; - } else { - log.error("Error stopping adapter with id {}", adapter.getId(), throwable); - return false; - } - }); - stopFutureRef.set(stopFuture); - return stopFuture; - } else { - log.warn("State change for adapter with id '{}' is already ongoing, returning existing stop future", getId()); - currentFuture = stopFutureRef.get(); - if (currentFuture != null) { - return currentFuture; - } - } - return CompletableFuture.completedFuture(true); - } - -// ...existing code...