Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/frontend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ jobs:
ref: ${{ github.event.pull_request.head.sha }}
token: ${{ github.token }}

- uses: pnpm/action-setup@v3.0.0
- uses: pnpm/action-setup@v4.0.0
with:
version: 8.6.12
version: 9.1.2

- name: Install node
uses: actions/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ To run Kafbat UI, you can use either a pre-built Docker image or build it (or a
## Quick start (Demo run)

```
docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true image: ghcr.io/kafbat/kafka-ui
docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true ghcr.io/kafbat/kafka-ui
```

Then access the web UI at [http://localhost:8080](http://localhost:8080)
Expand Down
3 changes: 1 addition & 2 deletions api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#FROM azul/zulu-openjdk-alpine:17-jre-headless
FROM azul/zulu-openjdk-alpine@sha256:d59f1266db40341318e563fd76c21b2880ffa5d371f0c097c29d33f89c3a0010
FROM azul/zulu-openjdk-alpine:17.0.11-jre-headless

RUN apk add --no-cache \
# snappy codec
Expand Down
26 changes: 25 additions & 1 deletion api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;

@Slf4j
@Getter
Expand All @@ -34,7 +36,7 @@ class OffsetsInfo {

OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.beginOffsets = firstOffsetsForPolling(consumer, targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions);
endOffsets.forEach((tp, endOffset) -> {
var beginningOffset = beginOffsets.get(tp);
Expand All @@ -46,6 +48,28 @@ class OffsetsInfo {
});
}


private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
try {
// we try to use offsetsForTimes() to find earliest offsets, since for
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
// even when effectively first offset can be very high
var offsets = consumer.offsetsForTimes(
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
);
// result of offsetsForTimes() can be null, if message version < 0.10.0
if (offsets.entrySet().stream().noneMatch(e -> e.getValue() == null)) {
return offsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
}
} catch (UnsupportedOperationException | UnsupportedVersionException e) {
// offsetsForTimes() not supported
}
//falling back to beginningOffsets() if offsetsForTimes() not supported
return consumer.beginningOffsets(partitions);
}

boolean assignedPartitionsFullyPolled() {
for (var tp : consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp));
Expand Down
10 changes: 7 additions & 3 deletions api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -84,20 +86,22 @@ private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));

List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
Set<TopicPartition> paused = new HashSet<>();
while (!sink.isCancelled() && paused.size() < range.size()) {
var polledRecords = poll(sink, consumer);
range.forEach((tp, fromTo) -> {
polledRecords.records(tp).stream()
.filter(r -> r.offset() < fromTo.to)
.forEach(result::add);

//next position is out of target range -> pausing partition
if (consumer.position(tp) >= fromTo.to) {
if (!paused.contains(tp) && consumer.position(tp) >= fromTo.to) {
paused.add(tp);
consumer.pause(List.of(tp));
}
});
}
consumer.resume(consumer.paused());
consumer.resume(paused);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.kafbat.ui.serdes.BuiltInSerde;
import io.kafbat.ui.util.jsonschema.ProtobufSchemaConverter;
import java.io.ByteArrayInputStream;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
Expand Down Expand Up @@ -404,7 +405,7 @@ private Loader createFilesLoader(Map<String, ProtoFile> files) {
@SneakyThrows
private Map<String, ProtoFile> loadFilesWithLocations() {
Map<String, ProtoFile> filesByLocations = new HashMap<>();
try (var files = Files.walk(baseLocation)) {
try (var files = Files.walk(baseLocation, FileVisitOption.FOLLOW_LINKS)) {
files.filter(p -> !Files.isDirectory(p) && p.toString().endsWith(".proto"))
.forEach(path -> {
// relative path will be used as "import" statement
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package io.kafbat.ui.service.rbac.extractor;

import io.kafbat.ui.config.auth.LdapProperties;
import io.kafbat.ui.model.rbac.Role;
import io.kafbat.ui.model.rbac.provider.Provider;
import io.kafbat.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -14,25 +11,26 @@
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
import org.springframework.util.Assert;
import org.springframework.security.ldap.userdetails.NestedLdapAuthoritiesPopulator;

@Slf4j
public class RbacLdapAuthoritiesExtractor extends DefaultLdapAuthoritiesPopulator {
public class RbacLdapAuthoritiesExtractor extends NestedLdapAuthoritiesPopulator {

private final AccessControlService acs;
private final LdapProperties props;

public RbacLdapAuthoritiesExtractor(ApplicationContext context,
BaseLdapPathContextSource contextSource, String groupFilterSearchBase) {
super(contextSource, groupFilterSearchBase);
this.acs = context.getBean(AccessControlService.class);
this.props = context.getBean(LdapProperties.class);
}

@Override
protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username) {
var ldapGroups = getRoles(user.getNameInNamespace(), username);
var ldapGroups = super.getGroupMembershipRoles(user.getNameInNamespace(), username)
.stream()
.map(GrantedAuthority::getAuthority)
.peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
.collect(Collectors.toSet());

return acs.getRoles()
.stream()
Expand All @@ -47,32 +45,4 @@ protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, St
.map(SimpleGrantedAuthority::new)
.collect(Collectors.toSet());
}

private Set<String> getRoles(String userDn, String username) {
var groupSearchBase = props.getGroupFilterSearchBase();
Assert.notNull(groupSearchBase, "groupSearchBase is empty");

var groupRoleAttribute = props.getGroupRoleAttribute();
if (groupRoleAttribute == null) {

groupRoleAttribute = "cn";
}

log.trace(
"Searching for roles for user [{}] with DN [{}], groupRoleAttribute [{}] and filter [{}] in search base [{}]",
username, userDn, groupRoleAttribute, getGroupSearchFilter(), groupSearchBase);

var ldapTemplate = getLdapTemplate();
ldapTemplate.setIgnoreNameNotFoundException(true);

Set<Map<String, List<String>>> userRoles = ldapTemplate.searchForMultipleAttributeValues(
groupSearchBase, getGroupSearchFilter(), new String[] {userDn, username},
new String[] {groupRoleAttribute});

return userRoles.stream()
.map(record -> record.get(getGroupRoleAttribute()).get(0))
.peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
.collect(Collectors.toSet());
}

}
2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/util/GithubReleaseInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class GithubReleaseInfo {
private static final String GITHUB_LATEST_RELEASE_RETRIEVAL_URL =
"https://api.github.com/repos/kafbat/kafka-ui/releases/latest";

private static final Duration GITHUB_API_MAX_WAIT_TIME = Duration.ofSeconds(2);
private static final Duration GITHUB_API_MAX_WAIT_TIME = Duration.ofSeconds(10);

public record GithubReleaseDto(String html_url, String tag_name, String published_at) {

Expand Down
1 change: 1 addition & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3471,6 +3471,7 @@ components:
- PAUSED
- UNASSIGNED
- TASK_FAILED
- RESTARTING

ConnectorAction:
type: string
Expand Down
1 change: 1 addition & 0 deletions contract/src/main/resources/swagger/kafka-connect-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ components:
- FAILED
- PAUSED
- UNASSIGNED
- RESTARTING
worker_id:
type: string
trace:
Expand Down
8 changes: 4 additions & 4 deletions documentation/compose/kafbat-ui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ services:
- "9093:9092"
- "9998:9998"
environment:
KAFKA_BROKER_ID: 1
KAFKA_BROKER_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9998
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@kafka1:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
Expand Down
11 changes: 11 additions & 0 deletions frontend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ Install [pnpm](https://pnpm.io/installation)
npm install -g pnpm
```

Update pnpm
```
npm rm -g pnpm
```
Then reinstall it

or use
```
npm install -g pnpm@<version>
```

Install dependencies
```
pnpm install
Expand Down
4 changes: 2 additions & 2 deletions frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"react-hot-toast": "2.4.1",
"react-is": "18.2.0",
"react-multi-select-component": "4.3.4",
"react-router-dom": "6.4.3",
"react-router-dom": "6.23.0",
"sass": "1.66.1",
"styled-components": "6.1.8",
"use-debounce": "10.0.0",
Expand Down Expand Up @@ -104,7 +104,7 @@
},
"engines": {
"node": "v18.17.1",
"pnpm": "^8.6.12"
"pnpm": "v9.1.2"
},
"pnpm": {
"overrides": {
Expand Down
Loading