@@ -43,7 +43,8 @@ def subscribe(client, timeout)
43
43
def initialize ( router , command_builder )
44
44
@router = router
45
45
@command_builder = command_builder
46
- @states = { }
46
+ @state_list = [ ]
47
+ @state_dict = { }
47
48
end
48
49
49
50
def call ( *args , **kwargs )
@@ -55,21 +56,21 @@ def call_v(command)
55
56
end
56
57
57
58
def close
58
- @states . each_value ( &:close )
59
- @states . clear
59
+ @state_list . each ( &:close )
60
+ @state_list . clear
61
+ @state_dict . clear
60
62
end
61
63
62
64
def next_event ( timeout = nil )
63
- return if @states . empty?
65
+ return if @state_list . empty?
64
66
65
67
max_duration = calc_max_duration ( timeout )
66
68
starting = obtain_current_time
67
- clients = @states . values
68
69
loop do
69
70
break if max_duration > 0 && obtain_current_time - starting > max_duration
70
71
71
- clients . shuffle!
72
- clients . each do |pubsub |
72
+ @state_list . shuffle!
73
+ @state_list . each do |pubsub |
73
74
message = pubsub . take_message ( timeout )
74
75
return message if message
75
76
end
@@ -80,26 +81,26 @@ def next_event(timeout = nil)
80
81
81
82
def _call ( command )
82
83
node_key = @router . find_node_key ( command )
83
- add_state ( node_key )
84
84
try_call ( node_key , command )
85
85
end
86
86
87
87
def try_call ( node_key , command , retry_count : 1 )
88
- @states [ node_key ] . call ( command )
88
+ add_state ( node_key ) . call ( command )
89
89
rescue ::RedisClient ::CommandError => e
90
90
raise if !e . message . start_with? ( 'MOVED' ) || retry_count <= 0
91
91
92
92
# for sharded pub/sub
93
93
node_key = e . message . split [ 2 ]
94
- add_state ( node_key )
95
94
retry_count -= 1
96
95
retry
97
96
end
98
97
99
98
def add_state ( node_key )
100
- return @states [ node_key ] if @states . key? ( node_key )
99
+ return @state_dict [ node_key ] if @state_dict . key? ( node_key )
101
100
102
- @states [ node_key ] = State . new ( @router . find_node ( node_key ) . pubsub )
101
+ state = State . new ( @router . find_node ( node_key ) . pubsub )
102
+ @state_list << state
103
+ @state_dict [ node_key ] = state
103
104
end
104
105
105
106
def obtain_current_time
0 commit comments