Skip to content

Commit 9cfbcb3

Browse files
authored
fix: Revise Protocol Adapter Component to be More Async Aware (#1260)
1 parent 829c73f commit 9cfbcb3

File tree

10 files changed

+322
-413
lines changed

10 files changed

+322
-413
lines changed

hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ public class ProtocolAdaptersResourceImpl extends AbstractApi implements Protoco
124124
// no-op
125125
};
126126
private static final @NotNull Logger log = LoggerFactory.getLogger(ProtocolAdaptersResourceImpl.class);
127+
private static final long RETRY_TIMEOUT_MILLIS = 5000;
128+
private static final long RETRY_INTERVAL_MILLIS = 200;
127129

128130
private final @NotNull HiveMQEdgeRemoteService remoteService;
129131
private final @NotNull ConfigurationService configurationService;

hivemq-edge/src/main/java/com/hivemq/bootstrap/LoggingBootstrap.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ public void onStart(final @NotNull LoggerContext context) {
214214
@Override
215215
public void onReset(final @NotNull LoggerContext context) {
216216
log.trace("logback.xml was changed");
217+
context.getTurboFilterList().remove(logLevelModifierTurboFilter);
217218
context.addTurboFilter(logLevelModifierTurboFilter);
218219
}
219220

hivemq-edge/src/main/java/com/hivemq/bootstrap/factories/WritingServiceProvider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.List;
33+
import java.util.concurrent.CompletableFuture;
3334

3435
@Singleton
3536
public class WritingServiceProvider {
@@ -78,12 +79,13 @@ public boolean writingEnabled() {
7879

7980

8081
@Override
81-
public boolean startWriting(
82+
public @NotNull CompletableFuture<Boolean> startWritingAsync(
8283
final @NotNull WritingProtocolAdapter writingProtocolAdapter,
8384
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
8485
final @NotNull List<InternalWritingContext> southboundMappings) {
8586
log.warn("No bidirectional module is currently installed. Writing to PLCs is currently not supported.");
86-
return true; }
87+
return CompletableFuture.completedFuture(true);
88+
}
8789

8890
@Override
8991
public void stopWriting(

hivemq-edge/src/main/java/com/hivemq/edge/modules/ModuleLoader.java

Lines changed: 61 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -20,72 +20,72 @@
2020
import com.hivemq.edge.modules.adapters.impl.IsolatedModuleClassloader;
2121
import com.hivemq.extensions.loader.ClassServiceLoader;
2222
import com.hivemq.http.handlers.AlternativeClassloadingStaticFileHandler;
23+
import jakarta.inject.Inject;
2324
import org.jetbrains.annotations.NotNull;
2425
import org.jetbrains.annotations.Nullable;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
2728

28-
import jakarta.inject.Inject;
2929
import java.io.File;
3030
import java.io.IOException;
3131
import java.net.MalformedURLException;
3232
import java.net.URL;
3333
import java.util.ArrayList;
3434
import java.util.Arrays;
35+
import java.util.Collections;
3536
import java.util.Comparator;
36-
import java.util.HashSet;
3737
import java.util.List;
3838
import java.util.Set;
39+
import java.util.concurrent.ConcurrentHashMap;
3940
import java.util.concurrent.atomic.AtomicBoolean;
4041

4142
public class ModuleLoader {
42-
private static final Logger log = LoggerFactory.getLogger(ModuleLoader.class);
43-
44-
private final @NotNull SystemInformation systemInformation;
45-
protected final @NotNull Set<EdgeModule> modules = new HashSet<>();
46-
protected final @NotNull Comparator<File> fileComparator = (o1, o2) -> {
43+
protected static final @NotNull Comparator<File> fileComparator = (o1, o2) -> {
4744
final long delta = o2.lastModified() - o1.lastModified();
48-
// we cna easily get an overflow within months, so we can not use the delta directly by casting it to integer!
49-
if (delta == 0) {
50-
return 0;
51-
} else if (delta < 0) {
52-
return -1;
53-
} else {
54-
return 1;
55-
}
45+
return delta == 0 ? 0 : delta < 0 ? -1 : 1;
5646
};
57-
58-
private final @NotNull ClassServiceLoader classServiceLoader = new ClassServiceLoader();
59-
private final AtomicBoolean loaded = new AtomicBoolean();
47+
private static final @NotNull Logger log = LoggerFactory.getLogger(ModuleLoader.class);
48+
protected final @NotNull Set<EdgeModule> modules;
49+
private final @NotNull SystemInformation systemInformation;
50+
private final @NotNull ClassServiceLoader classServiceLoader;
51+
private final @NotNull AtomicBoolean loaded;
6052

6153
@Inject
6254
public ModuleLoader(final @NotNull SystemInformation systemInformation) {
6355
this.systemInformation = systemInformation;
56+
this.classServiceLoader = new ClassServiceLoader();
57+
this.loaded = new AtomicBoolean(false);
58+
this.modules = Collections.newSetFromMap(new ConcurrentHashMap<>());
59+
}
60+
61+
private static void logException(final @NotNull File file, final @NotNull IOException ioException) {
62+
log.warn("Exception with reason {} while reading module file {}",
63+
ioException.getMessage(),
64+
file.getAbsolutePath());
65+
log.debug("Original exception", ioException);
6466
}
6567

6668
public void loadModules() {
67-
if (loaded.get()) {
68-
// avoid duplicate loads
69-
return;
70-
}
71-
loaded.set(true);
72-
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
73-
if (Boolean.getBoolean(HiveMQEdgeConstants.DEVELOPMENT_MODE)) {
74-
log.info(String.format("Welcome '%s' is starting...", "48 69 76 65 4D 51 45 64 67 65"));
75-
log.warn(
76-
"\n################################################################################################################\n" +
77-
"# You are running HiveMQ Edge in Development Mode and Modules will be loaded from your workspace NOT your #\n" +
78-
"# HIVEMQ_HOME/modules directory. To load runtime modules from your HOME directory please remove #\n" +
79-
"# '-Dhivemq.edge.workspace.modules=true' from your startup script #\n" +
80-
"################################################################################################################");
81-
loadFromWorkspace(contextClassLoader);
82-
// load the commercial module loader from the workspace folder
83-
// the loadFromWorkspace() will not find it.
84-
log.info("Loading the commercial module loader from workspace.");
85-
loadCommercialModuleLoaderFromWorkSpace(contextClassLoader);
86-
} else {
87-
// the commercial module loader will be found here in case of a "normal" running hivemq edge
88-
loadFromModulesDirectory(contextClassLoader);
69+
if (loaded.compareAndSet(false, true)) {
70+
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
71+
if (Boolean.getBoolean(HiveMQEdgeConstants.DEVELOPMENT_MODE)) {
72+
log.info(String.format("Welcome '%s' is starting...", "48 69 76 65 4D 51 45 64 67 65"));
73+
log.warn("""
74+
75+
################################################################################################################
76+
# You are running HiveMQ Edge in Development Mode and Modules will be loaded from your workspace NOT your #
77+
# HIVEMQ_HOME/modules directory. To load runtime modules from your HOME directory please remove #
78+
# '-Dhivemq.edge.workspace.modules=true' from your startup script #
79+
################################################################################################################""");
80+
loadFromWorkspace(contextClassLoader);
81+
// load the commercial module loader from the workspace folder
82+
// the loadFromWorkspace() will not find it.
83+
log.info("Loading the commercial module loader from workspace.");
84+
loadCommercialModuleLoaderFromWorkSpace(contextClassLoader);
85+
} else {
86+
// the commercial module loader will be found here in case of a "normal" running hivemq edge
87+
loadFromModulesDirectory(contextClassLoader);
88+
}
8989
}
9090
}
9191

@@ -98,31 +98,24 @@ private void loadCommercialModuleLoaderFromWorkSpace(final @NotNull ClassLoader
9898
return;
9999
}
100100

101-
102-
final File commercialModuleLoaderLibFolder =
103-
new File(commercialModulesRepoRootFolder, "hivemq-edge-commercial-modules-loader/build/libs");
104-
if (!commercialModuleLoaderLibFolder.exists()) {
101+
final File libs = new File(commercialModulesRepoRootFolder, "hivemq-edge-commercial-modules-loader/build/libs");
102+
if (!libs.exists()) {
105103
log.error("Could not load commercial module loader as the assumed lib folder '{}' does not exist.",
106-
commercialModuleLoaderLibFolder.getAbsolutePath());
104+
libs.getAbsolutePath());
107105
return;
108106
}
109-
110-
final File[] tmp = commercialModuleLoaderLibFolder.listFiles(file -> file.getName().endsWith("proguarded.jar"));
111-
107+
final File[] tmp = libs.listFiles(file -> file.getName().endsWith("proguarded.jar"));
112108
if (tmp == null || tmp.length == 0) {
113-
log.info("No commercial module loader jar was discovered in libs folder '{}'",
114-
commercialModuleLoaderLibFolder);
109+
log.info("No commercial module loader jar was discovered in libs folder '{}'", libs);
115110
return;
116111
}
117112

118-
final List<File> potentialCommercialModuleJars =
119-
new ArrayList<>(Arrays.stream(tmp).sorted(fileComparator).toList());
120-
121-
final String absolutePathJar = potentialCommercialModuleJars.get(0).getAbsolutePath();
122-
if (potentialCommercialModuleJars.size() > 1) {
113+
final List<File> jars = new ArrayList<>(Arrays.stream(tmp).sorted(fileComparator).toList());
114+
final String absolutePathJar = jars.get(0).getAbsolutePath();
115+
if (jars.size() > 1) {
123116
log.debug(
124117
"More than one commercial module loader jar was discovered in libs folder '{}'. Clean unwanted jars to avoid loading the wrong version. The first found jar '{}' will be loaded.",
125-
commercialModuleLoaderLibFolder,
118+
libs,
126119
absolutePathJar);
127120
} else {
128121
log.info("Commercial Module jar '{}' was discovered.", absolutePathJar);
@@ -138,15 +131,14 @@ private void loadCommercialModulesLoaderJar(final File jarFile, final @NotNull C
138131
log.error("", e);
139132
}
140133
log.info("Loading commercial module loader from {}", jarFile.getAbsoluteFile());
141-
final IsolatedModuleClassloader isolatedClassloader =
142-
new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader);
143-
modules.add(new ModuleLoader.EdgeModule(jarFile, isolatedClassloader, false));
134+
modules.add(new ModuleLoader.EdgeModule(jarFile,
135+
new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader),
136+
false));
144137
}
145138

146139
protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader) {
147140
log.debug("Loading modules from development workspace.");
148-
final File userDir = new File(System.getProperty("user.dir"));
149-
loadFromWorkspace(parentClassloader, userDir);
141+
loadFromWorkspace(parentClassloader, new File(System.getProperty("user.dir")));
150142
}
151143

152144
/**
@@ -158,7 +150,7 @@ protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader) {
158150
* @param parentClassloader the parent classloader
159151
* @param currentDir the current dir
160152
*/
161-
protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader, final @NotNull File currentDir) {
153+
private void loadFromWorkspace(final @NotNull ClassLoader parentClassloader, final @NotNull File currentDir) {
162154
if (currentDir.exists() && currentDir.isDirectory()) {
163155
if (currentDir.getName().equals("hivemq-edge")) {
164156
discoverWorkspaceModule(new File(currentDir, "modules"), parentClassloader);
@@ -173,7 +165,7 @@ protected void loadFromWorkspace(final @NotNull ClassLoader parentClassloader, f
173165
}
174166
}
175167

176-
protected void discoverWorkspaceModule(final File dir, final @NotNull ClassLoader parentClassloader) {
168+
protected void discoverWorkspaceModule(final @NotNull File dir, final @NotNull ClassLoader parentClassloader) {
177169
if (dir.exists()) {
178170
final File[] files = dir.listFiles(pathname -> pathname.isDirectory() &&
179171
pathname.canRead() &&
@@ -196,14 +188,11 @@ protected void discoverWorkspaceModule(final File dir, final @NotNull ClassLoade
196188
urls.add(jar.toURI().toURL());
197189
}
198190
}
199-
final IsolatedModuleClassloader isolatedClassloader =
200-
new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader);
201-
modules.add(new EdgeModule(file, isolatedClassloader, true));
191+
modules.add(new EdgeModule(file,
192+
new IsolatedModuleClassloader(urls.toArray(new URL[0]), parentClassloader),
193+
true));
202194
} catch (final IOException ioException) {
203-
log.warn("Exception with reason {} while reading module file {}",
204-
ioException.getMessage(),
205-
file.getAbsolutePath());
206-
log.debug("Original exception", ioException);
195+
logException(file, ioException);
207196
}
208197
}
209198
}
@@ -225,10 +214,7 @@ protected void loadFromModulesDirectory(final @NotNull ClassLoader parentClasslo
225214
log.debug("Ignoring non jar file in module folder {}.", lib.getAbsolutePath());
226215
}
227216
} catch (final IOException ioException) {
228-
log.warn("Exception with reason {} while reading module file {}",
229-
ioException.getMessage(),
230-
lib.getAbsolutePath());
231-
log.debug("Original exception", ioException);
217+
logException(lib, ioException);
232218
}
233219
}
234220
}
@@ -256,15 +242,14 @@ protected void loadFromModulesDirectory(final @NotNull ClassLoader parentClasslo
256242
}
257243

258244
public @NotNull Set<EdgeModule> getModules() {
259-
return modules;
245+
return Collections.unmodifiableSet(modules);
260246
}
261247

262248
public void clear() {
263249
modules.clear();
264250
}
265251

266252
public static class EdgeModule {
267-
268253
private final @NotNull File root;
269254
private final @NotNull ClassLoader classloader;
270255

0 commit comments

Comments
 (0)