Skip to content

Commit ed562f9

Browse files
authored
Merge pull request #618 from didi/dev
DB中Group信息的更新方式,由replace调整为insert或update
2 parents ad0c16a + b4d44ef commit ed562f9

File tree

3 files changed

+60
-17
lines changed

3 files changed

+60
-17
lines changed

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/GroupService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,14 @@ public interface GroupService {
3030

3131
int replaceDBData(GroupMemberPO groupMemberPO);
3232

33+
void batchReplace(List<GroupMemberPO> newGroupMemberList);
34+
3335
GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName);
3436

3537
List<GroupMemberPO> listGroupByTopic(Long clusterPhyId, String topicName);
3638

39+
List<GroupMemberPO> listGroup(Long clusterPhyId);
40+
3741
PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
3842
String topicName,
3943
String groupName,

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.stereotype.Service;
3333

3434
import java.util.*;
35+
import java.util.function.Function;
3536
import java.util.stream.Collectors;
3637

3738
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_SEARCH_GROUP;
@@ -120,6 +121,38 @@ public int replaceDBData(GroupMemberPO groupMemberPO) {
120121
return groupMemberDAO.replace(groupMemberPO);
121122
}
122123

124+
@Override
125+
public void batchReplace(List<GroupMemberPO> newGroupMemberList) {
126+
if (newGroupMemberList == null || newGroupMemberList.isEmpty()) {
127+
return;
128+
}
129+
130+
Long clusterPhyId = newGroupMemberList.get(0).getClusterPhyId();
131+
if (clusterPhyId == null) {
132+
return;
133+
}
134+
135+
List<GroupMemberPO> dbGroupMemberList = listGroup(clusterPhyId);
136+
137+
138+
Map<String, GroupMemberPO> dbGroupMemberMap = dbGroupMemberList.stream().collect(Collectors.toMap(elem -> elem.getGroupName() + elem.getTopicName(), Function.identity()));
139+
for (GroupMemberPO groupMemberPO : newGroupMemberList) {
140+
GroupMemberPO po = dbGroupMemberMap.remove(groupMemberPO.getGroupName() + groupMemberPO.getTopicName());
141+
try {
142+
if (po != null) {
143+
groupMemberPO.setId(po.getId());
144+
groupMemberDAO.updateById(groupMemberPO);
145+
} else {
146+
groupMemberDAO.insert(groupMemberPO);
147+
}
148+
} catch (Exception e) {
149+
log.error("method=batchReplace||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhyId, groupMemberPO.getGroupName(), e);
150+
}
151+
152+
}
153+
154+
}
155+
123156
@Override
124157
public GroupStateEnum getGroupStateFromDB(Long clusterPhyId, String groupName) {
125158
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
@@ -143,6 +176,14 @@ public List<GroupMemberPO> listGroupByTopic(Long clusterPhyId, String topicName)
143176
return groupMemberDAO.selectList(lambdaQueryWrapper);
144177
}
145178

179+
@Override
180+
public List<GroupMemberPO> listGroup(Long clusterPhyId) {
181+
LambdaQueryWrapper<GroupMemberPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
182+
lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId);
183+
184+
return groupMemberDAO.selectList(lambdaQueryWrapper);
185+
}
186+
146187
@Override
147188
public PaginationResult<GroupMemberPO> pagingGroupMembers(Long clusterPhyId,
148189
String topicName,

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/metadata/SyncKafkaGroupTask.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.kafka.clients.admin.*;
1616
import org.apache.kafka.common.TopicPartition;
1717
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.transaction.annotation.Transactional;
1819

1920
import java.util.*;
2021
import java.util.stream.Collectors;
@@ -34,14 +35,9 @@ public class SyncKafkaGroupTask extends AbstractAsyncMetadataDispatchTask {
3435

3536
@Override
3637
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
37-
TaskResult tr = TaskResult.SUCCESS;
3838

3939
List<String> groupNameList = groupService.listGroupsFromKafka(clusterPhy.getId());
40-
for (String groupName: groupNameList) {
41-
if (!TaskResult.SUCCESS.equals(this.updateGroupMembersTask(clusterPhy, groupName, triggerTimeUnitMs))) {
42-
tr = TaskResult.FAIL;
43-
}
44-
}
40+
TaskResult tr = updateGroupMembersTask(clusterPhy, groupNameList, triggerTimeUnitMs);
4541

4642
if (!TaskResult.SUCCESS.equals(tr)) {
4743
return tr;
@@ -53,19 +49,21 @@ public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnit
5349
return tr;
5450
}
5551

56-
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, String groupName, long triggerTimeUnitMs) {
57-
try {
58-
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
59-
for (GroupMemberPO po: poList) {
60-
groupService.replaceDBData(po);
61-
}
62-
} catch (Exception e) {
63-
log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg={}", clusterPhy.getId(), groupName, e.getMessage());
6452

65-
return TaskResult.FAIL;
53+
private TaskResult updateGroupMembersTask(ClusterPhy clusterPhy, List<String> groupNameList, long triggerTimeUnitMs) {
54+
List<GroupMemberPO> groupMemberPOList = new ArrayList<>();
55+
TaskResult tr = TaskResult.SUCCESS;
56+
for (String groupName : groupNameList) {
57+
try {
58+
List<GroupMemberPO> poList = this.getGroupMembers(clusterPhy.getId(), groupName, new Date(triggerTimeUnitMs));
59+
groupMemberPOList.addAll(poList);
60+
} catch (Exception e) {
61+
log.error("method=updateGroupMembersTask||clusterPhyId={}||groupName={}||errMsg=exception", clusterPhy.getId(), groupName, e);
62+
tr = TaskResult.FAIL;
63+
}
6664
}
67-
68-
return TaskResult.SUCCESS;
65+
groupService.batchReplace(groupMemberPOList);
66+
return tr;
6967
}
7068

7169
private List<GroupMemberPO> getGroupMembers(Long clusterPhyId, String groupName, Date updateTime) throws NotExistException, AdminOperateException {

0 commit comments

Comments
 (0)