diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 5f9be7b7..0e643889 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -9,7 +9,11 @@ class Base CONNECTION_ERRORS = [ ::Redis::BaseConnectionError, ::SocketError, # https://github.com/redis/redis-rb/pull/631 - ].freeze + ] + + # https://github.com/redis/redis-rb/pull/1312 + CONNECTION_ERRORS << RedisClient::ConnectionError if defined?(RedisClient::ConnectionError) + CONNECTION_ERRORS.freeze module RedisInstrumentation def call(command, redis_config) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 00013b5d..bd8f28f0 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -201,36 +201,35 @@ def try_to_reserve_lost_test def push(tests) @total = tests.size - if @master = redis.setnx(key('master-status'), 'setup') - puts "Worker electected as leader, pushing #{@total} tests to the queue." - puts - - attempts = 0 - duration = measure do - with_redis_timeout(5) do - redis.without_reconnect do - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') - - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) + with_redis_timeout(5) do + redis.without_reconnect do + @master = leader_election do + puts "Worker elected as leader, pushing #{@total} tests to the queue." + puts + + attempts = 0 + duration = measure do + begin + redis.multi do |transaction| + transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.set(key('total'), @total, ex: config.redis_ttl) + transaction.set(key('master-status'), 'ready', ex: config.redis_ttl) + + transaction.expire(key('queue'), config.redis_ttl) + end + rescue ::Redis::BaseError, RedisClient::Error => error + if !queue_initialized? && attempts < 3 + puts "Retrying pushing #{@total} tests to the queue... (#{error})" + attempts += 1 + retry + end + + raise if !queue_initialized? end end - rescue ::Redis::BaseError => error - if !queue_initialized? && attempts < 3 - puts "Retrying pushing #{@total} tests to the queue... (#{error})" - attempts += 1 - retry - end - - raise if !queue_initialized? + puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s." end end - - puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s." end register redis.expire(key('workers'), config.redis_ttl) @@ -238,6 +237,31 @@ def push(tests) raise if @master end + def leader_election + attempts = 0 + value = key('setup', worker_id) + + begin + if master = redis.setnx(key('master-status'), value) + yield + end + rescue ::Redis::BaseError, RedisClient::Error => error + puts "Error during leader election: #{error}" + if redis.get(key('master-status')) == value + master = true + yield + elsif attempts < 3 + puts "Retrying leader election... (#{error})" + attempts += 1 + retry + else + raise + end + end + + master + end + def register redis.sadd(key('workers'), [worker_id]) end diff --git a/ruby/test/integration/rspec_redis_test.rb b/ruby/test/integration/rspec_redis_test.rb index a459e688..c6f11924 100644 --- a/ruby/test/integration/rspec_redis_test.rb +++ b/ruby/test/integration/rspec_redis_test.rb @@ -34,7 +34,7 @@ def test_redis_runner assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 3 tests to the queue. + Worker elected as leader, pushing 3 tests to the queue. Finished pushing 3 tests to the queue in X.XXs. @@ -91,7 +91,7 @@ def test_redis_runner_retry assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 3 tests to the queue. + Worker elected as leader, pushing 3 tests to the queue. Finished pushing 3 tests to the queue in X.XXs. @@ -273,7 +273,7 @@ def test_before_suite_errors assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 2 tests to the queue. + Worker elected as leader, pushing 2 tests to the queue. Finished pushing 2 tests to the queue in X.XXs. @@ -317,7 +317,7 @@ def test_report assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 3 tests to the queue. + Worker elected as leader, pushing 3 tests to the queue. Finished pushing 3 tests to the queue in X.XXs. @@ -418,7 +418,7 @@ def test_world_wants_to_quit assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 1 tests to the queue. + Worker elected as leader, pushing 1 tests to the queue. Finished pushing 1 tests to the queue in X.XXs.