From 8bbee9cfec35b19dab5f9870738f9488c1cdbd72 Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Wed, 14 May 2025 09:55:37 +0100 Subject: [PATCH 1/2] Make leader election idempotent --- ruby/lib/ci/queue/redis/base.rb | 6 +- ruby/lib/ci/queue/redis/worker.rb | 78 +++++++++++++++-------- ruby/test/integration/rspec_redis_test.rb | 10 +-- 3 files changed, 62 insertions(+), 32 deletions(-) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 5f9be7b7..0e643889 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -9,7 +9,11 @@ class Base CONNECTION_ERRORS = [ ::Redis::BaseConnectionError, ::SocketError, # https://github.com/redis/redis-rb/pull/631 - ].freeze + ] + + # https://github.com/redis/redis-rb/pull/1312 + CONNECTION_ERRORS << RedisClient::ConnectionError if defined?(RedisClient::ConnectionError) + CONNECTION_ERRORS.freeze module RedisInstrumentation def call(command, redis_config) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 00013b5d..9ded87ce 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -201,36 +201,37 @@ def try_to_reserve_lost_test def push(tests) @total = tests.size - if @master = redis.setnx(key('master-status'), 'setup') - puts "Worker electected as leader, pushing #{@total} tests to the queue." - puts - - attempts = 0 - duration = measure do - with_redis_timeout(5) do - redis.without_reconnect do - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') - - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) + with_redis_timeout(5) do + redis.without_reconnect do + @master = leader_election do + puts "Worker elected as leader, pushing #{@total} tests to the queue." + puts + + attempts = 0 + duration = measure do + begin + redis.multi do |transaction| + transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.set(key('total'), @total) + transaction.set(key('master-status'), 'ready') + + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end + rescue ::Redis::BaseError, RedisClient::Error => error + if !queue_initialized? && attempts < 3 + puts "Retrying pushing #{@total} tests to the queue... (#{error})" + attempts += 1 + retry + end + + raise if !queue_initialized? end end - rescue ::Redis::BaseError => error - if !queue_initialized? && attempts < 3 - puts "Retrying pushing #{@total} tests to the queue... (#{error})" - attempts += 1 - retry - end - - raise if !queue_initialized? + puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s." end end - - puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s." end register redis.expire(key('workers'), config.redis_ttl) @@ -238,6 +239,31 @@ def push(tests) raise if @master end + def leader_election + attempts = 0 + value = key('setup', worker_id) + + begin + if master = redis.setnx(key('master-status'), value) + yield + end + rescue ::Redis::BaseError, RedisClient::Error => error + puts "Error during leader election: #{error}" + if redis.get(key('master-status')) == value + master = true + yield + elsif attempts < 3 + puts "Retrying leader election... (#{error})" + attempts += 1 + retry + else + raise + end + end + + master + end + def register redis.sadd(key('workers'), [worker_id]) end diff --git a/ruby/test/integration/rspec_redis_test.rb b/ruby/test/integration/rspec_redis_test.rb index a459e688..c6f11924 100644 --- a/ruby/test/integration/rspec_redis_test.rb +++ b/ruby/test/integration/rspec_redis_test.rb @@ -34,7 +34,7 @@ def test_redis_runner assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 3 tests to the queue. + Worker elected as leader, pushing 3 tests to the queue. Finished pushing 3 tests to the queue in X.XXs. @@ -91,7 +91,7 @@ def test_redis_runner_retry assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 3 tests to the queue. + Worker elected as leader, pushing 3 tests to the queue. Finished pushing 3 tests to the queue in X.XXs. @@ -273,7 +273,7 @@ def test_before_suite_errors assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 2 tests to the queue. + Worker elected as leader, pushing 2 tests to the queue. Finished pushing 2 tests to the queue in X.XXs. @@ -317,7 +317,7 @@ def test_report assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 3 tests to the queue. + Worker elected as leader, pushing 3 tests to the queue. Finished pushing 3 tests to the queue in X.XXs. @@ -418,7 +418,7 @@ def test_world_wants_to_quit assert_empty err expected_output = strip_heredoc <<-EOS - Worker electected as leader, pushing 1 tests to the queue. + Worker elected as leader, pushing 1 tests to the queue. Finished pushing 1 tests to the queue in X.XXs. From 9038dc1e77a12afbd771bc6d5bccc6b2fee6daad Mon Sep 17 00:00:00 2001 From: Christian Bruckmayer Date: Wed, 14 May 2025 17:12:23 +0100 Subject: [PATCH 2/2] Set expire on set operations --- ruby/lib/ci/queue/redis/worker.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 9ded87ce..bd8f28f0 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -212,12 +212,10 @@ def push(tests) begin redis.multi do |transaction| transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') + transaction.set(key('total'), @total, ex: config.redis_ttl) + transaction.set(key('master-status'), 'ready', ex: config.redis_ttl) transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) end rescue ::Redis::BaseError, RedisClient::Error => error if !queue_initialized? && attempts < 3