Skip to content

Commit 465f98c

Browse files
author
zengqiao
committed
日志错误信息中补充Topic名称信息
1 parent a0312be commit 465f98c

File tree

3 files changed

+22
-8
lines changed

3 files changed

+22
-8
lines changed

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/partition/PartitionOffsetParam.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
22

3-
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
3+
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
44
import lombok.Data;
55
import lombok.NoArgsConstructor;
66
import org.apache.kafka.clients.admin.OffsetSpec;
@@ -10,13 +10,13 @@
1010

1111
@Data
1212
@NoArgsConstructor
13-
public class PartitionOffsetParam extends ClusterPhyParam {
13+
public class PartitionOffsetParam extends TopicParam {
1414
private Map<TopicPartition, OffsetSpec> topicPartitionOffsets;
1515

1616
private Long timestamp;
1717

18-
public PartitionOffsetParam(Long clusterPhyId, Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Long timestamp) {
19-
super(clusterPhyId);
18+
public PartitionOffsetParam(Long clusterPhyId, String topicName, Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Long timestamp) {
19+
super(clusterPhyId, topicName);
2020
this.topicPartitionOffsets = topicPartitionOffsets;
2121
this.timestamp = timestamp;
2222
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicParam.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,12 @@ public TopicParam(Long clusterPhyId, String topicName) {
1515
super(clusterPhyId);
1616
this.topicName = topicName;
1717
}
18+
19+
@Override
20+
public String toString() {
21+
return "TopicParam{" +
22+
"clusterPhyId=" + clusterPhyId +
23+
", topicName='" + topicName + '\'' +
24+
'}';
25+
}
1826
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long cluste
207207
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec));
208208

209209
try {
210-
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp));
210+
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp));
211211
} catch (VCHandlerNotExistException e) {
212212
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
213213
}
@@ -226,7 +226,7 @@ public Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafka(Long cluste
226226
.forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec));
227227

228228
try {
229-
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicPartitionOffsets, timestamp));
229+
return (Result<Map<TopicPartition, Long>>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp));
230230
} catch (VCHandlerNotExistException e) {
231231
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
232232
}
@@ -300,7 +300,10 @@ private Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafkaAdminClient
300300
} catch (NotExistException nee) {
301301
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId()));
302302
} catch (Exception e) {
303-
log.error("method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e);
303+
log.error(
304+
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaAdminClient||clusterPhyId={}||topicName={}||errMsg=exception!",
305+
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e
306+
);
304307

305308
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
306309
}
@@ -355,7 +358,10 @@ private Result<Map<TopicPartition, Long>> getPartitionOffsetFromKafkaConsumerCli
355358
} catch (NotExistException nee) {
356359
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(offsetParam.getClusterPhyId()));
357360
} catch (Exception e) {
358-
log.error("method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||errMsg=exception!", offsetParam.getClusterPhyId(), e);
361+
log.error(
362+
"class=PartitionServiceImpl||method=getPartitionOffsetFromKafkaConsumerClient||clusterPhyId={}||topicName={}||errMsg=exception!",
363+
offsetParam.getClusterPhyId(), offsetParam.getTopicName(), e
364+
);
359365

360366
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
361367
} finally {

0 commit comments

Comments
 (0)