Skip to content

Commit 081280d

Browse files
committed
Handle blocking commands inside pipelines and transactions
1 parent d837a0b commit 081280d

File tree

5 files changed

+70
-16
lines changed

5 files changed

+70
-16
lines changed

lib/redis.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,12 @@ def queue(*command)
106106
def commit
107107
synchronize do |client|
108108
begin
109-
client.call_pipelined(@queue[Thread.current.object_id])
109+
pipeline = Pipeline.new(client)
110+
@queue[Thread.current.object_id].each do |command|
111+
pipeline.call(command)
112+
end
113+
114+
client.call_pipelined(pipeline)
110115
ensure
111116
@queue.delete(Thread.current.object_id)
112117
end
@@ -2405,7 +2410,8 @@ def unwatch
24052410
def pipelined
24062411
synchronize do |client|
24072412
begin
2408-
original, @client = @client, Pipeline.new
2413+
pipeline = Pipeline.new(@client)
2414+
original, @client = @client, pipeline
24092415
yield(self)
24102416
original.call_pipeline(@client)
24112417
ensure
@@ -2450,7 +2456,7 @@ def multi
24502456
client.call([:multi])
24512457
else
24522458
begin
2453-
pipeline = Pipeline::Multi.new
2459+
pipeline = Pipeline::Multi.new(@client)
24542460
original, @client = @client, pipeline
24552461
yield(self)
24562462
original.call_pipeline(pipeline)

lib/redis/client.rb

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,11 @@ def call_loop(command, timeout = 0)
157157
end
158158

159159
def call_pipeline(pipeline)
160-
commands = pipeline.commands
161-
return [] if commands.empty?
160+
return [] if pipeline.futures.empty?
162161

163162
with_reconnect pipeline.with_reconnect? do
164163
begin
165-
pipeline.finish(call_pipelined(commands)).tap do
164+
pipeline.finish(call_pipelined(pipeline)).tap do
166165
self.db = pipeline.db if pipeline.db
167166
end
168167
rescue ConnectionError => e
@@ -175,8 +174,8 @@ def call_pipeline(pipeline)
175174
end
176175
end
177176

178-
def call_pipelined(commands)
179-
return [] if commands.empty?
177+
def call_pipelined(pipeline)
178+
return [] if pipeline.futures.empty?
180179

181180
# The method #ensure_connected (called from #process) reconnects once on
182181
# I/O errors. To make an effort in making sure that commands are not
@@ -186,15 +185,21 @@ def call_pipelined(commands)
186185
# already successfully executed commands. To circumvent this, don't retry
187186
# after the first reply has been read successfully.
188187

188+
commands = pipeline.commands
189+
189190
result = Array.new(commands.size)
190191
reconnect = @reconnect
191192

192193
begin
193194
exception = nil
194195

195196
process(commands) do
196-
commands.size.times do |i|
197-
reply = read
197+
pipeline.timeouts.each_with_index do |timeout, i|
198+
reply = if timeout
199+
with_socket_timeout(timeout) { read }
200+
else
201+
read
202+
end
198203
result[i] = reply
199204
@reconnect = false
200205
exception = reply if exception.nil? && reply.is_a?(CommandError)

lib/redis/pipeline.rb

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
class Redis
22
class Pipeline
33
attr_accessor :db
4+
attr_reader :client
45

56
attr :futures
67

7-
def initialize
8+
def initialize(client)
9+
@client = client.is_a?(Pipeline) ? client.client : client
810
@with_reconnect = true
911
@shutdown = false
1012
@futures = []
1113
end
1214

15+
def timeout
16+
client.timeout
17+
end
18+
1319
def with_reconnect?
1420
@with_reconnect
1521
end
@@ -26,15 +32,19 @@ def empty?
2632
@futures.empty?
2733
end
2834

29-
def call(command, &block)
35+
def call(command, timeout: nil, &block)
3036
# A pipeline that contains a shutdown should not raise ECONNRESET when
3137
# the connection is gone.
3238
@shutdown = true if command.first == :shutdown
33-
future = Future.new(command, block)
39+
future = Future.new(command, block, timeout)
3440
@futures << future
3541
future
3642
end
3743

44+
def call_with_timeout(command, timeout, &block)
45+
call(command, timeout: timeout, &block)
46+
end
47+
3848
def call_pipeline(pipeline)
3949
@shutdown = true if pipeline.shutdown?
4050
@futures.concat(pipeline.futures)
@@ -43,7 +53,11 @@ def call_pipeline(pipeline)
4353
end
4454

4555
def commands
46-
@futures.map { |f| f._command }
56+
@futures.map(&:_command)
57+
end
58+
59+
def timeouts
60+
@futures.map(&:timeout)
4761
end
4862

4963
def with_reconnect(val=true)
@@ -89,6 +103,14 @@ def finish(replies)
89103
end
90104
end
91105

106+
def timeouts
107+
if empty?
108+
[]
109+
else
110+
[nil, *super, nil]
111+
end
112+
end
113+
92114
def commands
93115
if empty?
94116
[]
@@ -108,9 +130,12 @@ def initialize
108130
class Future < BasicObject
109131
FutureNotReady = ::Redis::FutureNotReady.new
110132

111-
def initialize(command, transformation)
133+
attr_reader :timeout
134+
135+
def initialize(command, transformation, timeout)
112136
@command = command
113137
@transformation = transformation
138+
@timeout = timeout
114139
@object = FutureNotReady
115140
end
116141

test/blocking_commands_test.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,22 @@ def test_brpoplpush_disable_client_timeout
3636
assert_equal '0', r.brpoplpush('foo', 'bar')
3737
end
3838
end
39+
40+
def test_brpoplpush_in_transaction
41+
results = r.multi do
42+
r.brpoplpush('foo', 'bar')
43+
r.brpoplpush('foo', 'bar', timeout: 2)
44+
end
45+
assert_equal [nil, nil], results
46+
end
47+
48+
def test_brpoplpush_in_pipeline
49+
mock do |r|
50+
results = r.pipelined do
51+
r.brpoplpush('foo', 'bar')
52+
r.brpoplpush('foo', 'bar', timeout: 2)
53+
end
54+
assert_equal ['0', '2'], results
55+
end
56+
end
3957
end

test/lint/blocking_commands.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def test_brpoplpush_timeout_with_old_prototype
152152
def test_bzpopmin
153153
target_version('5.0.0') do
154154
assert_equal ['{szap}foo', 'a', 0.0], r.bzpopmin('{szap}foo', '{szap}bar', 1)
155-
assert_equal nil, r.bzpopmin('{szap}aaa', '{szap}bbb', 1)
155+
assert_equal nil, r.bzpopmin('{szap}aaa', '{szap}bbb', 2)
156156
end
157157
end
158158

0 commit comments

Comments
 (0)