Skip to content

Commit bd58eda

Browse files
authored
Merge pull request #95 from JuliaDatabases/tan/pubsub
fix subscription callback invoke and pubsub tests
2 parents 8adc065 + 1c519e4 commit bd58eda

File tree

4 files changed

+33
-9
lines changed

4 files changed

+33
-9
lines changed

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,13 @@ publish(conn, "channel", "hello, world!")
116116

117117
Subscriptions are handled using the `SubscriptionConnection`. Similar to the `TransactionConnection`, the `SubscriptionConnection` is constructed from an existing `RedisConnection`. Once created, the `SubscriptionConnection` maintains a simple event loop that will call the user's defined function whenever a message is received on the specified channel.
118118

119+
If the `subscribe_data` method is used for subscription then the callback function will be passed the `message` field of `SubscriptionMessage` instance. If the `subscribe` method is used for subscription, the callback will be passed a `SubscriptionMessage` directly, which contains the channel, message type and key as well.
120+
119121
```julia
120122
x = Any[]
121123
f(y) = push!(x, y)
122124
sub = open_subscription(conn)
123-
subscribe(sub, "baz", f)
125+
subscribe_data(sub, "baz", f)
124126
publish(conn, "baz", "foobar")
125127
x # Returns ["foobar"]
126128
```
@@ -129,9 +131,9 @@ Multiple channels can be subscribed together by providing a `Dict{String, Functi
129131

130132
```julia
131133
x = Any[]
132-
f(y::SubscriptionMessage) = push!(x, y)
134+
f(y::SubscriptionMessage) = push!(x, y.message)
133135
sub = open_subscription(conn)
134-
d = Dict{String, Function}({"baz" => f, "bar" => println})
136+
d = Dict{String, Function}({"baz" => f, "bar" => y->println(y.message)})
135137
subscribe(sub, d)
136138
publish(conn, "baz", "foobar")
137139
x # Returns ["foobar"]

src/Redis.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export discard, exec, multi, unwatch, watch
4545
# Scripting commands
4646
export evalscript, evalsha, script_exists, script_flush, script_kill, script_load
4747
# PubSub commands
48-
export subscribe, publish, psubscribe, punsubscribe, unsubscribe
48+
export subscribe, subscribe_data, publish, psubscribe, punsubscribe, unsubscribe
4949
# Server commands
5050
export bgrewriteaof, bgsave, client_list, client_id, client_pause, client_setname, cluster_slots,
5151
command, command_count, command_info, config_get, config_resetstat, config_rewrite,

src/commands.jl

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,14 +286,36 @@ function _subscribe(conn::SubscriptionConnection, channels::Array)
286286
execute_command_without_reply(conn, pushfirst!(channels, "subscribe"))
287287
end
288288

289+
@enum CallbackType CallbackMsgStruct = 1 CallbackMsgData = 2
290+
291+
function _register_callback(conn::SubscriptionConnection, channel::AbstractString, callback::Function, callback_type::CallbackType)
292+
if callback_type == CallbackMsgStruct
293+
conn.callbacks[channel] = callback
294+
elseif callback_type == CallbackMsgData
295+
conn.callbacks[channel] = (msg_struct)->callback(msg_struct.message)
296+
end
297+
end
298+
289299
function subscribe(conn::SubscriptionConnection, channel::AbstractString, callback::Function)
290-
conn.callbacks[channel] = callback
300+
_register_callback(conn, channel, callback, CallbackMsgStruct)
291301
_subscribe(conn, [channel])
292302
end
293303

294304
function subscribe(conn::SubscriptionConnection, subs::Dict{AbstractString, Function})
295305
for (channel, callback) in subs
296-
conn.callbacks[channel] = callback
306+
_register_callback(conn, channel, callback, CallbackMsgStruct)
307+
end
308+
_subscribe(conn, collect(keys(subs)))
309+
end
310+
311+
function subscribe_data(conn::SubscriptionConnection, channel::AbstractString, callback::Function)
312+
_register_callback(conn, channel, callback, CallbackMsgData)
313+
_subscribe(conn, [channel])
314+
end
315+
316+
function subscribe_data(conn::SubscriptionConnection, subs::Dict{AbstractString, Function})
317+
for (channel, callback) in subs
318+
_register_callback(conn, channel, callback, CallbackMsgData)
297319
end
298320
_subscribe(conn, collect(keys(subs)))
299321
end

test/redis_tests.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,12 +396,12 @@ end
396396
return nothing
397397
end
398398
x = Any[]
399-
function f(y::String)
399+
function f(y::AbstractString)
400400
push!(x, y)
401401
end
402402
subs = open_subscription(conn, handleException) #handleException is called when an exception occurs
403-
subscribe(subs, "channel", f)
404-
subscribe(subs, "duplicate", f)
403+
subscribe_data(subs, "channel", f)
404+
subscribe(subs, "duplicate", y->f(y.message))
405405
@test publish(conn, "channel", "hello, world!") > 0 #Number of connected clients returned
406406
@test publish(conn, "channel", "Okay, bye!") > 0 #Number of connected clients returned
407407
sleep(2)

0 commit comments

Comments
 (0)