@@ -48,24 +48,42 @@ def getInformationBeforeDispose(self):
4848 return [shard .getInformationBeforeDispose () for shard in self .shards ]
4949
5050 def getInformationAfterDispose (self ):
51- return [shard .getInformationAfterDispose () for shard in self .shards ]
51+ return [shard .getInformationAfterDispose () for shard in self .shards ]
52+
53+ def _agreeOk (self ):
54+ ok = 0
55+ for shard in self .shards :
56+ con = shard .getConnection ()
57+ try :
58+ status = con .execute_command ('CLUSTER' , 'INFO' )
59+ except Exception as e :
60+ print ('got error on cluster info, will try again, %s' % str (e ))
61+ continue
62+ if 'cluster_state:ok' in str (status ):
63+ ok += 1
64+ return ok == len (self .shards )
65+
66+ def _agreeSlots (self ):
67+ ok = 0
68+ first_view = None
69+ for shard in self .shards :
70+ con = shard .getConnection ()
71+ try :
72+ slots_view = con .execute_command ('CLUSTER' , 'SLOTS' )
73+ except Exception as e :
74+ print ('got error on cluster slots, will try again, %s' % str (e ))
75+ continue
76+ if first_view is None :
77+ first_view = slots_view
78+ if slots_view == first_view :
79+ ok += 1
80+ return ok == len (self .shards )
5281
5382 def waitCluster (self , timeout_sec = 40 ):
5483 st = time .time ()
55- ok = 0
5684
5785 while st + timeout_sec > time .time ():
58- ok = 0
59- for shard in self .shards :
60- con = shard .getConnection ()
61- try :
62- status = con .execute_command ('CLUSTER' , 'INFO' )
63- except Exception as e :
64- print ('got error on cluster info, will try again, %s' % str (e ))
65- continue
66- if 'cluster_state:ok' in str (status ):
67- ok += 1
68- if ok == len (self .shards ):
86+ if self ._agreeOk () and self ._agreeSlots ():
6987 for shard in self .shards :
7088 try :
7189 shard .getConnection ().execute_command ('SEARCH.CLUSTERREFRESH' )
0 commit comments