Skip to content

Commit c92461e

Browse files
authored
Merge pull request #565 from didi/dev
合并开发分支
2 parents be60ae8 + 2e01680 commit c92461e

File tree

39 files changed

+637
-193
lines changed

39 files changed

+637
-193
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
2+
3+
![Logo](https://user-images.githubusercontent.com/71620349/185368586-aed82d30-1534-453d-86ff-ecfa9d0f35bd.png)
4+
5+
## 登录系统对接
6+
7+
[KnowStreaming](https://github.com/didi/KnowStreaming)(以下简称KS) 除了实现基于本地MySQL的用户登录认证方式外,还已经实现了基于Ldap的登录认证。
8+
9+
但是,登录认证系统并非仅此两种。因此,为了具有更好的拓展性,KS具有自定义登陆认证逻辑,快速对接已有系统的特性。
10+
11+
在KS中,我们将登陆认证相关的一些文件放在[km-extends](https://github.com/didi/KnowStreaming/tree/master/km-extends)模块下的[km-account](https://github.com/didi/KnowStreaming/tree/master/km-extends/km-account)模块里。
12+
13+
本文将介绍KS如何快速对接自有的用户登录认证系统。
14+
15+
### 对接步骤
16+
17+
- 创建一个登陆认证类,实现[LogiCommon](https://github.com/didi/LogiCommon)的LoginExtend接口;
18+
-[application.yml](https://github.com/didi/KnowStreaming/blob/master/km-rest/src/main/resources/application.yml)中的spring.logi-security.login-extend-bean-name字段改为登陆认证类的bean名称;
19+
20+
```Java
21+
//LoginExtend 接口
22+
public interface LoginExtend {
23+
24+
/**
25+
* 验证登录信息,同时记住登录状态
26+
*/
27+
UserBriefVO verifyLogin(AccountLoginDTO var1, HttpServletRequest var2, HttpServletResponse var3) throws LogiSecurityException;
28+
29+
/**
30+
* 登出接口,清楚登录状态
31+
*/
32+
Result<Boolean> logout(HttpServletRequest var1, HttpServletResponse var2);
33+
34+
/**
35+
* 检查是否已经登录
36+
*/
37+
boolean interceptorCheck(HttpServletRequest var1, HttpServletResponse var2, String var3, List<String> var4) throws IOException;
38+
39+
}
40+
41+
```
42+
43+
44+
45+
### 对接例子
46+
47+
我们以Ldap对接为例,说明KS如何对接登录认证系统。
48+
49+
+ 编写[LdapLoginServiceImpl](https://github.com/didi/KnowStreaming/blob/master/km-extends/km-account/src/main/java/com/xiaojukeji/know/streaming/km/account/login/ldap/LdapLoginServiceImpl.java)类,实现LoginExtend接口。
50+
+ 设置[application.yml](https://github.com/didi/KnowStreaming/blob/master/km-rest/src/main/resources/application.yml)中的spring.logi-security.login-extend-bean-name=ksLdapLoginService。
51+
52+
完成上述两步即可实现KS对接Ldap认证登陆。
53+
54+
```Java
55+
@Service("ksLdapLoginService")
56+
public class LdapLoginServiceImpl implements LoginExtend {
57+
58+
59+
@Override
60+
public UserBriefVO verifyLogin(AccountLoginDTO loginDTO,
61+
HttpServletRequest request,
62+
HttpServletResponse response) throws LogiSecurityException {
63+
String decodePasswd = AESUtils.decrypt(loginDTO.getPw());
64+
65+
// 去LDAP验证账密
66+
LdapPrincipal ldapAttrsInfo = ldapAuthentication.authenticate(loginDTO.getUserName(), decodePasswd);
67+
if (ldapAttrsInfo == null) {
68+
// 用户不存在,正常来说上如果有问题,上一步会直接抛出异常
69+
throw new LogiSecurityException(ResultCode.USER_NOT_EXISTS);
70+
}
71+
72+
// 进行业务相关操作
73+
74+
// 记录登录状态,Ldap因为无法记录登录状态,因此有KnowStreaming进行记录
75+
initLoginContext(request, response, loginDTO.getUserName(), user.getId());
76+
return CopyBeanUtil.copy(user, UserBriefVO.class);
77+
}
78+
79+
@Override
80+
public Result<Boolean> logout(HttpServletRequest request, HttpServletResponse response) {
81+
82+
//清理cookie和session
83+
84+
return Result.buildSucc(Boolean.TRUE);
85+
}
86+
87+
@Override
88+
public boolean interceptorCheck(HttpServletRequest request, HttpServletResponse response, String requestMappingValue, List<String> whiteMappingValues) throws IOException {
89+
90+
// 检查是否已经登录
91+
String userName = HttpRequestUtil.getOperator(request);
92+
if (StringUtils.isEmpty(userName)) {
93+
// 未登录,则进行登出
94+
logout(request, response);
95+
return Boolean.FALSE;
96+
}
97+
98+
return Boolean.TRUE;
99+
}
100+
}
101+
102+
```
103+
104+
105+
106+
### 实现原理
107+
108+
因为登陆和登出整体实现逻辑是一致的,所以我们以登陆逻辑为例进行介绍。
109+
110+
+ 登陆原理
111+
112+
登陆走的是[LogiCommon](https://github.com/didi/LogiCommon)自带的LoginController。
113+
114+
```java
115+
@RestController
116+
public class LoginController {
117+
118+
119+
//登陆接口
120+
@PostMapping({"/login"})
121+
public Result<UserBriefVO> login(HttpServletRequest request, HttpServletResponse response, @RequestBody AccountLoginDTO loginDTO) {
122+
try {
123+
//登陆认证
124+
UserBriefVO userBriefVO = this.loginService.verifyLogin(loginDTO, request, response);
125+
return Result.success(userBriefVO);
126+
127+
} catch (LogiSecurityException var5) {
128+
return Result.fail(var5);
129+
}
130+
}
131+
132+
}
133+
```
134+
135+
而登陆操作是调用LoginServiceImpl类来实现,但是具体由哪个登陆认证类来执行登陆操作却由loginExtendBeanTool来指定。
136+
137+
```java
138+
//LoginServiceImpl类
139+
@Service
140+
public class LoginServiceImpl implements LoginService {
141+
142+
//实现登陆操作,但是具体哪个登陆类由loginExtendBeanTool来管理
143+
public UserBriefVO verifyLogin(AccountLoginDTO loginDTO, HttpServletRequest request, HttpServletResponse response) throws LogiSecurityException {
144+
145+
return this.loginExtendBeanTool.getLoginExtendImpl().verifyLogin(loginDTO, request, response);
146+
}
147+
148+
149+
}
150+
```
151+
152+
而loginExtendBeanTool类会优先去查找用户指定的登陆认证类,如果失败则调用默认的登陆认证函数。
153+
154+
```java
155+
//LoginExtendBeanTool类
156+
@Component("logiSecurityLoginExtendBeanTool")
157+
public class LoginExtendBeanTool {
158+
159+
public LoginExtend getLoginExtendImpl() {
160+
LoginExtend loginExtend;
161+
//先调用用户指定登陆类,如果失败则调用系统默认登陆认证
162+
try {
163+
//调用的类由spring.logi-security.login-extend-bean-name指定
164+
loginExtend = this.getCustomLoginExtendImplBean();
165+
} catch (UnsupportedOperationException var3) {
166+
loginExtend = this.getDefaultLoginExtendImplBean();
167+
}
168+
169+
return loginExtend;
170+
}
171+
}
172+
```
173+
174+
+ 认证原理
175+
176+
认证的实现则比较简单,向Spring中注册我们的拦截器PermissionInterceptor。
177+
178+
拦截器会调用LoginServiceImpl类的拦截方法,LoginServiceImpl后续处理逻辑就和前面登陆是一致的。
179+
180+
```java
181+
public class PermissionInterceptor implements HandlerInterceptor {
182+
183+
184+
/**
185+
* 拦截预处理
186+
* @return boolean false:拦截, 不向下执行, true:放行
187+
*/
188+
@Override
189+
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
190+
191+
//免登录相关校验,如果验证通过,提前返回
192+
193+
//走拦截函数,进行普通用户验证
194+
return loginService.interceptorCheck(request, response, classRequestMappingValue, whiteMappingValues);
195+
}
196+
197+
}
198+
```
199+

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
2121
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
2222
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
23+
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
2324
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
2425
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
2526
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
@@ -171,7 +172,7 @@ public Result<Void> resetGroupOffsets(GroupOffsetResetDTO dto, String operator)
171172
}
172173

173174
if (!ConsumerGroupState.EMPTY.equals(description.state()) && !ConsumerGroupState.DEAD.equals(description.state())) {
174-
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", description.state().name()));
175+
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, String.format("group处于%s, 重置失败(仅Empty情况可重置)", GroupStateEnum.getByRawState(description.state()).getState()));
175176
}
176177

177178
// 获取offset
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.event.cluster;
2+
3+
import lombok.Getter;
4+
5+
/**
6+
* 集群新增事件
7+
* @author zengqiao
8+
* @date 22/02/25
9+
*/
10+
@Getter
11+
public class ClusterPhyAddedEvent extends ClusterPhyBaseEvent {
12+
public ClusterPhyAddedEvent(Object source, Long clusterPhyId) {
13+
super(source, clusterPhyId);
14+
}
15+
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/BaseKafkaZKEvent.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/kafka/zk/ControllerChangeEvent.java

Lines changed: 0 additions & 10 deletions
This file was deleted.

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ReassignConverter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ private static ReassignJobDetailDataGroupByTopic convert2ReassignJobDetailDataGr
170170
detail.setOriginalBrokerIdList(CommonUtils.string2IntList(subJobPO.getOriginalBrokerIds()));
171171
detail.setReassignBrokerIdList(CommonUtils.string2IntList(subJobPO.getReassignBrokerIds()));
172172
detail.setStatus(subJobPO.getStatus());
173+
detail.setOldReplicaNum(detail.getOriginalBrokerIdList().size());
173174

174175
ReassignSubJobExtendData extendData = ConvertUtil.str2ObjByJson(subJobPO.getExtendData(), ReassignSubJobExtendData.class);
175176
if (extendData != null) {
@@ -217,6 +218,7 @@ private static ReassignJobDetailDataGroupByTopic convert2ReassignJobDetailDataGr
217218

218219
topicDetail.setPresentReplicaNum(partitionDetailList.get(0).getPresentReplicaNum());
219220
topicDetail.setNewReplicaNum(partitionDetailList.get(0).getNewReplicaNum());
221+
topicDetail.setOldReplicaNum(partitionDetailList.get(0).getOldReplicaNum());
220222
topicDetail.setOriginalRetentionTimeUnitMs(partitionDetailList.get(0).getOriginalRetentionTimeUnitMs());
221223
topicDetail.setReassignRetentionTimeUnitMs(partitionDetailList.get(0).getReassignRetentionTimeUnitMs());
222224

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
88
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
99
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
10+
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
1011
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
12+
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
1113
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
1214
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
1315
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
@@ -106,6 +108,8 @@ public Long addClusterPhy(ClusterPhyPO clusterPhyPO, String operator) throws Par
106108

107109
log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator);
108110

111+
// 发布添加集群事件
112+
SpringTool.publish(new ClusterPhyAddedEvent(this, clusterPhyPO.getId()));
109113
return clusterPhyPO.getId();
110114
} catch (DuplicateKeyException dke) {
111115
log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator);

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/reassign/ReassignJobService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,7 @@ public interface ReassignJobService {
6060
* 依据任务状态或者其中一个任务ID
6161
*/
6262
Long getOneRunningJobId(Long clusterPhyId);
63+
64+
65+
Result<Void> preferredReplicaElection(Long jobId);
6366
}

0 commit comments

Comments
 (0)