Skip to content

Commit e224ada

Browse files
author
夜色
committed
升级到0.7.0
1 parent 5f40875 commit e224ada

File tree

2 files changed

+35
-37
lines changed

2 files changed

+35
-37
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.mpush</groupId>
88
<artifactId>alloc</artifactId>
9-
<version>0.6.1</version>
9+
<version>0.7.0</version>
1010
<name>mpush-alloc</name>
1111
<description>mpush Server Allocator</description>
1212
<url>https://github.com/mpusher/mpush</url>
@@ -45,7 +45,7 @@
4545
<dependency>
4646
<groupId>com.github.mpusher</groupId>
4747
<artifactId>mpush-client</artifactId>
48-
<version>0.6.1</version>
48+
<version>0.7.0</version>
4949
</dependency>
5050
</dependencies>
5151

src/main/java/com/shinemo/mpush/alloc/AllocHandler.java

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@
2020
package com.shinemo.mpush.alloc;
2121

2222
import com.mpush.api.Constants;
23-
import com.mpush.cache.redis.manager.RedisManager;
23+
import com.mpush.api.spi.common.CacheManagerFactory;
24+
import com.mpush.api.spi.common.ServiceDiscoveryFactory;
25+
import com.mpush.api.srd.ServiceDiscovery;
26+
import com.mpush.api.srd.ServiceListener;
27+
import com.mpush.api.srd.ServiceNames;
28+
import com.mpush.api.srd.ServiceNode;
2429
import com.mpush.common.user.UserManager;
25-
import com.mpush.zk.ZKClient;
26-
import com.mpush.zk.ZKPath;
27-
import com.mpush.zk.cache.ZKServerNodeCache;
28-
import com.mpush.zk.listener.ZKServerNodeWatcher;
29-
import com.mpush.zk.node.ZKServerNode;
3030
import com.sun.net.httpserver.HttpExchange;
3131
import com.sun.net.httpserver.HttpHandler;
3232

3333
import java.io.IOException;
3434
import java.io.OutputStream;
35-
import java.util.Collection;
3635
import java.util.Collections;
3736
import java.util.Iterator;
3837
import java.util.List;
@@ -49,22 +48,24 @@
4948
/*package*/ final class AllocHandler implements HttpHandler {
5049

5150
private ScheduledExecutorService scheduledExecutor;
52-
private ZKServerNodeWatcher watcher;
5351
private List<ServerNode> serverNodes = Collections.emptyList();
52+
private final ServiceDiscovery discovery = ServiceDiscoveryFactory.create();
5453

5554
public void start() {
56-
ZKClient.I.start();//启动ZK
57-
watcher = new ZKServerNodeWatcher(ZKPath.CONNECT_SERVER, new ConnServerZKNodeCache());//监听长链接服务器节点
58-
watcher.watch();
55+
CacheManagerFactory.create().init(); //启动缓冲服务
56+
57+
ServiceDiscovery discovery = ServiceDiscoveryFactory.create();// 启动发现服务
58+
discovery.syncStart();
59+
discovery.subscribe(ServiceNames.CONN_SERVER, new ConnServerNodeListener());
5960

60-
RedisManager.I.init();
6161

6262
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
6363
scheduledExecutor.scheduleAtFixedRate(this::refresh, 0, 5, TimeUnit.MINUTES);
6464
}
6565

6666
public void stop() {
67-
if (ZKClient.I.isRunning()) ZKClient.I.stop();
67+
discovery.syncStop();
68+
CacheManagerFactory.create().destroy();
6869
scheduledExecutor.shutdown();
6970
}
7071

@@ -73,13 +74,13 @@ public void handle(HttpExchange httpExchange) throws IOException {
7374
StringBuilder sb = new StringBuilder();
7475
Iterator<ServerNode> it = serverNodes.iterator();
7576
if (it.hasNext()) {
76-
ZKServerNode node = it.next();
77-
sb.append(node.getExtranetIp()).append(':').append(node.getPort());
77+
ServerNode node = it.next();
78+
sb.append(node.host).append(':').append(node.port);
7879
}
7980

8081
while (it.hasNext()) {
81-
ZKServerNode node = it.next();
82-
sb.append(',').append(node.getExtranetIp()).append(':').append(node.getPort());
82+
ServerNode node = it.next();
83+
sb.append(',').append(node.host).append(':').append(node.port);
8384
}
8485

8586
byte[] data = sb.toString().getBytes(Constants.UTF_8);
@@ -95,7 +96,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
9596
*/
9697
private void refresh() {
9798
//1.从缓存中拿取可用的长链接服务器节点
98-
Collection<ZKServerNode> nodes = watcher.getCache().values();
99+
List<ServiceNode> nodes = discovery.lookup(ServiceNames.CONN_SERVER);
99100
if (nodes.size() > 0) {
100101
//2.对serverNodes可以按某种规则排序,以便实现负载均衡,比如:随机,轮询,链接数量等
101102
this.serverNodes = nodes
@@ -110,42 +111,39 @@ private long getOnlineUserNum(String publicIP) {
110111
return UserManager.I.getOnlineUserNum(publicIP);
111112
}
112113

113-
private ServerNode convert(ZKServerNode node) {
114-
ServerNode serverNode = new ServerNode();
115-
serverNode.setExtranetIp(node.getExtranetIp());
116-
serverNode.setIp(node.getIp());
117-
serverNode.setPort(node.getPort());
118-
serverNode.setOnlineUserNum(getOnlineUserNum(node.getExtranetIp()));
119-
return serverNode;
114+
private ServerNode convert(ServiceNode node) {
115+
String public_ip = node.getAttr(ServiceNames.ATTR_PUBLIC_IP);
116+
long onlineUserNum = getOnlineUserNum(public_ip);
117+
return new ServerNode(node.getHost(), node.getPort(), onlineUserNum);
120118
}
121119

122-
private class ConnServerZKNodeCache extends ZKServerNodeCache {
120+
private class ConnServerNodeListener implements ServiceListener {
123121

124122
@Override
125-
public void put(String fullPath, ZKServerNode node) {
126-
super.put(fullPath, node);
123+
public void onServiceAdded(String s, ServiceNode serviceNode) {
127124
refresh();
128125
}
129126

130127
@Override
131-
public ZKServerNode remove(String fullPath) {
132-
ZKServerNode node = super.remove(fullPath);
128+
public void onServiceUpdated(String s, ServiceNode serviceNode) {
133129
refresh();
134-
return node;
135130
}
136131

137132
@Override
138-
public void clear() {
139-
super.clear();
133+
public void onServiceRemoved(String s, ServiceNode serviceNode) {
140134
refresh();
141135
}
142136
}
143137

144-
private static class ServerNode extends ZKServerNode implements Comparable<ServerNode> {
138+
private static class ServerNode implements Comparable<ServerNode> {
145139
long onlineUserNum = 0;
140+
String host;
141+
int port;
146142

147-
public void setOnlineUserNum(long onlineUserNum) {
143+
public ServerNode(String host, int port, long onlineUserNum) {
148144
this.onlineUserNum = onlineUserNum;
145+
this.host = host;
146+
this.port = port;
149147
}
150148

151149
@Override

0 commit comments

Comments
 (0)