Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions test/test_concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class TestConcurrency < TestingWrapper
MAX_THREADS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', 5))
ATTEMPTS = 200
ATTEMPTS = 1000
WANT = '1'

def setup
Expand All @@ -21,9 +21,13 @@ def test_forking

pids = Array.new(MAX_THREADS) do
Process.fork do
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('INCR', "key#{i}") } }
sleep 0.1
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('DECR', "key#{i}") } }
ATTEMPTS.times { |i| @client.call('INCR', "key#{i}") }
end
end

pids += Array.new(MAX_THREADS) do
Process.fork do
ATTEMPTS.times { |i| @client.call('DECR', "key#{i}") }
end
end

Expand All @@ -40,9 +44,13 @@ def test_forking_with_pipelining

pids = Array.new(MAX_THREADS) do
Process.fork do
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('INCR', "key#{i}") } } }
sleep 0.1
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('DECR', "key#{i}") } } }
@client.pipelined { |pi| ATTEMPTS.times { |i| pi.call('INCR', "key#{i}") } }
end
end

pids += Array.new(MAX_THREADS) do
Process.fork do
@client.pipelined { |pi| ATTEMPTS.times { |i| pi.call('DECR', "key#{i}") } }
end
end

Expand All @@ -63,10 +71,8 @@ def test_forking_with_transaction
Process.fork do
@client.multi(watch: %w[{key}1]) do |tx|
ATTEMPTS.times do
MAX_THREADS.times do
tx.call('INCR', '{key}1')
tx.call('DECR', '{key}1')
end
tx.call('INCR', '{key}1')
tx.call('DECR', '{key}1')
end
end
end
Expand All @@ -83,8 +89,16 @@ def test_forking_with_transaction
def test_threading
threads = Array.new(MAX_THREADS) do
Thread.new do
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('INCR', "key#{i}") } }
ATTEMPTS.times { MAX_THREADS.times { |i| @client.call('DECR', "key#{i}") } }
ATTEMPTS.times { |i| @client.call('INCR', "key#{i}") }
nil
rescue StandardError => e
e
end
end

threads += Array.new(MAX_THREADS) do
Thread.new do
ATTEMPTS.times { |i| @client.call('DECR', "key#{i}") }
nil
rescue StandardError => e
e
Expand All @@ -98,8 +112,16 @@ def test_threading
def test_threading_with_pipelining
threads = Array.new(MAX_THREADS) do
Thread.new do
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('INCR', "key#{i}") } } }
@client.pipelined { |pi| ATTEMPTS.times { MAX_THREADS.times { |i| pi.call('DECR', "key#{i}") } } }
@client.pipelined { |pi| ATTEMPTS.times { |i| pi.call('INCR', "key#{i}") } }
nil
rescue StandardError => e
e
end
end

threads += Array.new(MAX_THREADS) do
Thread.new do
@client.pipelined { |pi| ATTEMPTS.times { |i| pi.call('DECR', "key#{i}") } }
nil
rescue StandardError => e
e
Expand All @@ -117,10 +139,8 @@ def test_threading_with_transaction
Thread.new do
@client.multi(watch: %w[{key}1]) do |tx|
ATTEMPTS.times do
MAX_THREADS.times do
tx.call('INCR', '{key}1')
tx.call('DECR', '{key}1')
end
tx.call('INCR', '{key}1')
tx.call('DECR', '{key}1')
end
end
rescue StandardError => e
Expand Down