diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 9aac5b74e..6bb82bf7e 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -2,8 +2,10 @@ import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.model.ClusterInfo; +import io.kafbat.ui.connect.model.Connector; import io.kafbat.ui.connect.model.ConnectorStatusConnector; import io.kafbat.ui.connect.model.ConnectorTask; +import io.kafbat.ui.connect.model.ConnectorTopics; import io.kafbat.ui.connect.model.ExpandedConnector; import io.kafbat.ui.connect.model.NewConnector; import io.kafbat.ui.model.ConnectDTO; @@ -42,6 +44,14 @@ default ClusterInfo toClient(KafkaConnectState state) { @Mapping(target = "connect", ignore = true) ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector); + default ConnectorDTO fromClient(Connector connector, ConnectorTopics topics) { + ConnectorDTO connectorDto = this.fromClient(connector); + if (topics != null) { + return connectorDto.topics(topics.getTopics()); + } + return connectorDto; + } + ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus); @Mapping(target = "status", ignore = true) @@ -177,6 +187,4 @@ default KafkaConnectState.ConnectorState toScrapeState(InternalConnectorInfo con connector.getTopics() ); } - - } diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index e58da714c..8b6bcabe4 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -226,8 +226,9 @@ private Mono connectorExists(KafkaCluster cluster, String connectName, public Mono getConnector(KafkaCluster cluster, String connectName, String connectorName) { return api(cluster, connectName) - .mono(client -> client.getConnector(connectorName) - .map(kafkaConnectMapper::fromClient) + .mono(client -> + Mono.zip(client.getConnector(connectorName), getConnectorTopics(cluster, connectName, connectorName)) + .map(t -> kafkaConnectMapper.fromClient(t.getT1(), t.getT2())) .flatMap(connector -> client.getConnectorStatus(connector.getName()) // status request can return 404 if tasks not assigned yet diff --git a/contract-typespec/api/kafka-connect.tsp b/contract-typespec/api/kafka-connect.tsp index 697e4bfd4..dc131c384 100644 --- a/contract-typespec/api/kafka-connect.tsp +++ b/contract-typespec/api/kafka-connect.tsp @@ -225,6 +225,7 @@ model Connector { type: ConnectorType; status: ConnectorStatus; connect: string; + topics?: string[]; } enum ConnectorAction { diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index de8f08b8a..c8e6f66de 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -3674,6 +3674,10 @@ components: $ref: '#/components/schemas/ConnectorStatus' connect: type: string + topics: + type: array + items: + type: string required: - type - status