Skip to content

Commit a5888b7

Browse files
authored
Merge pull request #4 from ChenglongLiu/dev0415
Merge upstream master
2 parents 59c9f2a + 62aa397 commit a5888b7

File tree

8 files changed

+107
-42
lines changed

8 files changed

+107
-42
lines changed

Makefile

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
.PHONY: build
2+
build: tidy ## Build the CLI
3+
go build
4+
5+
build-image: ## Build the Docker image
6+
docker build -t kubernetes-event-exporter .
7+
8+
.PHONY: fmt
9+
fmt: ## Run go fmt against code
10+
gofmt -s -l -w .
11+
12+
.PHONY: vet
13+
vet: ## Run go vet against code
14+
go vet ./...
15+
16+
tidy: ## Run go mod tidy
17+
go mod tidy
18+
19+
test: tidy ## Run tests
20+
go test -cover -mod=mod -v ./...
21+
22+
clean: ## Delete go.sum and clean mod cache
23+
go clean -modcache
24+
rm go.sum
25+
26+
.PHONY: help
27+
help: ## Display this help.
28+
@cat $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m<target>\033[0m\n"} /^[a-zA-Z_0-9-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } '

deploy/00-roles.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ rules:
2525
- apiGroups: ["*"]
2626
resources: ["*"]
2727
verbs: ["get", "watch", "list"]
28+
- apiGroups: ["coordination.k8s.io"]
29+
resources: ["leases"]
30+
verbs: ["*"]

event-exporter.dockerignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Ignore everything
2+
*
3+
4+
# Include pkg directory
5+
!pkg/

main.go

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func main() {
3434

3535
configBytes = []byte(os.ExpandEnv(string(configBytes)))
3636

37-
cfg, err := setup.ParseConfigFromBites(configBytes)
37+
cfg, err := setup.ParseConfigFromBytes(configBytes)
3838
if err != nil {
3939
log.Fatal().Msg(err.Error())
4040
}
@@ -64,6 +64,8 @@ func main() {
6464

6565
cfg.SetDefaults()
6666

67+
log.Info().Msgf("Starting with config: %#v", cfg)
68+
6769
if err := cfg.Validate(); err != nil {
6870
log.Fatal().Err(err).Msg("config validation failed")
6971
}
@@ -91,45 +93,61 @@ func main() {
9193

9294
w := kube.NewEventWatcher(kubecfg, cfg.Namespace, cfg.MaxEventAgeSeconds, metricsStore, onEvent, cfg.OmitLookup, cfg.CacheSize)
9395

94-
ctx, cancel := context.WithCancel(context.Background())
95-
leaderLost := make(chan bool)
96+
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
97+
defer cancel()
98+
9699
if cfg.LeaderElection.Enabled {
100+
var wasLeader bool
101+
log.Info().Msg("leader election enabled")
102+
103+
onStoppedLeading := func(ctx context.Context) {
104+
select {
105+
case <-ctx.Done():
106+
log.Info().Msg("Context was cancelled, stopping leader election loop")
107+
default:
108+
log.Info().Msg("Lost the leader lease, stopping leader election loop")
109+
}
110+
}
111+
97112
l, err := kube.NewLeaderElector(cfg.LeaderElection.LeaderElectionID, kubecfg,
113+
// this method gets called when this instance becomes the leader
98114
func(_ context.Context) {
99-
log.Info().Msg("leader election got")
115+
wasLeader = true
116+
log.Info().Msg("leader election won")
100117
w.Start()
101118
},
119+
// this method gets called when the leader election loop is closed
120+
// either due to context cancellation or due to losing the leader lease
102121
func() {
103-
log.Error().Msg("leader election lost")
104-
leaderLost <- true
122+
onStoppedLeading(ctx)
123+
},
124+
func(identity string) {
125+
log.Info().Msg("new leader observed: " + identity)
105126
},
106127
)
107128
if err != nil {
108129
log.Fatal().Err(err).Msg("create leaderelector failed")
109130
}
110-
go l.Run(ctx)
131+
132+
// Run returns if either the context is canceled or client stopped holding the leader lease
133+
l.Run(ctx)
134+
135+
// We get here either because we lost the leader lease or the context was canceled.
136+
// In either case we want to stop the event watcher and exit.
137+
// However, if we were the leader, we wait leaseDuration seconds before stopping
138+
// so that we don't lose events until the next leader is elected. The new leader
139+
// will only be elected after leaseDuration seconds.
140+
if wasLeader {
141+
log.Info().Msgf("waiting leaseDuration seconds before stopping: %s", kube.GetLeaseDuration())
142+
time.Sleep(kube.GetLeaseDuration())
143+
}
111144
} else {
145+
log.Info().Msg("leader election disabled")
112146
w.Start()
147+
<-ctx.Done()
113148
}
114149

115-
c := make(chan os.Signal, 1)
116-
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
117-
118-
gracefulExit := func() {
119-
defer close(c)
120-
defer close(leaderLost)
121-
cancel()
122-
w.Stop()
123-
engine.Stop()
124-
log.Info().Msg("Exiting")
125-
}
126-
127-
select {
128-
case sig := <-c:
129-
log.Info().Str("signal", sig.String()).Msg("Received signal to exit")
130-
gracefulExit()
131-
case <-leaderLost:
132-
log.Warn().Msg("Leader election lost")
133-
gracefulExit()
134-
}
150+
log.Info().Msg("Received signal to exit. Stopping.")
151+
w.Stop()
152+
engine.Stop()
135153
}

pkg/kube/leaderelection.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ const (
2828
defaultRetryPeriod = 2 * time.Second
2929
)
3030

31+
func GetLeaseDuration() time.Duration {
32+
return defaultLeaseDuration
33+
}
34+
3135
// NewResourceLock creates a new config map resource lock for use in a leader
3236
// election loop
3337
func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock.Interface, error) {
@@ -53,7 +57,7 @@ func newResourceLock(config *rest.Config, leaderElectionID string) (resourcelock
5357
return nil, err
5458
}
5559

56-
return resourcelock.New(resourcelock.ConfigMapsLeasesResourceLock,
60+
return resourcelock.New(resourcelock.LeasesResourceLock,
5761
leaderElectionNamespace,
5862
leaderElectionID,
5963
client.CoreV1(),
@@ -82,7 +86,7 @@ func getInClusterNamespace() (string, error) {
8286
}
8387

8488
// NewLeaderElector return a leader elector object using client-go
85-
func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func()) (*leaderelection.LeaderElector, error) {
89+
func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc func(context.Context), stopFunc func(), newLeaderFunc func(string)) (*leaderelection.LeaderElector, error) {
8690
resourceLock, err := newResourceLock(config, leaderElectionID)
8791
if err != nil {
8892
return &leaderelection.LeaderElector{}, err
@@ -96,6 +100,7 @@ func NewLeaderElector(leaderElectionID string, config *rest.Config, startFunc fu
96100
Callbacks: leaderelection.LeaderCallbacks{
97101
OnStartedLeading: startFunc,
98102
OnStoppedLeading: stopFunc,
103+
OnNewLeader: newLeaderFunc,
99104
},
100105
})
101106
return l, err

pkg/kube/watcher.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kube
22

33
import (
4+
"sync"
45
"time"
56

67
"github.com/resmoio/kubernetes-event-exporter/pkg/metrics"
@@ -19,6 +20,7 @@ var startUpTime = time.Now()
1920
type EventHandler func(event *EnhancedEvent)
2021

2122
type EventWatcher struct {
23+
wg sync.WaitGroup
2224
informer cache.SharedInformer
2325
stopper chan struct{}
2426
objectMetadataCache ObjectMetadataProvider
@@ -135,12 +137,16 @@ func (e *EventWatcher) OnDelete(obj interface{}) {
135137
}
136138

137139
func (e *EventWatcher) Start() {
138-
go e.informer.Run(e.stopper)
140+
e.wg.Add(1)
141+
go func() {
142+
defer e.wg.Done()
143+
e.informer.Run(e.stopper)
144+
}()
139145
}
140146

141147
func (e *EventWatcher) Stop() {
142-
e.stopper <- struct{}{}
143148
close(e.stopper)
149+
e.wg.Wait()
144150
}
145151

146152
func (e *EventWatcher) setStartUpTime(time time.Time) {

pkg/setup/setup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/resmoio/kubernetes-event-exporter/pkg/exporter"
99
)
1010

11-
func ParseConfigFromBites(configBytes []byte) (exporter.Config, error) {
11+
func ParseConfigFromBytes(configBytes []byte) (exporter.Config, error) {
1212
var config exporter.Config
1313
err := yaml.Unmarshal(configBytes, &config)
1414
if err != nil {

pkg/setup/setup_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ import (
77
"github.com/stretchr/testify/assert"
88
)
99

10-
func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) {
10+
func Test_ParseConfigFromBytes_ExampleConfigIsCorrect(t *testing.T) {
1111
configBytes, err := os.ReadFile("../../config.example.yaml")
1212
if err != nil {
1313
assert.NoError(t, err, "cannot read config file: "+err.Error())
1414
return
1515
}
1616

17-
config, err := ParseConfigFromBites(configBytes)
17+
config, err := ParseConfigFromBytes(configBytes)
1818

1919
assert.NoError(t, err)
2020
assert.NotEmpty(t, config.LogLevel)
@@ -26,26 +26,26 @@ func Test_ParseConfigFromBites_ExampleConfigIsCorrect(t *testing.T) {
2626
assert.Equal(t, 10, len(config.Receivers))
2727
}
2828

29-
func Test_ParseConfigFromBites_NoErrors(t *testing.T) {
29+
func Test_ParseConfigFromBytes_NoErrors(t *testing.T) {
3030
configBytes := []byte(`
3131
logLevel: info
3232
logFormat: json
3333
`)
3434

35-
config, err := ParseConfigFromBites(configBytes)
35+
config, err := ParseConfigFromBytes(configBytes)
3636

3737
assert.NoError(t, err)
3838
assert.Equal(t, "info", config.LogLevel)
3939
assert.Equal(t, "json", config.LogFormat)
4040
}
4141

42-
func Test_ParseConfigFromBites_ErrorWhenCurlyBracesNotEscaped(t *testing.T) {
42+
func Test_ParseConfigFromBytes_ErrorWhenCurlyBracesNotEscaped(t *testing.T) {
4343
configBytes := []byte(`
4444
logLevel: {{info}}
4545
logFormat: json
4646
`)
4747

48-
config, err := ParseConfigFromBites(configBytes)
48+
config, err := ParseConfigFromBytes(configBytes)
4949

5050
expectedErrorLine := "> 2 | logLevel: {{info}}"
5151
expectedErrorSuggestion := "Need to wrap values with special characters in quotes"
@@ -56,26 +56,26 @@ logFormat: json
5656
assert.Equal(t, "", config.LogFormat)
5757
}
5858

59-
func Test_ParseConfigFromBites_OkWhenCurlyBracesEscaped(t *testing.T) {
59+
func Test_ParseConfigFromBytes_OkWhenCurlyBracesEscaped(t *testing.T) {
6060
configBytes := []byte(`
6161
logLevel: "{{info}}"
6262
logFormat: json
6363
`)
6464

65-
config, err := ParseConfigFromBites(configBytes)
65+
config, err := ParseConfigFromBytes(configBytes)
6666

6767
assert.Nil(t, err)
6868
assert.Equal(t, "{{info}}", config.LogLevel)
6969
assert.Equal(t, "json", config.LogFormat)
7070
}
7171

72-
func Test_ParseConfigFromBites_ErrorErrorNotWithCurlyBraces(t *testing.T) {
72+
func Test_ParseConfigFromBytes_ErrorErrorNotWithCurlyBraces(t *testing.T) {
7373
configBytes := []byte(`
7474
logLevelNotYAMLErrorError
7575
logFormat: json
7676
`)
7777

78-
config, err := ParseConfigFromBites(configBytes)
78+
config, err := ParseConfigFromBytes(configBytes)
7979

8080
expectedErrorLine := "> 2 | logLevelNotYAMLErrorError"
8181
expectedErrorSuggestion := "Need to wrap values with special characters in quotes"

0 commit comments

Comments
 (0)