Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Getter
@ConfigurationProperties("rbac")
public class RoleBasedAccessControlProperties {

private final List<Role> roles = new ArrayList<>();
private volatile List<Role> roles = new ArrayList<>();

private DefaultRole defaultRole;

Expand All @@ -23,8 +25,9 @@ public void init() {
}
}

public List<Role> getRoles() {
return roles;
public void setRoles(List<Role> roles) {
this.roles = roles;
init();
}

public void setDefaultRole(DefaultRole defaultRole) {
Expand Down
125 changes: 125 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.kafbat.ui.service.app;

import io.kafbat.ui.config.auth.RoleBasedAccessControlProperties;
import io.kafbat.ui.util.MultiFileWatcher;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.env.OriginTrackedMapPropertySource;
import org.springframework.boot.env.YamlPropertySourceLoader;
import org.springframework.boot.origin.Origin;
import org.springframework.boot.origin.OriginTrackedValue;
import org.springframework.boot.origin.TextResourceOrigin;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
@ConditionalOnProperty(value = "config.autoreload", havingValue = "true")
public class ConfigReloadService {

private static final String THREAD_NAME = "config-watcher-thread";

private final ConfigurableEnvironment environment;
private final RoleBasedAccessControlProperties rbacProperties;
private final YamlPropertySourceLoader yamlLoader = new YamlPropertySourceLoader();

private Thread watcherThread;
private MultiFileWatcher multiFileWatcher;

@PostConstruct
public void init() {
var propertySourcePaths = StreamSupport.stream(environment.getPropertySources().spliterator(), false)
.filter(OriginTrackedMapPropertySource.class::isInstance)
.map(OriginTrackedMapPropertySource.class::cast)
.flatMap(ps -> ps.getSource().values().stream())
.map(v -> (v instanceof OriginTrackedValue otv) ? otv.getOrigin() : null)
.filter(Objects::nonNull)
.flatMap(o -> Stream.iterate(o, Objects::nonNull, Origin::getParent))
.filter(TextResourceOrigin.class::isInstance)
.map(TextResourceOrigin.class::cast)
.map(TextResourceOrigin::getResource)
.filter(Objects::nonNull)
.filter(Resource::exists)
.filter(Resource::isReadable)
.filter(Resource::isFile)
.map(r -> {
try {
return r.getURI();
} catch (IOException e) {
log.error("can't retrieve resource URL", e);
return null;
}
})
.filter(Objects::nonNull)
.map(Paths::get)
.collect(Collectors.toCollection(LinkedHashSet::new));

if (propertySourcePaths.isEmpty()) {
log.debug("No config files found, auto reload is disabled");
return;
}

log.debug("Auto reload is enabled, will watch for config changes");

try {
this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, this::reloadFile);
this.watcherThread = new Thread(multiFileWatcher::watchLoop, THREAD_NAME);
this.watcherThread.start();
} catch (IOException e) {
log.error("Error while registering watch service", e);
}
}

private void reloadFile(Path path) {
log.info("Reloading file {}", path);
try {
if (!path.toString().endsWith(".yml") && !path.toString().endsWith(".yaml")) {
log.trace("Skipping non-YML file {}", path);
}

String name = String.format("Config resource 'file [%s] via location '%s'",
path.toAbsolutePath(),
path.toAbsolutePath()); // TODO extract an obj reference from env

List<PropertySource<?>> load = yamlLoader.load(path.toString(), new FileSystemResource(path));
environment.getPropertySources().remove(name);
environment.getPropertySources().addFirst(load.getFirst());
Binder binder = Binder.get(environment);

binder.bind("rbac", RoleBasedAccessControlProperties.class)
.ifBound(bound -> rbacProperties.setRoles(bound.getRoles()));
} catch (Throwable e) {
log.error("Error while reloading file {}", path, e);
}
}

@PreDestroy
public void shutdown() {
try {
if (multiFileWatcher != null) {
multiFileWatcher.close();
}
} catch (IOException ignored) {
}
if (watcherThread != null) {
this.watcherThread.interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class AccessControlService {
@Getter
private Set<ProviderAuthorityExtractor> oauthExtractors = Collections.emptySet();


@PostConstruct
public void init() {
if (CollectionUtils.isEmpty(properties.getRoles()) && properties.getDefaultRole() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import org.yaml.snakeyaml.introspector.BeanAccess;
import org.yaml.snakeyaml.introspector.Property;
import org.yaml.snakeyaml.introspector.PropertyUtils;
Expand Down Expand Up @@ -79,7 +77,7 @@ public Optional<PropertySource<?>> loadDynamicPropertySource() {
if (dynamicConfigEnabled()) {
Path configPath = dynamicConfigFilePath();
if (!Files.exists(configPath) || !Files.isReadable(configPath)) {
log.warn("Dynamic config file {} doesnt exist or not readable", configPath);
log.warn("Dynamic config file {} doesnt exist or is not readable", configPath);
return Optional.empty();
}
var propertySource = new CompositePropertySource("dynamicProperties");
Expand Down
110 changes: 110 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.kafbat.ui.util;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;

@Slf4j
public final class MultiFileWatcher implements AutoCloseable {

private final WatchService watchService = FileSystems.getDefault().newWatchService();
private final Set<URI> watchedFiles = ConcurrentHashMap.newKeySet();
private final Map<WatchKey, Path> watchDirsByKey = new HashMap<>();
private final Consumer<Path> reloader;

public MultiFileWatcher(Collection<Path> filesToWatch, Consumer<Path> reloader) throws IOException {
Assert.notNull(reloader, "reloader must not be null");
this.reloader = reloader;

if (filesToWatch.isEmpty()) {
log.warn("No files to watch, aborting");
}


watchedFiles.addAll(filesToWatch.stream()
.map(p -> p.toAbsolutePath().normalize())
.map(Path::toUri)
.toList()
);

if (watchedFiles.isEmpty()) {
log.warn("No files to watch resolved, aborting");
return;
}

log.debug("Going to watch {} files", watchedFiles.size());
log.trace("Watching files: {}", watchedFiles.stream().map(URI::toString).toList());

var directories = filesToWatch
.stream()
.map(Path::getParent)
.distinct()
.toList();

directories
.forEach(dir -> {
try {
var key = dir.register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE);
watchDirsByKey.put(key, dir);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

log.trace("Watching directories: {}", directories.stream().map(Path::toString).toList());
}

public void watchLoop() {
while (true) {
try {
var key = watchService.take();
Path dir = watchDirsByKey.get(key);
if (dir == null) {
continue;
}

for (WatchEvent<?> event : key.pollEvents()) {
Path relativePath = (Path) event.context();
Path path = dir.resolve(relativePath);
if (watchedFiles.contains(path.toAbsolutePath().normalize().toUri())) {
reloader.accept(path);
}
}
key.reset();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (ClosedWatchServiceException e) {
log.trace("Watch service closed, exiting watcher thread");
break;
} catch (Exception e) {
log.error("Error while calling the reloader", e);
break;
}
}
}

@Override
public void close() throws IOException {
watchService.close();
}
}

Loading