Skip to content

Commit d6c9cf3

Browse files
committed
tokens from file would be ignored if instance was not in the ring when starting
Signed-off-by: Alex Le <[email protected]>
1 parent d87cab9 commit d6c9cf3

File tree

2 files changed

+21
-17
lines changed

2 files changed

+21
-17
lines changed

pkg/ingester/ingester.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
730730
}, i.getOldestUnshippedBlockMetric)
731731
}
732732

733-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, true, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
733+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, false, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
734734
if err != nil {
735735
return nil, err
736736
}
@@ -817,13 +817,6 @@ func (i *Ingester) startingV2ForFlusher(ctx context.Context) error {
817817
}
818818

819819
func (i *Ingester) starting(ctx context.Context) error {
820-
if err := i.openExistingTSDB(ctx); err != nil {
821-
// Try to rollback and close opened TSDBs before halting the ingester.
822-
i.closeAllTSDB()
823-
824-
return errors.Wrap(err, "opening existing TSDBs")
825-
}
826-
827820
// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
828821
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
829822
return errors.Wrap(err, "failed to start lifecycler")
@@ -832,6 +825,13 @@ func (i *Ingester) starting(ctx context.Context) error {
832825
return errors.Wrap(err, "failed to start lifecycler")
833826
}
834827

828+
if err := i.openExistingTSDB(ctx); err != nil {
829+
// Try to rollback and close opened TSDBs before halting the ingester.
830+
i.closeAllTSDB()
831+
832+
return errors.Wrap(err, "opening existing TSDBs")
833+
}
834+
835835
i.lifecycler.Join()
836836

837837
// let's start the rest of subservices via manager

pkg/ring/lifecycler.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,6 @@ func (i *Lifecycler) loop(ctx context.Context) error {
495495
return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
496496
}
497497

498-
level.Info(i.logger).Log("msg", "finished init ring", "ring", i.RingName, "state", i.GetState())
499-
500498
// We do various period tasks
501499
var autoJoinAfter <-chan time.Time
502500
var observeChan <-chan time.Time
@@ -524,7 +522,6 @@ func (i *Lifecycler) loop(ctx context.Context) error {
524522
select {
525523
case <-i.autojoinChan:
526524
autoJoinAfter = time.After(i.cfg.JoinAfter)
527-
level.Info(i.logger).Log("msg", "will do auto-joining after timeout", "timeout", i.cfg.JoinAfter, "state", i.GetState())
528525
case <-autoJoinAfter:
529526
if joined {
530527
continue
@@ -692,13 +689,16 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
692689

693690
// We use the tokens from the file only if it does not exist in the ring yet.
694691
if len(tokensFromFile) > 0 {
695-
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
696692
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
693+
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
697694
i.setState(i.getPreviousState())
695+
state := i.GetState()
696+
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, state, registeredAt)
697+
level.Info(i.logger).Log("msg", "auto join on startup, adding with token and state", "ring", i.RingName, "state", state)
698+
i.setTokens(tokensFromFile)
699+
return ringDesc, true, nil
698700
}
699-
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
700-
i.setTokens(tokensFromFile)
701-
return ringDesc, true, nil
701+
level.Info(i.logger).Log("msg", "ignore tokens from file since autoJoinOnStartup set to false")
702702
}
703703

704704
// Either we are a new ingester, or consul must have restarted
@@ -894,7 +894,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
894894

895895
if needTokens == 0 && myTokens.Equals(i.getTokens()) {
896896
// Tokens have been verified. No need to change them.
897-
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
897+
state := i.GetState()
898+
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
899+
level.Info(i.logger).Log("msg", "auto joined with existing tokens", "ring", i.RingName, "state", state)
898900
return ringDesc, true, nil
899901
}
900902

@@ -908,7 +910,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
908910
sort.Sort(myTokens)
909911
i.setTokens(myTokens)
910912

911-
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
913+
state := i.GetState()
914+
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
915+
level.Info(i.logger).Log("msg", "auto joined with new tokens", "ring", i.RingName, "state", state)
912916

913917
return ringDesc, true, nil
914918
})

0 commit comments

Comments
 (0)