Skip to content

Commit 2fe6254

Browse files
OuYangJinTingbyroot
authored andcommitted
Prevent the stream_from method throwing RuntimeError(can't add a new key into hash during iteration)
1 parent b8d22ce commit 2fe6254

File tree

5 files changed

+73
-0
lines changed

5 files changed

+73
-0
lines changed

actioncable/lib/action_cable/channel/base.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ def initialize(connection, identifier, params = {})
165165

166166
@reject_subscription = nil
167167
@subscription_confirmation_sent = nil
168+
@unsubscribed = false
168169

169170
delegate_connection_identifiers
170171
end
@@ -200,11 +201,16 @@ def subscribe_to_channel
200201
# cleanup with callbacks. This method is not intended to be called directly by
201202
# the user. Instead, override the #unsubscribed callback.
202203
def unsubscribe_from_channel # :nodoc:
204+
@unsubscribed = true
203205
run_callbacks :unsubscribe do
204206
unsubscribed
205207
end
206208
end
207209

210+
def unsubscribed? # :nodoc:
211+
@unsubscribed
212+
end
213+
208214
private
209215
# Called once a consumer has become a subscriber of the channel. Usually the
210216
# place to set up any streams you want this channel to be sending to the

actioncable/lib/action_cable/channel/streams.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ module Streams
8888
# callback. Defaults to `coder: nil` which does no decoding, passes raw
8989
# messages.
9090
def stream_from(broadcasting, callback = nil, coder: nil, &block)
91+
return if unsubscribed?
92+
9193
broadcasting = String(broadcasting)
9294

9395
# Don't send the confirmation until pubsub#subscribe is successful

actioncable/test/channel/base_test.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,16 @@ def error_handler
126126
assert_not_predicate @channel, :subscribed?
127127
end
128128

129+
test "unsubscribed? method returns correct status" do
130+
assert_not @channel.unsubscribed?
131+
132+
@channel.subscribe_to_channel
133+
assert_not @channel.unsubscribed?
134+
135+
@channel.unsubscribe_from_channel
136+
assert @channel.unsubscribed?
137+
end
138+
129139
test "connection identifiers" do
130140
assert_equal @user.name, @channel.current_user.name
131141
end

actioncable/test/channel/stream_test.rb

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require "minitest/mock"
55
require "stubs/test_connection"
66
require "stubs/room"
7+
require "concurrent/atomic/cyclic_barrier"
78

89
module ActionCable::StreamTests
910
class Connection < ActionCable::Connection::Base
@@ -280,6 +281,59 @@ class StreamTest < ActionCable::TestCase
280281
end
281282
end
282283

284+
test "concurrent unsubscribe_from_channel and stream_from do not raise RuntimeError" do
285+
ENV["UNSUBSCRIBE_SLEEP_TIME"] = "0.0001" # Set a delay to increase the chance of concurrent execution
286+
run_in_eventmachine do
287+
connection = TestConnection.new
288+
channel = ChatChannel.new connection, "{id: 1}", id: 1
289+
channel.subscribe_to_channel
290+
291+
# Set up initial streams
292+
channel.stream_from "room_one"
293+
channel.stream_from "room_two"
294+
wait_for_async
295+
296+
# Create barriers to synchronize thread execution
297+
barrier = Concurrent::CyclicBarrier.new(2)
298+
299+
exception_caught = nil
300+
301+
# Thread 1: calls unsubscribe_from_channel
302+
thread1 = Thread.new do
303+
barrier.wait
304+
# Add a small delay to increase the chance of concurrent execution
305+
sleep 0.001
306+
channel.unsubscribe_from_channel
307+
rescue => e
308+
exception_caught = e
309+
ensure
310+
barrier.wait
311+
end
312+
313+
# Thread 2: calls stream_from during unsubscribe_from_channel iteration
314+
thread2 = Thread.new do
315+
barrier.wait
316+
# Try to add streams while unsubscribe_from_channel is potentially iterating
317+
10.times do |i|
318+
channel.stream_from "concurrent_room_#{i}"
319+
sleep 0.0001 # Small delay to interleave with unsubscribe_from_channel
320+
end
321+
rescue => e
322+
exception_caught = e
323+
ensure
324+
barrier.wait
325+
end
326+
327+
thread1.join
328+
thread2.join
329+
330+
# Ensure no RuntimeError was raised during concurrent access
331+
assert_nil exception_caught, "Concurrent unsubscribe_from_channel and stream_from should not raise RuntimeError: #{exception_caught}"
332+
end
333+
ensure
334+
ENV.delete("UNSUBSCRIBE_SLEEP_TIME")
335+
end
336+
283337
private
284338
def subscribers_of(connection)
285339
connection

actioncable/test/stubs/test_adapter.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ def subscribe(channel, callback, success_callback = nil)
1010
end
1111

1212
def unsubscribe(channel, callback)
13+
sleep ENV["UNSUBSCRIBE_SLEEP_TIME"].to_f if ENV["UNSUBSCRIBE_SLEEP_TIME"]
1314
subscriber_map[channel].delete(callback)
1415
subscriber_map.delete(channel) if subscriber_map[channel].empty?
1516
@@unsubscribe_called = { channel: channel, callback: callback }

0 commit comments

Comments
 (0)