Skip to content

Commit c20e978

Browse files
committed
remove stress over adapter by not restarting it and instead providing hot reload for mappings and tags (cheaper). will see how this improves stability.
1 parent 26de8d9 commit c20e978

File tree

4 files changed

+300
-64
lines changed

4 files changed

+300
-64
lines changed

hivemq-edge/src/main/java/com/hivemq/api/resources/impl/ProtocolAdaptersResourceImpl.java

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
import com.hivemq.edge.api.model.StatusTransitionResult;
7171
import com.hivemq.edge.api.model.TagSchema;
7272
import com.hivemq.edge.modules.adapters.impl.ProtocolAdapterDiscoveryOutputImpl;
73+
import com.hivemq.persistence.mappings.NorthboundMapping;
74+
import com.hivemq.persistence.mappings.SouthboundMapping;
7375
import com.hivemq.persistence.topicfilter.TopicFilterPersistence;
7476
import com.hivemq.persistence.topicfilter.TopicFilterPojo;
7577
import com.hivemq.protocols.InternalProtocolAdapterWritingService;
@@ -803,22 +805,31 @@ public int getDepth() {
803805
missingTags));
804806
}
805807

806-
return configExtractor.getAdapterByAdapterId(adapterId)
807-
.map(cfg -> new ProtocolAdapterEntity(cfg.getAdapterId(),
808-
cfg.getProtocolId(),
809-
cfg.getConfigVersion(),
810-
cfg.getConfig(),
811-
converted,
812-
cfg.getSouthboundMappings(),
813-
cfg.getTags()))
814-
.map(newCfg -> {
815-
if (!configExtractor.updateAdapter(newCfg)) {
816-
return adapterCannotBeUpdatedError(adapterId);
817-
}
818-
log.info("Successfully updated northbound mappings for adapter '{}'.", adapterId);
819-
return Response.ok(northboundMappings).build();
820-
})
821-
.orElseGet(adapterNotUpdatedError(adapterId));
808+
final List<NorthboundMapping> convertedMappings =
809+
converted.stream().map(NorthboundMappingEntity::toPersistence).toList();
810+
if (protocolAdapterManager.updateNorthboundMappingsHotReload(adapterId, convertedMappings)) {
811+
// update config persistence
812+
return configExtractor.getAdapterByAdapterId(adapterId)
813+
.map(cfg -> new ProtocolAdapterEntity(cfg.getAdapterId(),
814+
cfg.getProtocolId(),
815+
cfg.getConfigVersion(),
816+
cfg.getConfig(),
817+
converted,
818+
cfg.getSouthboundMappings(),
819+
cfg.getTags()))
820+
.map(newCfg -> {
821+
if (!configExtractor.updateAdapter(newCfg)) {
822+
return adapterCannotBeUpdatedError(adapterId);
823+
}
824+
log.info("Successfully updated northbound mappings for adapter '{}' via hot-reload.",
825+
adapterId);
826+
return Response.ok(northboundMappings).build();
827+
})
828+
.orElseGet(adapterNotUpdatedError(adapterId));
829+
} else {
830+
log.error("Hot-reload failed for northbound mappings on adapter '{}'", adapterId);
831+
return errorResponse(new InternalServerError("Failed to hot-reload northbound mappings"));
832+
}
822833
};
823834
}
824835

@@ -841,22 +852,31 @@ public int getDepth() {
841852
missingTags));
842853
}
843854

844-
return configExtractor.getAdapterByAdapterId(adapterId)
845-
.map(cfg -> new ProtocolAdapterEntity(cfg.getAdapterId(),
846-
cfg.getProtocolId(),
847-
cfg.getConfigVersion(),
848-
cfg.getConfig(),
849-
cfg.getNorthboundMappings(),
850-
converted,
851-
cfg.getTags()))
852-
.map(newCfg -> {
853-
if (!configExtractor.updateAdapter(newCfg)) {
854-
return adapterCannotBeUpdatedError(adapterId);
855-
}
856-
log.info("Successfully updated fromMappings for adapter '{}'.", adapterId);
857-
return Response.ok(southboundMappings).build();
858-
})
859-
.orElseGet(adapterNotUpdatedError(adapterId));
855+
final List<SouthboundMapping> convertedMappings =
856+
converted.stream().map(entity -> entity.toPersistence(objectMapper)).toList();
857+
if (protocolAdapterManager.updateSouthboundMappingsHotReload(adapterId, convertedMappings)) {
858+
// update config persistence
859+
return configExtractor.getAdapterByAdapterId(adapterId)
860+
.map(cfg -> new ProtocolAdapterEntity(cfg.getAdapterId(),
861+
cfg.getProtocolId(),
862+
cfg.getConfigVersion(),
863+
cfg.getConfig(),
864+
cfg.getNorthboundMappings(),
865+
converted,
866+
cfg.getTags()))
867+
.map(newCfg -> {
868+
if (!configExtractor.updateAdapter(newCfg)) {
869+
return adapterCannotBeUpdatedError(adapterId);
870+
}
871+
log.info("Successfully updated southbound mappings for adapter '{}' via hot-reload.",
872+
adapterId);
873+
return Response.ok(southboundMappings).build();
874+
})
875+
.orElseGet(adapterNotUpdatedError(adapterId));
876+
} else {
877+
log.error("Hot-reload failed for southbound mappings on adapter '{}'", adapterId);
878+
return errorResponse(new InternalServerError("Failed to hot-reload southbound mappings"));
879+
}
860880
};
861881
}
862882

@@ -911,11 +931,10 @@ private void validateAdapterSchema(
911931

912932
private @NotNull Adapter toAdapter(final @NotNull ProtocolAdapterWrapper value) {
913933
final String adapterId = value.getId();
914-
final Map<String, Object> config = runWithContextLoader(
915-
value.getAdapterFactory().getClass().getClassLoader(),
916-
() -> {
917-
final Map<String, Object> cfg = value.getAdapterFactory()
918-
.unconvertConfigObject(objectMapper, value.getConfigObject());
934+
final Map<String, Object> config =
935+
runWithContextLoader(value.getAdapterFactory().getClass().getClassLoader(), () -> {
936+
final Map<String, Object> cfg =
937+
value.getAdapterFactory().unconvertConfigObject(objectMapper, value.getConfigObject());
919938
cfg.put("id", value.getId());
920939
return cfg;
921940
});

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterConfig.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@
3030
public class ProtocolAdapterConfig {
3131

3232
private final @NotNull ProtocolSpecificAdapterConfig adapterConfig;
33-
private final @NotNull List<? extends Tag> tags;
33+
private @NotNull List<? extends Tag> tags;
3434
private final @NotNull String adapterId;
3535
private final @NotNull String protocolId;
3636
private final int configVersion;
37-
private final @NotNull List<SouthboundMapping> southboundMappings;
38-
private final @NotNull List<NorthboundMapping> northboundMappings;
37+
private @NotNull List<SouthboundMapping> southboundMappings;
38+
private @NotNull List<NorthboundMapping> northboundMappings;
3939

4040
public ProtocolAdapterConfig(
4141
final @NotNull String adapterId,
@@ -99,6 +99,36 @@ public int getConfigVersion() {
9999
return configVersion;
100100
}
101101

102+
/**
103+
* Updates the tags for hot-reload support.
104+
* This method is used to update tags without restarting the adapter.
105+
*
106+
* @param tags the new tags
107+
*/
108+
public void setTags(final @NotNull List<? extends Tag> tags) {
109+
this.tags = tags;
110+
}
111+
112+
/**
113+
* Updates the northbound mappings for hot-reload support.
114+
* This method is used to update northbound mappings without restarting the adapter.
115+
*
116+
* @param northboundMappings the new northbound mappings
117+
*/
118+
public void setNorthboundMappings(final @NotNull List<NorthboundMapping> northboundMappings) {
119+
this.northboundMappings = northboundMappings;
120+
}
121+
122+
/**
123+
* Updates the southbound mappings for hot-reload support.
124+
* This method is used to update southbound mappings without restarting the adapter.
125+
*
126+
* @param southboundMappings the new southbound mappings
127+
*/
128+
public void setSouthboundMappings(final @NotNull List<SouthboundMapping> southboundMappings) {
129+
this.southboundMappings = southboundMappings;
130+
}
131+
102132
@Override
103133
public boolean equals(final Object o) {
104134
if (o == null || getClass() != o.getClass()) return false;

hivemq-edge/src/main/java/com/hivemq/protocols/ProtocolAdapterManager.java

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import com.hivemq.edge.modules.api.adapters.ProtocolAdapterPollingService;
4141
import com.hivemq.persistence.domain.DomainTag;
4242
import com.hivemq.persistence.domain.DomainTagAddResult;
43+
import com.hivemq.persistence.mappings.NorthboundMapping;
44+
import com.hivemq.persistence.mappings.SouthboundMapping;
4345
import com.hivemq.protocols.northbound.NorthboundConsumerFactory;
4446
import jakarta.inject.Inject;
4547
import jakarta.inject.Singleton;
@@ -305,28 +307,68 @@ public boolean isWritingEnabled() {
305307
if (alreadyExists) {
306308
return DomainTagAddResult.failed(ALREADY_EXISTS, adapterId);
307309
}
308-
deleteAdapterInternal(wrapper.getId());
310+
309311
try {
310-
tags.add(configConverter.domainTagToTag(wrapper.getProtocolAdapterInformation().getProtocolId(),
311-
domainTag));
312-
startAsync(createAdapterInternal(new ProtocolAdapterConfig(wrapper.getId(),
313-
wrapper.getAdapterInformation().getProtocolId(),
314-
wrapper.getAdapterInformation().getCurrentConfigVersion(),
315-
wrapper.getConfigObject(),
316-
wrapper.getSouthboundMappings(),
317-
wrapper.getNorthboundMappings(),
318-
tags), versionProvider.getVersion())).get();
312+
final var convertedTag =
313+
configConverter.domainTagToTag(wrapper.getProtocolAdapterInformation().getProtocolId(),
314+
domainTag);
315+
316+
// Use hot-reload to add tag without restarting the adapter
317+
log.debug("Adding tag '{}' to adapter '{}' via hot-reload", domainTag.getTagName(), adapterId);
318+
wrapper.addTagHotReload(convertedTag, eventService);
319+
320+
log.info("Successfully added tag '{}' to adapter '{}' via hot-reload",
321+
domainTag.getTagName(),
322+
adapterId);
319323
return DomainTagAddResult.success();
320-
} catch (final InterruptedException e) {
321-
Thread.currentThread().interrupt();
322-
log.error("Interrupted while async execution: ", e.getCause());
324+
} catch (final IllegalStateException e) {
325+
log.error("Cannot hot-reload tag, adapter not in correct state: {}", e.getMessage());
326+
return DomainTagAddResult.failed(ADAPTER_FAILED_TO_START, adapterId);
323327
} catch (final Throwable e) {
324-
log.error("Exception happened while async execution: ", e.getCause());
328+
log.error("Exception happened while adding tag via hot-reload: ", e);
329+
return DomainTagAddResult.failed(ADAPTER_FAILED_TO_START, adapterId);
325330
}
326-
return DomainTagAddResult.failed(ADAPTER_FAILED_TO_START, adapterId);
327331
}).orElse(DomainTagAddResult.failed(ADAPTER_MISSING, adapterId));
328332
}
329333

334+
public boolean updateNorthboundMappingsHotReload(
335+
final @NotNull String adapterId,
336+
final @NotNull List<NorthboundMapping> northboundMappings) {
337+
return getProtocolAdapterWrapperByAdapterId(adapterId).map(wrapper -> {
338+
try {
339+
log.debug("Updating northbound mappings for adapter '{}' via hot-reload", adapterId);
340+
wrapper.updateMappingsHotReload(northboundMappings, null, eventService);
341+
log.info("Successfully updated northbound mappings for adapter '{}' via hot-reload", adapterId);
342+
return true;
343+
} catch (final IllegalStateException e) {
344+
log.error("Cannot hot-reload northbound mappings, adapter not in correct state: {}", e.getMessage());
345+
return false;
346+
} catch (final Throwable e) {
347+
log.error("Exception happened while updating northbound mappings via hot-reload: ", e);
348+
return false;
349+
}
350+
}).orElse(false);
351+
}
352+
353+
public boolean updateSouthboundMappingsHotReload(
354+
final @NotNull String adapterId,
355+
final @NotNull List<SouthboundMapping> southboundMappings) {
356+
return getProtocolAdapterWrapperByAdapterId(adapterId).map(wrapper -> {
357+
try {
358+
log.debug("Updating southbound mappings for adapter '{}' via hot-reload", adapterId);
359+
wrapper.updateMappingsHotReload(null, southboundMappings, eventService);
360+
log.info("Successfully updated southbound mappings for adapter '{}' via hot-reload", adapterId);
361+
return true;
362+
} catch (final IllegalStateException e) {
363+
log.error("Cannot hot-reload southbound mappings, adapter not in correct state: {}", e.getMessage());
364+
return false;
365+
} catch (final Throwable e) {
366+
log.error("Exception happened while updating southbound mappings via hot-reload: ", e);
367+
return false;
368+
}
369+
}).orElse(false);
370+
}
371+
330372
public @NotNull List<DomainTag> getDomainTags() {
331373
return protocolAdapters.values()
332374
.stream()

0 commit comments

Comments
 (0)