4848import com .alibaba .dubbo .registry .NotifyListener ;
4949import com .alibaba .dubbo .registry .support .FailbackRegistry ;
5050import com .alibaba .dubbo .rpc .RpcException ;
51+ import redis .clients .jedis .exceptions .JedisConnectionException ;
5152
5253/**
5354 * RedisRegistry
54- *
55+ *
5556 * @author william.liangf
5657 */
5758public class RedisRegistry extends FailbackRegistry {
@@ -65,26 +66,26 @@ public class RedisRegistry extends FailbackRegistry {
6566 private final ScheduledExecutorService expireExecutor = Executors .newScheduledThreadPool (1 , new NamedThreadFactory ("DubboRegistryExpireTimer" , true ));
6667
6768 private final ScheduledFuture <?> expireFuture ;
68-
69+
6970 private final String root ;
7071
7172 private final Map <String , JedisPool > jedisPools = new ConcurrentHashMap <String , JedisPool >();
7273
7374 private final ConcurrentMap <String , Notifier > notifiers = new ConcurrentHashMap <String , Notifier >();
74-
75+
7576 private final int reconnectPeriod ;
7677
7778 private final int expirePeriod ;
78-
79+
7980 private volatile boolean admin = false ;
80-
81+
8182 private boolean replicate ;
8283
8384 public RedisRegistry (URL url ) {
8485 super (url );
8586 if (url .isAnyHost ()) {
86- throw new IllegalStateException ("registry address == null" );
87- }
87+ throw new IllegalStateException ("registry address == null" );
88+ }
8889 GenericObjectPool .Config config = new GenericObjectPool .Config ();
8990 config .testOnBorrow = url .getParameter ("test.on.borrow" , true );
9091 config .testOnReturn = url .getParameter ("test.on.return" , false );
@@ -103,13 +104,13 @@ public RedisRegistry(URL url) {
103104 config .timeBetweenEvictionRunsMillis = url .getParameter ("time.between.eviction.runs.millis" , 0 );
104105 if (url .getParameter ("min.evictable.idle.time.millis" , 0 ) > 0 )
105106 config .minEvictableIdleTimeMillis = url .getParameter ("min.evictable.idle.time.millis" , 0 );
106-
107+
107108 String cluster = url .getParameter ("cluster" , "failover" );
108109 if (! "failover" .equals (cluster ) && ! "replicate" .equals (cluster )) {
109- throw new IllegalArgumentException ("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate." );
110+ throw new IllegalArgumentException ("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate." );
110111 }
111112 replicate = "replicate" .equals (cluster );
112-
113+
113114 List <String > addresses = new ArrayList <String >();
114115 addresses .add (url .getAddress ());
115116 String [] backups = url .getParameter (Constants .BACKUP_KEY , new String [0 ]);
@@ -127,10 +128,10 @@ public RedisRegistry(URL url) {
127128 host = address ;
128129 port = DEFAULT_REDIS_PORT ;
129130 }
130- this .jedisPools .put (address , new JedisPool (config , host , port ,
131+ this .jedisPools .put (address , new JedisPool (config , host , port ,
131132 url .getParameter (Constants .TIMEOUT_KEY , Constants .DEFAULT_TIMEOUT )));
132133 }
133-
134+
134135 this .reconnectPeriod = url .getParameter (Constants .REGISTRY_RECONNECT_PERIOD_KEY , Constants .DEFAULT_REGISTRY_RECONNECT_PERIOD );
135136 String group = url .getParameter (Constants .GROUP_KEY , DEFAULT_ROOT );
136137 if (! group .startsWith (Constants .PATH_SEPARATOR )) {
@@ -140,7 +141,7 @@ public RedisRegistry(URL url) {
140141 group = group + Constants .PATH_SEPARATOR ;
141142 }
142143 this .root = group ;
143-
144+
144145 this .expirePeriod = url .getParameter (Constants .SESSION_TIMEOUT_KEY , Constants .DEFAULT_SESSION_TIMEOUT );
145146 this .expireFuture = expireExecutor .scheduleWithFixedDelay (new Runnable () {
146147 public void run () {
@@ -152,10 +153,11 @@ public void run() {
152153 }
153154 }, expirePeriod / 2 , expirePeriod / 2 , TimeUnit .MILLISECONDS );
154155 }
155-
156+
156157 private void deferExpired () {
157158 for (Map .Entry <String , JedisPool > entry : jedisPools .entrySet ()) {
158159 JedisPool jedisPool = entry .getValue ();
160+ boolean isBroken = false ;
159161 try {
160162 Jedis jedis = jedisPool .getResource ();
161163 try {
@@ -170,18 +172,24 @@ private void deferExpired() {
170172 if (admin ) {
171173 clean (jedis );
172174 }
173- if (! replicate ) {
174- break ;// 如果服务器端已同步数据,只需写入单台机器
175+ if (!replicate ) {
176+ break ;// 如果服务器端已同步数据,只需写入单台机器
175177 }
178+ } catch (JedisConnectionException e ){
179+ isBroken = true ;
176180 } finally {
177- jedisPool .returnResource (jedis );
181+ if (isBroken ){
182+ jedisPool .returnBrokenResource (jedis );
183+ } else {
184+ jedisPool .returnResource (jedis );
185+ }
178186 }
179187 } catch (Throwable t ) {
180188 logger .warn ("Failed to write provider heartbeat to redis registry. registry: " + entry .getKey () + ", cause: " + t .getMessage (), t );
181189 }
182190 }
183191 }
184-
192+
185193 // 监控中心负责删除过期脏数据
186194 private void clean (Jedis jedis ) {
187195 Set <String > keys = jedis .keys (root + Constants .ANY_VALUE );
@@ -202,7 +210,7 @@ private void clean(Jedis jedis) {
202210 logger .warn ("Delete expired key: " + key + " -> value: " + entry .getKey () + ", expire: " + new Date (expire ) + ", now: " + new Date (now ));
203211 }
204212 }
205- }
213+ }
206214 }
207215 if (delete ) {
208216 jedis .publish (key , Constants .UNREGISTER );
@@ -214,16 +222,20 @@ private void clean(Jedis jedis) {
214222
215223 public boolean isAvailable () {
216224 for (JedisPool jedisPool : jedisPools .values ()) {
225+ Jedis jedis = jedisPool .getResource ();
226+ boolean isBroken = false ;
217227 try {
218- Jedis jedis = jedisPool .getResource ();
219- try {
220- if (jedis .isConnected ()) {
221- return true ; // 至少需单台机器可用
222- }
223- } finally {
228+ if (jedis .isConnected ()) {
229+ return true ; // 至少需单台机器可用
230+ }
231+ } catch (JedisConnectionException e ) {
232+ isBroken = true ;
233+ } finally {
234+ if (isBroken ) {
235+ jedisPool .returnBrokenResource (jedis );
236+ } else {
224237 jedisPool .returnResource (jedis );
225238 }
226- } catch (Throwable t ) {
227239 }
228240 }
229241 return false ;
@@ -265,15 +277,22 @@ public void doRegister(URL url) {
265277 JedisPool jedisPool = entry .getValue ();
266278 try {
267279 Jedis jedis = jedisPool .getResource ();
280+ boolean isBroken = false ;
268281 try {
269282 jedis .hset (key , value , expire );
270283 jedis .publish (key , Constants .REGISTER );
271284 success = true ;
272285 if (! replicate ) {
273- break ; // 如果服务器端已同步数据,只需写入单台机器
286+ break ; // 如果服务器端已同步数据,只需写入单台机器
274287 }
288+ } catch (JedisConnectionException e ){
289+ isBroken = true ;
275290 } finally {
276- jedisPool .returnResource (jedis );
291+ if (isBroken ){
292+ jedisPool .returnBrokenResource (jedis );
293+ } else {
294+ jedisPool .returnResource (jedis );
295+ }
277296 }
278297 } catch (Throwable t ) {
279298 exception = new RpcException ("Failed to register service to redis registry. registry: " + entry .getKey () + ", service: " + url + ", cause: " + t .getMessage (), t );
@@ -298,15 +317,22 @@ public void doUnregister(URL url) {
298317 JedisPool jedisPool = entry .getValue ();
299318 try {
300319 Jedis jedis = jedisPool .getResource ();
320+ boolean isBroken = false ;
301321 try {
302322 jedis .hdel (key , value );
303323 jedis .publish (key , Constants .UNREGISTER );
304324 success = true ;
305325 if (! replicate ) {
306- break ; // 如果服务器端已同步数据,只需写入单台机器
326+ break ; // 如果服务器端已同步数据,只需写入单台机器
307327 }
328+ } catch (JedisConnectionException e ){
329+ isBroken = true ;
308330 } finally {
309- jedisPool .returnResource (jedis );
331+ if (isBroken ){
332+ jedisPool .returnBrokenResource (jedis );
333+ } else {
334+ jedisPool .returnResource (jedis );
335+ }
310336 }
311337 } catch (Throwable t ) {
312338 exception = new RpcException ("Failed to unregister service to redis registry. registry: " + entry .getKey () + ", service: " + url + ", cause: " + t .getMessage (), t );
@@ -320,7 +346,7 @@ public void doUnregister(URL url) {
320346 }
321347 }
322348 }
323-
349+
324350 @ Override
325351 public void doSubscribe (final URL url , final NotifyListener listener ) {
326352 String service = toServicePath (url );
@@ -339,6 +365,7 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
339365 JedisPool jedisPool = entry .getValue ();
340366 try {
341367 Jedis jedis = jedisPool .getResource ();
368+ boolean isBroken = false ;
342369 try {
343370 if (service .endsWith (Constants .ANY_VALUE )) {
344371 admin = true ;
@@ -363,8 +390,14 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
363390 }
364391 success = true ;
365392 break ; // 只需读一个服务器的数据
393+ } catch (JedisConnectionException e ){
394+ isBroken = true ;
366395 } finally {
367- jedisPool .returnResource (jedis );
396+ if (isBroken ){
397+ jedisPool .returnBrokenResource (jedis );
398+ } else {
399+ jedisPool .returnResource (jedis );
400+ }
368401 }
369402 } catch (Throwable t ) { // 尝试下一个服务器
370403 exception = new RpcException ("Failed to subscribe service from redis registry. registry: " + entry .getKey () + ", service: " + url + ", cause: " + t .getMessage (), t );
@@ -470,7 +503,7 @@ private String toCategoryPath(URL url) {
470503 }
471504
472505 private class NotifySub extends JedisPubSub {
473-
506+
474507 private final JedisPool jedisPool ;
475508
476509 public NotifySub (JedisPool jedisPool ) {
@@ -482,14 +515,21 @@ public void onMessage(String key, String msg) {
482515 if (logger .isInfoEnabled ()) {
483516 logger .info ("redis event: " + key + " = " + msg );
484517 }
485- if (msg .equals (Constants .REGISTER )
518+ if (msg .equals (Constants .REGISTER )
486519 || msg .equals (Constants .UNREGISTER )) {
487520 try {
488521 Jedis jedis = jedisPool .getResource ();
522+ boolean isBroken = false ;
489523 try {
490524 doNotify (jedis , key );
525+ } catch (JedisConnectionException e ){
526+ isBroken = true ;
491527 } finally {
492- jedisPool .returnResource (jedis );
528+ if (isBroken ){
529+ jedisPool .returnBrokenResource (jedis );
530+ } else {
531+ jedisPool .returnResource (jedis );
532+ }
493533 }
494534 } catch (Throwable t ) { // TODO 通知失败没有恢复机制保障
495535 logger .error (t .getMessage (), t );
@@ -527,23 +567,23 @@ private class Notifier extends Thread {
527567 private volatile Jedis jedis ;
528568
529569 private volatile boolean first = true ;
530-
570+
531571 private volatile boolean running = true ;
532-
572+
533573 private final AtomicInteger connectSkip = new AtomicInteger ();
534574
535575 private final AtomicInteger connectSkiped = new AtomicInteger ();
536576
537577 private final Random random = new Random ();
538-
578+
539579 private volatile int connectRandom ;
540580
541581 private void resetSkip () {
542582 connectSkip .set (0 );
543583 connectSkiped .set (0 );
544584 connectRandom = 0 ;
545585 }
546-
586+
547587 private boolean isSkip () {
548588 int skip = connectSkip .get (); // 跳过次数增长
549589 if (skip >= 10 ) { // 如果跳过次数增长超过10,取随机数
@@ -560,13 +600,13 @@ private boolean isSkip() {
560600 connectRandom = 0 ;
561601 return false ;
562602 }
563-
603+
564604 public Notifier (String service ) {
565605 super .setDaemon (true );
566606 super .setName ("DubboRedisSubscribe" );
567607 this .service = service ;
568608 }
569-
609+
570610 @ Override
571611 public void run () {
572612 while (running ) {
@@ -618,7 +658,7 @@ public void run() {
618658 }
619659 }
620660 }
621-
661+
622662 public void shutdown () {
623663 try {
624664 running = false ;
@@ -627,7 +667,7 @@ public void shutdown() {
627667 logger .warn (t .getMessage (), t );
628668 }
629669 }
630-
670+
631671 }
632672
633- }
673+ }
0 commit comments