Skip to content

Commit 9abe346

Browse files
committed
Modify consumer poll timeout in KafkaAdminService.getMessage and add some log info.Add lspath and getnode path in zookeeper utils.
1 parent ebba453 commit 9abe346

File tree

6 files changed

+111
-12
lines changed

6 files changed

+111
-12
lines changed

docs/index.pdf

0 Bytes
Binary file not shown.

src/main/java/org/gnuhpc/bigdata/controller/ZookeeperController.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77
import org.gnuhpc.bigdata.model.ZkServerStat;
88
import org.gnuhpc.bigdata.service.ZookeeperService;
99
import org.gnuhpc.bigdata.utils.ZookeeperUtils;
10+
import org.gnuhpc.bigdata.validator.ZKNodePathExistConstraint;
1011
import org.springframework.beans.factory.annotation.Autowired;
11-
import org.springframework.web.bind.annotation.*;
12+
import org.springframework.web.bind.annotation.GetMapping;
13+
import org.springframework.web.bind.annotation.RequestMapping;
14+
import org.springframework.web.bind.annotation.RequestParam;
15+
import org.springframework.web.bind.annotation.RestController;
1216

1317
import java.util.List;
1418
import java.util.Map;
@@ -26,16 +30,25 @@ public class ZookeeperController {
2630
@Autowired
2731
private ZookeeperService zookeeperService;
2832

29-
@GetMapping("/ls/{path}")
33+
@GetMapping("/ls/path")
3034
@ApiOperation(value = "List a zookeeper path")
31-
public List<String> ls(@PathVariable("path") String path){
35+
public List<String> ls(@RequestParam String path){
36+
return zookeeperUtils.lsPath(path);
37+
/*
3238
try {
33-
return zookeeperUtils.getCuratorClient().getChildren().forPath("/"+path);
39+
return zookeeperUtils.getCuratorClient().getChildren().forPath(path);
3440
} catch (Exception e) {
3541
e.printStackTrace();
3642
}
3743
3844
return null;
45+
*/
46+
}
47+
48+
@GetMapping("/get/path")
49+
@ApiOperation(value = "Get data of a zookeeper path")
50+
public Map<String, String> get(@RequestParam String path){
51+
return zookeeperUtils.getNodeData(path);
3952
}
4053

4154
@GetMapping("/connstate")

src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ private List<ConsumerGroupDesc> setNewCGD(String consumerGroup, String topic) {
626626

627627

628628
public String getMessage(@TopicExistConstraint String topic, int partition, long offset, String decoder, String avroSchema) {
629-
KafkaConsumer consumer = kafkaUtils.createNewConsumer();
629+
KafkaConsumer consumer = kafkaUtils.createNewConsumer(String.valueOf(System.currentTimeMillis()));
630630
TopicPartition tp = new TopicPartition(topic, partition);
631631
long beginningOffset = getBeginningOffset(topic, partition);
632632
long endOffset = getEndOffset(topic, partition);
@@ -647,18 +647,21 @@ public String getMessage(@TopicExistConstraint String topic, int partition, long
647647

648648
String last = null;
649649

650-
ConsumerRecords<String, String> crs = consumer.poll(channelRetryBackoffMs);
650+
//ConsumerRecords<String, String> crs = consumer.poll(channelRetryBackoffMs);
651+
ConsumerRecords<String, String> crs = consumer.poll(3000);
652+
log.info("Seek to offset:" + offset + ", topic:" + topic + ", partition:" + partition + ", crs.count:" + crs.count());
651653
if (crs.count() != 0) {
652654
Iterator<ConsumerRecord<String, String>> it = crs.iterator();
653655
while (it.hasNext()) {
654656
ConsumerRecord<String, String> initCr = it.next();
655657
last = "Value: " + initCr.value() + ", Offset: " + String.valueOf(initCr.offset());
658+
log.info("Value: " + initCr.value() + ", initCr.Offset: " + String.valueOf(initCr.offset()));
656659
if (last != null && initCr.offset() == offset) {
657660
break;
658661
}
659662
}
660663
}
661-
664+
log.info("last:" + last);
662665
consumer.close();
663666
return last;
664667
}

src/main/java/org/gnuhpc/bigdata/utils/ZookeeperUtils.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.gnuhpc.bigdata.utils;
22

3+
import com.google.common.base.Charsets;
34
import kafka.utils.ZKStringSerializer$;
45
import kafka.utils.ZkUtils;
56
import lombok.Getter;
@@ -12,23 +13,24 @@
1213
import org.apache.curator.RetryPolicy;
1314
import org.apache.curator.framework.CuratorFramework;
1415
import org.apache.curator.framework.CuratorFrameworkFactory;
16+
import org.apache.curator.framework.api.GetDataBuilder;
1517
import org.apache.curator.retry.ExponentialBackoffRetry;
18+
import org.apache.curator.utils.ZKPaths;
19+
import org.apache.zookeeper.data.Stat;
1620
import org.gnuhpc.bigdata.config.ZookeeperConfig;
1721
import org.gnuhpc.bigdata.constant.ZkServerMode;
1822
import org.gnuhpc.bigdata.exception.ServiceNotAvailableException;
1923
import org.gnuhpc.bigdata.model.ZkServerClient;
2024
import org.gnuhpc.bigdata.model.ZkServerEnvironment;
2125
import org.gnuhpc.bigdata.model.ZkServerStat;
26+
import org.gnuhpc.bigdata.validator.ZKNodePathExistConstraint;
2227
import org.springframework.beans.factory.annotation.Autowired;
23-
import org.springframework.context.annotation.Configuration;
28+
import org.springframework.validation.annotation.Validated;
2429

2530
import java.io.IOException;
2631
import java.net.InetAddress;
2732
import java.net.Socket;
28-
import java.net.UnknownHostException;
29-
import java.util.Iterator;
30-
import java.util.LinkedList;
31-
import java.util.List;
33+
import java.util.*;
3234
import java.util.regex.Matcher;
3335
import java.util.regex.Pattern;
3436

@@ -39,6 +41,7 @@
3941
@Log4j
4042
@Setter
4143
@Getter
44+
@Validated
4245
public class ZookeeperUtils {
4346

4447
//For Stat Command parse
@@ -233,5 +236,47 @@ public ZkServerEnvironment parseEnvResult(final List<String> result) {
233236
return environment;
234237
}
235238

239+
public List<String> lsPath(@ZKNodePathExistConstraint String path) {
240+
try {
241+
return curatorClient.getChildren().forPath(path);
242+
} catch (Exception e) {
243+
log.error("ls path fail! path: " + path + ", error: {}" + e);
244+
return null;
245+
}
246+
}
247+
248+
public Map<String, String> getNodeData(@ZKNodePathExistConstraint String path) {
249+
Map<String, String> map = new HashMap<>();
250+
251+
try {
252+
List<String> childrens = curatorClient.getChildren().forPath(path);
253+
GetDataBuilder dataBuilder = curatorClient.getData();
254+
if (childrens != null && childrens.size() > 0) {
255+
for (int i = 0; i < childrens.size(); i++) {
256+
String child = childrens.get(i);
257+
String childPath = ZKPaths.makePath(path, child);
258+
byte[] bytes = dataBuilder.forPath(childPath);
259+
map.put(childPath, (bytes!=null)?(new String(bytes, Charsets.UTF_8)):(null));
260+
}
261+
} else {
262+
byte[] bytes = dataBuilder.forPath(path);
263+
map.put(path, (bytes!=null)?(new String(bytes, Charsets.UTF_8)):(null));
264+
}
265+
266+
} catch (Exception e) {
267+
log.error("get node data fail! path: " + path + ", error: {}" + e);
268+
}
269+
270+
return map;
271+
}
272+
273+
public Stat getNodePathStat(String path) {
274+
try {
275+
return curatorClient.checkExists().forPath(path);
276+
} catch (Exception e) {
277+
log.error("get node data fail! path: " + path + ", error: {}" + e);
278+
}
279+
return null;
280+
}
236281
}
237282

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.gnuhpc.bigdata.validator;
2+
3+
import javax.validation.Constraint;
4+
import javax.validation.Payload;
5+
import java.lang.annotation.ElementType;
6+
import java.lang.annotation.Retention;
7+
import java.lang.annotation.RetentionPolicy;
8+
import java.lang.annotation.Target;
9+
10+
@Constraint(validatedBy = ZKNodePathExistValidator.class)
11+
@Target( { ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
12+
@Retention(RetentionPolicy.RUNTIME)
13+
public @interface ZKNodePathExistConstraint {
14+
String message() default "Non-exist ZooKeeper Node path!";
15+
16+
Class<?>[] groups() default {};
17+
18+
Class<? extends Payload>[] payload() default {};
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.gnuhpc.bigdata.validator;
2+
3+
import org.gnuhpc.bigdata.utils.ZookeeperUtils;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
6+
import javax.validation.ConstraintValidator;
7+
import javax.validation.ConstraintValidatorContext;
8+
9+
public class ZKNodePathExistValidator implements ConstraintValidator<ZKNodePathExistConstraint, String> {
10+
@Autowired
11+
private ZookeeperUtils zookeeperUtils;
12+
13+
public void initialize(ZKNodePathExistConstraint constraint) {
14+
}
15+
16+
public boolean isValid(String path, ConstraintValidatorContext context) {
17+
return (zookeeperUtils.getNodePathStat(path)!=null);
18+
}
19+
}

0 commit comments

Comments
 (0)