Skip to content

Commit 07a3073

Browse files
authored
Wrap the Connection::CHANELS in a mutex to help thread safety (#98)
1 parent ef8dc8a commit 07a3073

File tree

3 files changed

+41
-20
lines changed

3 files changed

+41
-20
lines changed

src/cable.cr

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
require "mutex"
2+
require "set"
3+
require "uuid"
14
require "habitat"
25
require "json"
36
require "./cable/**"

src/cable/connection.cr

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
require "uuid"
2-
31
module Cable
42
abstract class Connection
53
class UnauthorizedConnectionException < Exception; end
@@ -12,7 +10,8 @@ module Cable
1210
getter socket
1311
getter started_at : Time = Time.utc
1412

15-
CHANNELS = {} of String => Hash(String, Cable::Channel)
13+
CHANNELS = {} of String => Hash(String, Cable::Channel)
14+
CHANNELS_MUTEX = Mutex.new
1615

1716
def identifier
1817
internal_identifier
@@ -53,25 +52,33 @@ module Cable
5352
end
5453

5554
def channels : Array(Cable::Channel)
56-
return Array(Cable::Channel).new unless Connection::CHANNELS.has_key?(connection_identifier)
57-
Connection::CHANNELS.[connection_identifier].values
55+
CHANNELS_MUTEX.synchronize do
56+
return Array(Cable::Channel).new unless CHANNELS.has_key?(connection_identifier)
57+
CHANNELS[connection_identifier].values
58+
end
5859
end
5960

6061
def closed? : Bool
6162
socket.closed?
6263
end
6364

6465
def close
65-
if Connection::CHANNELS.has_key?(connection_identifier)
66-
Connection::CHANNELS[connection_identifier].each do |identifier, channel|
67-
# the ordering here is important
68-
Connection::CHANNELS[connection_identifier].delete(identifier)
66+
channels_to_close = CHANNELS_MUTEX.synchronize do
67+
if CHANNELS.has_key?(connection_identifier)
68+
channels_copy = CHANNELS[connection_identifier].dup
69+
CHANNELS.delete(connection_identifier)
70+
channels_copy
71+
else
72+
nil
73+
end
74+
end
75+
76+
if channels_to_close
77+
channels_to_close.each do |identifier, channel|
6978
channel.close
7079
rescue e : IO::Error
7180
Cable.settings.on_error.call(e, "IO::Error: #{e.message} -> #{self.class.name}#close")
7281
end
73-
74-
Connection::CHANNELS.delete(connection_identifier)
7582
unsubscribe_from_internal_channel
7683
end
7784

@@ -111,8 +118,10 @@ module Cable
111118
identifier: payload.identifier.key,
112119
params: payload.channel_params
113120
)
114-
Connection::CHANNELS[connection_identifier] ||= {} of String => Cable::Channel
115-
Connection::CHANNELS[connection_identifier][payload.identifier.key] = channel
121+
CHANNELS_MUTEX.synchronize do
122+
CHANNELS[connection_identifier] ||= {} of String => Cable::Channel
123+
CHANNELS[connection_identifier][payload.identifier.key] = channel
124+
end
116125
channel.subscribed
117126

118127
if channel.subscription_rejected?
@@ -133,29 +142,41 @@ module Cable
133142

134143
# ensure we only allow subscribing to the same channel once from a connection
135144
def connection_requesting_duplicate_channel_subscription?(payload)
136-
return unless connection_key = Connection::CHANNELS.dig?(connection_identifier, payload.identifier.key)
145+
connection_key = CHANNELS_MUTEX.synchronize do
146+
CHANNELS.dig?(connection_identifier, payload.identifier.key)
147+
end
148+
return unless connection_key
137149

138150
connection_key.class.to_s == payload.channel
139151
end
140152

141153
def unsubscribe(payload : Cable::Payload)
142-
if channel = Connection::CHANNELS[connection_identifier].delete(payload.identifier.key)
154+
channel = CHANNELS_MUTEX.synchronize do
155+
CHANNELS[connection_identifier]?.try(&.delete(payload.identifier.key))
156+
end
157+
if channel
143158
channel.close
144159
Cable::Logger.info { "#{payload.channel} is transmitting the unsubscribe confirmation" }
145160
send_message({type: Cable.message(:unsubscribe), identifier: payload.identifier.key}.to_json)
146161
end
147162
end
148163

149164
def reject(payload : Cable::Payload)
150-
if channel = Connection::CHANNELS[connection_identifier].delete(payload.identifier.key)
165+
channel = CHANNELS_MUTEX.synchronize do
166+
CHANNELS[connection_identifier]?.try(&.delete(payload.identifier.key))
167+
end
168+
if channel
151169
channel.unsubscribed
152170
Cable::Logger.info { "#{channel.class} is transmitting the subscription rejection" }
153171
send_message({type: Cable.message(:rejection), identifier: payload.identifier.key}.to_json)
154172
end
155173
end
156174

157175
def message(payload : Cable::Payload)
158-
if channel = Connection::CHANNELS.dig?(connection_identifier, payload.identifier.key)
176+
channel = CHANNELS_MUTEX.synchronize do
177+
CHANNELS.dig?(connection_identifier, payload.identifier.key)
178+
end
179+
if channel
159180
if payload.action?
160181
Cable::Logger.info { "#{channel.class}#perform(\"#{payload.action}\", #{payload.data})" }
161182
channel.perform(payload.action, payload.data)

src/cable/server.cr

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
require "mutex"
2-
require "set"
3-
41
module Cable
52
alias Channels = Set(Cable::Channel)
63

0 commit comments

Comments
 (0)