Skip to content

Commit 5983739

Browse files
authored
Integration with ODD (#3289)
ODD integration Exporting statistics: - topics as DataSets (metadata + schemas) - connectors as DataTransformers (metadata + inputs&outputs)Exporting to ODD:
1 parent ffa49eb commit 5983739

25 files changed

+2437
-52
lines changed

kafka-ui-api/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,31 @@
199199
<version>${antlr4-maven-plugin.version}</version>
200200
</dependency>
201201

202+
<dependency>
203+
<groupId>org.opendatadiscovery</groupId>
204+
<artifactId>oddrn-generator-java</artifactId>
205+
<version>${odd-oddrn-generator.version}</version>
206+
</dependency>
207+
<dependency>
208+
<groupId>org.opendatadiscovery</groupId>
209+
<artifactId>ingestion-contract-client</artifactId>
210+
<exclusions>
211+
<exclusion>
212+
<groupId>org.springframework.boot</groupId>
213+
<artifactId>spring-boot-starter-webflux</artifactId>
214+
</exclusion>
215+
<exclusion>
216+
<groupId>io.projectreactor</groupId>
217+
<artifactId>reactor-core</artifactId>
218+
</exclusion>
219+
<exclusion>
220+
<groupId>io.projectreactor.ipc</groupId>
221+
<artifactId>reactor-netty</artifactId>
222+
</exclusion>
223+
</exclusions>
224+
<version>${odd-oddrn-client.version}</version>
225+
</dependency>
226+
202227
<dependency>
203228
<groupId>org.springframework.security</groupId>
204229
<artifactId>spring-security-ldap</artifactId>

kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
3737
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
3838
ServerWebExchange exchange) {
3939

40-
Flux<ConnectDTO> flux = Flux.fromIterable(kafkaConnectService.getConnects(getCluster(clusterName)))
40+
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
4141
.filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
4242

43-
return Mono.just(ResponseEntity.ok(flux));
43+
return Mono.just(ResponseEntity.ok(availableConnects));
4444
}
4545

4646
@Override
@@ -54,7 +54,7 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
5454
.build());
5555

5656
return validateAccess.thenReturn(
57-
ResponseEntity.ok(kafkaConnectService.getConnectors(getCluster(clusterName), connectName))
57+
ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))
5858
);
5959
}
6060

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
class KafkaConfigSanitizer extends Sanitizer {
1818
private static final List<String> DEFAULT_PATTERNS_TO_SANITIZE = Arrays.asList(
1919
"basic.auth.user.info", /* For Schema Registry credentials */
20-
"password", "secret", "token", "key", ".*credentials.*" /* General credential patterns */
20+
"password", "secret", "token", "key", ".*credentials.*", /* General credential patterns */
21+
"aws.access.*", "aws.secret.*", "aws.session.*" /* AWS-related credential patterns */
2122
);
2223

2324
KafkaConfigSanitizer(

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.Optional;
31-
import java.util.function.Function;
3231
import java.util.function.Predicate;
3332
import java.util.stream.Collectors;
3433
import java.util.stream.Stream;
34+
import javax.annotation.Nullable;
3535
import lombok.RequiredArgsConstructor;
3636
import lombok.SneakyThrows;
3737
import lombok.extern.slf4j.Slf4j;
@@ -40,7 +40,6 @@
4040
import org.springframework.web.reactive.function.client.WebClientResponseException;
4141
import reactor.core.publisher.Flux;
4242
import reactor.core.publisher.Mono;
43-
import reactor.util.function.Tuple2;
4443
import reactor.util.function.Tuples;
4544

4645
@Service
@@ -52,18 +51,18 @@ public class KafkaConnectService {
5251
private final ObjectMapper objectMapper;
5352
private final KafkaConfigSanitizer kafkaConfigSanitizer;
5453

55-
public List<ConnectDTO> getConnects(KafkaCluster cluster) {
56-
return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
57-
.map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
58-
.orElse(List.of());
54+
public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
55+
return Flux.fromIterable(
56+
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
57+
.map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
58+
.orElse(List.of())
59+
);
5960
}
6061

6162
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
62-
final String search) {
63-
Mono<Flux<ConnectDTO>> clusters = Mono.just(Flux.fromIterable(getConnects(cluster))); // TODO get rid
64-
return clusters
65-
.flatMapMany(Function.identity())
66-
.flatMap(connect -> getConnectorNames(cluster, connect.getName()))
63+
@Nullable final String search) {
64+
return getConnects(cluster)
65+
.flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
6766
.flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
6867
.flatMap(connector ->
6968
getConnectorConfig(cluster, connector.getConnect(), connector.getName())
@@ -99,56 +98,46 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
9998
.filter(matchesSearchTerm(search));
10099
}
101100

102-
private Predicate<FullConnectorInfoDTO> matchesSearchTerm(final String search) {
103-
return connector -> getSearchValues(connector)
104-
.anyMatch(value -> value.contains(
105-
StringUtils.defaultString(
106-
search,
107-
StringUtils.EMPTY)
108-
.toUpperCase()));
101+
private Predicate<FullConnectorInfoDTO> matchesSearchTerm(@Nullable final String search) {
102+
if (search == null) {
103+
return c -> true;
104+
}
105+
return connector -> getStringsForSearch(connector)
106+
.anyMatch(string -> StringUtils.containsIgnoreCase(string, search));
109107
}
110108

111-
private Stream<String> getSearchValues(FullConnectorInfoDTO fullConnectorInfo) {
109+
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
112110
return Stream.of(
113-
fullConnectorInfo.getName(),
114-
fullConnectorInfo.getStatus().getState().getValue(),
115-
fullConnectorInfo.getType().getValue())
116-
.map(String::toUpperCase);
111+
fullConnectorInfo.getName(),
112+
fullConnectorInfo.getStatus().getState().getValue(),
113+
fullConnectorInfo.getType().getValue());
117114
}
118115

119-
private Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
120-
String connectorName) {
116+
public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
117+
String connectorName) {
121118
return api(cluster, connectClusterName)
122119
.mono(c -> c.getConnectorTopics(connectorName))
123120
.map(result -> result.get(connectorName))
124-
// old connectors don't have this api, setting empty list for
121+
// old Connect API versions don't have this endpoint, setting empty list for
125122
// backward-compatibility
126123
.onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
127124
}
128125

129-
private Flux<Tuple2<String, String>> getConnectorNames(KafkaCluster cluster, String connectName) {
130-
return getConnectors(cluster, connectName)
131-
.collectList().map(e -> e.get(0))
126+
public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName) {
127+
return api(cluster, connectName)
128+
.flux(client -> client.getConnectors(null))
132129
// for some reason `getConnectors` method returns the response as a single string
133-
.map(this::parseToList)
134-
.flatMapMany(Flux::fromIterable)
135-
.map(connector -> Tuples.of(connectName, connector));
130+
.collectList().map(e -> e.get(0))
131+
.map(this::parseConnectorsNamesStringToList)
132+
.flatMapMany(Flux::fromIterable);
136133
}
137134

138135
@SneakyThrows
139-
private List<String> parseToList(String json) {
136+
private List<String> parseConnectorsNamesStringToList(String json) {
140137
return objectMapper.readValue(json, new TypeReference<>() {
141138
});
142139
}
143140

144-
public Flux<String> getConnectors(KafkaCluster cluster, String connectName) {
145-
return api(cluster, connectName)
146-
.flux(client ->
147-
client.getConnectors(null)
148-
.doOnError(e -> log.error("Unexpected error upon getting connectors", e))
149-
);
150-
}
151-
152141
public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
153142
Mono<NewConnectorDTO> connector) {
154143
return api(cluster, connectName)
@@ -171,9 +160,7 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa
171160
private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName,
172161
String connectorName) {
173162
return getConnectorNames(cluster, connectName)
174-
.map(Tuple2::getT2)
175-
.collectList()
176-
.map(connectorNames -> connectorNames.contains(connectorName));
163+
.any(name -> name.equals(connectorName));
177164
}
178165

179166
public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package com.provectus.kafka.ui.service.integration.odd;
2+
3+
import com.provectus.kafka.ui.model.ConnectorTypeDTO;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import java.util.function.Function;
9+
import java.util.stream.Stream;
10+
import javax.annotation.Nullable;
11+
import org.apache.commons.collections.CollectionUtils;
12+
import org.opendatadiscovery.oddrn.JdbcUrlParser;
13+
import org.opendatadiscovery.oddrn.model.HivePath;
14+
import org.opendatadiscovery.oddrn.model.MysqlPath;
15+
import org.opendatadiscovery.oddrn.model.PostgreSqlPath;
16+
import org.opendatadiscovery.oddrn.model.SnowflakePath;
17+
18+
record ConnectorInfo(List<String> inputs,
19+
List<String> outputs) {
20+
21+
static ConnectorInfo extract(String className,
22+
ConnectorTypeDTO type,
23+
Map<String, Object> config,
24+
List<String> topicsFromApi, // can be empty for old Connect API versions
25+
Function<String, String> topicOddrnBuilder) {
26+
return switch (className) {
27+
case "org.apache.kafka.connect.file.FileStreamSinkConnector",
28+
"org.apache.kafka.connect.file.FileStreamSourceConnector",
29+
"FileStreamSource",
30+
"FileStreamSink" -> extractFileIoConnector(type, topicsFromApi, config, topicOddrnBuilder);
31+
case "io.confluent.connect.s3.S3SinkConnector" -> extractS3Sink(type, topicsFromApi, config, topicOddrnBuilder);
32+
case "io.confluent.connect.jdbc.JdbcSinkConnector" ->
33+
extractJdbcSink(type, topicsFromApi, config, topicOddrnBuilder);
34+
case "io.debezium.connector.postgresql.PostgresConnector" -> extractDebeziumPg(config);
35+
case "io.debezium.connector.mysql.MySqlConnector" -> extractDebeziumMysql(config);
36+
default -> new ConnectorInfo(
37+
extractInputs(type, topicsFromApi, config, topicOddrnBuilder),
38+
extractOutputs(type, topicsFromApi, config, topicOddrnBuilder)
39+
);
40+
};
41+
}
42+
43+
private static ConnectorInfo extractFileIoConnector(ConnectorTypeDTO type,
44+
List<String> topics,
45+
Map<String, Object> config,
46+
Function<String, String> topicOddrnBuilder) {
47+
return new ConnectorInfo(
48+
extractInputs(type, topics, config, topicOddrnBuilder),
49+
extractOutputs(type, topics, config, topicOddrnBuilder)
50+
);
51+
}
52+
53+
private static ConnectorInfo extractJdbcSink(ConnectorTypeDTO type,
54+
List<String> topics,
55+
Map<String, Object> config,
56+
Function<String, String> topicOddrnBuilder) {
57+
String tableNameFormat = (String) config.getOrDefault("table.name.format", "${topic}");
58+
List<String> targetTables = extractTopicNamesBestEffort(topics, config)
59+
.map(topic -> tableNameFormat.replace("${kafka}", topic))
60+
.toList();
61+
62+
String connectionUrl = (String) config.get("connection.url");
63+
List<String> outputs = new ArrayList<>();
64+
@Nullable var knownJdbcPath = new JdbcUrlParser().parse(connectionUrl);
65+
if (knownJdbcPath instanceof PostgreSqlPath p) {
66+
targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
67+
}
68+
if (knownJdbcPath instanceof MysqlPath p) {
69+
targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
70+
}
71+
if (knownJdbcPath instanceof HivePath p) {
72+
targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
73+
}
74+
if (knownJdbcPath instanceof SnowflakePath p) {
75+
targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
76+
}
77+
return new ConnectorInfo(
78+
extractInputs(type, topics, config, topicOddrnBuilder),
79+
outputs
80+
);
81+
}
82+
83+
private static ConnectorInfo extractDebeziumPg(Map<String, Object> config) {
84+
String host = (String) config.get("database.hostname");
85+
String dbName = (String) config.get("database.dbname");
86+
var inputs = List.of(
87+
PostgreSqlPath.builder()
88+
.host(host)
89+
.database(dbName)
90+
.build().oddrn()
91+
);
92+
return new ConnectorInfo(inputs, List.of());
93+
}
94+
95+
private static ConnectorInfo extractDebeziumMysql(Map<String, Object> config) {
96+
String host = (String) config.get("database.hostname");
97+
var inputs = List.of(
98+
MysqlPath.builder()
99+
.host(host)
100+
.build()
101+
.oddrn()
102+
);
103+
return new ConnectorInfo(inputs, List.of());
104+
}
105+
106+
private static ConnectorInfo extractS3Sink(ConnectorTypeDTO type,
107+
List<String> topics,
108+
Map<String, Object> config,
109+
Function<String, String> topicOrrdnBuilder) {
110+
String bucketName = (String) config.get("s3.bucket.name");
111+
String topicsDir = (String) config.getOrDefault("topics.dir", "topics");
112+
String directoryDelim = (String) config.getOrDefault("directory.delim", "/");
113+
List<String> outputs = extractTopicNamesBestEffort(topics, config)
114+
.map(topic -> Oddrn.awsS3Oddrn(bucketName, topicsDir + directoryDelim + topic))
115+
.toList();
116+
return new ConnectorInfo(
117+
extractInputs(type, topics, config, topicOrrdnBuilder),
118+
outputs
119+
);
120+
}
121+
122+
private static List<String> extractInputs(ConnectorTypeDTO type,
123+
List<String> topicsFromApi,
124+
Map<String, Object> config,
125+
Function<String, String> topicOrrdnBuilder) {
126+
return type == ConnectorTypeDTO.SINK
127+
? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder)
128+
: List.of();
129+
}
130+
131+
private static List<String> extractOutputs(ConnectorTypeDTO type,
132+
List<String> topicsFromApi,
133+
Map<String, Object> config,
134+
Function<String, String> topicOrrdnBuilder) {
135+
return type == ConnectorTypeDTO.SOURCE
136+
? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder)
137+
: List.of();
138+
}
139+
140+
private static Stream<String> extractTopicNamesBestEffort(
141+
// topic list can be empty for old Connect API versions
142+
List<String> topicsFromApi,
143+
Map<String, Object> config
144+
) {
145+
if (CollectionUtils.isNotEmpty(topicsFromApi)) {
146+
return topicsFromApi.stream();
147+
}
148+
149+
// trying to extract topic names from config
150+
String topicsString = (String) config.get("topics");
151+
String topicString = (String) config.get("topic");
152+
return Stream.of(topicsString, topicString)
153+
.filter(Objects::nonNull)
154+
.flatMap(str -> Stream.of(str.split(",")))
155+
.map(String::trim)
156+
.filter(s -> !s.isBlank());
157+
}
158+
159+
private static List<String> extractTopicsOddrns(Map<String, Object> config,
160+
List<String> topicsFromApi,
161+
Function<String, String> topicOrrdnBuilder) {
162+
return extractTopicNamesBestEffort(topicsFromApi, config)
163+
.map(topicOrrdnBuilder)
164+
.toList();
165+
}
166+
167+
}

0 commit comments

Comments
 (0)