Skip to content

Commit f1af8ed

Browse files
authored
Merge pull request #33 from crossoverJie/cim-1.0.4
cim 1.0.4
2 parents 052505e + 273e7c5 commit f1af8ed

File tree

19 files changed

+848
-11
lines changed

19 files changed

+848
-11
lines changed

cim-client/src/main/java/com/crossoverjie/cim/client/handle/CIMClientHandle.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.crossoverjie.cim.client.handle;
22

3+
import com.crossoverjie.cim.client.service.ShutDownMsg;
34
import com.crossoverjie.cim.client.thread.ReConnectJob;
45
import com.crossoverjie.cim.client.util.SpringBeanFactory;
56
import com.crossoverjie.cim.common.constant.Constants;
@@ -37,6 +38,8 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
3738

3839
private ScheduledExecutorService scheduledExecutorService ;
3940

41+
private ShutDownMsg shutDownMsg ;
42+
4043

4144
@Override
4245
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@@ -71,11 +74,20 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
7174

7275
@Override
7376
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
74-
LOGGER.info("客户端断开了,重新连接!");
77+
78+
if (shutDownMsg == null){
79+
shutDownMsg = SpringBeanFactory.getBean(ShutDownMsg.class) ;
80+
}
81+
82+
//用户主动退出,不执行重连逻辑
83+
if (shutDownMsg.checkStatus()){
84+
return;
85+
}
7586

7687
if (scheduledExecutorService == null){
7788
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
7889
}
90+
LOGGER.info("客户端断开了,重新连接!");
7991
// TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
8092
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
8193
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.crossoverjie.cim.client.service;
2+
3+
import org.springframework.stereotype.Component;
4+
5+
/**
6+
* Function:
7+
*
8+
* @author crossoverJie
9+
* Date: 2019-02-27 16:17
10+
* @since JDK 1.8
11+
*/
12+
@Component
13+
public class ShutDownMsg {
14+
private boolean isCommand ;
15+
16+
/**
17+
* 置为用户主动退出状态
18+
*/
19+
public void shutdown(){
20+
isCommand = true ;
21+
}
22+
23+
public boolean checkStatus(){
24+
return isCommand ;
25+
}
26+
}

cim-client/src/main/java/com/crossoverjie/cim/client/service/impl/command/QueryHistoryCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ public class QueryHistoryCommand implements InnerCommand {
2525
@Override
2626
public void process(String msg) {
2727
String[] split = msg.split(" ");
28+
if (split.length < 2){
29+
return;
30+
}
2831
String res = msgLogger.query(split[1]);
2932
System.out.println(res);
3033
}

cim-client/src/main/java/com/crossoverjie/cim/client/service/impl/command/ShutDownCommand.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.crossoverjie.cim.client.service.InnerCommand;
55
import com.crossoverjie.cim.client.service.MsgLogger;
66
import com.crossoverjie.cim.client.service.RouteRequest;
7+
import com.crossoverjie.cim.client.service.ShutDownMsg;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
910
import org.springframework.beans.factory.annotation.Autowired;
@@ -36,9 +37,14 @@ public class ShutDownCommand implements InnerCommand {
3637
@Resource(name = "callBackThreadPool")
3738
private ThreadPoolExecutor executor;
3839

40+
41+
@Autowired
42+
private ShutDownMsg shutDownMsg ;
43+
3944
@Override
4045
public void process(String msg) {
4146
LOGGER.info("系统关闭中。。。。");
47+
shutDownMsg.shutdown();
4248
routeRequest.offLine();
4349
msgLogger.stop();
4450
executor.shutdown();

cim-client/src/main/resources/application.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ cim.clear.route.request.url=http://45.78.28.220:8083/offLine
4545
#cim.clear.route.request.url=http://localhost:8083/offLine
4646

4747
# 客户端唯一ID
48-
cim.user.id=1545574841528
49-
cim.user.userName=zhangsan
48+
cim.user.id=1551267098213
49+
cim.user.userName=test3
5050

5151
# 回调线程队列大小
5252
cim.callback.thread.queue.size = 2
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package com.crossoverjie.cim.common.data.construct;
2+
3+
import java.util.Arrays;
4+
import java.util.Comparator;
5+
6+
/**
7+
* Function:根据 key 排序的 Map
8+
*
9+
* @author crossoverJie
10+
* Date: 2019-02-25 18:17
11+
* @since JDK 1.8
12+
*/
13+
public class SortArrayMap {
14+
15+
/**
16+
* 核心数组
17+
*/
18+
private Node[] buckets;
19+
20+
private static final int DEFAULT_SIZE = 10;
21+
22+
/**
23+
* 数组大小
24+
*/
25+
private int size = 0;
26+
27+
public SortArrayMap() {
28+
buckets = new Node[DEFAULT_SIZE];
29+
}
30+
31+
/**
32+
* 写入数据
33+
* @param key
34+
* @param value
35+
*/
36+
public void add(Long key, String value) {
37+
checkSize(size + 1);
38+
Node node = new Node(key, value);
39+
buckets[size++] = node;
40+
}
41+
42+
/**
43+
* 校验是否需要扩容
44+
* @param size
45+
*/
46+
private void checkSize(int size) {
47+
if (size >= buckets.length) {
48+
//扩容自身的 3/2
49+
int oldLen = buckets.length;
50+
int newLen = oldLen + (oldLen >> 1);
51+
buckets = Arrays.copyOf(buckets, newLen);
52+
}
53+
}
54+
55+
/**
56+
* 顺时针取出数据
57+
* @param key
58+
* @return
59+
*/
60+
public String firstNodeValue(long key) {
61+
if (size == 0){
62+
return null ;
63+
}
64+
for (Node bucket : buckets) {
65+
if (bucket == null){
66+
continue;
67+
}
68+
if (bucket.key >= key) {
69+
return bucket.value;
70+
}
71+
}
72+
73+
return buckets[0].value;
74+
75+
}
76+
77+
/**
78+
* 排序
79+
*/
80+
public void sort() {
81+
Arrays.sort(buckets, 0, size, new Comparator<Node>() {
82+
@Override
83+
public int compare(Node o1, Node o2) {
84+
if (o1.key > o2.key) {
85+
return 1;
86+
} else {
87+
return -1;
88+
}
89+
}
90+
});
91+
}
92+
93+
public void print() {
94+
for (Node bucket : buckets) {
95+
if (bucket == null) {
96+
continue;
97+
}
98+
System.out.println(bucket.toString());
99+
}
100+
}
101+
102+
public int size() {
103+
return size;
104+
}
105+
106+
/**
107+
* 数据节点
108+
*/
109+
private class Node {
110+
public Long key;
111+
public String value;
112+
113+
public Node(Long key, String value) {
114+
this.key = key;
115+
this.value = value;
116+
}
117+
118+
@Override
119+
public String toString() {
120+
return "Node{" +
121+
"key=" + key +
122+
", value='" + value + '\'' +
123+
'}';
124+
}
125+
126+
}
127+
128+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.crossoverjie.cim.common.route.algorithm;
2+
3+
import java.util.List;
4+
5+
/**
6+
* Function:
7+
*
8+
* @author crossoverJie
9+
* Date: 2019-02-27 00:31
10+
* @since JDK 1.8
11+
*/
12+
public interface RouteHandle {
13+
14+
/**
15+
* 再一批服务器里进行路由
16+
* @param values
17+
* @param key
18+
* @return
19+
*/
20+
String routeServer(List<String> values,String key) ;
21+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.crossoverjie.cim.common.route.algorithm.consistenthash;
2+
3+
import java.io.UnsupportedEncodingException;
4+
import java.security.MessageDigest;
5+
import java.security.NoSuchAlgorithmException;
6+
import java.util.List;
7+
8+
/**
9+
* Function:一致性 hash 算法抽象类
10+
*
11+
* @author crossoverJie
12+
* Date: 2019-02-27 00:35
13+
* @since JDK 1.8
14+
*/
15+
public abstract class AbstractConsistentHash {
16+
17+
/**
18+
* 新增节点
19+
* @param key
20+
* @param value
21+
*/
22+
protected abstract void add(long key,String value);
23+
24+
/**
25+
* 排序节点,数据结构自身支持排序可以不用重写
26+
*/
27+
protected void sort(){}
28+
29+
/**
30+
* 根据当前的 key 通过一致性 hash 算法的规则取出一个节点
31+
* @param value
32+
* @return
33+
*/
34+
protected abstract String getFirstNodeValue(String value);
35+
36+
/**
37+
* 传入节点列表以及客户端信息获取一个服务节点
38+
* @param values
39+
* @param key
40+
* @return
41+
*/
42+
public String process(List<String> values,String key){
43+
44+
for (String value : values) {
45+
add(hash(value), value);
46+
}
47+
sort();
48+
49+
return getFirstNodeValue(key) ;
50+
}
51+
52+
/**
53+
* hash 运算
54+
* @param value
55+
* @return
56+
*/
57+
public Long hash(String value){
58+
MessageDigest md5;
59+
try {
60+
md5 = MessageDigest.getInstance("MD5");
61+
} catch (NoSuchAlgorithmException e) {
62+
throw new RuntimeException("MD5 not supported", e);
63+
}
64+
md5.reset();
65+
byte[] keyBytes = null;
66+
try {
67+
keyBytes = value.getBytes("UTF-8");
68+
} catch (UnsupportedEncodingException e) {
69+
throw new RuntimeException("Unknown string :" + value, e);
70+
}
71+
72+
md5.update(keyBytes);
73+
byte[] digest = md5.digest();
74+
75+
// hash code, Truncate to 32-bits
76+
long hashCode = ((long) (digest[3] & 0xFF) << 24)
77+
| ((long) (digest[2] & 0xFF) << 16)
78+
| ((long) (digest[1] & 0xFF) << 8)
79+
| (digest[0] & 0xFF);
80+
81+
long truncateHashCode = hashCode & 0xffffffffL;
82+
return truncateHashCode;
83+
}
84+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.crossoverjie.cim.common.route.algorithm.consistenthash;
2+
3+
import com.crossoverjie.cim.common.route.algorithm.RouteHandle;
4+
5+
import java.util.List;
6+
7+
/**
8+
* Function:
9+
*
10+
* @author crossoverJie
11+
* Date: 2019-02-27 00:33
12+
* @since JDK 1.8
13+
*/
14+
public class ConsistentHashHandle implements RouteHandle {
15+
private AbstractConsistentHash hash ;
16+
17+
public void setHash(AbstractConsistentHash hash) {
18+
this.hash = hash;
19+
}
20+
21+
@Override
22+
public String routeServer(List<String> values, String key) {
23+
return hash.process(values, key);
24+
}
25+
}

0 commit comments

Comments
 (0)