Skip to content

Commit 5ba0e5c

Browse files
committed
Fix psubscribe, unsubscribe, punsubscribe
- Spawn subscription loop with Threads.@Spawn instead of @async - Add 'key' field to psubscribe struct - Make callback functions in subscription accept 1 argument: SubscriptionMessage, instead of just SubscriptionMessage.message - Lock pack_command to prevent race conditions writing to the same socket
1 parent d0f4bb9 commit 5ba0e5c

File tree

5 files changed

+19
-15
lines changed

5 files changed

+19
-15
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Redis"
22
uuid = "0cf705f9-a9e2-50d1-a699-2b372a39b750"
3-
version = "1.0.0"
3+
version = "1.1.0"
44

55
[deps]
66
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ Multiple channels can be subscribed together by providing a `Dict{String, Functi
129129

130130
```julia
131131
x = Any[]
132-
f(y) = push!(x, y)
132+
f(y::SubscriptionMessage) = push!(x, y)
133133
sub = open_subscription(conn)
134134
d = Dict{String, Function}({"baz" => f, "bar" => println})
135135
subscribe(sub, d)
@@ -140,7 +140,7 @@ publish(conn, "bar", "anything") # "anything" written to stdout
140140

141141
Pattern subscription works in the same way through use of the `psubscribe` function. Channels can be unsubscribed through `unsubscribe` and `punsubscribe`.
142142

143-
Note that the async event loop currently runs until the `SubscriptionConnection` is disconnected, regardless of how many subscriptions the client has active. Event loop error handling should be improved in an update to the API.
143+
Note that the event loop spawned with Threads.@spawn currently runs until the `SubscriptionConnection` is disconnected, regardless of how many subscriptions the client has active. Event loop error handling should be improved in an update to the API.
144144

145145
### Subscription error handling
146146

src/client.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ end
145145
nullcb(err) = @debug err
146146
function open_subscription(conn::RedisConnection, err_callback=nullcb)
147147
s = SubscriptionConnection(conn)
148-
@async subscription_loop(s, err_callback)
148+
Threads.@spawn subscription_loop(s, err_callback)
149149
s
150150
end
151151

@@ -157,9 +157,9 @@ function subscription_loop(conn::SubscriptionConnection, err_callback::Function)
157157
reply = convert_reply(reply)
158158
message = SubscriptionMessage(reply)
159159
if message.message_type == SubscriptionMessageType.Message
160-
conn.callbacks[message.channel](message.message)
160+
conn.callbacks[message.channel](message)
161161
elseif message.message_type == SubscriptionMessageType.Pmessage
162-
conn.pcallbacks[message.channel](message.message)
162+
conn.pcallbacks[message.channel](message)
163163
end
164164
catch err
165165
err_callback(err)

src/commands.jl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ function unsubscribe(conn::SubscriptionConnection, channels...)
304304
delete!(conn.callbacks, channel)
305305
end
306306
for channel in channels
307-
execute_command_without_reply(conn, unshift!([channel], "unsubscribe"))
307+
execute_command_without_reply(conn, pushfirst!([channel], "unsubscribe"))
308308
end
309309
end
310310
# function unsubscribe(conn::SubscriptionConnection, channels...)
@@ -319,13 +319,13 @@ function _psubscribe(conn::SubscriptionConnection, patterns::Array)
319319
end
320320

321321
function psubscribe(conn::SubscriptionConnection, pattern::AbstractString, callback::Function)
322-
conn.callbacks[pattern] = callback
322+
conn.pcallbacks[pattern] = callback
323323
_psubscribe(conn, [pattern])
324324
end
325325

326326
function psubscribe(conn::SubscriptionConnection, subs::Dict{AbstractString, Function})
327327
for (pattern, callback) in subs
328-
conn.callbacks[pattern] = callback
328+
conn.pcallbacks[pattern] = callback
329329
end
330330
_psubscribe(conn, collect(values(subs)))
331331
end
@@ -334,7 +334,9 @@ function punsubscribe(conn::SubscriptionConnection, patterns...)
334334
for pattern in patterns
335335
delete!(conn.pcallbacks, pattern)
336336
end
337-
execute_command(conn, pushfirst!(patterns, "punsubscribe"))
337+
for pattern in patterns
338+
execute_command_without_reply(conn, pushfirst!([pattern], "unsubscribe"))
339+
end
338340
end
339341

340342
#Need a specialized version of execute to keep the connection in the transaction state

src/parser.jl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ end
8888

8989
function execute_command_without_reply(conn::RedisConnectionBase, command)
9090
is_connected(conn) || throw(ConnectionException("Socket is disconnected"))
91-
pack_command(conn.socket, command)
91+
lock(conn.socket.lock) do
92+
pack_command(conn.socket, command)
93+
end
9294
end
9395

9496
function execute_command(conn::RedisConnectionBase, command::Vector)
@@ -105,15 +107,15 @@ end
105107
struct SubscriptionMessage
106108
message_type
107109
channel::AbstractString
110+
key::Union{AbstractString,Nothing}
108111
message::AbstractString
109112

110113
function SubscriptionMessage(reply::AbstractArray)
111-
notification = reply
112-
message_type = notification[1]
114+
message_type = reply[1]
113115
if message_type == "message"
114-
new(SubscriptionMessageType.Message, notification[2], notification[3])
116+
new(SubscriptionMessageType.Message, reply[2], nothing, reply[3])
115117
elseif message_type == "pmessage"
116-
new(SubscriptionMessageType.Pmessage, notification[2], notification[4])
118+
new(SubscriptionMessageType.Pmessage, reply[2], reply[3], reply[4])
117119
else
118120
new(SubscriptionMessageType.Other, "", "")
119121
end

0 commit comments

Comments
 (0)