Skip to content

Commit c1607d6

Browse files
authored
Added support for zipping fragment content in k8s (#1047)
1 parent 987331f commit c1607d6

File tree

22 files changed

+156
-63
lines changed

22 files changed

+156
-63
lines changed

hivemq-edge/src/main/java/com/hivemq/api/model/adapters/AdapterStatusModelConversionUtils.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,29 +47,20 @@ public class AdapterStatusModelConversionUtils {
4747

4848
public static @NotNull Status.ConnectionEnum convertConnectionStatus(final @NotNull ProtocolAdapterState.ConnectionStatus connectionStatus) {
4949
Preconditions.checkNotNull(connectionStatus);
50-
switch (connectionStatus) {
51-
case DISCONNECTED:
52-
return Status.ConnectionEnum.DISCONNECTED;
53-
case CONNECTED:
54-
return Status.ConnectionEnum.CONNECTED;
55-
case ERROR:
56-
return Status.ConnectionEnum.ERROR;
57-
case STATELESS:
58-
return Status.ConnectionEnum.STATELESS;
59-
default:
60-
case UNKNOWN:
61-
return Status.ConnectionEnum.UNKNOWN;
62-
}
50+
return switch (connectionStatus) {
51+
case DISCONNECTED -> Status.ConnectionEnum.DISCONNECTED;
52+
case CONNECTED -> Status.ConnectionEnum.CONNECTED;
53+
case ERROR -> Status.ConnectionEnum.ERROR;
54+
case STATELESS -> Status.ConnectionEnum.STATELESS;
55+
default -> Status.ConnectionEnum.UNKNOWN;
56+
};
6357
}
6458

6559
public static @NotNull Status.RuntimeEnum convertRuntimeStatus(final @NotNull ProtocolAdapterState.RuntimeStatus runtimeStatus) {
6660
Preconditions.checkNotNull(runtimeStatus);
67-
switch (runtimeStatus) {
68-
case STARTED:
69-
return Status.RuntimeEnum.STARTED;
70-
default:
71-
case STOPPED:
72-
return Status.RuntimeEnum.STOPPED;
61+
if(ProtocolAdapterState.RuntimeStatus.STARTED.equals(runtimeStatus)) {
62+
return Status.RuntimeEnum.STARTED;
7363
}
64+
return Status.RuntimeEnum.STOPPED;
7465
}
7566
}

hivemq-edge/src/main/java/com/hivemq/configuration/ConfigurationBootstrap.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ public class ConfigurationBootstrap {
7070

7171
final ConfigurationFile configurationFile = ConfigurationFileProvider.get(systemInformation);
7272

73-
final ConfigFileReaderWriter configFileReader = new ConfigFileReaderWriter(configurationFile,
73+
final ConfigFileReaderWriter configFileReader = new ConfigFileReaderWriter(
74+
systemInformation,
75+
configurationFile,
7476
List.of(
7577
new RestrictionConfigurator(configurationService.restrictionsConfiguration()),
7678
new SecurityConfigurator(configurationService.securityConfiguration()),

hivemq-edge/src/main/java/com/hivemq/configuration/EnvironmentVariables.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public class EnvironmentVariables {
5858
*/
5959
public static final String CONFIG_WRITEABLE = "HIVEMQ_CONFIG_WRITEABLE";
6060

61+
/**
62+
* Name of the environment variable for indicating that the config fragment will be zipped and base64 encoded
63+
*/
64+
public static final String CONFIG_FRAGMENT_BASE64ZIP = "HIVEMQ_CONFIG_FRAGMENT_BASE64ZIP";
65+
6166
/**
6267
* Name of the environment variable for configuring the data folder.
6368
*/

hivemq-edge/src/main/java/com/hivemq/configuration/SystemProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class SystemProperties {
2828
public static final String CONFIG_FOLDER_SECONDARY = "hivemq.config.secondary";
2929
public static final String CONFIG_REFRESH_INTERVAL = "hivemq.config.refreshinterval";
3030
public static final String CONFIG_WRITEABLE = "hivemq.config.writeable";
31+
public static final String CONFIG_FRAGMENT_BASE64ZIP = "hivemq.config.fragment.base64zip";
3132
public static final String LICENSE_FOLDER = "hivemq.license.folder";
3233

3334
public static final String DATA_FOLDER = "hivemq.data.folder";

hivemq-edge/src/main/java/com/hivemq/configuration/info/SystemInformation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ public interface SystemInformation {
104104
*/
105105
boolean isEmbedded();
106106

107+
/**
108+
* @return should the fragment config be treated as bing zipped and base64 encoded
109+
*/
110+
boolean isConfigFragmentBase64Zip();
111+
107112
/**
108113
* @return the interval between refreshing config files, 0 means no refreshing
109114
*/

hivemq-edge/src/main/java/com/hivemq/configuration/info/SystemInformationImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public class SystemInformationImpl implements SystemInformation {
5959

6060
private final boolean configWriteable;
6161

62+
private final boolean configFragmentBase64Zip;
63+
6264
public SystemInformationImpl() {
6365
this(false);
6466
}
@@ -78,6 +80,7 @@ public SystemInformationImpl(
7880
final @Nullable File licenseFolder) {
7981
final String refreshInterval = getSystemPropertyOrEnvironmentVariable(SystemProperties.CONFIG_REFRESH_INTERVAL, EnvironmentVariables.CONFIG_REFRESH_INTERVAL);
8082
final String configWriteable = getSystemPropertyOrEnvironmentVariable(SystemProperties.CONFIG_WRITEABLE, EnvironmentVariables.CONFIG_WRITEABLE);
83+
final String configFragmentBase64ZipString = getSystemPropertyOrEnvironmentVariable(SystemProperties.CONFIG_FRAGMENT_BASE64ZIP, EnvironmentVariables.CONFIG_FRAGMENT_BASE64ZIP);
8184
this.usePathOfRunningJar = usePathOfRunningJar;
8285
this.embedded = embedded;
8386
this.configFolder = configFolder;
@@ -89,6 +92,7 @@ public SystemInformationImpl(
8992
this.runningSince = System.currentTimeMillis();
9093
this.configRefreshIntervalInMs = Long.parseLong(Objects.requireNonNullElse(refreshInterval, "-1"));
9194
this.configWriteable = Boolean.parseBoolean((configWriteable == null || configWriteable.isEmpty()) ? "true" : configWriteable );
95+
this.configFragmentBase64Zip = Boolean.parseBoolean((configFragmentBase64ZipString == null || configFragmentBase64ZipString.isEmpty()) ? "false" : configFragmentBase64ZipString );
9296
processorCount = getPhysicalProcessorCount();
9397
}
9498

@@ -366,4 +370,9 @@ public boolean isEmbedded() {
366370
public boolean isConfigWriteable() {
367371
return configWriteable;
368372
}
373+
374+
@Override
375+
public boolean isConfigFragmentBase64Zip() {
376+
return configFragmentBase64Zip;
377+
}
369378
}

hivemq-edge/src/main/java/com/hivemq/configuration/reader/ConfigFileReaderWriter.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,20 @@
2929
import com.hivemq.configuration.entity.listener.WebsocketListenerEntity;
3030
import com.hivemq.configuration.entity.listener.tls.KeystoreEntity;
3131
import com.hivemq.configuration.entity.listener.tls.TruststoreEntity;
32+
import com.hivemq.configuration.info.SystemInformation;
3233
import com.hivemq.edge.HiveMQEdgeConstants;
3334
import com.hivemq.exceptions.UnrecoverableException;
3435
import com.hivemq.util.ThreadFactoryUtil;
3536
import com.hivemq.util.render.EnvVarUtil;
3637
import com.hivemq.util.render.FileFragmentUtil;
3738
import com.hivemq.util.render.IfUtil;
39+
import jakarta.xml.bind.JAXBContext;
40+
import jakarta.xml.bind.JAXBElement;
41+
import jakarta.xml.bind.JAXBException;
42+
import jakarta.xml.bind.Marshaller;
43+
import jakarta.xml.bind.Unmarshaller;
44+
import jakarta.xml.bind.ValidationEvent;
45+
import jakarta.xml.bind.ValidationEventLocator;
3846
import org.apache.commons.io.FileUtils;
3947
import org.jetbrains.annotations.NotNull;
4048
import org.jetbrains.annotations.Nullable;
@@ -44,17 +52,11 @@
4452
import org.xml.sax.SAXException;
4553

4654
import javax.xml.XMLConstants;
47-
import jakarta.xml.bind.JAXBContext;
48-
import jakarta.xml.bind.JAXBElement;
49-
import jakarta.xml.bind.JAXBException;
50-
import jakarta.xml.bind.Marshaller;
51-
import jakarta.xml.bind.Unmarshaller;
52-
import jakarta.xml.bind.ValidationEvent;
53-
import jakarta.xml.bind.ValidationEventLocator;
5455
import javax.xml.transform.stream.StreamSource;
5556
import javax.xml.validation.Schema;
5657
import javax.xml.validation.SchemaFactory;
5758
import java.io.ByteArrayInputStream;
59+
import java.io.ByteArrayOutputStream;
5860
import java.io.File;
5961
import java.io.FileWriter;
6062
import java.io.IOException;
@@ -74,13 +76,16 @@
7476
import java.util.HashMap;
7577
import java.util.List;
7678
import java.util.Map;
79+
import java.util.Optional;
7780
import java.util.concurrent.ConcurrentHashMap;
7881
import java.util.concurrent.Executors;
7982
import java.util.concurrent.ScheduledExecutorService;
8083
import java.util.concurrent.ThreadFactory;
8184
import java.util.concurrent.TimeUnit;
8285
import java.util.concurrent.atomic.AtomicLong;
8386
import java.util.stream.Collectors;
87+
import java.util.zip.DataFormatException;
88+
import java.util.zip.Inflater;
8489

8590
import static java.util.Objects.requireNonNullElse;
8691

@@ -91,7 +96,7 @@ public class ConfigFileReaderWriter {
9196
public static final String CONFIG_FRAGMENT_PATH = "/fragment/config";
9297

9398
private final @NotNull ConfigurationFile configurationFile;
94-
protected volatile @NotNull HiveMQConfigEntity configEntity;
99+
protected volatile HiveMQConfigEntity configEntity;
95100
private final Object lock = new Object();
96101
private boolean defaultBackupConfig = true;
97102
private volatile @Nullable ScheduledExecutorService scheduledExecutorService = null;
@@ -103,10 +108,12 @@ public class ConfigFileReaderWriter {
103108
private final @NotNull DataCombiningExtractor dataCombiningExtractor;
104109
private final @NotNull UnsExtractor unsExtractor;
105110
private final @NotNull List<ReloadableExtractor> reloadableExtractors;
111+
private final @NotNull SystemInformation systemInformation;
106112

107113
private final @NotNull AtomicLong lastWrite = new AtomicLong(0L);
108114

109115
public ConfigFileReaderWriter(
116+
final @NotNull SystemInformation systemInformation,
110117
final @NotNull ConfigurationFile configurationFile,
111118
final @NotNull List<Configurator<?>> configurators) {
112119
this.configurationFile = configurationFile;
@@ -115,6 +122,7 @@ public ConfigFileReaderWriter(
115122
this.protocolAdapterExtractor = new ProtocolAdapterExtractor(this);
116123
this.dataCombiningExtractor = new DataCombiningExtractor(this);
117124
this.unsExtractor = new UnsExtractor(this);
125+
this.systemInformation = systemInformation;
118126
reloadableExtractors = List.of(
119127
bridgeExtractor,
120128
protocolAdapterExtractor,
@@ -201,14 +209,17 @@ private void checkMonitoredFilesForChanges(
201209

202210
pathsToCheck.putAll(fileModificationTimestamps);
203211

204-
pathsToCheck.entrySet().forEach(pathToTs -> {
212+
pathsToCheck.forEach((key, value) -> {
205213
try {
206-
if (!pathToTs.getKey().toString().equals(CONFIG_FRAGMENT_PATH) && Files.getFileAttributeView(pathToTs.getKey().toRealPath(LinkOption.NOFOLLOW_LINKS), BasicFileAttributeView.class).readAttributes().lastModifiedTime().toMillis() > pathToTs.getValue()) {
207-
log.error("Restarting because a required file was updated: {}", pathToTs.getKey());
214+
if (!key.toString().equals(CONFIG_FRAGMENT_PATH) &&
215+
Files.getFileAttributeView(key.toRealPath(LinkOption.NOFOLLOW_LINKS),
216+
BasicFileAttributeView.class).readAttributes().lastModifiedTime().toMillis() >
217+
value) {
218+
log.error("Restarting because a required file was updated: {}", key);
208219
System.exit(0);
209220
}
210221
} catch (final IOException e) {
211-
throw new RuntimeException("Unable to read last modified time for " + pathToTs.getKey(), e);
222+
throw new RuntimeException("Unable to read last modified time for " + key, e);
212223
}
213224
});
214225
}
@@ -339,8 +350,7 @@ protected JAXBContext createContext() throws JAXBException {
339350
.build()
340351
.toArray(new Class<?>[0]);
341352

342-
final JAXBContext context = JAXBContext.newInstance(classes);
343-
return context;
353+
return JAXBContext.newInstance(classes);
344354
}
345355

346356
private void writeConfigToXML(final @NotNull ConfigurationFile outputFile, final boolean rollConfig) {
@@ -442,7 +452,10 @@ public void writeConfigToXML(final @NotNull Writer writer) {
442452

443453
//replace environment variable placeholders
444454
String configFileContent = Files.readString(configFile.toPath());
445-
final FileFragmentUtil.FragmentResult fragmentResult = FileFragmentUtil.replaceFragmentPlaceHolders(configFileContent);
455+
final var fragmentResult = FileFragmentUtil
456+
.replaceFragmentPlaceHolders(
457+
configFileContent,
458+
systemInformation.isConfigFragmentBase64Zip());
446459

447460
fragmentToModificationTime.putAll(fragmentResult.getFragmentToModificationTime());
448461

@@ -461,7 +474,6 @@ public void writeConfigToXML(final @NotNull Writer writer) {
461474
return true;
462475

463476
});
464-
465477
final JAXBElement<? extends HiveMQConfigEntity> result =
466478
unmarshaller.unmarshal(streamSource, getConfigEntityClass());
467479

@@ -587,8 +599,7 @@ protected Schema loadSchema() throws IOException, SAXException {
587599
if (resource != null) {
588600
try (final InputStream is = uncachedStream(resource)) {
589601
final SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
590-
final Schema schema = sf.newSchema(new StreamSource(is));
591-
return schema;
602+
return sf.newSchema(new StreamSource(is));
592603
}
593604
}
594605
log.warn("No schema loaded for validation of config xml.");

hivemq-edge/src/main/java/com/hivemq/util/render/FileFragmentUtil.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,25 @@
1717

1818
import com.hivemq.exceptions.UnrecoverableException;
1919
import org.jetbrains.annotations.NotNull;
20+
import org.jose4j.base64url.Base64;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

24+
import java.io.ByteArrayInputStream;
25+
import java.io.ByteArrayOutputStream;
2326
import java.io.IOException;
2427
import java.nio.charset.StandardCharsets;
2528
import java.nio.file.Files;
2629
import java.nio.file.Path;
2730
import java.util.HashMap;
2831
import java.util.Map;
32+
import java.util.Optional;
2933
import java.util.regex.Matcher;
3034
import java.util.regex.Pattern;
35+
import java.util.zip.DataFormatException;
36+
import java.util.zip.Inflater;
37+
import java.util.zip.ZipEntry;
38+
import java.util.zip.ZipInputStream;
3139

3240
public class FileFragmentUtil {
3341
private static final Logger log = LoggerFactory.getLogger(EnvVarUtil.class);
@@ -41,9 +49,9 @@ public class FileFragmentUtil {
4149
* @return replacement result containing the actual string and file and their modification time used in the replacement
4250
* @throws UnrecoverableException if a fragment used in a placeholder can't be loaded
4351
*/
44-
public static @NotNull FragmentResult replaceFragmentPlaceHolders(final @NotNull String text) {
52+
public static @NotNull FragmentResult replaceFragmentPlaceHolders(final @NotNull String text, final boolean contentIsZippedAndBase64Encoded) {
4553

46-
final StringBuffer resultString = new StringBuffer();
54+
final StringBuilder resultString = new StringBuilder();
4755
final Map<Path, Long> fragmentToModificationTime = new HashMap<>();
4856
final Matcher matcher = Pattern.compile(FRAGMENT_VAR_PATTERN)
4957
.matcher(text);
@@ -60,8 +68,19 @@ public class FileFragmentUtil {
6068
try {
6169
final Path fragmentPath = Path.of(pathString);
6270
final Long modificationTime = fragmentPath.toRealPath().toFile().lastModified();
63-
final String replacement = Files.readString(fragmentPath, StandardCharsets.UTF_8);
6471
fragmentToModificationTime.put(fragmentPath, modificationTime);
72+
73+
final String replacement;
74+
if (contentIsZippedAndBase64Encoded) {
75+
log.info("Config fragment {} is expected to be zipped and base64 encoded, extracting content.", pathString);
76+
replacement =
77+
deflate(Base64.decode(Files.readString(fragmentPath, StandardCharsets.UTF_8)))
78+
.map(deflated -> new String(deflated, StandardCharsets.UTF_8))
79+
.orElseThrow(UnrecoverableException::new);
80+
} else {
81+
replacement = Files.readString(fragmentPath, StandardCharsets.UTF_8);
82+
}
83+
6584
//sets replacement for this match
6685
matcher.appendReplacement(resultString, escapeReplacement(replacement));
6786
} catch (IOException e) {
@@ -100,4 +119,16 @@ public FragmentResult(
100119
return renderResult;
101120
}
102121
}
122+
123+
public static Optional<byte[]> deflate(final byte[] zipData) {
124+
try (final var zipStream = new ZipInputStream(new ByteArrayInputStream(zipData))) {
125+
if (zipStream.getNextEntry() != null) {
126+
return Optional.of(zipStream.readAllBytes());
127+
}
128+
return Optional.empty();
129+
} catch (final IOException e) {
130+
log.error("Error while extracting from ZIP: {}", e.getMessage());
131+
return Optional.empty();
132+
}
133+
}
103134
}

hivemq-edge/src/test/java/com/hivemq/configuration/reader/AbstractConfigurationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public void setUp() throws Exception {
8888

8989
final ConfigurationFile configurationFile = new ConfigurationFile(xmlFile);
9090
reader = new ConfigFileReaderWriter(
91+
systemInformation,
9192
configurationFile,
9293
List.of(
9394
new RestrictionConfigurator(restrictionsConfigurationService),

hivemq-edge/src/test/java/com/hivemq/configuration/reader/ConfigFileReaderTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.hivemq.configuration.entity.HiveMQConfigEntity;
2020
import com.hivemq.configuration.entity.adapter.MqttUserPropertyEntity;
2121
import com.hivemq.configuration.entity.adapter.ProtocolAdapterEntity;
22+
import com.hivemq.configuration.info.SystemInformation;
2223
import com.hivemq.exceptions.UnrecoverableException;
2324
import org.apache.commons.io.FileUtils;
2425
import org.jetbrains.annotations.NotNull;
@@ -145,7 +146,7 @@ public void whenUserPropertie_thenMapCorrectlyFilled() throws Exception {
145146
assertTrue(userPropertiesAfterReload.contains(new MqttUserPropertyEntity("my-name", "my-value2")));
146147
}
147148

148-
private static @NotNull ConfigFileReaderWriter getConfigFileReaderWriter(File tempFile) {
149+
private static @NotNull ConfigFileReaderWriter getConfigFileReaderWriter(final File tempFile) {
149150
final ConfigurationFile configurationFile = new ConfigurationFile(tempFile);
150151

151152
final RestrictionConfigurator restrictionConfigurator = mock(RestrictionConfigurator.class);
@@ -184,10 +185,12 @@ public void whenUserPropertie_thenMapCorrectlyFilled() throws Exception {
184185
final InternalConfigurator internalConfigurator = mock(InternalConfigurator.class);
185186
when(internalConfigurator.applyConfig(any())).thenReturn(Configurator.ConfigResult.SUCCESS);
186187

187-
188-
final ConfigFileReaderWriter configFileReader = new ConfigFileReaderWriter(
188+
final var sysInfo = mock(SystemInformation.class);
189+
//ALways set to true for the test to ensure the fragment zipping code doesn't interfer with regular file rendering
190+
when(sysInfo.isConfigFragmentBase64Zip()).thenReturn(true);
191+
return new ConfigFileReaderWriter(
192+
sysInfo,
189193
configurationFile,
190194
List.of());
191-
return configFileReader;
192195
}
193196
}

0 commit comments

Comments
 (0)