@@ -201,43 +201,68 @@ def try_to_reserve_lost_test
201201 def push ( tests )
202202 @total = tests . size
203203
204- if @master = redis . setnx ( key ( 'master-status' ) , 'setup' )
205- puts "Worker electected as leader, pushing #{ @total } tests to the queue."
206- puts
207-
208- attempts = 0
209- duration = measure do
210- with_redis_timeout ( 5 ) do
211- redis . without_reconnect do
212- redis . multi do |transaction |
213- transaction . lpush ( key ( 'queue' ) , tests ) unless tests . empty?
214- transaction . set ( key ( 'total' ) , @total )
215- transaction . set ( key ( 'master-status' ) , 'ready' )
216-
217- transaction . expire ( key ( 'queue' ) , config . redis_ttl )
218- transaction . expire ( key ( 'total' ) , config . redis_ttl )
219- transaction . expire ( key ( 'master-status' ) , config . redis_ttl )
204+ with_redis_timeout ( 5 ) do
205+ redis . without_reconnect do
206+ @master = leader_election do
207+ puts "Worker electected as leader, pushing #{ @total } tests to the queue."
208+ puts
209+
210+ attempts = 0
211+ duration = measure do
212+ begin
213+ redis . multi do |transaction |
214+ transaction . lpush ( key ( 'queue' ) , tests ) unless tests . empty?
215+ transaction . set ( key ( 'total' ) , @total )
216+ transaction . set ( key ( 'master-status' ) , 'ready' )
217+
218+ transaction . expire ( key ( 'queue' ) , config . redis_ttl )
219+ transaction . expire ( key ( 'total' ) , config . redis_ttl )
220+ transaction . expire ( key ( 'master-status' ) , config . redis_ttl )
221+ end
222+ rescue ::Redis ::BaseError => error
223+ if !queue_initialized? && attempts < 3
224+ puts "Retrying pushing #{ @total } tests to the queue... (#{ error } )"
225+ attempts += 1
226+ retry
227+ end
228+
229+ raise if !queue_initialized?
220230 end
221231 end
222- rescue ::Redis ::BaseError => error
223- if !queue_initialized? && attempts < 3
224- puts "Retrying pushing #{ @total } tests to the queue... (#{ error } )"
225- attempts += 1
226- retry
227- end
228-
229- raise if !queue_initialized?
232+ puts "Finished pushing #{ @total } tests to the queue in #{ duration . round ( 2 ) } s."
230233 end
231234 end
232-
233- puts "Finished pushing #{ @total } tests to the queue in #{ duration . round ( 2 ) } s."
234235 end
235236 register
236237 redis . expire ( key ( 'workers' ) , config . redis_ttl )
237238 rescue *CONNECTION_ERRORS
238239 raise if @master
239240 end
240241
242+ def leader_election
243+ attempts = 0
244+ value = key ( 'setup' )
245+
246+ begin
247+ if master = redis . setnx ( key ( 'master-status' ) , value )
248+ yield
249+ end
250+ rescue ::Redis ::BaseError => error
251+ puts "Error during leader election: #{ error } "
252+ if redis . get ( key ( 'master-status' ) ) == value
253+ master = true
254+ elsif attempts < 3
255+ puts "Retrying leader election... (#{ error } )"
256+ attempts += 1
257+ retry
258+ else
259+ raise error
260+ end
261+ end
262+
263+ master
264+ end
265+
241266 def register
242267 redis . sadd ( key ( 'workers' ) , [ worker_id ] )
243268 end
0 commit comments