Skip to content

Commit baddd63

Browse files
committed
Modify readme.
1 parent dcf9dbe commit baddd63

File tree

4 files changed

+45
-30
lines changed

4 files changed

+45
-30
lines changed

README.adoc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@ Kafka/ZK REST API is to provide the production-ready endpoints to perform some a
88

99
// tag::base-t[]
1010
.Following features are provided:
11+
* cluster/nodes/controller info describe
1112
* Broker List
12-
* Topic create/delete/describe
13-
* Topic config create/update/delete/list
14-
* Topic partition add and reassign
15-
* Consumer group(old zookeeper based/new kafka based) list/describe
16-
* Offset check/reset
13+
* Broker config get/update, dynamic config get/update/delete
14+
* Log dirs describe, filtered by brokers/topic/partition/logdirs
15+
* Topic create/delete/describe/list
16+
* Topic config create/update/list
17+
* Topic partition add and reassign, replicas movement between log directories is supported
18+
* Consumer group list/describe/delete
1719
* Consumer Group Lag check
20+
* Offset reset by earliest/latest/timestamp
21+
* Contents view of message by different decoder, avro is also supported
1822
* Collect JMX metrics from brokers that expose JMX metrics +
1923
More details refer to https://github.com/gnuhpc/Kafka-zk-restapi/blob/master/docs/JMXCollector.adoc[JMXCollector API Specification]
2024
* Secure the REST API with Spring Security
@@ -82,7 +86,7 @@ Step 3: Use user controller API to add user to security file security/security.y
8286
* No need to restart server after adding new user or update user info. Timing thread introduced in Step 1 will refresh the user list according to your settings.
8387

8488
=== Support Kafka Version Information
85-
Currently, this rest api (master branch) supports Kafka 0.10.x brokers. The master branch is the most active branch. We're going to get down to the work of supporting the Kafka 1.x version.
89+
Currently, this rest api (master branch) supports Kafka 1.1.1 brokers. The master branch is the most active branch.
8690

8791
*For 0.11.x, please checkout the branch 0.11.x by calling the command:*
8892

@@ -96,8 +100,7 @@ __BasePath__ : /
96100
You can access Swagger-UI by accessing http://127.0.0.1:8121/api
97101

98102

99-
=== https://github.com/gnuhpc/Kafka-zk-restapi/blob/master/docs/paths.adoc[API LIST for 0.10]
100-
=== https://github.com/gnuhpc/Kafka-zk-restapi/blob/0.11.x/docs/paths.adoc[API LIST for 0.11.x]
103+
=== https://github.com/gnuhpc/Kafka-zk-restapi/blob/master/docs/paths.adoc[API LIST for 1.1.1]
101104

102105

103106
* kafka-controller : Kafka Api
@@ -106,13 +109,12 @@ You can access Swagger-UI by accessing http://127.0.0.1:8121/api
106109
* user-controller : User management Api
107110

108111

109-
=== https://github.com/gnuhpc/Kafka-zk-restapi/blob/master/docs/definitions.adoc[Data Model Definitions for 0.10]
110-
=== https://github.com/gnuhpc/Kafka-zk-restapi/blob/0.11.x/docs/definitions.adoc[Data Model Definitions for 0.11.x]
112+
=== https://github.com/gnuhpc/Kafka-zk-restapi/blob/master/docs/definitions.adoc[Data Model Definitions for 1.1.1]
111113

112114

113115
=== Version information
114116
[%hardbreaks]
115-
__Version__ : 0.1.0
117+
__Version__ : 1.1.1
116118

117119

118120
=== Contact information

pics/ShowApi.png

-153 KB
Loading

src/main/java/org/gnuhpc/bigdata/controller/KafkaController.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,16 @@
5050
import org.springframework.web.bind.annotation.ResponseStatus;
5151
import org.springframework.web.bind.annotation.RestController;
5252

53-
/** Created by gnuhpc on 2017/7/16. */
53+
/**
54+
* Created by gnuhpc on 2017/7/16.
55+
*/
5456
@Log4j
5557
@RequestMapping("/kafka")
5658
@RestController
5759
public class KafkaController {
5860

59-
@Autowired private KafkaAdminService kafkaAdminService;
61+
@Autowired
62+
private KafkaAdminService kafkaAdminService;
6063

6164
// @Autowired private KafkaProducerService kafkaProducerService;
6265

@@ -91,15 +94,16 @@ public Map<Integer, Map<String, LogDirInfo>> describeLogDirs(
9194
@RequestParam(required = false) List<Integer> brokerList,
9295
@RequestParam(required = false) List<String> logDirList,
9396
@RequestBody(required = false) Map<String, List<Integer>> topicPartitionMap) {
94-
return kafkaAdminService.describeLogDirsByBrokerAndTopic(brokerList, logDirList, topicPartitionMap);
97+
return kafkaAdminService
98+
.describeLogDirsByBrokerAndTopic(brokerList, logDirList, topicPartitionMap);
9599
}
96100

97-
98-
@GetMapping(value = "/brokers/replicalogdirs")
99-
@ApiOperation(value = "Describe replica log dirs.")
100-
public Map<TopicPartitionReplica, ReplicaLogDirInfo> describeReplicaLogDirs(
101-
@RequestParam List<TopicPartitionReplica> replicas) {
102-
return kafkaAdminService.describeReplicaLogDirs(replicas);
101+
@GetMapping(value = "/brokers/replicalogdir/{brokerId}/{topic}/{partition}")
102+
@ApiOperation(value = "Describe replica log dir.")
103+
public ReplicaLogDirInfo describeReplicaLogDirs(@PathVariable int brokerId,
104+
@PathVariable String topic, @PathVariable int partition) {
105+
TopicPartitionReplica replica = new TopicPartitionReplica(topic, partition, brokerId);
106+
return kafkaAdminService.describeReplicaLogDir(replica);
103107
}
104108

105109
@GetMapping(value = "/brokers/{brokerId}/conf")
@@ -179,7 +183,8 @@ public List<Record> getMessage(
179183
@RequestParam(required = false, defaultValue = "StringDeserializer") String keyDecoder,
180184
@RequestParam(required = false, defaultValue = "StringDeserializer") String valueDecoder,
181185
@RequestParam(required = false) String avroSchema,
182-
@RequestParam(required = false, defaultValue = "30000") long fetchTimeoutMs) throws ApiException {
186+
@RequestParam(required = false, defaultValue = "30000") long fetchTimeoutMs)
187+
throws ApiException {
183188
return kafkaAdminService.getRecordsByOffset(topic, partition, offset, maxRecords, keyDecoder,
184189
valueDecoder, avroSchema, fetchTimeoutMs);
185190
}
@@ -241,7 +246,8 @@ public Map<String, GeneralResponse> addPartition(@RequestBody List<AddPartition>
241246

242247
@PostMapping(value = "/partitions/reassign/generate")
243248
@ApiOperation(value = "Generate plan for the partition reassignment")
244-
public List<ReassignModel> generateReassignPartitions(@RequestBody ReassignWrapper reassignWrapper) {
249+
public List<ReassignModel> generateReassignPartitions(
250+
@RequestBody ReassignWrapper reassignWrapper) {
245251
return kafkaAdminService.generateReassignPartition(reassignWrapper);
246252
}
247253

@@ -260,9 +266,9 @@ public ReassignStatus executeReassignPartitions(
260266
@ApiOperation(value = "Check the partition reassignment process")
261267
@ApiResponses(
262268
value = {
263-
@ApiResponse(code = 1, message = "Reassignment Completed"),
264-
@ApiResponse(code = 0, message = "Reassignment In Progress"),
265-
@ApiResponse(code = -1, message = "Reassignment Failed")
269+
@ApiResponse(code = 1, message = "Reassignment Completed"),
270+
@ApiResponse(code = 0, message = "Reassignment In Progress"),
271+
@ApiResponse(code = -1, message = "Reassignment Failed")
266272
})
267273
public ReassignStatus checkReassignPartitions(@RequestBody ReassignModel reassign) {
268274
return kafkaAdminService.checkReassignStatus(reassign);
@@ -314,7 +320,7 @@ public ConsumerGroupMeta getConsumerGroupMeta(@PathVariable String consumerGroup
314320
public List<ConsumerGroupMeta> getConsumerGroupsMeta() {
315321
Set<String> consumerGroupList = kafkaAdminService.listAllNewConsumerGroups();
316322
List<ConsumerGroupMeta> consumerGroupMetaList = new ArrayList<>();
317-
for (String consumerGroup:consumerGroupList) {
323+
for (String consumerGroup : consumerGroupList) {
318324
if (kafkaAdminService.isNewConsumerGroup(consumerGroup)) {
319325
consumerGroupMetaList.add(kafkaAdminService.getConsumerGroupMeta(consumerGroup));
320326
} else {
@@ -369,10 +375,10 @@ public GeneralResponse resetOffset(
369375
@PathVariable int partition,
370376
@PathVariable String consumergroup,
371377
@PathVariable
372-
@ApiParam(
373-
value =
374-
"[earliest/latest/{long}/yyyy-MM-dd HH:mm:ss.SSS] can be supported. "
375-
+ "The date type is only valid for new consumer group.")
378+
@ApiParam(
379+
value =
380+
"[earliest/latest/{long}/yyyy-MM-dd HH:mm:ss.SSS] can be supported. "
381+
+ "The date type is only valid for new consumer group.")
376382
String offset,
377383
@PathVariable ConsumerType type) {
378384
return kafkaAdminService.resetOffset(topic, partition, consumergroup, type, offset);

src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,13 @@ public Map<Integer, Map<String, LogDirInfo>> describeLogDirsByBrokerAndTopic(
536536
return logDirInfosByBroker;
537537
}
538538

539+
public ReplicaLogDirInfo describeReplicaLogDir(TopicPartitionReplica topicPartitionReplica) {
540+
Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfoMap =
541+
describeReplicaLogDirs(Collections.singletonList(topicPartitionReplica));
542+
543+
return replicaLogDirInfoMap.get(topicPartitionReplica);
544+
}
545+
539546
public Map<TopicPartitionReplica, ReplicaLogDirInfo> describeReplicaLogDirs(
540547
List<TopicPartitionReplica> replicas) {
541548
org.apache.kafka.clients.admin.AdminClient kafkaAdminClient = createKafkaAdminClient();

0 commit comments

Comments
 (0)