Skip to content

Commit dcf9dbe

Browse files
committed
Remove empty result in describeLogDirsByBrokerAndTopic
1 parent 8b1a9bd commit dcf9dbe

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

src/main/java/org/gnuhpc/bigdata/controller/KafkaController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import io.swagger.annotations.ApiResponse;
66
import io.swagger.annotations.ApiResponses;
77
import java.util.ArrayList;
8-
import java.util.Arrays;
98
import java.util.Collection;
109
import java.util.HashMap;
1110
import java.util.List;
@@ -86,15 +85,16 @@ public Map<Integer, List<String>> listLogDirs(
8685
return kafkaAdminService.listLogDirsByBroker(brokerList);
8786
}
8887

89-
@GetMapping(value = "/brokers/logdirs/detail")
88+
@PostMapping(value = "/brokers/logdirs/detail")
9089
@ApiOperation(value = "Describe log dirs by broker list and topic list")
9190
public Map<Integer, Map<String, LogDirInfo>> describeLogDirs(
9291
@RequestParam(required = false) List<Integer> brokerList,
9392
@RequestParam(required = false) List<String> logDirList,
94-
@RequestParam(required = false) Map<String, List<Integer>> topicPartitionMap) {
93+
@RequestBody(required = false) Map<String, List<Integer>> topicPartitionMap) {
9594
return kafkaAdminService.describeLogDirsByBrokerAndTopic(brokerList, logDirList, topicPartitionMap);
9695
}
9796

97+
9898
@GetMapping(value = "/brokers/replicalogdirs")
9999
@ApiOperation(value = "Describe replica log dirs.")
100100
public Map<TopicPartitionReplica, ReplicaLogDirInfo> describeReplicaLogDirs(

src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import org.apache.kafka.common.internals.Topic;
102102
import org.apache.kafka.common.protocol.Errors;
103103
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
104-
import org.apache.kafka.common.serialization.Serdes;
105104
import org.gnuhpc.bigdata.CollectionConvertor;
106105
import org.gnuhpc.bigdata.componet.OffsetStorage;
107106
import org.gnuhpc.bigdata.config.KafkaConfig;
@@ -502,11 +501,13 @@ public Map<Integer, Map<String, LogDirInfo>> describeLogDirsByBrokerAndTopic(
502501
log.warn("Describe log dirs exception:" + exception);
503502
throw new ApiException("Describe log dirs exception:" + exception);
504503
} finally {
504+
log.info("After describe log dir, result is:" + logDirInfosByBroker);
505505
if (logDirList != null && !logDirList.isEmpty()) {
506506
logDirInfosByBroker.entrySet().forEach(e -> {
507507
e.getValue().entrySet().removeIf(m -> !logDirList.contains(m.getKey()));
508508
});
509509
}
510+
log.info("After describe log dir filtered by logdirList, result is:" + logDirInfosByBroker);
510511
if (topicPartitionMap != null && !topicPartitionMap.isEmpty()) {
511512
logDirInfosByBroker
512513
.entrySet()
@@ -528,6 +529,10 @@ public Map<Integer, Map<String, LogDirInfo>> describeLogDirsByBrokerAndTopic(
528529
}
529530
}
530531

532+
log.info("After describe log dir filtered by topicPartitionMap, result is:" + logDirInfosByBroker);
533+
logDirInfosByBroker.entrySet().forEach(e->e.getValue().entrySet().removeIf(m->m.getValue().replicaInfos.isEmpty()));
534+
logDirInfosByBroker.entrySet().removeIf(e->e.getValue().isEmpty());
535+
531536
return logDirInfosByBroker;
532537
}
533538

0 commit comments

Comments
 (0)