Skip to content

Commit fcd0353

Browse files
authored
Merge pull request #102 from JuliaDatabases/tan/psub
fix: psubscribe method issues
2 parents b76a05f + 91ded87 commit fcd0353

File tree

3 files changed

+73
-24
lines changed

3 files changed

+73
-24
lines changed

src/Redis.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ export discard, exec, multi, unwatch, watch
4545
# Scripting commands
4646
export evalscript, evalsha, script_exists, script_flush, script_kill, script_load
4747
# PubSub commands
48-
export subscribe, subscribe_data, publish, psubscribe, punsubscribe, unsubscribe
48+
export subscribe, subscribe_data, publish, psubscribe, psubscribe_data, punsubscribe, unsubscribe
4949
# Server commands
5050
export bgrewriteaof, bgsave, client_list, client_id, client_pause, client_setname, cluster_slots,
5151
command, command_count, command_info, config_get, config_resetstat, config_rewrite,

src/commands.jl

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,35 +287,46 @@ function _subscribe(conn::SubscriptionConnection, channels::Array)
287287
end
288288

289289
@enum CallbackType CallbackMsgStruct = 1 CallbackMsgData = 2
290+
@enum SubscriptionType SubscriptionPlain = 1 SubscriptionPattern = 2
291+
292+
function _register_callback(
293+
conn::SubscriptionConnection,
294+
channel::AbstractString,
295+
subscription_type::SubscriptionType,
296+
callback::Function,
297+
callback_type::CallbackType,
298+
)
299+
callbacks = subscription_type == SubscriptionPlain ? conn.callbacks :
300+
subscription_type == SubscriptionPattern ? conn.pcallbacks :
301+
error("Invalid callback type")
290302

291-
function _register_callback(conn::SubscriptionConnection, channel::AbstractString, callback::Function, callback_type::CallbackType)
292303
if callback_type == CallbackMsgStruct
293-
conn.callbacks[channel] = callback
304+
callbacks[channel] = callback
294305
elseif callback_type == CallbackMsgData
295-
conn.callbacks[channel] = (msg_struct)->callback(msg_struct.message)
306+
callbacks[channel] = (msg_struct)->callback(msg_struct.message)
296307
end
297308
end
298309

299310
function subscribe(conn::SubscriptionConnection, channel::AbstractString, callback::Function)
300-
_register_callback(conn, channel, callback, CallbackMsgStruct)
311+
_register_callback(conn, channel, SubscriptionPlain, callback, CallbackMsgStruct)
301312
_subscribe(conn, [channel])
302313
end
303314

304-
function subscribe(conn::SubscriptionConnection, subs::Dict{AbstractString, Function})
315+
function subscribe(conn::SubscriptionConnection, subs::Dict)
305316
for (channel, callback) in subs
306-
_register_callback(conn, channel, callback, CallbackMsgStruct)
317+
_register_callback(conn, channel, SubscriptionPlain, callback, CallbackMsgStruct)
307318
end
308319
_subscribe(conn, collect(keys(subs)))
309320
end
310321

311322
function subscribe_data(conn::SubscriptionConnection, channel::AbstractString, callback::Function)
312-
_register_callback(conn, channel, callback, CallbackMsgData)
323+
_register_callback(conn, channel, SubscriptionPlain, callback, CallbackMsgData)
313324
_subscribe(conn, [channel])
314325
end
315326

316-
function subscribe_data(conn::SubscriptionConnection, subs::Dict{AbstractString, Function})
327+
function subscribe_data(conn::SubscriptionConnection, subs::Dict)
317328
for (channel, callback) in subs
318-
_register_callback(conn, channel, callback, CallbackMsgData)
329+
_register_callback(conn, channel, SubscriptionPlain, callback, CallbackMsgData)
319330
end
320331
_subscribe(conn, collect(keys(subs)))
321332
end
@@ -341,15 +352,27 @@ function _psubscribe(conn::SubscriptionConnection, patterns::Array)
341352
end
342353

343354
function psubscribe(conn::SubscriptionConnection, pattern::AbstractString, callback::Function)
344-
conn.pcallbacks[pattern] = callback
355+
_register_callback(conn, pattern, SubscriptionPattern, callback, CallbackMsgStruct)
345356
_psubscribe(conn, [pattern])
346357
end
347358

348-
function psubscribe(conn::SubscriptionConnection, subs::Dict{AbstractString, Function})
359+
function psubscribe(conn::SubscriptionConnection, subs::Dict)
349360
for (pattern, callback) in subs
350-
conn.pcallbacks[pattern] = callback
361+
_register_callback(conn, pattern, SubscriptionPattern, callback, CallbackMsgStruct)
351362
end
352-
_psubscribe(conn, collect(values(subs)))
363+
_psubscribe(conn, collect(keys(subs)))
364+
end
365+
366+
function psubscribe_data(conn::SubscriptionConnection, pattern::AbstractString, callback::Function)
367+
_register_callback(conn, pattern, SubscriptionPattern, callback, CallbackMsgData)
368+
_psubscribe(conn, [pattern])
369+
end
370+
371+
function psubscribe_data(conn::SubscriptionConnection, subs::Dict)
372+
for (pattern, callback) in subs
373+
_register_callback(conn, pattern, SubscriptionPattern, callback, CallbackMsgData)
374+
end
375+
_psubscribe(conn, collect(keys(subs)))
353376
end
354377

355378
function punsubscribe(conn::SubscriptionConnection, patterns...)

test/redis_tests.jl

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -395,18 +395,44 @@ end
395395
@warn "Error while processing subscription: $err"
396396
return nothing
397397
end
398-
x = Any[]
399-
function f(y::AbstractString)
400-
push!(x, y)
398+
399+
string_matched_results = Any[]
400+
pattern_matched_results = Any[]
401+
402+
function string_matched(y::AbstractString)
403+
push!(string_matched_results, y)
404+
end
405+
406+
function pattern_matched(y::AbstractString)
407+
push!(pattern_matched_results, y)
401408
end
409+
402410
subs = open_subscription(conn, handleException) #handleException is called when an exception occurs
403-
subscribe_data(subs, "channel", f)
404-
subscribe(subs, "duplicate", y->f(y.message))
405-
@test publish(conn, "channel", "hello, world!") > 0 #Number of connected clients returned
406-
@test publish(conn, "channel", "Okay, bye!") > 0 #Number of connected clients returned
407-
@test publish(conn, "duplicate", "hello world 2") > 0 #Number of connected clients returned
408-
sleep(2)
409-
@test x == ["hello, world!", "Okay, bye!", "hello world 2"]
411+
412+
subscribe_data(subs, "1channel", string_matched)
413+
psubscribe_data(subs, "1cha??el", pattern_matched)
414+
415+
subscribe(subs, "2channel", y->string_matched(y.message))
416+
psubscribe(subs, "2chan*", y->pattern_matched(y.message))
417+
418+
subscribe_data(subs, Dict("3channel" => string_matched))
419+
psubscribe_data(subs, Dict("3cha??el" => pattern_matched))
420+
421+
subscribe(subs, Dict("4channel" => y->string_matched(y.message)))
422+
psubscribe(subs, Dict("4chan*" => y->pattern_matched(y.message)))
423+
424+
published_messages = ["hello, world1!", "hello, world 2!", "hello, world 3!", "hello, world 4!"]
425+
426+
for idx in 1:4
427+
@test publish(conn, string(idx)*"channel", published_messages[idx]) > 0 #Number of connected clients returned
428+
end
429+
430+
timedwait(5.0; pollint=1.0) do # wait for the messages to be received
431+
length(string_matched_results) == 4 && length(pattern_matched_results) == 4
432+
end
433+
434+
@test string_matched_results == published_messages
435+
@test pattern_matched_results == published_messages
410436

411437
# following command prints ("Invalid response received: ")
412438
disconnect(subs)

0 commit comments

Comments
 (0)