Skip to content

Commit f13dc86

Browse files
author
Mario Macias
authored
Merge pull request #190 from mariomac/simple-shutdown
simplify elegant shutdown
2 parents 43ca3a2 + 10a217c commit f13dc86

File tree

12 files changed

+19
-68
lines changed

12 files changed

+19
-68
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ require (
8585
gopkg.in/inf.v0 v0.9.1 // indirect
8686
gopkg.in/ini.v1 v1.66.2 // indirect
8787
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
88-
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d // indirect
8988
k8s.io/klog/v2 v2.30.0 // indirect
9089
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
9190
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,8 +1523,6 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
15231523
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
15241524
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
15251525
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
1526-
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d h1:yjDpoTxoYVpCt04OYp8zlZsKtrEOK1O4U7l2aWbn3D8=
1527-
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d/go.mod h1:rbNo0ST5hSazCG4rGfpHrwnwvzP1QX62WbhzD+ghGzs=
15281526
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
15291527
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
15301528
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

pkg/pipeline/encode/encode_prom.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type encodeProm struct {
8282
expiryTime int64
8383
mList *list.List
8484
mCache metricCache
85-
exitChan chan struct{}
85+
exitChan <-chan struct{}
8686
}
8787

8888
var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
@@ -336,9 +336,6 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
336336
}
337337
}
338338

339-
ch := make(chan struct{})
340-
utils.RegisterExitChannel(ch)
341-
342339
log.Debugf("metrics = %v", metrics)
343340
w := &encodeProm{
344341
port: fmt.Sprintf(":%v", portNum),
@@ -347,7 +344,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
347344
expiryTime: expiryTime,
348345
mList: list.New(),
349346
mCache: make(metricCache),
350-
exitChan: ch,
347+
exitChan: utils.ExitChannel(),
351348
}
352349
go startPrometheusInterface(w)
353350
go w.cleanupExpiredEntriesLoop()

pkg/pipeline/extract/aggregate/aggregates.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,10 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi
7575
func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {
7676

7777
ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
78-
done := make(chan struct{})
79-
utils.RegisterExitChannel(done)
8078
go func() {
8179
for {
8280
select {
83-
case <-done:
81+
case <-utils.ExitChannel():
8482
return
8583
case <-ticker.C:
8684
aggregates.cleanupExpiredEntries()

pkg/pipeline/ingest/ingest_collector.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type ingestCollector struct {
5151
in chan map[string]interface{}
5252
batchFlushTime time.Duration
5353
batchMaxLength int
54-
exitChan chan struct{}
54+
exitChan <-chan struct{}
5555
}
5656

5757
// TransportWrapper is an implementation of the goflow2 transport interface
@@ -200,9 +200,6 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
200200
log.Infof("hostname = %s", jsonIngestCollector.HostName)
201201
log.Infof("port = %d", jsonIngestCollector.Port)
202202

203-
ch := make(chan struct{})
204-
pUtils.RegisterExitChannel(ch)
205-
206203
bml := defaultBatchMaxLength
207204
if jsonIngestCollector.BatchMaxLen != 0 {
208205
bml = jsonIngestCollector.BatchMaxLen
@@ -211,7 +208,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
211208
return &ingestCollector{
212209
hostname: jsonIngestCollector.HostName,
213210
port: jsonIngestCollector.Port,
214-
exitChan: ch,
211+
exitChan: pUtils.ExitChannel(),
215212
batchFlushTime: defaultBatchFlushTime,
216213
batchMaxLength: bml,
217214
}, nil

pkg/pipeline/ingest/ingest_file.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
type IngestFile struct {
3232
params config.Ingest
33-
exitChan chan struct{}
33+
exitChan <-chan struct{}
3434
PrevRecords []interface{}
3535
TotalRecords int
3636
}
@@ -104,10 +104,8 @@ func NewIngestFile(params config.StageParam) (Ingester, error) {
104104

105105
log.Debugf("input file name = %s", params.Ingest.File.Filename)
106106

107-
ch := make(chan struct{})
108-
utils.RegisterExitChannel(ch)
109107
return &IngestFile{
110108
params: params.Ingest,
111-
exitChan: ch,
109+
exitChan: utils.ExitChannel(),
112110
}, nil
113111
}

pkg/pipeline/ingest/ingest_grpc.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,8 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) {
3838
}
3939

4040
func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) {
41-
exitCh := make(chan struct{})
42-
utils.RegisterExitChannel(exitCh)
4341
go func() {
44-
<-exitCh
42+
<-utils.ExitChannel()
4543
close(no.flowPackets)
4644
no.collector.Close()
4745
}()

pkg/pipeline/ingest/ingest_kafka.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type ingestKafka struct {
3838
kafkaParams api.IngestKafka
3939
kafkaReader kafkaReadMessage
4040
in chan string
41-
exitChan chan struct{}
41+
exitChan <-chan struct{}
4242
prevRecords []interface{} // copy of most recently sent records; for testing and debugging
4343
}
4444

@@ -153,13 +153,10 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
153153
}
154154
log.Debugf("kafkaReader = %v", kafkaReader)
155155

156-
ch := make(chan struct{})
157-
utils.RegisterExitChannel(ch)
158-
159156
return &ingestKafka{
160157
kafkaParams: jsonIngestKafka,
161158
kafkaReader: kafkaReader,
162-
exitChan: ch,
159+
exitChan: utils.ExitChannel(),
163160
in: make(chan string, channelSizeKafka),
164161
prevRecords: make([]interface{}, 0),
165162
}, nil

pkg/pipeline/ingest/ingest_kafka_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,6 @@ func Test_IngestKafka(t *testing.T) {
124124
require.Equal(t, record1, receivedEntries[0])
125125
require.Equal(t, record2, receivedEntries[1])
126126
require.Equal(t, record3, receivedEntries[2])
127-
128-
// make the ingest thread exit
129-
close(ingestKafka.exitChan)
130-
time.Sleep(time.Second)
131127
}
132128

133129
type fakeKafkaReader struct {
@@ -177,9 +173,4 @@ func Test_KafkaListener(t *testing.T) {
177173

178174
require.Equal(t, 1, len(receivedEntries))
179175
require.Equal(t, string(fakeRecord), receivedEntries[0])
180-
181-
// make the ingest thread exit
182-
close(ingestKafka.exitChan)
183-
time.Sleep(time.Second)
184-
185176
}

pkg/pipeline/utils/exit.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,31 @@ package utils
2020
import (
2121
"os"
2222
"os/signal"
23-
"sync"
2423
"syscall"
2524

2625
log "github.com/sirupsen/logrus"
2726
)
2827

2928
var (
30-
registeredChannels []chan struct{}
31-
chanMutex sync.Mutex
29+
exitChannel chan struct{}
3230
)
3331

34-
func RegisterExitChannel(ch chan struct{}) {
35-
chanMutex.Lock()
36-
defer chanMutex.Unlock()
37-
registeredChannels = append(registeredChannels, ch)
32+
func ExitChannel() <-chan struct{} {
33+
return exitChannel
3834
}
3935

4036
func SetupElegantExit() {
4137
log.Debugf("entering SetupElegantExit")
4238
// handle elegant exit; create support for channels of go routines that want to exit cleanly
43-
registeredChannels = make([]chan struct{}, 0)
39+
exitChannel = make(chan struct{})
4440
exitSigChan := make(chan os.Signal, 1)
4541
log.Debugf("registered exit signal channel")
4642
signal.Notify(exitSigChan, syscall.SIGINT, syscall.SIGTERM)
4743
go func() {
4844
// wait for exit signal; then stop all the other go functions
4945
sig := <-exitSigChan
5046
log.Debugf("received exit signal = %v", sig)
51-
chanMutex.Lock()
52-
defer chanMutex.Unlock()
53-
// exit signal received; stop other go functions
54-
for _, ch := range registeredChannels {
55-
close(ch)
56-
}
47+
close(exitChannel)
5748
log.Debugf("exiting SetupElegantExit go function")
5849
}()
5950
log.Debugf("exiting SetupElegantExit")

0 commit comments

Comments
 (0)