Skip to content

Commit 1ddd053

Browse files
committed
Add keyDecoder,maxRecords,timeoutMs parameters when getting records by offset. Add api to stop reassign partitions.
1 parent f04e90e commit 1ddd053

File tree

6 files changed

+358
-179
lines changed

6 files changed

+358
-179
lines changed

src/main/java/org/gnuhpc/bigdata/controller/KafkaController.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,14 +172,18 @@ public boolean existTopic(@PathVariable String topic) {
172172
@GetMapping(value = "/consumer/{topic}/{partition}/{offset}")
173173
@ApiOperation(
174174
value =
175-
"Get the message from the offset of the partition in the topic"
176-
+ ", decoder is not supported yet")
177-
public Record getMessage(
175+
"Get the message from the offset of the partition in the topic")
176+
public List<Record> getMessage(
178177
@PathVariable String topic,
179178
@PathVariable int partition,
180179
@PathVariable long offset,
181-
@RequestParam(required = false) String decoder) {
182-
return kafkaAdminService.getRecordByOffset(topic, partition, offset, decoder, "");
180+
@RequestParam(required = false) int maxRecords,
181+
@RequestParam(required = false, defaultValue = "StringDeserializer") String keyDecoder,
182+
@RequestParam(required = false, defaultValue = "StringDeserializer") String valueDecoder,
183+
@RequestParam(required = false) String avroSchema,
184+
@RequestParam(required = false, defaultValue = "30000") long fetchTimeoutMs) throws ApiException {
185+
return kafkaAdminService.getRecordsByOffset(topic, partition, offset, maxRecords, keyDecoder,
186+
valueDecoder, avroSchema, fetchTimeoutMs);
183187
}
184188

185189
@GetMapping(value = "/topics/{topic}")
@@ -266,6 +270,12 @@ public ReassignStatus checkReassignPartitions(@RequestBody ReassignModel reassig
266270
return kafkaAdminService.checkReassignStatus(reassign);
267271
}
268272

273+
@PutMapping(value = "/partitions/reassign/stop")
274+
@ApiOperation(value = "Stop the partition reassignment process")
275+
public GeneralResponse stopReassignPartitions() {
276+
return kafkaAdminService.stopReassignPartitions();
277+
}
278+
269279
@GetMapping(value = "/consumergroups")
270280
@ApiOperation(value = "List all consumer groups from zk and kafka")
271281
public Map<String, Set<String>> listAllConsumerGroups(

src/main/java/org/gnuhpc/bigdata/model/Record.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,28 @@
55
import lombok.Data;
66
import lombok.Getter;
77
import lombok.Setter;
8+
import lombok.extern.log4j.Log4j;
89
import org.apache.kafka.common.errors.ApiException;
910
import org.apache.kafka.common.utils.Bytes;
11+
import org.gnuhpc.bigdata.utils.KafkaUtils;
1012

1113
@Data
1214
@Getter
1315
@Setter
1416
@Builder
17+
@Log4j
1518
public class Record {
1619
public String topic;
1720
public long offset;
1821
public Object key = new Object();
1922
public Object value = new Object();
2023
public long timestamp;
21-
public Class<?> type;
22-
String decoder;
24+
String keyDecoder;
25+
String valueDecoder;
2326

24-
public String getValue() {
27+
public String getValueByDecoder(String decoder, Object value) {
28+
if (value == null) return null;
29+
Class<?> type = KafkaUtils.DESERIALIZER_TYPE_MAP.get(decoder);
2530
try {
2631
if (String.class.isAssignableFrom(type)) {
2732
return value.toString();
@@ -53,7 +58,7 @@ public String getValue() {
5358
}
5459

5560
if (byte[].class.isAssignableFrom(type)) {
56-
if (decoder.equals("AvorDeserializer")) {
61+
if (decoder.equals("AvroDeserializer")) {
5762
return value.toString();
5863
} else {
5964
byte[] byteArray = (byte[]) value;
@@ -74,6 +79,16 @@ public String getValue() {
7479
+ "String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes");
7580
}
7681

82+
public String getValue() {
83+
log.info("getValue for value:" + value + " by decoder:" + valueDecoder);
84+
return getValueByDecoder(valueDecoder, value);
85+
}
86+
87+
public String getKey() {
88+
log.info("getKeyValue for key:" + key + " by decoder:" + keyDecoder);
89+
return getValueByDecoder(keyDecoder, key);
90+
}
91+
7792
@Override
7893
public String toString() {
7994
if (value != null) {
@@ -82,13 +97,15 @@ public String toString() {
8297
+ ", offset:"
8398
+ offset
8499
+ ", key:"
85-
+ key
100+
+ getKey()
86101
+ ", value:"
87102
+ getValue()
88103
+ ", timestamp:"
89104
+ timestamp
90-
+ ", type:"
91-
+ type;
105+
+ ", keyDecoder:"
106+
+ keyDecoder
107+
+ ", valueDecoder:"
108+
+ valueDecoder;
92109
} else {
93110
return "topic:"
94111
+ topic
@@ -100,8 +117,10 @@ public String toString() {
100117
+ value
101118
+ ", timestamp:"
102119
+ timestamp
103-
+ ", type:"
104-
+ type;
120+
+ ", keyDecoder:"
121+
+ keyDecoder
122+
+ ", valueDecoder:"
123+
+ valueDecoder;
105124
}
106125
}
107126
}

0 commit comments

Comments
 (0)