@@ -44,7 +44,7 @@ def redis_connection_for_subscriptions
44
44
45
45
private
46
46
def listener
47
- @listener || @server . mutex . synchronize { @listener ||= Listener . new ( self , @server . event_loop ) }
47
+ @listener || @server . mutex . synchronize { @listener ||= Listener . new ( self , config_options , @server . event_loop ) }
48
48
end
49
49
50
50
def redis_connection_for_broadcasts
@@ -54,11 +54,15 @@ def redis_connection_for_broadcasts
54
54
end
55
55
56
56
def redis_connection
57
- self . class . redis_connector . call ( @server . config . cable . symbolize_keys . merge ( id : identifier ) )
57
+ self . class . redis_connector . call ( config_options )
58
+ end
59
+
60
+ def config_options
61
+ @config_options ||= @server . config . cable . symbolize_keys . merge ( id : identifier )
58
62
end
59
63
60
64
class Listener < SubscriberMap
61
- def initialize ( adapter , event_loop )
65
+ def initialize ( adapter , config_options , event_loop )
62
66
super ( )
63
67
64
68
@adapter = adapter
@@ -67,6 +71,11 @@ def initialize(adapter, event_loop)
67
71
@subscribe_callbacks = Hash . new { |h , k | h [ k ] = [ ] }
68
72
@subscription_lock = Mutex . new
69
73
74
+ @reconnect_attempt = 0
75
+ # Use the same config as used by Redis conn
76
+ @reconnect_attempts = config_options . fetch ( :reconnect_attempts , 1 )
77
+ @reconnect_attempts = Array . new ( @reconnect_attempts , 0 ) if @reconnect_attempts . is_a? ( Integer )
78
+
70
79
@subscribed_client = nil
71
80
72
81
@when_connected = [ ]
@@ -82,6 +91,7 @@ def listen(conn)
82
91
on . subscribe do |chan , count |
83
92
@subscription_lock . synchronize do
84
93
if count == 1
94
+ @reconnect_attempt = 0
85
95
@subscribed_client = original_client
86
96
87
97
until @when_connected . empty?
@@ -148,8 +158,16 @@ def ensure_listener_running
148
158
@thread ||= Thread . new do
149
159
Thread . current . abort_on_exception = true
150
160
151
- conn = @adapter . redis_connection_for_subscriptions
152
- listen conn
161
+ begin
162
+ conn = @adapter . redis_connection_for_subscriptions
163
+ listen conn
164
+ rescue ConnectionError
165
+ reset
166
+ if retry_connecting?
167
+ when_connected { resubscribe }
168
+ retry
169
+ end
170
+ end
153
171
end
154
172
end
155
173
@@ -161,7 +179,36 @@ def when_connected(&block)
161
179
end
162
180
end
163
181
182
+ def retry_connecting?
183
+ @reconnect_attempt += 1
184
+
185
+ return false if @reconnect_attempt > @reconnect_attempts . size
186
+
187
+ sleep_t = @reconnect_attempts [ @reconnect_attempt - 1 ]
188
+
189
+ sleep ( sleep_t ) if sleep_t > 0
190
+
191
+ true
192
+ end
193
+
194
+ def resubscribe
195
+ channels = @sync . synchronize do
196
+ @subscribers . keys
197
+ end
198
+ @subscribed_client . subscribe ( *channels ) unless channels . empty?
199
+ end
200
+
201
+ def reset
202
+ @subscription_lock . synchronize do
203
+ @subscribed_client = nil
204
+ @subscribe_callbacks . clear
205
+ @when_connected . clear
206
+ end
207
+ end
208
+
164
209
if ::Redis ::VERSION < "5"
210
+ ConnectionError = ::Redis ::ConnectionError
211
+
165
212
class SubscribedClient
166
213
def initialize ( raw_client )
167
214
@raw_client = raw_client
@@ -194,6 +241,8 @@ def extract_subscribed_client(conn)
194
241
SubscribedClient . new ( conn . _client )
195
242
end
196
243
else
244
+ ConnectionError = RedisClient ::ConnectionError
245
+
197
246
def extract_subscribed_client ( conn )
198
247
conn
199
248
end
0 commit comments