Skip to content

Commit 3e13388

Browse files
authored
Merge pull request rails#46562 from palkan/feat/action-cable-redis-resubscribe
Restore Action Cable Redis pub/sub listener on connection failure
2 parents 3bb8cb9 + ae649b1 commit 3e13388

File tree

3 files changed

+107
-5
lines changed

3 files changed

+107
-5
lines changed

actioncable/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* Redis pub/sub adapter now automatically reconnects when Redis connection is lost.
2+
3+
*Vladimir Dementyev*
4+
15
* The `connected()` callback can now take a `{reconnected}` parameter to differentiate
26
connections from reconnections.
37

actioncable/lib/action_cable/subscription_adapter/redis.rb

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def redis_connection_for_subscriptions
4444

4545
private
4646
def listener
47-
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
47+
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, config_options, @server.event_loop) }
4848
end
4949

5050
def redis_connection_for_broadcasts
@@ -54,11 +54,15 @@ def redis_connection_for_broadcasts
5454
end
5555

5656
def redis_connection
57-
self.class.redis_connector.call(@server.config.cable.symbolize_keys.merge(id: identifier))
57+
self.class.redis_connector.call(config_options)
58+
end
59+
60+
def config_options
61+
@config_options ||= @server.config.cable.symbolize_keys.merge(id: identifier)
5862
end
5963

6064
class Listener < SubscriberMap
61-
def initialize(adapter, event_loop)
65+
def initialize(adapter, config_options, event_loop)
6266
super()
6367

6468
@adapter = adapter
@@ -67,6 +71,11 @@ def initialize(adapter, event_loop)
6771
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
6872
@subscription_lock = Mutex.new
6973

74+
@reconnect_attempt = 0
75+
# Use the same config as used by Redis conn
76+
@reconnect_attempts = config_options.fetch(:reconnect_attempts, 1)
77+
@reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer)
78+
7079
@subscribed_client = nil
7180

7281
@when_connected = []
@@ -82,6 +91,7 @@ def listen(conn)
8291
on.subscribe do |chan, count|
8392
@subscription_lock.synchronize do
8493
if count == 1
94+
@reconnect_attempt = 0
8595
@subscribed_client = original_client
8696

8797
until @when_connected.empty?
@@ -148,8 +158,16 @@ def ensure_listener_running
148158
@thread ||= Thread.new do
149159
Thread.current.abort_on_exception = true
150160

151-
conn = @adapter.redis_connection_for_subscriptions
152-
listen conn
161+
begin
162+
conn = @adapter.redis_connection_for_subscriptions
163+
listen conn
164+
rescue ConnectionError
165+
reset
166+
if retry_connecting?
167+
when_connected { resubscribe }
168+
retry
169+
end
170+
end
153171
end
154172
end
155173

@@ -161,7 +179,36 @@ def when_connected(&block)
161179
end
162180
end
163181

182+
def retry_connecting?
183+
@reconnect_attempt += 1
184+
185+
return false if @reconnect_attempt > @reconnect_attempts.size
186+
187+
sleep_t = @reconnect_attempts[@reconnect_attempt - 1]
188+
189+
sleep(sleep_t) if sleep_t > 0
190+
191+
true
192+
end
193+
194+
def resubscribe
195+
channels = @sync.synchronize do
196+
@subscribers.keys
197+
end
198+
@subscribed_client.subscribe(*channels) unless channels.empty?
199+
end
200+
201+
def reset
202+
@subscription_lock.synchronize do
203+
@subscribed_client = nil
204+
@subscribe_callbacks.clear
205+
@when_connected.clear
206+
end
207+
end
208+
164209
if ::Redis::VERSION < "5"
210+
ConnectionError = ::Redis::ConnectionError
211+
165212
class SubscribedClient
166213
def initialize(raw_client)
167214
@raw_client = raw_client
@@ -194,6 +241,8 @@ def extract_subscribed_client(conn)
194241
SubscribedClient.new(conn._client)
195242
end
196243
else
244+
ConnectionError = RedisClient::ConnectionError
245+
197246
def extract_subscribed_client(conn)
198247
conn
199248
end

actioncable/test/subscription_adapter/redis_test.rb

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,55 @@ def cable_config
1515
end
1616
end
1717
end
18+
19+
def test_reconnections
20+
subscribe_as_queue("channel") do |queue|
21+
subscribe_as_queue("other channel") do |queue_2|
22+
@tx_adapter.broadcast("channel", "hello world")
23+
24+
assert_equal "hello world", queue.pop
25+
26+
drop_pubsub_connections
27+
wait_pubsub_connection(redis_conn, "channel")
28+
29+
@tx_adapter.broadcast("channel", "hallo welt")
30+
31+
assert_equal "hallo welt", queue.pop
32+
33+
drop_pubsub_connections
34+
wait_pubsub_connection(redis_conn, "channel")
35+
wait_pubsub_connection(redis_conn, "other channel")
36+
37+
@tx_adapter.broadcast("channel", "hola mundo")
38+
@tx_adapter.broadcast("other channel", "other message")
39+
40+
assert_equal "hola mundo", queue.pop
41+
assert_equal "other message", queue_2.pop
42+
end
43+
end
44+
end
45+
46+
private
47+
def redis_conn
48+
@redis_conn ||= ::Redis.new(cable_config.except(:adapter))
49+
end
50+
51+
def drop_pubsub_connections
52+
# Emulate connection failure by dropping all connections
53+
redis_conn.client("kill", "type", "pubsub")
54+
end
55+
56+
def wait_pubsub_connection(redis_conn, channel, timeout: 2)
57+
wait = timeout
58+
loop do
59+
break if redis_conn.pubsub("numsub", channel).last > 0
60+
61+
sleep 0.1
62+
wait -= 0.1
63+
64+
raise "Timed out to subscribe to #{channel}" if wait <= 0
65+
end
66+
end
1867
end
1968

2069
class RedisAdapterTest::AlternateConfiguration < RedisAdapterTest

0 commit comments

Comments
 (0)