Skip to content

Commit 2f0e88a

Browse files
authored
Merge branch 'main' into issues/208
2 parents e7bf56b + 8cf4e11 commit 2f0e88a

File tree

18 files changed

+5234
-4275
lines changed

18 files changed

+5234
-4275
lines changed

.github/workflows/frontend_tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ jobs:
2121
ref: ${{ github.event.pull_request.head.sha }}
2222
token: ${{ github.token }}
2323

24-
- uses: pnpm/action-setup@v3.0.0
24+
- uses: pnpm/action-setup@v4.0.0
2525
with:
26-
version: 8.6.12
26+
version: 9.1.2
2727

2828
- name: Install node
2929
uses: actions/[email protected]

api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
import java.util.HashSet;
77
import java.util.Map;
88
import java.util.Set;
9+
import java.util.stream.Collectors;
910
import lombok.Getter;
1011
import lombok.extern.slf4j.Slf4j;
1112
import org.apache.commons.lang3.mutable.MutableLong;
1213
import org.apache.kafka.clients.consumer.Consumer;
1314
import org.apache.kafka.common.TopicPartition;
15+
import org.apache.kafka.common.errors.UnsupportedVersionException;
1416

1517
@Slf4j
1618
@Getter
@@ -34,7 +36,7 @@ class OffsetsInfo {
3436

3537
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
3638
this.consumer = consumer;
37-
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
39+
this.beginOffsets = firstOffsetsForPolling(consumer, targetPartitions);
3840
this.endOffsets = consumer.endOffsets(targetPartitions);
3941
endOffsets.forEach((tp, endOffset) -> {
4042
var beginningOffset = beginOffsets.get(tp);
@@ -46,6 +48,28 @@ class OffsetsInfo {
4648
});
4749
}
4850

51+
52+
private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer,
53+
Collection<TopicPartition> partitions) {
54+
try {
55+
// we try to use offsetsForTimes() to find earliest offsets, since for
56+
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
57+
// even when effectively first offset can be very high
58+
var offsets = consumer.offsetsForTimes(
59+
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
60+
);
61+
// result of offsetsForTimes() can be null, if message version < 0.10.0
62+
if (offsets.entrySet().stream().noneMatch(e -> e.getValue() == null)) {
63+
return offsets.entrySet().stream()
64+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
65+
}
66+
} catch (UnsupportedOperationException | UnsupportedVersionException e) {
67+
// offsetsForTimes() not supported
68+
}
69+
//falling back to beginningOffsets() if offsetsForTimes() not supported
70+
return consumer.beginningOffsets(partitions);
71+
}
72+
4973
boolean assignedPartitionsFullyPolled() {
5074
for (var tp : consumer.assignment()) {
5175
Preconditions.checkArgument(endOffsets.containsKey(tp));

api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import io.kafbat.ui.model.ConsumerPosition;
44
import io.kafbat.ui.model.TopicMessageEventDTO;
55
import java.util.ArrayList;
6+
import java.util.HashSet;
67
import java.util.List;
8+
import java.util.Set;
79
import java.util.TreeMap;
810
import java.util.function.Supplier;
911
import lombok.extern.slf4j.Slf4j;
@@ -84,20 +86,22 @@ private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
8486
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
8587

8688
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
87-
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
89+
Set<TopicPartition> paused = new HashSet<>();
90+
while (!sink.isCancelled() && paused.size() < range.size()) {
8891
var polledRecords = poll(sink, consumer);
8992
range.forEach((tp, fromTo) -> {
9093
polledRecords.records(tp).stream()
9194
.filter(r -> r.offset() < fromTo.to)
9295
.forEach(result::add);
9396

9497
//next position is out of target range -> pausing partition
95-
if (consumer.position(tp) >= fromTo.to) {
98+
if (!paused.contains(tp) && consumer.position(tp) >= fromTo.to) {
99+
paused.add(tp);
96100
consumer.pause(List.of(tp));
97101
}
98102
});
99103
}
100-
consumer.resume(consumer.paused());
104+
consumer.resume(paused);
101105
return result;
102106
}
103107
}

api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.kafbat.ui.serdes.BuiltInSerde;
5050
import io.kafbat.ui.util.jsonschema.ProtobufSchemaConverter;
5151
import java.io.ByteArrayInputStream;
52+
import java.nio.file.FileVisitOption;
5253
import java.nio.file.Files;
5354
import java.nio.file.Path;
5455
import java.util.Collection;
@@ -404,7 +405,7 @@ private Loader createFilesLoader(Map<String, ProtoFile> files) {
404405
@SneakyThrows
405406
private Map<String, ProtoFile> loadFilesWithLocations() {
406407
Map<String, ProtoFile> filesByLocations = new HashMap<>();
407-
try (var files = Files.walk(baseLocation)) {
408+
try (var files = Files.walk(baseLocation, FileVisitOption.FOLLOW_LINKS)) {
408409
files.filter(p -> !Files.isDirectory(p) && p.toString().endsWith(".proto"))
409410
.forEach(path -> {
410411
// relative path will be used as "import" statement
Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package io.kafbat.ui.service.rbac.extractor;
22

3-
import io.kafbat.ui.config.auth.LdapProperties;
43
import io.kafbat.ui.model.rbac.Role;
54
import io.kafbat.ui.model.rbac.provider.Provider;
65
import io.kafbat.ui.service.rbac.AccessControlService;
7-
import java.util.List;
8-
import java.util.Map;
96
import java.util.Set;
107
import java.util.stream.Collectors;
118
import lombok.extern.slf4j.Slf4j;
@@ -14,25 +11,26 @@
1411
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
1512
import org.springframework.security.core.GrantedAuthority;
1613
import org.springframework.security.core.authority.SimpleGrantedAuthority;
17-
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
18-
import org.springframework.util.Assert;
14+
import org.springframework.security.ldap.userdetails.NestedLdapAuthoritiesPopulator;
1915

2016
@Slf4j
21-
public class RbacLdapAuthoritiesExtractor extends DefaultLdapAuthoritiesPopulator {
17+
public class RbacLdapAuthoritiesExtractor extends NestedLdapAuthoritiesPopulator {
2218

2319
private final AccessControlService acs;
24-
private final LdapProperties props;
2520

2621
public RbacLdapAuthoritiesExtractor(ApplicationContext context,
2722
BaseLdapPathContextSource contextSource, String groupFilterSearchBase) {
2823
super(contextSource, groupFilterSearchBase);
2924
this.acs = context.getBean(AccessControlService.class);
30-
this.props = context.getBean(LdapProperties.class);
3125
}
3226

3327
@Override
3428
protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username) {
35-
var ldapGroups = getRoles(user.getNameInNamespace(), username);
29+
var ldapGroups = super.getGroupMembershipRoles(user.getNameInNamespace(), username)
30+
.stream()
31+
.map(GrantedAuthority::getAuthority)
32+
.peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
33+
.collect(Collectors.toSet());
3634

3735
return acs.getRoles()
3836
.stream()
@@ -47,32 +45,4 @@ protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, St
4745
.map(SimpleGrantedAuthority::new)
4846
.collect(Collectors.toSet());
4947
}
50-
51-
private Set<String> getRoles(String userDn, String username) {
52-
var groupSearchBase = props.getGroupFilterSearchBase();
53-
Assert.notNull(groupSearchBase, "groupSearchBase is empty");
54-
55-
var groupRoleAttribute = props.getGroupRoleAttribute();
56-
if (groupRoleAttribute == null) {
57-
58-
groupRoleAttribute = "cn";
59-
}
60-
61-
log.trace(
62-
"Searching for roles for user [{}] with DN [{}], groupRoleAttribute [{}] and filter [{}] in search base [{}]",
63-
username, userDn, groupRoleAttribute, getGroupSearchFilter(), groupSearchBase);
64-
65-
var ldapTemplate = getLdapTemplate();
66-
ldapTemplate.setIgnoreNameNotFoundException(true);
67-
68-
Set<Map<String, List<String>>> userRoles = ldapTemplate.searchForMultipleAttributeValues(
69-
groupSearchBase, getGroupSearchFilter(), new String[] {userDn, username},
70-
new String[] {groupRoleAttribute});
71-
72-
return userRoles.stream()
73-
.map(record -> record.get(getGroupRoleAttribute()).get(0))
74-
.peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
75-
.collect(Collectors.toSet());
76-
}
77-
7848
}

api/src/main/java/io/kafbat/ui/util/GithubReleaseInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class GithubReleaseInfo {
1111
private static final String GITHUB_LATEST_RELEASE_RETRIEVAL_URL =
1212
"https://api.github.com/repos/kafbat/kafka-ui/releases/latest";
1313

14-
private static final Duration GITHUB_API_MAX_WAIT_TIME = Duration.ofSeconds(2);
14+
private static final Duration GITHUB_API_MAX_WAIT_TIME = Duration.ofSeconds(10);
1515

1616
public record GithubReleaseDto(String html_url, String tag_name, String published_at) {
1717

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3471,6 +3471,7 @@ components:
34713471
- PAUSED
34723472
- UNASSIGNED
34733473
- TASK_FAILED
3474+
- RESTARTING
34743475

34753476
ConnectorAction:
34763477
type: string

contract/src/main/resources/swagger/kafka-connect-api.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ components:
447447
- FAILED
448448
- PAUSED
449449
- UNASSIGNED
450+
- RESTARTING
450451
worker_id:
451452
type: string
452453
trace:

documentation/compose/kafbat-ui.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,18 @@ services:
6262
- "9093:9092"
6363
- "9998:9998"
6464
environment:
65-
KAFKA_BROKER_ID: 1
65+
KAFKA_BROKER_ID: 2
6666
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
6767
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
6868
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
6969
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
7070
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
7171
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
7272
KAFKA_JMX_PORT: 9998
73-
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998
73+
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998
7474
KAFKA_PROCESS_ROLES: 'broker,controller'
75-
KAFKA_NODE_ID: 1
76-
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
75+
KAFKA_NODE_ID: 2
76+
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@kafka1:29093'
7777
KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
7878
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
7979
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

frontend/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ Install [pnpm](https://pnpm.io/installation)
2626
npm install -g pnpm
2727
```
2828

29+
Update pnpm
30+
```
31+
npm rm -g pnpm
32+
```
33+
Then reinstall it
34+
35+
or use
36+
```
37+
npm install -g pnpm@<version>
38+
```
39+
2940
Install dependencies
3041
```
3142
pnpm install

0 commit comments

Comments
 (0)