Skip to content

Commit 148b9f4

Browse files
author
Leshe4ka
committed
Merge branch 'issues/1370_be_csv' of github.com:kafbat/kafka-ui into issue/1370
2 parents ce2bd83 + a4dc3b5 commit 148b9f4

22 files changed

+347
-70
lines changed

api/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ dependencies {
5858
implementation libs.lucene.queryparser
5959
implementation libs.lucene.analysis.common
6060

61+
implementation libs.fastcsv
62+
6163
implementation libs.opendatadiscovery.oddrn
6264
implementation(libs.opendatadiscovery.client) {
6365
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ public class ClustersProperties {
4545

4646
AdminClient adminClient = new AdminClient();
4747

48+
Csv csv = new Csv();
49+
50+
@Data
51+
public static class Csv {
52+
String lineDelimeter = "crlf";
53+
char quoteCharacter = '"';
54+
String quoteStrategy = "required";
55+
char fieldSeparator = ',';
56+
}
57+
4858
@Data
4959
public static class AdminClient {
5060
Integer timeout;

api/src/main/java/io/kafbat/ui/controller/AbstractController.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,16 @@
44
import io.kafbat.ui.model.KafkaCluster;
55
import io.kafbat.ui.model.rbac.AccessContext;
66
import io.kafbat.ui.service.ClustersStorage;
7+
import io.kafbat.ui.service.CsvWriterService;
78
import io.kafbat.ui.service.audit.AuditService;
89
import io.kafbat.ui.service.rbac.AccessControlService;
10+
import java.util.List;
11+
import java.util.Objects;
12+
import java.util.Optional;
13+
import java.util.function.Function;
914
import org.springframework.beans.factory.annotation.Autowired;
15+
import org.springframework.http.ResponseEntity;
16+
import reactor.core.publisher.Flux;
1017
import reactor.core.publisher.Mono;
1118
import reactor.core.publisher.Signal;
1219

@@ -15,6 +22,7 @@ public abstract class AbstractController {
1522
protected ClustersStorage clustersStorage;
1623
protected AccessControlService accessControlService;
1724
protected AuditService auditService;
25+
protected CsvWriterService csvWriterService;
1826

1927
protected KafkaCluster getCluster(String name) {
2028
return clustersStorage.getClusterByName(name)
@@ -44,4 +52,22 @@ public void setAccessControlService(AccessControlService accessControlService) {
4452
public void setAuditService(AuditService auditService) {
4553
this.auditService = auditService;
4654
}
55+
56+
public <T extends Flux<R>, R> Mono<ResponseEntity<String>> responseToCsv(ResponseEntity<T> response) {
57+
return responseToCsv(response, (t) -> t);
58+
}
59+
60+
public <T, R> Mono<ResponseEntity<String>> responseToCsv(ResponseEntity<T> response, Function<T, Flux<R>> extract) {
61+
if (response.getStatusCode().is2xxSuccessful()) {
62+
return mapToCsv(extract.apply(response.getBody())).map(ResponseEntity::ok);
63+
} else {
64+
return Mono.just(ResponseEntity.status(response.getStatusCode()).body(
65+
Optional.ofNullable(response.getBody()).map(Object::toString).orElse("")
66+
));
67+
}
68+
}
69+
70+
protected <T> Mono<String> mapToCsv(Flux<T> body) {
71+
return csvWriterService.write(body);
72+
}
4773
}

api/src/main/java/io/kafbat/ui/controller/AclsController.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO
6363
.thenReturn(ResponseEntity.ok().build());
6464
}
6565

66+
67+
6668
@Override
6769
public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
6870
KafkaAclResourceTypeDTO resourceTypeDto,
@@ -96,19 +98,14 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
9698
}
9799

98100
@Override
99-
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
100-
AccessContext context = AccessContext.builder()
101-
.cluster(clusterName)
102-
.aclActions(AclAction.VIEW)
103-
.operationName("getAclAsCsv")
104-
.build();
105-
106-
return validateAccess(context).then(
107-
aclsService.getAclAsCsvString(getCluster(clusterName))
108-
.map(ResponseEntity::ok)
109-
.flatMap(Mono::just)
110-
.doOnEach(sig -> audit(context, sig))
111-
);
101+
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName,
102+
KafkaAclResourceTypeDTO resourceType,
103+
String resourceName,
104+
KafkaAclNamePatternTypeDTO namePatternType,
105+
String search, Boolean fts,
106+
ServerWebExchange exchange) {
107+
return listAcls(clusterName, resourceType, resourceName, namePatternType, search, fts, exchange)
108+
.flatMap(this::responseToCsv);
112109
}
113110

114111
@Override

api/src/main/java/io/kafbat/ui/controller/BrokersController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
4646
.doOnEach(sig -> audit(context, sig));
4747
}
4848

49+
@Override
50+
public Mono<ResponseEntity<String>> getBrokersCsv(String clusterName,
51+
ServerWebExchange exchange) {
52+
return getBrokers(clusterName, exchange).flatMap(this::responseToCsv);
53+
}
54+
4955
@Override
5056
public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
5157
ServerWebExchange exchange) {

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.kafbat.ui.service.mcp.McpTool;
2323
import java.util.Map;
2424
import java.util.Optional;
25+
import java.util.OptionalInt;
2526
import java.util.function.Supplier;
2627
import lombok.RequiredArgsConstructor;
2728
import lombok.extern.slf4j.Slf4j;
@@ -95,6 +96,8 @@ public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clu
9596
.doOnEach(sig -> audit(context, sig));
9697
}
9798

99+
100+
98101
@Override
99102
public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(String clusterName,
100103
String topicName,
@@ -120,6 +123,8 @@ public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(Strin
120123
.doOnEach(sig -> audit(context, sig));
121124
}
122125

126+
127+
123128
@Override
124129
public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage(
125130
String clusterName,
@@ -138,17 +143,51 @@ public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage
138143
.build();
139144

140145
return validateAccess(context).then(
141-
consumerGroupService.getConsumerGroupsPage(
146+
consumerGroupService.getConsumerGroups(
147+
getCluster(clusterName),
148+
OptionalInt.of(
149+
Optional.ofNullable(page).filter(i -> i > 0).orElse(1)
150+
),
151+
OptionalInt.of(
152+
Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize)
153+
),
154+
search,
155+
fts,
156+
Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME),
157+
Optional.ofNullable(sortOrderDto).orElse(SortOrderDTO.ASC)
158+
)
159+
.map(this::convertPage)
160+
.map(ResponseEntity::ok)
161+
).doOnEach(sig -> audit(context, sig));
162+
}
163+
164+
165+
@Override
166+
public Mono<ResponseEntity<String>> getConsumerGroupsCsv(String clusterName, Integer page,
167+
Integer perPage, String search,
168+
ConsumerGroupOrderingDTO orderBy,
169+
SortOrderDTO sortOrderDto, Boolean fts,
170+
ServerWebExchange exchange) {
171+
172+
var context = AccessContext.builder()
173+
.cluster(clusterName)
174+
// consumer group access validation is within the service
175+
.operationName("getConsumerGroupsPage")
176+
.build();
177+
178+
return validateAccess(context).then(
179+
consumerGroupService.getConsumerGroups(
142180
getCluster(clusterName),
143-
Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
144-
Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize),
181+
OptionalInt.empty(),
182+
OptionalInt.empty(),
145183
search,
146184
fts,
147185
Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME),
148186
Optional.ofNullable(sortOrderDto).orElse(SortOrderDTO.ASC)
149187
)
150188
.map(this::convertPage)
151189
.map(ResponseEntity::ok)
190+
.flatMap(r -> responseToCsv(r, (g) -> Flux.fromIterable(g.getConsumerGroups())))
152191
).doOnEach(sig -> audit(context, sig));
153192
}
154193

@@ -194,7 +233,12 @@ public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName,
194233
);
195234
}
196235
Map<Integer, Long> offsets = reset.getPartitionsOffsets().stream()
197-
.collect(toMap(PartitionOffsetDTO::getPartition, PartitionOffsetDTO::getOffset));
236+
.collect(
237+
toMap(
238+
PartitionOffsetDTO::getPartition,
239+
d -> Optional.ofNullable(d.getOffset()).orElse(0L)
240+
)
241+
);
198242
return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets);
199243
default:
200244
return Mono.error(

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.kafbat.ui.model.NewConnectorDTO;
1818
import io.kafbat.ui.model.SortOrderDTO;
1919
import io.kafbat.ui.model.TaskDTO;
20+
import io.kafbat.ui.model.TopicsResponseDTO;
2021
import io.kafbat.ui.model.rbac.AccessContext;
2122
import io.kafbat.ui.model.rbac.permission.ConnectAction;
2223
import io.kafbat.ui.service.KafkaConnectService;
@@ -56,6 +57,13 @@ public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
5657
return Mono.just(ResponseEntity.ok(availableConnects));
5758
}
5859

60+
@Override
61+
public Mono<ResponseEntity<String>> getConnectsCsv(String clusterName, Boolean withStats,
62+
ServerWebExchange exchange) {
63+
return getConnects(clusterName, withStats, exchange)
64+
.flatMap(this::responseToCsv);
65+
}
66+
5967
@Override
6068
public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
6169
ServerWebExchange exchange) {
@@ -157,6 +165,15 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
157165
.doOnEach(sig -> audit(context, sig));
158166
}
159167

168+
@Override
169+
public Mono<ResponseEntity<String>> getAllConnectorsCsv(String clusterName, String search,
170+
ConnectorColumnsToSortDTO orderBy,
171+
SortOrderDTO sortOrder, Boolean fts,
172+
ServerWebExchange exchange) {
173+
return getAllConnectors(clusterName, search, orderBy, sortOrder, fts, exchange)
174+
.flatMap(this::responseToCsv);
175+
}
176+
160177
@Override
161178
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
162179
String connectName,

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.kafbat.ui.model.TopicUpdateDTO;
3030
import io.kafbat.ui.model.TopicsResponseDTO;
3131
import io.kafbat.ui.model.rbac.AccessContext;
32-
import io.kafbat.ui.model.rbac.permission.ConnectAction;
3332
import io.kafbat.ui.service.KafkaConnectService;
3433
import io.kafbat.ui.service.TopicsService;
3534
import io.kafbat.ui.service.analyze.TopicAnalysisService;
@@ -186,7 +185,7 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
186185
.operationName("getTopics")
187186
.build();
188187

189-
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal, fts)
188+
return topicsService.getTopics(getCluster(clusterName), search, showInternal, fts)
190189
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
191190
.flatMap(topics -> {
192191
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
@@ -219,6 +218,28 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
219218
.doOnEach(sig -> audit(context, sig));
220219
}
221220

221+
@Override
222+
public Mono<ResponseEntity<String>> getTopicsCsv(String clusterName, Boolean showInternal,
223+
String search, TopicColumnsToSortDTO orderBy,
224+
SortOrderDTO sortOrder, Boolean fts,
225+
ServerWebExchange exchange) {
226+
227+
AccessContext context = AccessContext.builder()
228+
.cluster(clusterName)
229+
.operationName("getTopicsCsv")
230+
.build();
231+
232+
ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
233+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, ftsProperties.use(fts));
234+
235+
return topicsService
236+
.getTopics(getCluster(clusterName), search, showInternal, fts)
237+
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
238+
.map(topics -> topics.stream().sorted(comparatorForTopic).toList())
239+
.flatMap(topics -> responseToCsv(ResponseEntity.ok(Flux.fromIterable(topics))))
240+
.doOnEach(sig -> audit(context, sig));
241+
}
242+
222243
@Override
223244
public Mono<ResponseEntity<TopicDTO>> updateTopic(
224245
String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,

0 commit comments

Comments
 (0)