Skip to content

Commit f644d9f

Browse files
authored
Merge pull request rails#55201 from OuYangJinTing/fix/actioncable/channel
Prevent the `stream_from` method throwing RuntimeError
2 parents b8d22ce + c8f71d5 commit f644d9f

File tree

5 files changed

+84
-0
lines changed

5 files changed

+84
-0
lines changed

actioncable/lib/action_cable/channel/base.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ def initialize(connection, identifier, params = {})
165165

166166
@reject_subscription = nil
167167
@subscription_confirmation_sent = nil
168+
@unsubscribed = false
168169

169170
delegate_connection_identifiers
170171
end
@@ -200,11 +201,16 @@ def subscribe_to_channel
200201
# cleanup with callbacks. This method is not intended to be called directly by
201202
# the user. Instead, override the #unsubscribed callback.
202203
def unsubscribe_from_channel # :nodoc:
204+
@unsubscribed = true
203205
run_callbacks :unsubscribe do
204206
unsubscribed
205207
end
206208
end
207209

210+
def unsubscribed? # :nodoc:
211+
@unsubscribed
212+
end
213+
208214
private
209215
# Called once a consumer has become a subscriber of the channel. Usually the
210216
# place to set up any streams you want this channel to be sending to the

actioncable/lib/action_cable/channel/streams.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ module Streams
8888
# callback. Defaults to `coder: nil` which does no decoding, passes raw
8989
# messages.
9090
def stream_from(broadcasting, callback = nil, coder: nil, &block)
91+
return if unsubscribed?
92+
9193
broadcasting = String(broadcasting)
9294

9395
# Don't send the confirmation until pubsub#subscribe is successful

actioncable/test/channel/base_test.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,16 @@ def error_handler
126126
assert_not_predicate @channel, :subscribed?
127127
end
128128

129+
test "unsubscribed? method returns correct status" do
130+
assert_not @channel.unsubscribed?
131+
132+
@channel.subscribe_to_channel
133+
assert_not @channel.unsubscribed?
134+
135+
@channel.unsubscribe_from_channel
136+
assert @channel.unsubscribed?
137+
end
138+
129139
test "connection identifiers" do
130140
assert_equal @user.name, @channel.current_user.name
131141
end

actioncable/test/channel/stream_test.rb

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require "minitest/mock"
55
require "stubs/test_connection"
66
require "stubs/room"
7+
require "concurrent/atomic/cyclic_barrier"
78

89
module ActionCable::StreamTests
910
class Connection < ActionCable::Connection::Base
@@ -280,6 +281,63 @@ class StreamTest < ActionCable::TestCase
280281
end
281282
end
282283

284+
test "concurrent unsubscribe_from_channel and stream_from do not raise RuntimeError" do
285+
threads = []
286+
run_in_eventmachine do
287+
connection = TestConnection.new
288+
connection.pubsub.unsubscribe_latency = 0.1
289+
290+
channel = ChatChannel.new connection, "{id: 1}", id: 1
291+
channel.subscribe_to_channel
292+
293+
# Set up initial streams
294+
channel.stream_from "room_one"
295+
channel.stream_from "room_two"
296+
wait_for_async
297+
298+
# Create barriers to synchronize thread execution
299+
barrier = Concurrent::CyclicBarrier.new(2)
300+
301+
exception_caught = nil
302+
303+
# Thread 1: calls unsubscribe_from_channel
304+
thread1 = Thread.new do
305+
barrier.wait
306+
# Add a small delay to increase the chance of concurrent execution
307+
sleep 0.001
308+
channel.unsubscribe_from_channel
309+
rescue => e
310+
exception_caught = e
311+
ensure
312+
barrier.wait
313+
end
314+
threads << thread1
315+
316+
# Thread 2: calls stream_from during unsubscribe_from_channel iteration
317+
thread2 = Thread.new do
318+
barrier.wait
319+
# Try to add streams while unsubscribe_from_channel is potentially iterating
320+
10.times do |i|
321+
channel.stream_from "concurrent_room_#{i}"
322+
sleep 0.0001 # Small delay to interleave with unsubscribe_from_channel
323+
end
324+
rescue => e
325+
exception_caught = e
326+
ensure
327+
barrier.wait
328+
end
329+
threads << thread2
330+
331+
thread1.join
332+
thread2.join
333+
334+
# Ensure no RuntimeError was raised during concurrent access
335+
assert_nil exception_caught, "Concurrent unsubscribe_from_channel and stream_from should not raise RuntimeError: #{exception_caught}"
336+
end
337+
ensure
338+
threads.each(&:kill)
339+
end
340+
283341
private
284342
def subscribers_of(connection)
285343
connection

actioncable/test/stubs/test_adapter.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
# frozen_string_literal: true
22

33
class SuccessAdapter < ActionCable::SubscriptionAdapter::Base
4+
attr_accessor :unsubscribe_latency
5+
6+
def initialize(...)
7+
super
8+
@unsubscribe_latency = nil
9+
end
10+
411
def broadcast(channel, payload)
512
end
613

@@ -10,6 +17,7 @@ def subscribe(channel, callback, success_callback = nil)
1017
end
1118

1219
def unsubscribe(channel, callback)
20+
sleep @unsubscribe_latency if @unsubscribe_latency
1321
subscriber_map[channel].delete(callback)
1422
subscriber_map.delete(channel) if subscriber_map[channel].empty?
1523
@@unsubscribe_called = { channel: channel, callback: callback }

0 commit comments

Comments
 (0)