Skip to content

Commit fc07f56

Browse files
Merge branch 'main' into issues/348
2 parents d789bae + 1196f9f commit fc07f56

File tree

34 files changed

+5361
-4331
lines changed

34 files changed

+5361
-4331
lines changed

.github/workflows/frontend_tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ jobs:
2121
ref: ${{ github.event.pull_request.head.sha }}
2222
token: ${{ github.token }}
2323

24-
- uses: pnpm/action-setup@v3.0.0
24+
- uses: pnpm/action-setup@v4.0.0
2525
with:
26-
version: 8.6.12
26+
version: 9.1.2
2727

2828
- name: Install node
2929
uses: actions/[email protected]

api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import io.kafbat.ui.api.ApplicationConfigApi;
77
import io.kafbat.ui.config.ClustersProperties;
8+
import io.kafbat.ui.model.ActionDTO;
89
import io.kafbat.ui.model.ApplicationConfigDTO;
910
import io.kafbat.ui.model.ApplicationConfigPropertiesDTO;
1011
import io.kafbat.ui.model.ApplicationConfigValidationDTO;
@@ -18,6 +19,7 @@
1819
import io.kafbat.ui.util.ApplicationRestarter;
1920
import io.kafbat.ui.util.DynamicConfigOperations;
2021
import java.util.Map;
22+
import java.util.Optional;
2123
import javax.annotation.Nullable;
2224
import lombok.RequiredArgsConstructor;
2325
import lombok.extern.slf4j.Slf4j;
@@ -46,6 +48,12 @@ interface PropertiesMapper {
4648
DynamicConfigOperations.PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);
4749

4850
ApplicationConfigPropertiesDTO toDto(DynamicConfigOperations.PropertiesStructure propertiesStructure);
51+
52+
default ActionDTO stringToActionDto(String str) {
53+
return Optional.ofNullable(str)
54+
.map(s -> Enum.valueOf(ActionDTO.class, s.toUpperCase()))
55+
.orElseThrow();
56+
}
4957
}
5058

5159
private final DynamicConfigOperations dynamicConfigOperations;
@@ -75,7 +83,7 @@ public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExch
7583
@Override
7684
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
7785
ServerWebExchange exchange) {
78-
var context = AccessContext.builder()
86+
var context = AccessContext.builder()
7987
.applicationConfigActions(EDIT)
8088
.operationName("restartWithConfig")
8189
.build();

api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
import java.util.HashSet;
77
import java.util.Map;
88
import java.util.Set;
9+
import java.util.stream.Collectors;
910
import lombok.Getter;
1011
import lombok.extern.slf4j.Slf4j;
1112
import org.apache.commons.lang3.mutable.MutableLong;
1213
import org.apache.kafka.clients.consumer.Consumer;
1314
import org.apache.kafka.common.TopicPartition;
15+
import org.apache.kafka.common.errors.UnsupportedVersionException;
1416

1517
@Slf4j
1618
@Getter
@@ -34,7 +36,7 @@ class OffsetsInfo {
3436

3537
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
3638
this.consumer = consumer;
37-
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
39+
this.beginOffsets = firstOffsetsForPolling(consumer, targetPartitions);
3840
this.endOffsets = consumer.endOffsets(targetPartitions);
3941
endOffsets.forEach((tp, endOffset) -> {
4042
var beginningOffset = beginOffsets.get(tp);
@@ -46,6 +48,28 @@ class OffsetsInfo {
4648
});
4749
}
4850

51+
52+
private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer,
53+
Collection<TopicPartition> partitions) {
54+
try {
55+
// we try to use offsetsForTimes() to find earliest offsets, since for
56+
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
57+
// even when effectively first offset can be very high
58+
var offsets = consumer.offsetsForTimes(
59+
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
60+
);
61+
// result of offsetsForTimes() can be null, if message version < 0.10.0
62+
if (offsets.entrySet().stream().noneMatch(e -> e.getValue() == null)) {
63+
return offsets.entrySet().stream()
64+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
65+
}
66+
} catch (UnsupportedOperationException | UnsupportedVersionException e) {
67+
// offsetsForTimes() not supported
68+
}
69+
//falling back to beginningOffsets() if offsetsForTimes() not supported
70+
return consumer.beginningOffsets(partitions);
71+
}
72+
4973
boolean assignedPartitionsFullyPolled() {
5074
for (var tp : consumer.assignment()) {
5175
Preconditions.checkArgument(endOffsets.containsKey(tp));

api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import io.kafbat.ui.model.ConsumerPosition;
44
import io.kafbat.ui.model.TopicMessageEventDTO;
55
import java.util.ArrayList;
6+
import java.util.HashSet;
67
import java.util.List;
8+
import java.util.Set;
79
import java.util.TreeMap;
810
import java.util.function.Supplier;
911
import lombok.extern.slf4j.Slf4j;
@@ -84,20 +86,22 @@ private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
8486
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
8587

8688
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
87-
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
89+
Set<TopicPartition> paused = new HashSet<>();
90+
while (!sink.isCancelled() && paused.size() < range.size()) {
8891
var polledRecords = poll(sink, consumer);
8992
range.forEach((tp, fromTo) -> {
9093
polledRecords.records(tp).stream()
9194
.filter(r -> r.offset() < fromTo.to)
9295
.forEach(result::add);
9396

9497
//next position is out of target range -> pausing partition
95-
if (consumer.position(tp) >= fromTo.to) {
98+
if (!paused.contains(tp) && consumer.position(tp) >= fromTo.to) {
99+
paused.add(tp);
96100
consumer.pause(List.of(tp));
97101
}
98102
});
99103
}
100-
consumer.resume(consumer.paused());
104+
consumer.resume(paused);
101105
return result;
102106
}
103107
}

api/src/main/java/io/kafbat/ui/model/rbac/Permission.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.apache.commons.collections.CollectionUtils.isNotEmpty;
44

55
import com.google.common.base.Preconditions;
6+
import io.kafbat.ui.model.ActionDTO;
67
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
78
import java.util.List;
89
import java.util.regex.Pattern;
@@ -50,7 +51,7 @@ public void transform() {
5051
if (value != null) {
5152
this.compiledValuePattern = Pattern.compile(value);
5253
}
53-
if (actions.stream().anyMatch("ALL"::equalsIgnoreCase)) {
54+
if (actions.stream().anyMatch(ActionDTO.ALL.name()::equalsIgnoreCase)) {
5455
this.parsedActions = resource.allActions();
5556
} else {
5657
this.parsedActions = resource.parseActionsWithDependantsUnnest(actions);

api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.kafbat.ui.serdes.BuiltInSerde;
5050
import io.kafbat.ui.util.jsonschema.ProtobufSchemaConverter;
5151
import java.io.ByteArrayInputStream;
52+
import java.nio.file.FileVisitOption;
5253
import java.nio.file.Files;
5354
import java.nio.file.Path;
5455
import java.util.Collection;
@@ -404,7 +405,7 @@ private Loader createFilesLoader(Map<String, ProtoFile> files) {
404405
@SneakyThrows
405406
private Map<String, ProtoFile> loadFilesWithLocations() {
406407
Map<String, ProtoFile> filesByLocations = new HashMap<>();
407-
try (var files = Files.walk(baseLocation)) {
408+
try (var files = Files.walk(baseLocation, FileVisitOption.FOLLOW_LINKS)) {
408409
files.filter(p -> !Files.isDirectory(p) && p.toString().endsWith(".proto"))
409410
.forEach(path -> {
410411
// relative path will be used as "import" statement
Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package io.kafbat.ui.service.rbac.extractor;
22

3-
import io.kafbat.ui.config.auth.LdapProperties;
43
import io.kafbat.ui.model.rbac.Role;
54
import io.kafbat.ui.model.rbac.provider.Provider;
65
import io.kafbat.ui.service.rbac.AccessControlService;
7-
import java.util.List;
8-
import java.util.Map;
96
import java.util.Set;
107
import java.util.stream.Collectors;
118
import lombok.extern.slf4j.Slf4j;
@@ -14,25 +11,26 @@
1411
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
1512
import org.springframework.security.core.GrantedAuthority;
1613
import org.springframework.security.core.authority.SimpleGrantedAuthority;
17-
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
18-
import org.springframework.util.Assert;
14+
import org.springframework.security.ldap.userdetails.NestedLdapAuthoritiesPopulator;
1915

2016
@Slf4j
21-
public class RbacLdapAuthoritiesExtractor extends DefaultLdapAuthoritiesPopulator {
17+
public class RbacLdapAuthoritiesExtractor extends NestedLdapAuthoritiesPopulator {
2218

2319
private final AccessControlService acs;
24-
private final LdapProperties props;
2520

2621
public RbacLdapAuthoritiesExtractor(ApplicationContext context,
2722
BaseLdapPathContextSource contextSource, String groupFilterSearchBase) {
2823
super(contextSource, groupFilterSearchBase);
2924
this.acs = context.getBean(AccessControlService.class);
30-
this.props = context.getBean(LdapProperties.class);
3125
}
3226

3327
@Override
3428
protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username) {
35-
var ldapGroups = getRoles(user.getNameInNamespace(), username);
29+
var ldapGroups = super.getGroupMembershipRoles(user.getNameInNamespace(), username)
30+
.stream()
31+
.map(GrantedAuthority::getAuthority)
32+
.peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
33+
.collect(Collectors.toSet());
3634

3735
return acs.getRoles()
3836
.stream()
@@ -47,32 +45,4 @@ protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, St
4745
.map(SimpleGrantedAuthority::new)
4846
.collect(Collectors.toSet());
4947
}
50-
51-
private Set<String> getRoles(String userDn, String username) {
52-
var groupSearchBase = props.getGroupFilterSearchBase();
53-
Assert.notNull(groupSearchBase, "groupSearchBase is empty");
54-
55-
var groupRoleAttribute = props.getGroupRoleAttribute();
56-
if (groupRoleAttribute == null) {
57-
58-
groupRoleAttribute = "cn";
59-
}
60-
61-
log.trace(
62-
"Searching for roles for user [{}] with DN [{}], groupRoleAttribute [{}] and filter [{}] in search base [{}]",
63-
username, userDn, groupRoleAttribute, getGroupSearchFilter(), groupSearchBase);
64-
65-
var ldapTemplate = getLdapTemplate();
66-
ldapTemplate.setIgnoreNameNotFoundException(true);
67-
68-
Set<Map<String, List<String>>> userRoles = ldapTemplate.searchForMultipleAttributeValues(
69-
groupSearchBase, getGroupSearchFilter(), new String[] {userDn, username},
70-
new String[] {groupRoleAttribute});
71-
72-
return userRoles.stream()
73-
.map(record -> record.get(getGroupRoleAttribute()).get(0))
74-
.peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
75-
.collect(Collectors.toSet());
76-
}
77-
7848
}

api/src/main/java/io/kafbat/ui/util/GithubReleaseInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class GithubReleaseInfo {
1111
private static final String GITHUB_LATEST_RELEASE_RETRIEVAL_URL =
1212
"https://api.github.com/repos/kafbat/kafka-ui/releases/latest";
1313

14-
private static final Duration GITHUB_API_MAX_WAIT_TIME = Duration.ofSeconds(2);
14+
private static final Duration GITHUB_API_MAX_WAIT_TIME = Duration.ofSeconds(10);
1515

1616
public record GithubReleaseDto(String html_url, String tag_name, String published_at) {
1717

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3471,6 +3471,7 @@ components:
34713471
- PAUSED
34723472
- UNASSIGNED
34733473
- TASK_FAILED
3474+
- RESTARTING
34743475

34753476
ConnectorAction:
34763477
type: string
@@ -3820,6 +3821,7 @@ components:
38203821
Action:
38213822
type: string
38223823
enum:
3824+
- ALL
38233825
- VIEW
38243826
- EDIT
38253827
- CREATE

contract/src/main/resources/swagger/kafka-connect-api.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ components:
447447
- FAILED
448448
- PAUSED
449449
- UNASSIGNED
450+
- RESTARTING
450451
worker_id:
451452
type: string
452453
trace:

0 commit comments

Comments
 (0)