@@ -321,6 +321,106 @@ type noopFlushTransferer struct {
321
321
func (f * noopFlushTransferer ) Flush () {}
322
322
func (f * noopFlushTransferer ) TransferOut (ctx context.Context ) error { return nil }
323
323
324
+ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false (t * testing.T ) {
325
+ var ringConfig Config
326
+ flagext .DefaultValues (& ringConfig )
327
+ ringConfig .KVStore .Mock = consul .NewInMemoryClient (GetCodec ())
328
+
329
+ r , err := New (ringConfig , "ingester" , IngesterRingKey , nil )
330
+ require .NoError (t , err )
331
+ require .NoError (t , services .StartAndAwaitRunning (context .Background (), r ))
332
+
333
+ // poll function waits for a condition and returning actual state of the ingesters after the condition succeed.
334
+ poll := func (condition func (* Desc ) bool ) map [string ]InstanceDesc {
335
+ var ingesters map [string ]InstanceDesc
336
+ test .Poll (t , 5 * time .Second , true , func () interface {} {
337
+ d , err := r .KVClient .Get (context .Background (), IngesterRingKey )
338
+ require .NoError (t , err )
339
+
340
+ desc , ok := d .(* Desc )
341
+
342
+ if ok {
343
+ ingesters = desc .Ingesters
344
+ }
345
+ return ok && condition (desc )
346
+ })
347
+
348
+ return ingesters
349
+ }
350
+
351
+ // Starts Ingester and wait it to became active
352
+ startIngesterAndWaitActive := func (ingId string ) * Lifecycler {
353
+ lifecyclerConfig := testLifecyclerConfig (ringConfig , ingId )
354
+ // Disabling heartBeat and unregister_on_shutdown
355
+ lifecyclerConfig .UnregisterOnShutdown = false
356
+ lifecyclerConfig .HeartbeatPeriod = 0
357
+ lifecycler , err := NewLifecycler (lifecyclerConfig , & noopFlushTransferer {}, "lifecycler" , IngesterRingKey , true , nil )
358
+ require .NoError (t , err )
359
+ require .NoError (t , services .StartAndAwaitRunning (context .Background (), lifecycler ))
360
+ poll (func (desc * Desc ) bool {
361
+ return desc .Ingesters [ingId ].State == ACTIVE
362
+ })
363
+ return lifecycler
364
+ }
365
+
366
+ // We are going to create 2 fake ingester with disabled heart beat and `unregister_on_shutdown=false` then
367
+ // test if the ingester 2 became active after:
368
+ // * Clean Shutdown (LEAVING after shutdown)
369
+ // * Crashes while in the PENDING or JOINING state
370
+ l1 := startIngesterAndWaitActive ("ing1" )
371
+ defer services .StopAndAwaitTerminated (context .Background (), l1 ) //nolint:errcheck
372
+
373
+ l2 := startIngesterAndWaitActive ("ing2" )
374
+
375
+ ingesters := poll (func (desc * Desc ) bool {
376
+ return len (desc .Ingesters ) == 2 && desc .Ingesters ["ing1" ].State == ACTIVE && desc .Ingesters ["ing2" ].State == ACTIVE
377
+ })
378
+
379
+ // Both Ingester should be active and running
380
+ assert .Equal (t , ACTIVE , ingesters ["ing1" ].State )
381
+ assert .Equal (t , ACTIVE , ingesters ["ing2" ].State )
382
+
383
+ // Stop One ingester gracefully should leave it on LEAVING STATE on the ring
384
+ require .NoError (t , services .StopAndAwaitTerminated (context .Background (), l2 ))
385
+
386
+ ingesters = poll (func (desc * Desc ) bool {
387
+ return len (desc .Ingesters ) == 2 && desc .Ingesters ["ing2" ].State == LEAVING
388
+ })
389
+ assert .Equal (t , LEAVING , ingesters ["ing2" ].State )
390
+
391
+ // Start Ingester2 again - Should flip back to ACTIVE in the ring
392
+ l2 = startIngesterAndWaitActive ("ing2" )
393
+ require .NoError (t , services .StopAndAwaitTerminated (context .Background (), l2 ))
394
+
395
+ // Simulate ingester2 crash on startup and left the ring with JOINING state
396
+ err = r .KVClient .CAS (context .Background (), IngesterRingKey , func (in interface {}) (out interface {}, retry bool , err error ) {
397
+ desc , ok := in .(* Desc )
398
+ require .Equal (t , true , ok )
399
+ ingester2Desc := desc .Ingesters ["ing2" ]
400
+ ingester2Desc .State = JOINING
401
+ desc .Ingesters ["ing2" ] = ingester2Desc
402
+ return desc , true , nil
403
+ })
404
+ require .NoError (t , err )
405
+
406
+ l2 = startIngesterAndWaitActive ("ing2" )
407
+ require .NoError (t , services .StopAndAwaitTerminated (context .Background (), l2 ))
408
+
409
+ // Simulate ingester2 crash on startup and left the ring with PENDING state
410
+ err = r .KVClient .CAS (context .Background (), IngesterRingKey , func (in interface {}) (out interface {}, retry bool , err error ) {
411
+ desc , ok := in .(* Desc )
412
+ require .Equal (t , true , ok )
413
+ ingester2Desc := desc .Ingesters ["ing2" ]
414
+ ingester2Desc .State = PENDING
415
+ desc .Ingesters ["ing2" ] = ingester2Desc
416
+ return desc , true , nil
417
+ })
418
+ require .NoError (t , err )
419
+
420
+ l2 = startIngesterAndWaitActive ("ing2" )
421
+ require .NoError (t , services .StopAndAwaitTerminated (context .Background (), l2 ))
422
+ }
423
+
324
424
func TestTokensOnDisk (t * testing.T ) {
325
425
var ringConfig Config
326
426
flagext .DefaultValues (& ringConfig )
0 commit comments