Skip to content

Commit 6d31b98

Browse files
authored
Merge branch 'main' into kafbat/203
2 parents 7843560 + 654978b commit 6d31b98

File tree

12 files changed

+101
-114
lines changed

12 files changed

+101
-114
lines changed

api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
import dev.cel.common.types.StructType;
1919
import dev.cel.compiler.CelCompiler;
2020
import dev.cel.compiler.CelCompilerFactory;
21+
import dev.cel.extensions.CelExtensions;
2122
import dev.cel.parser.CelStandardMacro;
2223
import dev.cel.runtime.CelEvaluationException;
2324
import dev.cel.runtime.CelRuntime;
2425
import dev.cel.runtime.CelRuntimeFactory;
2526
import io.kafbat.ui.exception.CelException;
26-
import io.kafbat.ui.model.MessageFilterTypeDTO;
2727
import io.kafbat.ui.model.TopicMessageDTO;
2828
import java.util.HashMap;
2929
import java.util.Map;
@@ -42,8 +42,7 @@ public class MessageFilters {
4242
private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName();
4343

4444
private static final CelCompiler CEL_COMPILER = createCompiler();
45-
private static final CelRuntime CEL_RUNTIME = CelRuntimeFactory.standardCelRuntimeBuilder()
46-
.build();
45+
private static final CelRuntime CEL_RUNTIME = createRuntime();
4746

4847
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
4948

@@ -143,6 +142,7 @@ private static CelCompiler createCompiler() {
143142
return CelCompilerFactory.standardCelCompilerBuilder()
144143
.setOptions(CelOptions.DEFAULT)
145144
.setStandardMacros(CelStandardMacro.STANDARD_MACROS)
145+
.addLibraries(CelExtensions.strings(), CelExtensions.encoders())
146146
.addVar(CEL_RECORD_VAR_NAME, recordType)
147147
.setResultType(SimpleType.BOOL)
148148
.setTypeProvider(new CelTypeProvider() {
@@ -159,6 +159,12 @@ public Optional<CelType> findType(String typeName) {
159159
.build();
160160
}
161161

162+
private static CelRuntime createRuntime() {
163+
return CelRuntimeFactory.standardCelRuntimeBuilder()
164+
.addLibraries(CelExtensions.strings(), CelExtensions.encoders())
165+
.build();
166+
}
167+
162168
@Nullable
163169
private static Object parseToJsonOrReturnAsIs(@Nullable String str) {
164170
if (str == null) {

api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.util.List;
1111
import java.util.Properties;
1212
import java.util.UUID;
13-
import java.util.stream.Collectors;
1413
import java.util.stream.Stream;
1514
import lombok.extern.slf4j.Slf4j;
1615
import lombok.val;
@@ -32,7 +31,6 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
3231
@Test
3332
void shouldNotFoundWhenNoSuchConsumerGroupId() {
3433
String groupId = "groupA";
35-
String expError = "The group id does not exist";
3634
webTestClient
3735
.delete()
3836
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
@@ -47,12 +45,13 @@ void shouldOkWhenConsumerGroupIsNotActive() {
4745

4846
//Create a consumer and subscribe to the topic
4947
String groupId = UUID.randomUUID().toString();
50-
val consumer = createTestConsumerWithGroupId(groupId);
51-
consumer.subscribe(List.of(topicName));
52-
consumer.poll(Duration.ofMillis(100));
48+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
49+
consumer.subscribe(List.of(topicName));
50+
consumer.poll(Duration.ofMillis(100));
5351

54-
//Unsubscribe from all topics to be able to delete this consumer
55-
consumer.unsubscribe();
52+
//Unsubscribe from all topics to be able to delete this consumer
53+
consumer.unsubscribe();
54+
}
5655

5756
//Delete the consumer when it's INACTIVE and check
5857
webTestClient
@@ -69,24 +68,24 @@ void shouldBeBadRequestWhenConsumerGroupIsActive() {
6968

7069
//Create a consumer and subscribe to the topic
7170
String groupId = UUID.randomUUID().toString();
72-
val consumer = createTestConsumerWithGroupId(groupId);
73-
consumer.subscribe(List.of(topicName));
74-
consumer.poll(Duration.ofMillis(100));
71+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
72+
consumer.subscribe(List.of(topicName));
73+
consumer.poll(Duration.ofMillis(100));
7574

76-
//Try to delete the consumer when it's ACTIVE
77-
String expError = "The group is not empty";
78-
webTestClient
79-
.delete()
80-
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
81-
.exchange()
82-
.expectStatus()
83-
.isBadRequest();
75+
//Try to delete the consumer when it's ACTIVE
76+
webTestClient
77+
.delete()
78+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
79+
.exchange()
80+
.expectStatus()
81+
.isBadRequest();
82+
}
8483
}
8584

8685
@Test
8786
void shouldReturnConsumerGroupsWithPagination() throws Exception {
88-
try (var groups1 = startConsumerGroups(3, "cgPageTest1");
89-
var groups2 = startConsumerGroups(2, "cgPageTest2")) {
87+
try (var ignored = startConsumerGroups(3, "cgPageTest1");
88+
var ignored1 = startConsumerGroups(2, "cgPageTest2")) {
9089
webTestClient
9190
.get()
9291
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=3&search=cgPageTest", LOCAL)
@@ -114,19 +113,19 @@ void shouldReturnConsumerGroupsWithPagination() throws Exception {
114113
});
115114

116115
webTestClient
117-
.get()
118-
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
119-
+ "=cgPageTest&orderBy=NAME&sortOrder=DESC", LOCAL)
120-
.exchange()
121-
.expectStatus()
122-
.isOk()
123-
.expectBody(ConsumerGroupsPageResponseDTO.class)
124-
.value(page -> {
125-
assertThat(page.getPageCount()).isEqualTo(1);
126-
assertThat(page.getConsumerGroups().size()).isEqualTo(5);
127-
assertThat(page.getConsumerGroups())
128-
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
129-
});
116+
.get()
117+
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
118+
+ "=cgPageTest&orderBy=NAME&sortOrder=DESC", LOCAL)
119+
.exchange()
120+
.expectStatus()
121+
.isOk()
122+
.expectBody(ConsumerGroupsPageResponseDTO.class)
123+
.value(page -> {
124+
assertThat(page.getPageCount()).isEqualTo(1);
125+
assertThat(page.getConsumerGroups().size()).isEqualTo(5);
126+
assertThat(page.getConsumerGroups())
127+
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
128+
});
130129

131130
webTestClient
132131
.get()
@@ -156,7 +155,7 @@ private Closeable startConsumerGroups(int count, String consumerGroupPrefix) {
156155
return consumer;
157156
})
158157
.limit(count)
159-
.collect(Collectors.toList());
158+
.toList();
160159
return () -> {
161160
consumers.forEach(KafkaConsumer::close);
162161
deleteTopic(topicName);

api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010
import io.kafbat.ui.exception.CelException;
1111
import io.kafbat.ui.model.TopicMessageDTO;
1212
import java.time.OffsetDateTime;
13-
import java.time.temporal.ChronoUnit;
1413
import java.util.ArrayList;
14+
import java.util.Base64;
1515
import java.util.List;
1616
import java.util.Map;
17+
import java.util.UUID;
1718
import java.util.function.Predicate;
1819
import org.apache.commons.lang3.RandomStringUtils;
1920
import org.junit.jupiter.api.Nested;
@@ -100,7 +101,7 @@ void canCheckTimestampMs() {
100101
var ts = OffsetDateTime.now();
101102
var f = celScriptFilter("record.timestampMs == " + ts.toInstant().toEpochMilli());
102103
assertTrue(f.test(msg().timestamp(ts)));
103-
assertFalse(f.test(msg().timestamp(ts.plus(1L, ChronoUnit.SECONDS))));
104+
assertFalse(f.test(msg().timestamp(ts.plusSeconds(1L))));
104105
}
105106

106107
@Test
@@ -177,6 +178,7 @@ void filterSpeedIsAtLeast5kPerSec() {
177178
toFilter.add(msg().content(jsonContent).key(randString));
178179
}
179180
// first iteration for warmup
181+
// noinspection ResultOfMethodCallIgnored
180182
toFilter.stream().filter(f).count();
181183

182184
long before = System.currentTimeMillis();
@@ -188,10 +190,15 @@ void filterSpeedIsAtLeast5kPerSec() {
188190
}
189191
}
190192

193+
@Test
194+
void testBase64DecodingWorks() {
195+
var uuid = UUID.randomUUID().toString();
196+
var msg = "test." + Base64.getEncoder().encodeToString(uuid.getBytes());
197+
var f = celScriptFilter("string(base64.decode(record.value.split('.')[1])).contains('" + uuid + "')");
198+
assertTrue(f.test(msg().content(msg)));
199+
}
200+
191201
private TopicMessageDTO msg() {
192-
return new TopicMessageDTO()
193-
.timestamp(OffsetDateTime.now())
194-
.offset(-1L)
195-
.partition(1);
202+
return new TopicMessageDTO(1, -1L, OffsetDateTime.now());
196203
}
197204
}

api/src/test/java/io/kafbat/ui/service/acl/AclCsvTest.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.exception.ValidationException;
77
import java.util.Collection;
88
import java.util.List;
9+
import java.util.stream.Stream;
910
import org.apache.kafka.common.acl.AccessControlEntry;
1011
import org.apache.kafka.common.acl.AclBinding;
1112
import org.apache.kafka.common.acl.AclOperation;
@@ -15,6 +16,8 @@
1516
import org.apache.kafka.common.resource.ResourceType;
1617
import org.junit.jupiter.api.Test;
1718
import org.junit.jupiter.params.ParameterizedTest;
19+
import org.junit.jupiter.params.provider.Arguments;
20+
import org.junit.jupiter.params.provider.MethodSource;
1821
import org.junit.jupiter.params.provider.ValueSource;
1922

2023
class AclCsvTest {
@@ -29,22 +32,26 @@ class AclCsvTest {
2932
);
3033

3134
@ParameterizedTest
32-
@ValueSource(strings = {
33-
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
34-
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
35-
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost",
36-
37-
//without header
38-
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
39-
+ "\n"
40-
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
41-
+ "\n"
42-
})
35+
@MethodSource
4336
void parsesValidInputCsv(String csvString) {
4437
Collection<AclBinding> parsed = AclCsv.parseCsv(csvString);
4538
assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS);
4639
}
4740

41+
private static Stream<Arguments> parsesValidInputCsv() {
42+
return Stream.of(
43+
Arguments.of(
44+
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host" + System.lineSeparator()
45+
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
46+
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"),
47+
Arguments.of(
48+
//without header
49+
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
50+
+ System.lineSeparator()
51+
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
52+
+ System.lineSeparator()));
53+
}
54+
4855
@ParameterizedTest
4956
@ValueSource(strings = {
5057
// columns > 7

api/src/test/java/io/kafbat/ui/service/acl/AclsServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ void testSyncAclWithAclCsv() {
6868

6969
aclsService.syncAclWithAclCsv(
7070
CLUSTER,
71-
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
72-
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
71+
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host" + System.lineSeparator()
72+
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
7373
+ "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
7474
).block();
7575

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3472,6 +3472,7 @@ components:
34723472
- UNASSIGNED
34733473
- TASK_FAILED
34743474
- RESTARTING
3475+
- STOPPED
34753476

34763477
ConnectorAction:
34773478
type: string

contract/src/main/resources/swagger/kafka-connect-api.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,7 @@ components:
448448
- PAUSED
449449
- UNASSIGNED
450450
- RESTARTING
451+
- STOPPED
451452
worker_id:
452453
type: string
453454
trace:

documentation/compose/DOCKER_COMPOSE.md

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22

33
1. [kafka-ui.yaml](./kafbat-ui.yaml) - Default configuration with 2 kafka clusters with two nodes of Schema Registry, one kafka-connect and a few dummy topics.
44
2. [kafka-ui-arm64.yaml](../../.dev/dev_arm64.yaml) - Default configuration for ARM64(Mac M1) architecture with 1 kafka cluster without zookeeper with one node of Schema Registry, one kafka-connect and a few dummy topics.
5-
3. [kafka-clusters-only.yaml](./kafka-clusters-only.yaml) - A configuration for development purposes, everything besides `kafka-ui` itself (to be run locally).
6-
4. [kafka-ui-ssl.yml](./kafka-ssl.yml) - Connect to Kafka via TLS/SSL
7-
5. [kafka-cluster-sr-auth.yaml](./cluster-sr-auth.yaml) - Schema registry with authentication.
8-
6. [kafka-ui-auth-context.yaml](./auth-context.yaml) - Basic (username/password) authentication with custom path (URL) (issue 861).
9-
7. [e2e-tests.yaml](./e2e-tests.yaml) - Configuration with different connectors (github-source, s3, sink-activities, source-activities) and Ksql functionality.
10-
8. [kafka-ui-jmx-secured.yml](./ui-jmx-secured.yml) - Kafka’s JMX with SSL and authentication.
11-
9. [kafka-ui-reverse-proxy.yaml](./nginx-proxy.yaml) - An example for using the app behind a proxy (like nginx).
12-
10. [kafka-ui-sasl.yaml](./ui-sasl.yaml) - SASL auth for Kafka.
13-
11. [kafka-ui-traefik-proxy.yaml](./traefik-proxy.yaml) - Traefik specific proxy configuration.
14-
12. [oauth-cognito.yaml](./oauth-cognito.yaml) - OAuth2 with Cognito
15-
13. [kafka-ui-with-jmx-exporter.yaml](./ui-with-jmx-exporter.yaml) - A configuration with 2 kafka clusters with enabled prometheus jmx exporters instead of jmx.
16-
14. [kafka-with-zookeeper.yaml](./kafka-zookeeper.yaml) - An example for using kafka with zookeeper
5+
3. [kafka-ui-ssl.yml](./kafka-ssl.yml) - Connect to Kafka via TLS/SSL
6+
4. [kafka-cluster-sr-auth.yaml](./cluster-sr-auth.yaml) - Schema registry with authentication.
7+
5. [kafka-ui-auth-context.yaml](./auth-context.yaml) - Basic (username/password) authentication with custom path (URL) (issue 861).
8+
6. [e2e-tests.yaml](./e2e-tests.yaml) - Configuration with different connectors (github-source, s3, sink-activities, source-activities) and Ksql functionality.
9+
7. [kafka-ui-jmx-secured.yml](./ui-jmx-secured.yml) - Kafka’s JMX with SSL and authentication.
10+
8. [kafka-ui-reverse-proxy.yaml](./nginx-proxy.yaml) - An example for using the app behind a proxy (like nginx).
11+
9. [kafka-ui-sasl.yaml](./ui-sasl.yaml) - SASL auth for Kafka.
12+
10. [kafka-ui-traefik-proxy.yaml](./traefik-proxy.yaml) - Traefik specific proxy configuration.
13+
11. [kafka-ui-with-jmx-exporter.yaml](./ui-with-jmx-exporter.yaml) - A configuration with 2 kafka clusters with enabled prometheus jmx exporters instead of jmx.
14+
12. [kafka-with-zookeeper.yaml](./kafka-zookeeper.yaml) - An example for using kafka with zookeeper

documentation/compose/ui-acl-with-zk.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ services:
1818
KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
1919

2020
zookeeper:
21-
image: wurstmeister/zookeeper:3.4.6
21+
image: zookeeper:3.8
2222
environment:
2323
JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
2424
volumes:
Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import styled from 'styled-components';
22

33
interface Props {
4-
isCheckedIcon?: boolean;
4+
checked?: boolean;
55
}
66

7-
export const StyledLabel = styled.label<Props>`
7+
export const StyledLabel = styled.label`
88
position: relative;
99
display: inline-block;
10-
width: ${({ isCheckedIcon }) => (isCheckedIcon ? '40px' : '34px')};
10+
width: 34px;
1111
height: 20px;
1212
margin-right: 8px;
1313
`;
@@ -32,14 +32,12 @@ export const StyledSlider = styled.span<Props>`
3232
left: 0;
3333
right: 0;
3434
bottom: 0;
35-
background-color: ${({ isCheckedIcon, theme }) =>
36-
isCheckedIcon
37-
? theme.switch.checkedIcon.backgroundColor
38-
: theme.switch.unchecked};
35+
background-color: ${({ checked, theme }) =>
36+
checked ? theme.switch.checked : theme.switch.unchecked};
3937
transition: 0.4s;
4038
border-radius: 20px;
4139
42-
:hover {
40+
&:hover {
4341
background-color: ${({ theme }) => theme.switch.hover};
4442
}
4543
@@ -48,7 +46,7 @@ export const StyledSlider = styled.span<Props>`
4846
content: '';
4947
height: 14px;
5048
width: 14px;
51-
left: 3px;
49+
left: ${({ checked }) => (checked ? '17px' : '3px')};
5250
bottom: 3px;
5351
background-color: ${({ theme }) => theme.switch.circle};
5452
transition: 0.4s;
@@ -57,25 +55,8 @@ export const StyledSlider = styled.span<Props>`
5755
}
5856
`;
5957

60-
export const StyledInput = styled.input<Props>`
58+
export const StyledInput = styled.input`
6159
opacity: 0;
6260
width: 0;
6361
height: 0;
64-
65-
&:checked + ${StyledSlider} {
66-
background-color: ${({ isCheckedIcon, theme }) =>
67-
isCheckedIcon
68-
? theme.switch.checkedIcon.backgroundColor
69-
: theme.switch.checked};
70-
}
71-
72-
&:focus + ${StyledSlider} {
73-
box-shadow: 0 0 1px ${({ theme }) => theme.switch.checked};
74-
}
75-
76-
:checked + ${StyledSlider}:before {
77-
transform: translateX(
78-
${({ isCheckedIcon }) => (isCheckedIcon ? '20px' : '14px')}
79-
);
80-
}
8162
`;

0 commit comments

Comments
 (0)