Skip to content

Commit 1564a03

Browse files
authored
Merge pull request #812 from Icinga/use-sync-atomic
Use `sync/atomic#Pointer` instead of our own wrapper
2 parents 7214b28 + 95f2763 commit 1564a03

File tree

3 files changed

+27
-18
lines changed

3 files changed

+27
-18
lines changed

cmd/icingadb/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ func run() int {
101101
// the heartbeat is not read while HA gets stuck when updating the instance table.
102102
var heartbeat *icingaredis.Heartbeat
103103
var ha *icingadb.HA
104+
var telemetrySyncStats *atomic.Pointer[telemetry.SuccessfulSync]
104105
{
105106
rc, err := cmd.Redis(logs.GetChildLogger("redis"))
106107
if err != nil {
@@ -116,7 +117,7 @@ func run() int {
116117
ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability"))
117118

118119
telemetryLogger := logs.GetChildLogger("telemetry")
119-
telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat)
120+
telemetrySyncStats = telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat)
120121
telemetry.WriteStats(ctx, rc, telemetryLogger)
121122
}
122123
// Closing ha on exit ensures that this instance retracts its heartbeat
@@ -250,7 +251,7 @@ func run() int {
250251
logger := logs.GetChildLogger("config-sync")
251252

252253
if synctx.Err() == nil {
253-
telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{
254+
telemetrySyncStats.Store(&telemetry.SuccessfulSync{
254255
FinishMilli: syncEnd.UnixMilli(),
255256
DurationMilli: elapsed.Milliseconds(),
256257
})

pkg/icingadb/ha.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"encoding/hex"
88
"github.com/google/uuid"
99
"github.com/icinga/icinga-go-library/backoff"
10-
"github.com/icinga/icinga-go-library/com"
1110
"github.com/icinga/icinga-go-library/database"
1211
"github.com/icinga/icinga-go-library/logging"
1312
"github.com/icinga/icinga-go-library/retry"
@@ -19,6 +18,7 @@ import (
1918
"github.com/pkg/errors"
2019
"go.uber.org/zap"
2120
"sync"
21+
"sync/atomic"
2222
"time"
2323
)
2424

@@ -35,7 +35,7 @@ type haState struct {
3535

3636
// HA provides high availability and indicates whether a Takeover or Handover must be made.
3737
type HA struct {
38-
state com.Atomic[haState]
38+
state atomic.Pointer[haState]
3939
ctx context.Context
4040
cancelCtx context.CancelFunc
4141
instanceId types.Binary
@@ -71,6 +71,8 @@ func NewHA(ctx context.Context, db *database.DB, heartbeat *icingaredis.Heartbea
7171
done: make(chan struct{}),
7272
}
7373

74+
ha.state.Store(&haState{})
75+
7476
go ha.controller()
7577

7678
return ha
@@ -121,7 +123,8 @@ func (h *HA) Takeover() chan string {
121123

122124
// State returns the status quo.
123125
func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool) {
124-
state, _ := h.state.Load()
126+
state := h.state.Load()
127+
125128
return state.responsibleTsMilli, state.responsible, state.otherResponsible
126129
}
127130

@@ -428,9 +431,12 @@ func (h *HA) realize(
428431

429432
h.signalTakeover(takeover)
430433
} else if otherResponsible {
431-
if state, _ := h.state.Load(); !state.otherResponsible {
432-
state.otherResponsible = true
433-
h.state.Store(state)
434+
if state := h.state.Load(); !state.otherResponsible {
435+
// Dereference pointer to create a copy of the value it points to.
436+
// Ensures that any modifications do not directly affect the original data unless explicitly stored back.
437+
newState := *state
438+
newState.otherResponsible = true
439+
h.state.Store(&newState)
434440
}
435441
}
436442

@@ -496,7 +502,7 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
496502
// signalHandover gives up HA.responsible and notifies the HA.Handover chan.
497503
func (h *HA) signalHandover(reason string) {
498504
if h.responsible {
499-
h.state.Store(haState{
505+
h.state.Store(&haState{
500506
responsibleTsMilli: time.Now().UnixMilli(),
501507
responsible: false,
502508
otherResponsible: false,
@@ -514,7 +520,7 @@ func (h *HA) signalHandover(reason string) {
514520
// signalTakeover claims HA.responsible and notifies the HA.Takeover chan.
515521
func (h *HA) signalTakeover(reason string) {
516522
if !h.responsible {
517-
h.state.Store(haState{
523+
h.state.Store(&haState{
518524
responsibleTsMilli: time.Now().UnixMilli(),
519525
responsible: true,
520526
otherResponsible: false,

pkg/icingaredis/telemetry/heartbeat.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package telemetry
33
import (
44
"context"
55
"fmt"
6-
"github.com/icinga/icinga-go-library/com"
76
"github.com/icinga/icinga-go-library/logging"
87
"github.com/icinga/icinga-go-library/periodic"
98
"github.com/icinga/icinga-go-library/redis"
@@ -80,16 +79,17 @@ func GetCurrentDbConnErr() (string, int64) {
8079
// OngoingSyncStartMilli is to be updated by the main() function.
8180
var OngoingSyncStartMilli int64
8281

83-
// LastSuccessfulSync is to be updated by the main() function.
84-
var LastSuccessfulSync com.Atomic[SuccessfulSync]
85-
8682
var boolToStr = map[bool]string{false: "0", true: "1"}
8783
var startTime = time.Now().UnixMilli()
8884

8985
// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2.
86+
// It returns an atomic pointer to SuccessfulSync,
87+
// which contains synchronisation statistics that the caller should update.
9088
func StartHeartbeat(
9189
ctx context.Context, client *redis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat,
92-
) {
90+
) *atomic.Pointer[SuccessfulSync] {
91+
var syncStats atomic.Pointer[SuccessfulSync]
92+
syncStats.Store(&SuccessfulSync{})
9393
goMetrics := NewGoMetrics()
9494

9595
const interval = time.Second
@@ -101,7 +101,7 @@ func StartHeartbeat(
101101
heartbeat := heartbeat.LastReceived()
102102
responsibleTsMilli, responsible, otherResponsible := ha.State()
103103
ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli)
104-
sync, _ := LastSuccessfulSync.Load()
104+
lastSync := syncStats.Load()
105105
dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr()
106106
now := time.Now()
107107

@@ -117,8 +117,8 @@ func StartHeartbeat(
117117
"ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10),
118118
"ha-other-responsible": boolToStr[otherResponsible],
119119
"sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10),
120-
"sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10),
121-
"sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10),
120+
"sync-success-finish": strconv.FormatInt(lastSync.FinishMilli, 10),
121+
"sync-success-duration": strconv.FormatInt(lastSync.DurationMilli, 10),
122122
}
123123

124124
ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval))
@@ -145,6 +145,8 @@ func StartHeartbeat(
145145
silenceUntil = time.Time{}
146146
}
147147
})
148+
149+
return &syncStats
148150
}
149151

150152
type goMetrics struct {

0 commit comments

Comments
 (0)