Skip to content

Commit 6ec9fc9

Browse files
committed
improve start/close rest resource for protocol adapter
1 parent d30d8f1 commit 6ec9fc9

File tree

2 files changed

+57
-41
lines changed

2 files changed

+57
-41
lines changed

hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -325,21 +325,37 @@ public int getDepth() {
325325
@SuppressWarnings("unchecked")
326326
@Override
327327
public @NotNull Response updateAdapter(final @NotNull String adapterId, final @NotNull Adapter adapter) {
328-
return systemInformation.isConfigWriteable() ?
329-
configExtractor.getAdapterByAdapterId(adapterId).map(oldInstance -> {
330-
final ProtocolAdapterEntity newConfig = new ProtocolAdapterEntity(oldInstance.getAdapterId(),
331-
oldInstance.getProtocolId(),
332-
oldInstance.getConfigVersion(),
333-
(Map<String, Object>) adapter.getConfig(),
334-
oldInstance.getNorthboundMappings(),
335-
oldInstance.getSouthboundMappings(),
336-
oldInstance.getTags());
337-
if (!configExtractor.updateAdapter(newConfig)) {
338-
return adapterCannotBeUpdatedError(adapterId);
339-
}
340-
return Response.ok().build();
341-
}).orElseGet(adapterNotFoundError(adapterId)) :
342-
errorResponse(new ConfigWritingDisabled());
328+
if (!systemInformation.isConfigWriteable()) {
329+
return errorResponse(new ConfigWritingDisabled());
330+
}
331+
332+
// Validate adapter configuration before updating
333+
final ApiErrorMessages errorMessages = ApiErrorUtils.createErrorContainer();
334+
validateAdapterSchema(errorMessages, adapter);
335+
if (hasRequestErrors(errorMessages)) {
336+
return errorResponse(new AdapterFailedSchemaValidationError(errorMessages.toErrorList()));
337+
}
338+
339+
return configExtractor.getAdapterByAdapterId(adapterId).map(oldInstance -> {
340+
try {
341+
final ProtocolAdapterEntity newConfig = new ProtocolAdapterEntity(oldInstance.getAdapterId(),
342+
oldInstance.getProtocolId(),
343+
oldInstance.getConfigVersion(),
344+
(Map<String, Object>) adapter.getConfig(),
345+
oldInstance.getNorthboundMappings(),
346+
oldInstance.getSouthboundMappings(),
347+
oldInstance.getTags());
348+
if (!configExtractor.updateAdapter(newConfig)) {
349+
return adapterCannotBeUpdatedError(adapterId);
350+
}
351+
return Response.ok().build();
352+
} catch (final @NotNull IllegalArgumentException e) {
353+
if (e.getCause() instanceof final UnrecognizedPropertyException pe) {
354+
addValidationError(errorMessages, pe.getPropertyName(), "Unknown field on adapter configuration");
355+
}
356+
return errorResponse(new AdapterFailedSchemaValidationError(errorMessages.toErrorList()));
357+
}
358+
}).orElseGet(adapterNotFoundError(adapterId));
343359
}
344360

345361
@Override
@@ -388,12 +404,12 @@ public int getDepth() {
388404
}
389405
});
390406
case RESTART -> protocolAdapterManager.stopAsync(adapterId)
391-
.thenRun(() -> protocolAdapterManager.startAsync(adapterId))
407+
.thenCompose(ignored -> protocolAdapterManager.startAsync(adapterId))
392408
.whenComplete((result, throwable) -> {
393409
if (throwable != null) {
394410
log.error("Failed to restart adapter '{}'.", adapterId, throwable);
395411
} else {
396-
log.trace("Adapter '{}' was restarted successfully.", adapterId);
412+
log.info("Adapter '{}' was restarted successfully.", adapterId);
397413
}
398414
});
399415
}
@@ -894,17 +910,15 @@ private void validateAdapterSchema(
894910
}
895911

896912
private @NotNull Adapter toAdapter(final @NotNull ProtocolAdapterWrapper value) {
897-
final Thread currentThread = Thread.currentThread();
898-
final ClassLoader ctxClassLoader = currentThread.getContextClassLoader();
899-
final Map<String, Object> config;
900-
try {
901-
currentThread.setContextClassLoader(value.getAdapterFactory().getClass().getClassLoader());
902-
config = value.getAdapterFactory().unconvertConfigObject(objectMapper, value.getConfigObject());
903-
config.put("id", value.getId());
904-
} finally {
905-
currentThread.setContextClassLoader(ctxClassLoader);
906-
}
907913
final String adapterId = value.getId();
914+
final Map<String, Object> config = runWithContextLoader(
915+
value.getAdapterFactory().getClass().getClassLoader(),
916+
() -> {
917+
final Map<String, Object> cfg = value.getAdapterFactory()
918+
.unconvertConfigObject(objectMapper, value.getConfigObject());
919+
cfg.put("id", value.getId());
920+
return cfg;
921+
});
908922
return new Adapter(adapterId).type(value.getAdapterInformation().getProtocolId())
909923
.config(objectMapper.valueToTree(config))
910924
.status(getAdapterStatusInternal(adapterId));

hivemq-edge/src/main/java/com/hivemq/common/executors/ioc/ExecutorsModule.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
@Module
3232
public abstract class ExecutorsModule {
3333

34-
private static final Logger log = LoggerFactory.getLogger(ExecutorsModule.class);
34+
private static final @NotNull Logger log = LoggerFactory.getLogger(ExecutorsModule.class);
3535

36-
static final @NotNull String GROUP_NAME = "hivemq-edge-group";
37-
static final @NotNull String SCHEDULED_WORKER_GROUP_NAME = "hivemq-edge-scheduled-group";
38-
static final @NotNull String CACHED_WORKER_GROUP_NAME = "hivemq-edge-cached-group";
36+
private static final @NotNull String GROUP_NAME = "hivemq-edge-group";
37+
private static final @NotNull String SCHEDULED_WORKER_GROUP_NAME = "hivemq-edge-scheduled-group";
38+
private static final @NotNull String CACHED_WORKER_GROUP_NAME = "hivemq-edge-cached-group";
3939
private static final @NotNull ThreadGroup coreGroup = new ThreadGroup(GROUP_NAME);
4040

4141
@Provides
@@ -50,30 +50,33 @@ public abstract class ExecutorsModule {
5050
@Provides
5151
@Singleton
5252
static @NotNull ExecutorService executorService() {
53-
final ExecutorService executor =
54-
Executors.newCachedThreadPool(new HiveMQEdgeThreadFactory(CACHED_WORKER_GROUP_NAME));
55-
registerShutdownHook(executor, CACHED_WORKER_GROUP_NAME);
56-
return executor;
53+
return registerShutdownHook(Executors.newCachedThreadPool(new HiveMQEdgeThreadFactory(CACHED_WORKER_GROUP_NAME)),
54+
CACHED_WORKER_GROUP_NAME);
5755
}
5856

59-
private static void registerShutdownHook(final @NotNull ExecutorService executor, final @NotNull String name) {
57+
private static @NotNull ExecutorService registerShutdownHook(
58+
final @NotNull ExecutorService executor,
59+
final @NotNull String name) {
6060
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
61-
log.debug("Shutting down executor service: {}", name);
61+
if (log.isDebugEnabled()) {
62+
log.debug("Shutting down executor service: {}", name);
63+
}
6264
executor.shutdown();
6365
try {
6466
if (!executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)) {
6567
log.warn("Executor service {} did not terminate in time, forcing shutdown", name);
6668
executor.shutdownNow();
6769
}
6870
} catch (final InterruptedException e) {
69-
log.warn("Interrupted while waiting for executor service {} to terminate", name);
7071
Thread.currentThread().interrupt();
72+
log.warn("Interrupted while waiting for executor service {} to terminate", name);
7173
executor.shutdownNow();
7274
}
7375
}, "shutdown-hook-" + name));
76+
return executor;
7477
}
7578

76-
static class HiveMQEdgeThreadFactory implements ThreadFactory {
79+
private static class HiveMQEdgeThreadFactory implements ThreadFactory {
7780
private final @NotNull String factoryName;
7881
private final @NotNull ThreadGroup group;
7982
private final @NotNull AtomicInteger counter = new AtomicInteger(0);
@@ -87,8 +90,7 @@ public HiveMQEdgeThreadFactory(final @NotNull String factoryName) {
8790
public @NotNull Thread newThread(final @NotNull Runnable r) {
8891
final Thread thread = new Thread(group, r, factoryName + "-" + counter.getAndIncrement());
8992
thread.setDaemon(true);
90-
thread.setUncaughtExceptionHandler((t, e) ->
91-
log.error("Uncaught exception in thread {}", t.getName(), e));
93+
thread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread {}", t.getName(), e));
9294
return thread;
9395
}
9496
}

0 commit comments

Comments
 (0)