Skip to content

Commit 3c1fd1f

Browse files
authored
BE: Fixed connector topics result (#1457)
1 parent c7b7b72 commit 3c1fd1f

File tree

2 files changed

+44
-35
lines changed

2 files changed

+44
-35
lines changed

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import io.kafbat.ui.config.ClustersProperties;
44
import io.kafbat.ui.connect.model.ClusterInfo;
55
import io.kafbat.ui.connect.model.Connector;
6+
import io.kafbat.ui.connect.model.ConnectorStatus;
67
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
78
import io.kafbat.ui.connect.model.ConnectorTask;
89
import io.kafbat.ui.connect.model.ConnectorTopics;
910
import io.kafbat.ui.connect.model.ExpandedConnector;
1011
import io.kafbat.ui.connect.model.NewConnector;
12+
import io.kafbat.ui.connect.model.TaskStatus;
1113
import io.kafbat.ui.model.ConnectDTO;
1214
import io.kafbat.ui.model.ConnectorDTO;
1315
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
@@ -42,14 +44,36 @@ default ClusterInfo toClient(KafkaConnectState state) {
4244

4345
@Mapping(target = "status", ignore = true)
4446
@Mapping(target = "connect", ignore = true)
45-
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);
46-
47-
default ConnectorDTO fromClient(Connector connector, ConnectorTopics topics) {
48-
ConnectorDTO connectorDto = this.fromClient(connector);
47+
ConnectorDTO fromClient(Connector connector);
48+
49+
default ConnectorDTO fromClient(Connector connector,
50+
String connect,
51+
ConnectorTopics topics,
52+
Map<String, Object> sanitizedConfigs,
53+
ConnectorStatus status) {
54+
ConnectorDTO result = this.fromClient(connector);
55+
result.connect(connect);
4956
if (topics != null) {
50-
return connectorDto.topics(topics.getTopics());
57+
result = result.topics(topics.getTopics());
58+
}
59+
if (sanitizedConfigs != null) {
60+
result = result.config(sanitizedConfigs);
61+
}
62+
if (status != null && status.getConnector() != null) {
63+
result = result.status(fromClient(status.getConnector()));
64+
65+
if (status.getTasks() != null) {
66+
boolean isAnyTaskFailed = status.getTasks().stream()
67+
.map(TaskStatus::getState)
68+
.anyMatch(TaskStatus.StateEnum.FAILED::equals);
69+
70+
if (isAnyTaskFailed) {
71+
result.getStatus().state(ConnectorStateDTO.TASK_FAILED);
72+
}
73+
}
5174
}
52-
return connectorDto;
75+
76+
return result;
5377
}
5478

5579
ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus);

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -227,35 +227,20 @@ public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
227227
String connectorName) {
228228
return api(cluster, connectName)
229229
.mono(client ->
230-
Mono.zip(client.getConnector(connectorName), getConnectorTopics(cluster, connectName, connectorName))
231-
.map(t -> kafkaConnectMapper.fromClient(t.getT1(), t.getT2()))
232-
.flatMap(connector ->
233-
client.getConnectorStatus(connector.getName())
234-
// status request can return 404 if tasks not assigned yet
235-
.onErrorResume(WebClientResponseException.NotFound.class,
230+
Mono.zip(
231+
client.getConnector(connectorName),
232+
getConnectorTopics(cluster, connectName, connectorName),
233+
client.getConnectorStatus(connectorName).onErrorResume(WebClientResponseException.NotFound.class,
236234
e -> emptyStatus(connectorName))
237-
.map(connectorStatus -> {
238-
var status = connectorStatus.getConnector();
239-
var sanitizedConfig = kafkaConfigSanitizer.sanitizeConnectorConfig(connector.getConfig());
240-
ConnectorDTO result = new ConnectorDTO()
241-
.connect(connectName)
242-
.status(kafkaConnectMapper.fromClient(status))
243-
.type(connector.getType())
244-
.tasks(connector.getTasks())
245-
.name(connector.getName())
246-
.config(sanitizedConfig);
247-
248-
if (connectorStatus.getTasks() != null) {
249-
boolean isAnyTaskFailed = connectorStatus.getTasks().stream()
250-
.map(TaskStatus::getState)
251-
.anyMatch(TaskStatus.StateEnum.FAILED::equals);
252-
253-
if (isAnyTaskFailed) {
254-
result.getStatus().state(ConnectorStateDTO.TASK_FAILED);
255-
}
256-
}
257-
return result;
258-
})
235+
)
236+
.map(t ->
237+
kafkaConnectMapper.fromClient(
238+
t.getT1(),
239+
connectName,
240+
t.getT2(),
241+
kafkaConfigSanitizer.sanitizeConnectorConfig(t.getT1().getConfig()),
242+
t.getT3()
243+
)
259244
)
260245
);
261246
}
@@ -281,7 +266,7 @@ public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connec
281266
.mono(c ->
282267
requestBody
283268
.flatMap(body -> c.setConnectorConfig(connectorName, body))
284-
.map(kafkaConnectMapper::fromClient));
269+
.map(connector -> kafkaConnectMapper.fromClient(connector)));
285270
}
286271

287272
public Mono<Void> deleteConnector(

0 commit comments

Comments
 (0)