@@ -75,6 +75,12 @@ def initialize(options = {})
75
75
@logger = @options [ :logger ]
76
76
@connection = nil
77
77
@command_map = { }
78
+
79
+ if options . include? ( :sentinels )
80
+ @connector = Connector ::Sentinel . new ( @options )
81
+ else
82
+ @connector = Connector . new ( @options )
83
+ end
78
84
end
79
85
80
86
def connect
@@ -85,25 +91,7 @@ def connect
85
91
establish_connection
86
92
call [ :auth , password ] if password
87
93
call [ :select , db ] if db != 0
88
- if @options [ :sentinels ]
89
- # Check the instance is really of the role we are looking for.
90
- # We can't assume the command is supported since it was
91
- # introduced recently and this client should work with old stuff.
92
- role = nil
93
- begin
94
- role = call [ :role ]
95
- rescue
96
- # Assume the test is passed if we can't get a reply from ROLE...
97
- role = [ @options [ :role ] . to_s ]
98
- end
99
-
100
- # Raise an error on role mismatch. TODO: we could do better, wait
101
- # some time and connect again...
102
- if role [ 0 ] != @options [ :role ] . to_s
103
- disconnect
104
- raise ConnectionError , "Instance role mismatch, try again."
105
- end
106
- end
94
+ @connector . check ( self )
107
95
end
108
96
109
97
self
@@ -319,60 +307,8 @@ def logging(commands)
319
307
end
320
308
end
321
309
322
- def set_addr_via_sentinel
323
- responder = nil # The Sentinel that was able to reply
324
-
325
- # Try one Sentinel after the other, using the list provided
326
- # by the user.
327
- @options [ :sentinels ] . each { |sentinel |
328
- begin
329
- if !sentinel [ :link ]
330
- sentinel [ :link ] = Redis . new ( :host => sentinel [ :host ] ,
331
- :port => sentinel [ :port ] ,
332
- :timeout => 0.300 )
333
- end
334
- if @options [ :role ] == :master
335
- reply = sentinel [ :link ] . client . call ( [ "sentinel" , "get-master-addr-by-name" , @options [ :mastername ] ] )
336
- next if !reply
337
- # Got it, set :host and :port
338
- @options [ :host ] = reply [ 0 ]
339
- @options [ :port ] = reply [ 1 ]
340
- responder = sentinel
341
- break
342
- elsif @options [ :role ] == :slave
343
- reply = sentinel [ :link ] . client . call ( [ "sentinel" , "slaves" , @options [ :mastername ] ] )
344
- slaves = [ ]
345
- reply . each { |slave |
346
- slaves << Hash [ *slave ]
347
- }
348
- random_slave = slaves [ rand ( slaves . length ) ]
349
- @options [ :host ] = random_slave [ 'ip' ]
350
- @options [ :port ] = random_slave [ 'port' ]
351
- responder = sentinel
352
- break
353
- else
354
- raise ArgumentError , "Unknown instance role #{ @options [ :role ] } "
355
- end
356
- rescue
357
- next ; # Try the next one on error
358
- end
359
- }
360
-
361
- if responder
362
- # If we were able to obtain the address, make sure to put the Sentinel
363
- # that was able to reply as the first in the list.
364
- @options [ :sentinels ] . delete ( responder )
365
- @options [ :sentinels ] . unshift ( responder )
366
- else
367
- raise ConnectionError , "Unable to fetch #{ @options [ :role ] } via Sentinel."
368
- end
369
- end
370
-
371
310
def establish_connection
372
- if @options [ :sentinels ]
373
- set_addr_via_sentinel
374
- end
375
- @connection = @options [ :driver ] . connect ( @options . dup )
311
+ @connection = @options [ :driver ] . connect ( @connector . resolve . dup )
376
312
rescue TimeoutError
377
313
raise CannotConnectError , "Timed out connecting to Redis on #{ location } "
378
314
rescue Errno ::ECONNREFUSED
@@ -444,12 +380,7 @@ def _parse_options(options)
444
380
defaults [ :port ] = uri . port if uri . port
445
381
defaults [ :password ] = CGI . unescape ( uri . password ) if uri . password
446
382
defaults [ :db ] = uri . path [ 1 ..-1 ] . to_i if uri . path
447
- elsif uri . scheme == "sentinel"
448
- defaults [ :scheme ] = uri . scheme
449
- defaults [ :mastername ] = uri . host
450
383
defaults [ :role ] = :master
451
- defaults [ :password ] = CGI . unescape ( uri . password ) if uri . password
452
- defaults [ :db ] = uri . path [ 1 ..-1 ] . to_i if uri . path
453
384
else
454
385
raise ArgumentError , "invalid uri scheme '#{ uri . scheme } '"
455
386
end
@@ -465,13 +396,6 @@ def _parse_options(options)
465
396
options [ :scheme ] = "unix"
466
397
options . delete ( :host )
467
398
options . delete ( :port )
468
- elsif options [ :mastername ]
469
- # Sentinel
470
- options . delete ( :host )
471
- options . delete ( :port )
472
- if options [ :sentinels ] . nil?
473
- raise ArgumentError , "list of Sentinels required"
474
- end
475
399
else
476
400
# TCP socket
477
401
options [ :host ] = options [ :host ] . to_s
@@ -519,5 +443,94 @@ def _parse_driver(driver)
519
443
520
444
driver
521
445
end
446
+
447
+ class Connector
448
+ def initialize ( options )
449
+ @options = options
450
+ end
451
+
452
+ def resolve
453
+ @options
454
+ end
455
+
456
+ def check ( client )
457
+ end
458
+
459
+ class Sentinel < Connector
460
+ def initialize ( options )
461
+ super ( options )
462
+
463
+ @sentinels = options . fetch ( :sentinels ) . dup
464
+ @role = options [ :role ] . to_s
465
+ @master = options [ :host ]
466
+ end
467
+
468
+ def check ( client )
469
+ # Check the instance is really of the role we are looking for.
470
+ # We can't assume the command is supported since it was introduced
471
+ # recently and this client should work with old stuff.
472
+ begin
473
+ role = client . call ( [ :role ] ) [ 0 ]
474
+ rescue
475
+ # Assume the test is passed if we can't get a reply from ROLE...
476
+ role = @role
477
+ end
478
+
479
+ if role != @role
480
+ disconnect
481
+ raise ConnectionError , "Instance role mismatch. Expected #{ @role } , got #{ role } ."
482
+ end
483
+ end
484
+
485
+ def resolve
486
+ result = case @role
487
+ when "master"
488
+ resolve_master
489
+ when "slave"
490
+ resolve_slave
491
+ else
492
+ raise ArgumentError , "Unknown instance role #{ @role } "
493
+ end
494
+
495
+ result || ( raise ConnectionError , "Unable to fetch #{ @role } via Sentinel." )
496
+ end
497
+
498
+ def sentinel_detect
499
+ @sentinels . each do |sentinel |
500
+ client = Client . new ( :host => sentinel [ :host ] , :port => sentinel [ :port ] , :timeout => 0.3 )
501
+
502
+ begin
503
+ if result = yield ( client )
504
+ # This sentinel responded. Make sure we ask it first next time.
505
+ @sentinels . delete ( sentinel )
506
+ @sentinels . unshift ( sentinel )
507
+
508
+ return result
509
+ end
510
+ ensure
511
+ client . disconnect
512
+ end
513
+ end
514
+ end
515
+
516
+ def resolve_master
517
+ sentinel_detect do |client |
518
+ if reply = client . call ( [ "sentinel" , "get-master-addr-by-name" , @master ] )
519
+ { host : reply [ 0 ] , port : reply [ 1 ] }
520
+ end
521
+ end
522
+ end
523
+
524
+ def resolve_slave
525
+ sentinel_detect do |client |
526
+ if reply = client . call ( [ "sentinel" , "slaves" , @master ] )
527
+ slave = Hash [ *reply . sample ]
528
+
529
+ { host : slave . fetch ( "ip" ) , port : slave . fetch ( "port" ) }
530
+ end
531
+ end
532
+ end
533
+ end
534
+ end
522
535
end
523
536
end
0 commit comments