Skip to content

Commit 65f8886

Browse files
somebody32djanowski
authored andcommitted
added subscribe and psubscribe with timeout
1 parent 6e09149 commit 65f8886

File tree

4 files changed

+72
-8
lines changed

4 files changed

+72
-8
lines changed

lib/redis.rb

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2134,7 +2134,14 @@ def subscribed?
21342134
# Listen for messages published to the given channels.
21352135
def subscribe(*channels, &block)
21362136
synchronize do |client|
2137-
_subscription(:subscribe, channels, block)
2137+
_subscription(:subscribe, 0, channels, block)
2138+
end
2139+
end
2140+
2141+
# Listen for messages published to the given channels. Throw a timeout error if there is no messages for a timeout period.
2142+
def subscribe_with_timeout(timeout, *channels, &block)
2143+
synchronize do |client|
2144+
_subscription(:subscribe_with_timeout, timeout, channels, block)
21382145
end
21392146
end
21402147

@@ -2149,7 +2156,14 @@ def unsubscribe(*channels)
21492156
# Listen for messages published to channels matching the given patterns.
21502157
def psubscribe(*channels, &block)
21512158
synchronize do |client|
2152-
_subscription(:psubscribe, channels, block)
2159+
_subscription(:psubscribe, 0, channels, block)
2160+
end
2161+
end
2162+
2163+
# Listen for messages published to channels matching the given patterns. Throw a timeout error if there is no messages for a timeout period.
2164+
def psubscribe_with_timeout(timeout, *channels, &block)
2165+
synchronize do |client|
2166+
_subscription(:psubscribe_with_timeout, timeout, channels, block)
21532167
end
21542168
end
21552169

@@ -2749,12 +2763,16 @@ def _identity
27492763
}
27502764
end
27512765

2752-
def _subscription(method, channels, block)
2766+
def _subscription(method, timeout, channels, block)
27532767
return @client.call([method] + channels) if subscribed?
27542768

27552769
begin
27562770
original, @client = @client, SubscribedClient.new(@client)
2757-
@client.send(method, *channels, &block)
2771+
if timeout > 0
2772+
@client.send(method, timeout, *channels, &block)
2773+
else
2774+
@client.send(method, *channels, &block)
2775+
end
27582776
ensure
27592777
@client = original
27602778
end

lib/redis/client.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,10 @@ def call(command, &block)
127127
end
128128
end
129129

130-
def call_loop(command)
130+
def call_loop(command, timeout = 0)
131131
error = nil
132132

133-
result = without_socket_timeout do
133+
result = with_socket_timeout(timeout) do
134134
process([command]) do
135135
loop do
136136
reply = read

lib/redis/subscribe.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,18 @@ def subscribe(*channels, &block)
1212
subscription("subscribe", "unsubscribe", channels, block)
1313
end
1414

15+
def subscribe_with_timeout(timeout, *channels, &block)
16+
subscription("subscribe", "unsubscribe", channels, block, timeout)
17+
end
18+
1519
def psubscribe(*channels, &block)
1620
subscription("psubscribe", "punsubscribe", channels, block)
1721
end
1822

23+
def psubscribe_with_timeout(timeout, *channels, &block)
24+
subscription("psubscribe", "punsubscribe", channels, block, timeout)
25+
end
26+
1927
def unsubscribe(*channels)
2028
call([:unsubscribe, *channels])
2129
end
@@ -26,13 +34,13 @@ def punsubscribe(*channels)
2634

2735
protected
2836

29-
def subscription(start, stop, channels, block)
37+
def subscription(start, stop, channels, block, timeout = 0)
3038
sub = Subscription.new(&block)
3139

3240
unsubscribed = false
3341

3442
begin
35-
@client.call_loop([start, *channels]) do |line|
43+
@client.call_loop([start, *channels], timeout) do |line|
3644
type, *rest = line
3745
sub.callbacks[type].call(*rest)
3846
unsubscribed = type == stop && rest.last == 0

test/publish_subscribe_test.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,4 +251,42 @@ def test_subscribe_past_a_timeout
251251
assert received
252252
end
253253
end
254+
255+
def test_subscribe_with_timeout
256+
assert_raise Redis::TimeoutError do
257+
sleep = %{sleep #{OPTIONS[:timeout] * 2}}
258+
publish = %{echo "publish foo bar\r\n" | nc localhost #{OPTIONS[:port]}}
259+
cmd = [sleep, publish].join("; ")
260+
261+
IO.popen(cmd, "r+") do |pipe|
262+
received = false
263+
264+
r.subscribe_with_timeout(OPTIONS[:timeout], "foo") do |on|
265+
on.message do |channel, message|
266+
received = true
267+
r.unsubscribe
268+
end
269+
end
270+
end
271+
end
272+
end
273+
274+
def test_psubscribe_with_timeout
275+
assert_raise Redis::TimeoutError do
276+
sleep = %{sleep #{OPTIONS[:timeout] * 2}}
277+
publish = %{echo "publish foo bar\r\n" | nc localhost #{OPTIONS[:port]}}
278+
cmd = [sleep, publish].join("; ")
279+
280+
IO.popen(cmd, "r+") do |pipe|
281+
received = false
282+
283+
r.psubscribe_with_timeout(OPTIONS[:timeout], "f*") do |on|
284+
on.message do |channel, message|
285+
received = true
286+
r.unsubscribe
287+
end
288+
end
289+
end
290+
end
291+
end
254292
end

0 commit comments

Comments
 (0)