Skip to content

Commit 13ee48d

Browse files
authored
Refactor remote connections (#76)
* Refactor remote connections to use the conneciton identifier to save memory. Fixes #75 * Refactor how remote connections are handled. Fixes #75
1 parent 6ed5f65 commit 13ee48d

File tree

4 files changed

+43
-26
lines changed

4 files changed

+43
-26
lines changed

spec/cable/server_spec.cr

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
require "../spec_helper"
2+
3+
include RequestHelpers
4+
5+
describe Cable::Server do
6+
describe "#remote_connections" do
7+
it "finds the connection and disconnects it" do
8+
Cable.reset_server
9+
Cable.temp_config(backend_class: Cable::DevBackend) do
10+
socket = DummySocket.new(IO::Memory.new)
11+
request = builds_request("abc123")
12+
connection = ApplicationCable::Connection.new(request, socket)
13+
Cable.server.add_connection(connection)
14+
connection.connection_identifier.should contain("abc123")
15+
16+
Cable.server.remote_connections.find(connection.connection_identifier).disconnect
17+
Cable::DevBackend.published_messages.should contain({"cable_internal/#{connection.connection_identifier}", "disconnect"})
18+
connection.close
19+
end
20+
end
21+
end
22+
end

src/cable/connection.cr

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ module Cable
4040
subscribe_to_internal_channel
4141
rescue e : UnathorizedConnectionException
4242
reject_connection!
43+
unsubscribe_from_internal_channel
4344
socket.close(HTTP::WebSocket::CloseCode::NormalClosure, "Farewell")
4445
Cable::Logger.info { ("An unauthorized connection attempt was rejected") }
4546
end
@@ -168,14 +169,14 @@ module Cable
168169
server = Cable.server
169170
channel = self
170171
spawn(name: "Cable::Connection - subscribe_to_internal_channel") do
171-
server.add_internal_subscription(internal_channel, channel)
172-
server.backend.open_subscribe_connection(internal_channel)
172+
if !channel.connection_rejected?
173+
server.backend.open_subscribe_connection(internal_channel)
174+
end
173175
end
174176
end
175177

176178
private def unsubscribe_from_internal_channel
177179
Cable.server.backend.unsubscribe(internal_channel)
178-
Cable.server.remove_internal_subscription(internal_channel)
179180
end
180181
end
181182
end

src/cable/remote_connections.cr

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,22 @@ module Cable
1111
#
1212
# find("1234")
1313
# ```
14-
def find(identifier : String)
14+
# NOTE: This code may run on a different machine than where the `@server.connections`
15+
# is actually sitting in memory. For this reason, we just pass the value right through
16+
# the backend (i.e. redis), and let that broadcast out to all running instances.
17+
def find(identifier : String) : RemoteConnection
1518
RemoteConnection.new(@server, identifier)
1619
end
1720

1821
private class RemoteConnection
1922
def initialize(@server : Cable::Server, @value : String)
2023
end
2124

22-
def disconnect
25+
def disconnect : Nil
2326
@server.backend.publish_message(internal_channel, Cable.message(:disconnect))
2427
end
2528

26-
private def internal_channel
29+
private def internal_channel : String
2730
"cable_internal/#{@value}"
2831
end
2932
end

src/cable/server.cr

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ module Cable
2323
class Server
2424
include Debug
2525

26+
# The String key is the `connection_identifier` value for `Cable::Connection`
27+
getter connections = {} of String => Cable::Connection
2628
getter errors = 0
27-
getter connections = {} of String => Connection
2829
getter fiber_channel = ::Channel({String, String}).new
2930
getter pinger : Cable::RedisPinger do
3031
Cable::RedisPinger.new(self)
@@ -40,7 +41,6 @@ module Cable
4041
end
4142

4243
@channels = {} of String => Channels
43-
@_internal_subscriptions = {} of String => Cable::Connection
4444
@channel_mutex = Mutex.new
4545

4646
def initialize
@@ -65,18 +65,6 @@ module Cable
6565
connections.delete(connection_id).try(&.close)
6666
end
6767

68-
def add_internal_subscription(internal_channel : String, connection : Cable::Connection)
69-
if internal_channel.presence && !connection.closed?
70-
@_internal_subscriptions[internal_channel] = connection
71-
end
72-
end
73-
74-
def remove_internal_subscription(internal_channel : String)
75-
if @_internal_subscriptions.has_key?(internal_channel)
76-
@_internal_subscriptions.delete(internal_channel)
77-
end
78-
end
79-
8068
def subscribe_channel(channel : Channel, identifier : String)
8169
@channel_mutex.synchronize do
8270
if !@channels.has_key?(identifier)
@@ -132,12 +120,13 @@ module Cable
132120
Cable.settings.on_error.call(e, "IO::Error Exception: #{e.message} -> #{self.class.name}#send_to_channels(channel, message)")
133121
end
134122

135-
def send_to_internal_channels(channel_identifier : String, message : String)
136-
if internal_channel = @_internal_subscriptions[channel_identifier]?
123+
def send_to_internal_connections(connection_identifier : String, message : String)
124+
if internal_connection = connections[connection_identifier]?
137125
case message
138126
when Cable.message(:disconnect)
139-
Cable::Logger.info { "Removing connection (#{channel_identifier})" }
140-
internal_channel.close
127+
Cable::Logger.info { "Removing connection (#{connection_identifier})" }
128+
internal_connection.close
129+
remove_connection(connection_identifier)
141130
end
142131
end
143132
end
@@ -182,8 +171,10 @@ module Cable
182171
spawn(name: "Cable::Server - process_subscribed_messages") do
183172
while received = fiber_channel.receive
184173
channel, message = received
185-
if channel.includes?("cable_internal")
186-
server.send_to_internal_channels(channel, message)
174+
if channel.starts_with?("cable_internal")
175+
identifier = channel.split('/').last
176+
connection_identifier = server.connections.keys.find!(&.starts_with?(identifier))
177+
server.send_to_internal_connections(connection_identifier, message)
187178
else
188179
server.send_to_channels(channel, message)
189180
end

0 commit comments

Comments
 (0)