Skip to content

Commit 40d03b2

Browse files
authored
Merge branch 'main' into issues/1333-optional-fts-fe
2 parents 362ff3c + 109f7c0 commit 40d03b2

File tree

15 files changed

+271
-141
lines changed

15 files changed

+271
-141
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import io.kafbat.ui.connect.ApiClient;
88
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
99
import io.kafbat.ui.connect.model.Connector;
10+
import io.kafbat.ui.connect.model.ConnectorExpand;
1011
import io.kafbat.ui.connect.model.ConnectorPlugin;
1112
import io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse;
1213
import io.kafbat.ui.connect.model.ConnectorStatus;
1314
import io.kafbat.ui.connect.model.ConnectorTask;
1415
import io.kafbat.ui.connect.model.ConnectorTopics;
16+
import io.kafbat.ui.connect.model.ExpandedConnector;
1517
import io.kafbat.ui.connect.model.NewConnector;
1618
import io.kafbat.ui.connect.model.TaskStatus;
1719
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
@@ -221,13 +223,17 @@ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWith
221223
}
222224

223225
@Override
224-
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
225-
return withRetryOnConflictOrRebalance(super.getConnectors(search));
226+
public Mono<Map<String, ExpandedConnector>> getConnectors(
227+
String search, List<ConnectorExpand> expand
228+
) throws WebClientResponseException {
229+
return withRetryOnConflictOrRebalance(super.getConnectors(search, expand));
226230
}
227231

228232
@Override
229-
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
230-
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
233+
public Mono<ResponseEntity<Map<String, ExpandedConnector>>> getConnectorsWithHttpInfo(
234+
String search, List<ConnectorExpand> expand
235+
) throws WebClientResponseException {
236+
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search, expand));
231237
}
232238

233239
@Override

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ public enum LogLevel {
227227
@AllArgsConstructor
228228
public static class CacheProperties {
229229
boolean enabled = true;
230-
Duration connectCacheExpiry = Duration.ofMinutes(1);
231230
Duration connectClusterCacheExpiry = Duration.ofHours(24);
232231
}
233232

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,12 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
6666
.build();
6767

6868
return validateAccess(context)
69-
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
70-
.doOnEach(sig -> audit(context, sig));
69+
.thenReturn(
70+
ResponseEntity.ok(
71+
kafkaConnectService.getConnectors(getCluster(clusterName), connectName)
72+
.flatMapMany(m -> Flux.fromIterable(m.keySet()))
73+
)
74+
).doOnEach(sig -> audit(context, sig));
7175
}
7276

7377
@Override

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.kafbat.ui.connect.model.ClusterInfo;
55
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
66
import io.kafbat.ui.connect.model.ConnectorTask;
7+
import io.kafbat.ui.connect.model.ExpandedConnector;
78
import io.kafbat.ui.connect.model.NewConnector;
89
import io.kafbat.ui.model.ConnectDTO;
910
import io.kafbat.ui.model.ConnectorDTO;
@@ -14,10 +15,15 @@
1415
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
1516
import io.kafbat.ui.model.FullConnectorInfoDTO;
1617
import io.kafbat.ui.model.TaskDTO;
18+
import io.kafbat.ui.model.TaskIdDTO;
1719
import io.kafbat.ui.model.TaskStatusDTO;
1820
import io.kafbat.ui.model.connect.InternalConnectorInfo;
1921
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Objects;
2024
import java.util.Optional;
25+
import java.util.stream.Collectors;
26+
import javax.annotation.Nullable;
2127
import org.mapstruct.Mapper;
2228
import org.mapstruct.Mapping;
2329

@@ -43,6 +49,39 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
4349
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
4450
connectorPluginConfigValidationResponse);
4551

52+
default InternalConnectorInfo fromClient(String connect, ExpandedConnector connector, @Nullable List<String> topics) {
53+
Objects.requireNonNull(connector.getInfo());
54+
Objects.requireNonNull(connector.getStatus());
55+
List<TaskDTO> tasks = List.of();
56+
57+
if (connector.getInfo().getTasks() != null
58+
&& connector.getStatus().getTasks() != null
59+
) {
60+
Map<Integer, TaskIdDTO> taskIds = connector.getInfo().getTasks()
61+
.stream().map(t -> new TaskIdDTO().task(t.getTask()).connector(t.getConnector()))
62+
.collect(Collectors.toMap(
63+
TaskIdDTO::getTask,
64+
t -> t
65+
));
66+
67+
tasks = connector.getStatus().getTasks().stream()
68+
.map(s ->
69+
new TaskDTO().status(fromClient(s)).id(taskIds.get(s.getId()))
70+
).toList();
71+
}
72+
73+
ConnectorDTO connectorDto = fromClient(connector.getInfo())
74+
.connect(connect)
75+
.status(fromClient(connector.getStatus().getConnector()));
76+
77+
return InternalConnectorInfo.builder()
78+
.connector(connectorDto)
79+
.config(connector.getInfo().getConfig())
80+
.tasks(tasks)
81+
.topics(topics)
82+
.build();
83+
}
84+
4685
default ConnectDTO toKafkaConnect(
4786
ClustersProperties.ConnectCluster connect,
4887
List<InternalConnectorInfo> connectors,

api/src/main/java/io/kafbat/ui/service/ApplicationInfoService.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static io.kafbat.ui.api.model.AuthType.DISABLED;
44
import static io.kafbat.ui.api.model.AuthType.OAUTH2;
55
import static io.kafbat.ui.model.ApplicationInfoDTO.EnabledFeaturesEnum;
6+
import static io.kafbat.ui.util.GithubReleaseInfo.GITHUB_RELEASE_INFO_ENABLED;
67
import static io.kafbat.ui.util.GithubReleaseInfo.GITHUB_RELEASE_INFO_TIMEOUT;
78

89
import com.google.common.annotations.VisibleForTesting;
@@ -15,12 +16,14 @@
1516
import io.kafbat.ui.model.OAuthProviderDTO;
1617
import io.kafbat.ui.util.DynamicConfigOperations;
1718
import io.kafbat.ui.util.GithubReleaseInfo;
19+
import jakarta.annotation.Nullable;
1820
import java.time.format.DateTimeFormatter;
1921
import java.util.ArrayList;
2022
import java.util.Collections;
2123
import java.util.List;
2224
import java.util.Optional;
2325
import java.util.Properties;
26+
import lombok.extern.slf4j.Slf4j;
2427
import org.springframework.beans.factory.annotation.Autowired;
2528
import org.springframework.beans.factory.annotation.Value;
2629
import org.springframework.boot.info.BuildProperties;
@@ -33,7 +36,9 @@
3336
import org.springframework.stereotype.Service;
3437

3538
@Service
39+
@Slf4j
3640
public class ApplicationInfoService {
41+
@Nullable
3742
private final GithubReleaseInfo githubReleaseInfo;
3843
private final ApplicationContext applicationContext;
3944
private final DynamicConfigOperations dynamicConfigOperations;
@@ -44,36 +49,52 @@ public ApplicationInfoService(DynamicConfigOperations dynamicConfigOperations,
4449
ApplicationContext applicationContext,
4550
@Autowired(required = false) BuildProperties buildProperties,
4651
@Autowired(required = false) GitProperties gitProperties,
52+
@Value("${" + GITHUB_RELEASE_INFO_ENABLED + ":true}") boolean githubInfoEnabled,
4753
@Value("${" + GITHUB_RELEASE_INFO_TIMEOUT + ":10}") int githubApiMaxWaitTime) {
4854
this.applicationContext = applicationContext;
4955
this.dynamicConfigOperations = dynamicConfigOperations;
5056
this.buildProperties = Optional.ofNullable(buildProperties).orElse(new BuildProperties(new Properties()));
5157
this.gitProperties = Optional.ofNullable(gitProperties).orElse(new GitProperties(new Properties()));
52-
githubReleaseInfo = new GithubReleaseInfo(githubApiMaxWaitTime);
58+
if (githubInfoEnabled) {
59+
this.githubReleaseInfo = new GithubReleaseInfo(githubApiMaxWaitTime);
60+
} else {
61+
this.githubReleaseInfo = null;
62+
log.warn("Check for latest release is disabled."
63+
+ " Note that old versions are not supported, please make sure that your system is up to date.");
64+
}
5365
}
5466

5567
public ApplicationInfoDTO getApplicationInfo() {
56-
var releaseInfo = githubReleaseInfo.get();
68+
var releaseInfo = githubReleaseInfo != null ? githubReleaseInfo.get() : null;
5769
return new ApplicationInfoDTO()
5870
.build(getBuildInfo(releaseInfo))
5971
.enabledFeatures(getEnabledFeatures())
6072
.latestRelease(convert(releaseInfo));
6173
}
6274

75+
@Nullable
6376
private ApplicationInfoLatestReleaseDTO convert(GithubReleaseInfo.GithubReleaseDto releaseInfo) {
77+
if (releaseInfo == null) {
78+
return null;
79+
}
6480
return new ApplicationInfoLatestReleaseDTO()
6581
.htmlUrl(releaseInfo.html_url())
6682
.publishedAt(releaseInfo.published_at())
6783
.versionTag(releaseInfo.tag_name());
6884
}
6985

7086
private ApplicationInfoBuildDTO getBuildInfo(GithubReleaseInfo.GithubReleaseDto release) {
71-
return new ApplicationInfoBuildDTO()
72-
.isLatestRelease(release.tag_name() != null && release.tag_name().equals(buildProperties.getVersion()))
87+
var buildInfo = new ApplicationInfoBuildDTO()
7388
.commitId(gitProperties.getShortCommitId())
7489
.version(buildProperties.getVersion())
7590
.buildTime(buildProperties.getTime() != null
7691
? DateTimeFormatter.ISO_INSTANT.format(buildProperties.getTime()) : null);
92+
if (release != null) {
93+
buildInfo = buildInfo.isLatestRelease(
94+
release.tag_name() != null && release.tag_name().equals(buildProperties.getVersion())
95+
);
96+
}
97+
return buildInfo;
7798
}
7899

79100
private List<EnabledFeaturesEnum> getEnabledFeatures() {
@@ -119,10 +140,13 @@ private List<OAuthProviderDTO> getOAuthProviders() {
119140
// updating on startup and every hour
120141
@Scheduled(fixedRateString = "${github-release-info-update-rate:3600000}")
121142
public void updateGithubReleaseInfo() {
122-
githubReleaseInfo.refresh().subscribe();
143+
if (githubReleaseInfo != null) {
144+
githubReleaseInfo.refresh().subscribe();
145+
}
123146
}
124147

125148
@VisibleForTesting
149+
@Nullable
126150
GithubReleaseInfo githubReleaseInfo() {
127151
return githubReleaseInfo;
128152
}

api/src/main/java/io/kafbat/ui/service/KafkaConfigSanitizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.springframework.stereotype.Component;
2020

2121
@Component
22-
class KafkaConfigSanitizer {
22+
public class KafkaConfigSanitizer {
2323

2424
private static final String SANITIZED_VALUE = "******";
2525

0 commit comments

Comments
 (0)