Skip to content

Commit 21ad391

Browse files
committed
Fixes
1 parent 146937c commit 21ad391

File tree

4 files changed

+66
-125
lines changed

4 files changed

+66
-125
lines changed

api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
@ConfigurationProperties("rbac")
1010
public class RoleBasedAccessControlProperties {
1111

12-
private final List<Role> roles = new ArrayList<>();
12+
private List<Role> roles = new ArrayList<>();
1313
// private String haha;
1414

15-
1615
@PostConstruct
1716
public void init() {
1817
roles.forEach(Role::validate);
@@ -22,4 +21,9 @@ public List<Role> getRoles() {
2221
return roles;
2322
}
2423

24+
public void setRoles(List<Role> roles) {
25+
this.roles = roles;
26+
init();
27+
}
28+
2529
}

api/src/main/java/io/kafbat/ui/service/app/ConfigReloadService.java

Lines changed: 31 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import java.io.IOException;
99
import java.io.InputStream;
1010
import java.nio.file.Files;
11+
import java.nio.file.Path;
1112
import java.nio.file.Paths;
1213
import java.util.LinkedHashSet;
14+
import java.util.List;
1315
import java.util.Objects;
1416
import java.util.Properties;
1517
import java.util.stream.Collectors;
@@ -19,17 +21,21 @@
1921
import lombok.RequiredArgsConstructor;
2022
import lombok.extern.slf4j.Slf4j;
2123
import org.springframework.beans.factory.ObjectProvider;
24+
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
2225
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
2326
import org.springframework.boot.context.properties.bind.Binder;
2427
import org.springframework.boot.env.OriginTrackedMapPropertySource;
28+
import org.springframework.boot.env.YamlPropertySourceLoader;
2529
import org.springframework.boot.origin.Origin;
2630
import org.springframework.boot.origin.OriginTrackedValue;
2731
import org.springframework.boot.origin.TextResourceOrigin;
2832
import org.springframework.context.ApplicationContext;
33+
import org.springframework.context.ConfigurableApplicationContext;
2934
import org.springframework.core.env.ConfigurableEnvironment;
3035
import org.springframework.core.env.MutablePropertySources;
3136
import org.springframework.core.env.PropertiesPropertySource;
3237
import org.springframework.core.env.PropertySource;
38+
import org.springframework.core.io.FileSystemResource;
3339
import org.springframework.core.io.Resource;
3440
import org.springframework.stereotype.Service;
3541

@@ -42,14 +48,12 @@ public class ConfigReloadService {
4248
private static final String THREAD_NAME = "config-watcher-thread";
4349

4450
private final ConfigurableEnvironment environment;
45-
private final ApplicationContext appContext;
51+
private final RoleBasedAccessControlProperties rbacProperties;
52+
private final YamlPropertySourceLoader yamlLoader = new YamlPropertySourceLoader();
4653

4754
private Thread watcherThread;
4855
private MultiFileWatcher multiFileWatcher;
4956

50-
private final ObjectProvider<AccessControlService> accessControlService;
51-
private final ObjectProvider<RoleBasedAccessControlProperties> roleBasedAccessControlProperties;
52-
5357
@PostConstruct
5458
public void init() {
5559
var propertySourcePaths = StreamSupport.stream(environment.getPropertySources().spliterator(), false)
@@ -86,35 +90,36 @@ public void init() {
8690
log.debug("Auto reload is enabled, will watch for config changes");
8791

8892
try {
89-
this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, path -> {
90-
System.out.println(path);
91-
var propertySources = environment.getPropertySources();
92-
93-
94-
95-
Properties properties = new Properties();
96-
try {
97-
@Cleanup InputStream inputStream = Files.newInputStream(Paths.get("/tmp/kek.yaml"));
98-
properties.load(inputStream);
99-
} catch (IOException e) {
100-
throw new RuntimeException(e);
101-
}
102-
103-
PropertySource<?> origin =
104-
propertySources.stream().filter(ps -> ps.getName().contains("tmp/kek")).findFirst().get();
105-
environment.getPropertySources().replace(origin.getName(), new PropertiesPropertySource(origin.getName(), properties));
106-
107-
System.out.println();
108-
var kekw = appContext.getBean(AccessControlService.class);
109-
return null;
110-
});
93+
this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, this::reloadFile);
11194
this.watcherThread = new Thread(multiFileWatcher::watchLoop, THREAD_NAME);
11295
this.watcherThread.start();
11396
} catch (IOException e) {
11497
log.error("Error while registering watch service", e);
11598
}
11699
}
117100

101+
private void reloadFile(Path path) {
102+
log.info("Reloading file {}", path);
103+
try {
104+
if (path.toString().endsWith(".yml") || path.toString().endsWith(".yaml")) {
105+
String name = String.format("Config resource 'file [%s] via location '%s'",
106+
path.toAbsolutePath().toString(),
107+
path.toAbsolutePath().toString());
108+
109+
List<PropertySource<?>> load = yamlLoader.load(path.toString(), new FileSystemResource(path));
110+
environment.getPropertySources().remove(name);
111+
environment.getPropertySources().addFirst(load.getFirst());
112+
Binder binder = Binder.get(environment);
113+
binder.bind("rbac", RoleBasedAccessControlProperties.class).ifBound(bound ->
114+
rbacProperties.setRoles(bound.getRoles())
115+
);
116+
}
117+
} catch (Throwable e) {
118+
log.error("Error while reloading file {}", path, e);
119+
}
120+
121+
}
122+
118123
@PreDestroy
119124
public void shutdown() {
120125
try {
@@ -127,23 +132,4 @@ public void shutdown() {
127132
this.watcherThread.interrupt();
128133
}
129134
}
130-
131-
private void reload() {
132-
var registry = (DefaultSingletonBeanRegistry) appContext.getAutowireCapableBeanFactory();
133-
134-
registry.destroySingleton("AccessControlService");
135-
136-
Binder.get(environment)
137-
.bind("rbac", RoleBasedAccessControlProperties.class)
138-
.orElseThrow(() -> new IllegalStateException("no rbac config"));
139-
140-
var newProps = appContext.getBean(AccessControlService.class);
141-
newProps.init();
142-
// accessControlService.init();
143-
System.out.println();
144-
145-
146-
}
147-
148-
149135
}

api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import lombok.extern.slf4j.Slf4j;
3333
import org.apache.commons.collections.CollectionUtils;
3434
import org.apache.commons.lang3.StringUtils;
35+
import org.springframework.beans.factory.ObjectProvider;
3536
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3637
import org.springframework.core.env.Environment;
3738
import org.springframework.security.access.AccessDeniedException;
@@ -60,6 +61,7 @@ public class AccessControlService {
6061
@Getter
6162
private Set<ProviderAuthorityExtractor> oauthExtractors = Collections.emptySet();
6263

64+
6365
@PostConstruct
6466
public void init() {
6567
if (CollectionUtils.isEmpty(properties.getRoles())) {

api/src/main/java/io/kafbat/ui/util/MultiFileWatcher.java

Lines changed: 27 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,25 @@
33
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
44
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
55
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
6-
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
76

87
import java.io.IOException;
98
import java.io.UncheckedIOException;
9+
import java.net.URI;
1010
import java.nio.file.ClosedWatchServiceException;
1111
import java.nio.file.FileSystems;
12-
import java.nio.file.Files;
1312
import java.nio.file.Path;
14-
import java.nio.file.Paths;
13+
import java.nio.file.WatchEvent;
1514
import java.nio.file.WatchKey;
1615
import java.nio.file.WatchService;
1716
import java.time.Duration;
1817
import java.util.Collection;
1918
import java.util.HashMap;
19+
import java.util.List;
2020
import java.util.Map;
21-
import java.util.Optional;
2221
import java.util.Set;
23-
import java.util.concurrent.Callable;
2422
import java.util.concurrent.ConcurrentHashMap;
25-
import java.util.function.Function;
23+
import java.util.function.Consumer;
2624
import lombok.extern.slf4j.Slf4j;
27-
import org.jetbrains.annotations.NotNull;
2825
import org.springframework.util.Assert;
2926

3027
@Slf4j
@@ -33,83 +30,62 @@ public final class MultiFileWatcher implements AutoCloseable {
3330
private static final long DEBOUNCE_MS = Duration.ofMillis(1000).toMillis();
3431

3532
private final WatchService watchService = FileSystems.getDefault().newWatchService();
36-
private final Set<Path> watchedFiles = ConcurrentHashMap.newKeySet();
33+
private final Set<URI> watchedFiles = ConcurrentHashMap.newKeySet();
3734
private final Map<WatchKey, Path> watchDirsByKey = new HashMap<>();
38-
private final Function<Path, Void> reloader;
35+
private final Consumer<Path> reloader;
3936

4037
private long lastTriggerAt = 0;
4138

42-
public MultiFileWatcher(Collection<Path> filesToWatch, Function<Path, Void> reloader) throws IOException {
39+
public MultiFileWatcher(Collection<Path> filesToWatch, Consumer<Path> reloader) throws IOException {
4340
Assert.notNull(reloader, "reloader must not be null");
4441
this.reloader = reloader;
4542

4643
if (filesToWatch.isEmpty()) {
4744
log.warn("No files to watch, aborting");
4845
}
4946

47+
48+
List<Path> directories = filesToWatch.stream().map(Path::getParent).distinct().toList();
5049
watchedFiles.addAll(filesToWatch.stream()
51-
.map(p -> {
52-
try {
53-
return Files.exists(p) ? p.toRealPath() : p.toAbsolutePath().normalize();
54-
} catch (IOException e) {
55-
return p.toAbsolutePath().normalize();
56-
}
57-
})
58-
.toList());
50+
.map(p -> p.toAbsolutePath().normalize())
51+
.map(Path::toUri)
52+
.toList()
53+
);
5954

6055
if (watchedFiles.isEmpty()) {
6156
log.warn("No files to watch resolved, aborting");
6257
return;
6358
}
6459

6560
log.debug("Going to watch {} files", watchedFiles.size());
66-
log.trace("Watching files: {}", watchedFiles.stream().map(Path::toString).toList());
61+
log.trace("Watching files: {}", watchedFiles.stream().map(URI::toString).toList());
6762

68-
watchedFiles.stream()
69-
// .map(getParentPath())
70-
.distinct()
71-
.forEach(file -> {
63+
directories
64+
.forEach(dir -> {
7265
try {
73-
var key = getParentPath().apply(file).register(watchService,
74-
ENTRY_MODIFY,
75-
ENTRY_CREATE, ENTRY_DELETE // watch these for atomic replacements
76-
);
77-
watchDirsByKey.put(key, file);
66+
WatchKey key = dir.register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE);
67+
watchDirsByKey.put(key, dir);
7868
} catch (IOException e) {
7969
throw new UncheckedIOException(e);
8070
}
8171
});
8272

83-
log.trace("Watching directories: {}", watchDirsByKey.values().stream().map(a -> getParentPath().apply(a)).map(Path::toString).toList());
73+
// log.trace("Watching directories: {}", watchDirsByKey.values().stream().map(a -> getParentPath().apply(a)).map(Path::toString).toList());
8474
}
8575

8676
public void watchLoop() {
8777
while (true) {
8878
try {
8979
var key = watchService.take();
90-
var dir = getParentPath().apply(watchDirsByKey.get(key));
91-
if (dir == null) {
92-
continue;
93-
}
94-
95-
var hit = key.pollEvents()
96-
.stream()
97-
.filter(ev -> ev.kind() != OVERFLOW)
98-
.map(ev -> dir.resolve((Path) ev.context()).normalize().toAbsolutePath())
99-
.anyMatch(this::matchesTarget);
100-
101-
if (hit && shouldTrigger()) {
102-
var filePath = watchDirsByKey.get(key); // TODO
103-
reloader.apply(filePath);
104-
}
105-
106-
if (!key.reset()) {
107-
watchDirsByKey.remove(key);
108-
}
109-
if (watchDirsByKey.isEmpty()) {
110-
break;
80+
Path dir = watchDirsByKey.get(key);
81+
for (WatchEvent<?> event : key.pollEvents()) {
82+
Path relativePath = (Path) event.context();
83+
Path path = dir.resolve(relativePath);
84+
if (watchedFiles.contains(path.toAbsolutePath().normalize().toUri())) {
85+
reloader.accept(path);
86+
}
11187
}
112-
88+
key.reset();
11389
} catch (InterruptedException e) {
11490
Thread.currentThread().interrupt();
11591
break;
@@ -123,36 +99,9 @@ public void watchLoop() {
12399
}
124100
}
125101

126-
private boolean matchesTarget(Path changed) {
127-
if (watchedFiles.contains(changed)) {
128-
return true;
129-
}
130-
try {
131-
return watchedFiles.contains(changed.toRealPath());
132-
} catch (IOException ignored) {
133-
return false;
134-
}
135-
}
136-
137-
private boolean shouldTrigger() {
138-
var now = System.currentTimeMillis();
139-
140-
if (now - lastTriggerAt < DEBOUNCE_MS) {
141-
return false;
142-
}
143-
144-
lastTriggerAt = now;
145-
return true;
146-
}
147-
148102
@Override
149103
public void close() throws IOException {
150104
watchService.close();
151105
}
152-
153-
@NotNull
154-
private static Function<Path, Path> getParentPath() {
155-
return p -> Optional.ofNullable(p.getParent()).orElse(Path.of("."));
156-
}
157106
}
158107

0 commit comments

Comments
 (0)