@@ -54,6 +54,7 @@ import (
54
54
"vitess.io/vitess/go/netutil"
55
55
"vitess.io/vitess/go/stats"
56
56
"vitess.io/vitess/go/vt/log"
57
+ "vitess.io/vitess/go/vt/logutil"
57
58
"vitess.io/vitess/go/vt/proto/query"
58
59
"vitess.io/vitess/go/vt/proto/topodata"
59
60
"vitess.io/vitess/go/vt/proto/vtrpc"
@@ -190,8 +191,10 @@ func FilteringKeyspaces() bool {
190
191
return len (KeyspacesToWatch ) > 0
191
192
}
192
193
193
- type KeyspaceShardTabletType string
194
- type tabletAliasString string
194
+ type (
195
+ KeyspaceShardTabletType string
196
+ tabletAliasString string
197
+ )
195
198
196
199
// HealthCheck declares what the TabletGateway needs from the HealthCheck
197
200
type HealthCheck interface {
@@ -301,6 +304,9 @@ type HealthCheckImpl struct {
301
304
subscribers map [chan * TabletHealth ]string
302
305
// loadTabletsTrigger is used to immediately load information about tablets of a specific shard.
303
306
loadTabletsTrigger chan topo.KeyspaceShard
307
+ // options contains optional settings used to modify HealthCheckImpl
308
+ // behavior.
309
+ options Options
304
310
}
305
311
306
312
// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
@@ -352,9 +358,9 @@ func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
352
358
// filters.
353
359
//
354
360
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
355
- func NewHealthCheck (ctx context. Context , retryDelay , healthCheckTimeout time. Duration , topoServer * topo. Server , localCell , cellsToWatch string , filters TabletFilter ) * HealthCheckImpl {
356
- log . Infof ( "loading tablets for cells: %v" , cellsToWatch )
357
-
361
+ func NewHealthCheck (
362
+ ctx context. Context , retryDelay , healthCheckTimeout time. Duration , topoServer * topo. Server , localCell , cellsToWatch string , filters TabletFilter , opts ... Option ,
363
+ ) * HealthCheckImpl {
358
364
hc := & HealthCheckImpl {
359
365
ts : topoServer ,
360
366
cell : localCell ,
@@ -366,19 +372,23 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
366
372
subscribers : make (map [chan * TabletHealth ]string ),
367
373
cellAliases : make (map [string ]string ),
368
374
loadTabletsTrigger : make (chan topo.KeyspaceShard , 1024 ),
375
+ options : withOptions (opts ... ),
369
376
}
377
+
378
+ hc .logger ().Infof ("loading tablets for cells: %v" , cellsToWatch )
379
+
370
380
var topoWatchers []* TopologyWatcher
371
381
cells := strings .Split (cellsToWatch , "," )
372
382
if cellsToWatch == "" {
373
383
cells = append (cells , localCell )
374
384
}
375
385
376
386
for _ , c := range cells {
377
- log .Infof ("Setting up healthcheck for cell: %v" , c )
387
+ hc . logger () .Infof ("Setting up healthcheck for cell: %v" , c )
378
388
if c == "" {
379
389
continue
380
390
}
381
- topoWatchers = append (topoWatchers , NewTopologyWatcher (ctx , topoServer , hc , filters , c , refreshInterval , refreshKnownTablets ))
391
+ topoWatchers = append (topoWatchers , NewTopologyWatcher (ctx , topoServer , hc , filters , c , refreshInterval , refreshKnownTablets , opts ... ))
382
392
}
383
393
384
394
hc .topoWatchers = topoWatchers
@@ -403,7 +413,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
403
413
return
404
414
}
405
415
406
- log .Infof ("Adding tablet to healthcheck: %v" , tablet )
416
+ hc . logger () .Infof ("Adding tablet to healthcheck: %v" , tablet )
407
417
hc .mu .Lock ()
408
418
defer hc .mu .Unlock ()
409
419
if hc .healthByAlias == nil {
@@ -421,14 +431,15 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
421
431
cancelFunc : cancelFunc ,
422
432
Tablet : tablet ,
423
433
Target : target ,
434
+ logger : hc .logger (),
424
435
}
425
436
426
437
// add to our datastore
427
438
key := KeyFromTarget (target )
428
439
tabletAlias := topoproto .TabletAliasString (tablet .Alias )
429
440
if _ , ok := hc .healthByAlias [tabletAliasString (tabletAlias )]; ok {
430
441
// We should not add a tablet that we already have
431
- log .Errorf ("Program bug: tried to add existing tablet: %v to healthcheck" , tabletAlias )
442
+ hc . logger () .Errorf ("Program bug: tried to add existing tablet: %v to healthcheck" , tabletAlias )
432
443
return
433
444
}
434
445
hc .healthByAlias [tabletAliasString (tabletAlias )] = thc
@@ -456,7 +467,7 @@ func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) {
456
467
}
457
468
458
469
func (hc * HealthCheckImpl ) deleteTablet (tablet * topodata.Tablet ) {
459
- log .Infof ("Removing tablet from healthcheck: %v" , tablet )
470
+ hc . logger () .Infof ("Removing tablet from healthcheck: %v" , tablet )
460
471
hc .mu .Lock ()
461
472
defer hc .mu .Unlock ()
462
473
@@ -499,7 +510,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
499
510
// delete from authoritative map
500
511
th , ok := hc .healthByAlias [tabletAlias ]
501
512
if ! ok {
502
- log .Infof ("We have no health data for tablet: %v, it might have been deleted already" , tablet )
513
+ hc . logger () .Infof ("We have no health data for tablet: %v, it might have been deleted already" , tablet )
503
514
return
504
515
}
505
516
// Calling this will end the context associated with th.checkConn,
@@ -518,7 +529,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
518
529
// so that we're not racing to update it and in effect re-adding a copy of the
519
530
// tablet record that was deleted
520
531
if _ , ok := hc .healthByAlias [tabletAlias ]; ! ok {
521
- log .Infof ("Tablet %v has been deleted, skipping health update" , th .Tablet )
532
+ hc . logger () .Infof ("Tablet %v has been deleted, skipping health update" , th .Tablet )
522
533
return
523
534
}
524
535
@@ -543,7 +554,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
543
554
// causing an interruption where no primary is assigned to the shard.
544
555
if prevTarget .TabletType == topodata .TabletType_PRIMARY {
545
556
if primaries := hc .healthData [oldTargetKey ]; len (primaries ) == 0 {
546
- log .Infof ("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now" , topotools .TabletIdent (th .Tablet ))
557
+ hc . logger () .Infof ("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now" , topotools .TabletIdent (th .Tablet ))
547
558
// We want to trigger a call to load tablets for this keyspace-shard,
548
559
// but we want this to be non-blocking to prevent the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
549
560
// If the buffer is exhausted, then we'll just receive the update when all the tablets are loaded on the ticker.
@@ -572,7 +583,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
572
583
// We already have one up server, see if we
573
584
// need to replace it.
574
585
if th .PrimaryTermStartTime < hc .healthy [targetKey ][0 ].PrimaryTermStartTime {
575
- log .Warningf ("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d " ,
586
+ hc . logger () .Warningf ("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d " ,
576
587
topoproto .TabletAliasString (th .Tablet .Alias ),
577
588
topoproto .KeyspaceShardString (th .Target .Keyspace , th .Target .Shard ),
578
589
topoproto .TabletAliasString (hc .healthy [targetKey ][0 ].Tablet .Alias ),
@@ -609,7 +620,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
609
620
610
621
isNewPrimary := isPrimary && prevTarget .TabletType != topodata .TabletType_PRIMARY
611
622
if isNewPrimary {
612
- log .Errorf ("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v" , prevTarget , topoproto .TabletAliasString (th .Tablet .Alias ), th .Target .TabletType )
623
+ hc . logger () .Errorf ("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v" , prevTarget , topoproto .TabletAliasString (th .Tablet .Alias ), th .Target .TabletType )
613
624
hcPrimaryPromotedCounters .Add ([]string {th .Target .Keyspace , th .Target .Shard }, 1 )
614
625
}
615
626
@@ -666,7 +677,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
666
677
default :
667
678
// If the channel is full, we drop the message.
668
679
hcChannelFullCounter .Add (1 )
669
- log .Warningf ("HealthCheck broadcast channel is full for %v, dropping message for %s" , subscriber , topotools .TabletIdent (th .Tablet ))
680
+ hc . logger () .Warningf ("HealthCheck broadcast channel is full for %v, dropping message for %s" , subscriber , topotools .TabletIdent (th .Tablet ))
670
681
// Print the stack trace only once.
671
682
printStack ()
672
683
}
@@ -849,7 +860,7 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.
849
860
timer .Stop ()
850
861
for _ , target := range targets {
851
862
if target != nil {
852
- log .Infof ("couldn't find tablets for target: %v" , target )
863
+ hc . logger () .Infof ("couldn't find tablets for target: %v" , target )
853
864
}
854
865
}
855
866
return ctx .Err ()
@@ -977,7 +988,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
977
988
if err != nil {
978
989
// Error logged
979
990
if _ , err := w .Write ([]byte (err .Error ())); err != nil {
980
- log .Errorf ("write to buffer error failed: %v" , err )
991
+ hc . logger () .Errorf ("write to buffer error failed: %v" , err )
981
992
}
982
993
983
994
return
@@ -988,7 +999,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
988
999
989
1000
// Error logged
990
1001
if _ , err := w .Write (buf .Bytes ()); err != nil {
991
- log .Errorf ("write to buffer bytes failed: %v" , err )
1002
+ hc . logger () .Errorf ("write to buffer bytes failed: %v" , err )
992
1003
}
993
1004
}
994
1005
@@ -1029,6 +1040,11 @@ func (hc *HealthCheckImpl) stateChecksum() int64 {
1029
1040
return int64 (crc32 .ChecksumIEEE (buf .Bytes ()))
1030
1041
}
1031
1042
1043
+ // logger returns the logutil.Logger used by the healthcheck.
1044
+ func (hc * HealthCheckImpl ) logger () logutil.Logger {
1045
+ return hc .options .logger
1046
+ }
1047
+
1032
1048
// TabletToMapKey creates a key to the map from tablet's host and ports.
1033
1049
// It should only be used in discovery and related module.
1034
1050
func TabletToMapKey (tablet * topodata.Tablet ) string {
0 commit comments