Skip to content

Commit 43ca3a2

Browse files
author
Mario Macias
authored
Merge pull request #188 from mariomac/shutdown
Fix shutdown time
2 parents d36b66c + 6ac9c48 commit 43ca3a2

File tree

11 files changed

+29
-21
lines changed

11 files changed

+29
-21
lines changed

pkg/pipeline/encode/encode_prom.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type encodeProm struct {
8282
expiryTime int64
8383
mList *list.List
8484
mCache metricCache
85-
exitChan chan bool
85+
exitChan chan struct{}
8686
}
8787

8888
var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
@@ -336,7 +336,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
336336
}
337337
}
338338

339-
ch := make(chan bool, 1)
339+
ch := make(chan struct{})
340340
utils.RegisterExitChannel(ch)
341341

342342
log.Debugf("metrics = %v", metrics)

pkg/pipeline/extract/aggregate/aggregates.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi
7575
func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {
7676

7777
ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
78-
done := make(chan bool)
78+
done := make(chan struct{})
7979
utils.RegisterExitChannel(done)
8080
go func() {
8181
for {

pkg/pipeline/ingest/ingest_collector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type ingestCollector struct {
5151
in chan map[string]interface{}
5252
batchFlushTime time.Duration
5353
batchMaxLength int
54-
exitChan chan bool
54+
exitChan chan struct{}
5555
}
5656

5757
// TransportWrapper is an implementation of the goflow2 transport interface
@@ -200,7 +200,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
200200
log.Infof("hostname = %s", jsonIngestCollector.HostName)
201201
log.Infof("port = %d", jsonIngestCollector.Port)
202202

203-
ch := make(chan bool, 1)
203+
ch := make(chan struct{})
204204
pUtils.RegisterExitChannel(ch)
205205

206206
bml := defaultBatchMaxLength

pkg/pipeline/ingest/ingest_collector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestIngest(t *testing.T) {
1919
hostname: "0.0.0.0",
2020
port: collectorPort,
2121
batchFlushTime: 10 * time.Millisecond,
22-
exitChan: make(chan bool),
22+
exitChan: make(chan struct{}),
2323
}
2424
forwarded := make(chan []interface{})
2525
//defer close(forwarded)

pkg/pipeline/ingest/ingest_file.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
type IngestFile struct {
3232
params config.Ingest
33-
exitChan chan bool
33+
exitChan chan struct{}
3434
PrevRecords []interface{}
3535
TotalRecords int
3636
}
@@ -104,7 +104,7 @@ func NewIngestFile(params config.StageParam) (Ingester, error) {
104104

105105
log.Debugf("input file name = %s", params.Ingest.File.Filename)
106106

107-
ch := make(chan bool, 1)
107+
ch := make(chan struct{})
108108
utils.RegisterExitChannel(ch)
109109
return &IngestFile{
110110
params: params.Ingest,

pkg/pipeline/ingest/ingest_grpc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
"github.com/netobserv/flowlogs-pipeline/pkg/config"
7+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
78
"github.com/netobserv/netobserv-agent/pkg/grpc"
89
"github.com/netobserv/netobserv-agent/pkg/pbflow"
910
)
@@ -37,6 +38,13 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) {
3738
}
3839

3940
func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) {
41+
exitCh := make(chan struct{})
42+
utils.RegisterExitChannel(exitCh)
43+
go func() {
44+
<-exitCh
45+
close(no.flowPackets)
46+
no.collector.Close()
47+
}()
4048
for fp := range no.flowPackets {
4149
out <- []interface{}{fp}
4250
}

pkg/pipeline/ingest/ingest_kafka.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type ingestKafka struct {
3838
kafkaParams api.IngestKafka
3939
kafkaReader kafkaReadMessage
4040
in chan string
41-
exitChan chan bool
41+
exitChan chan struct{}
4242
prevRecords []interface{} // copy of most recently sent records; for testing and debugging
4343
}
4444

@@ -153,7 +153,7 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
153153
}
154154
log.Debugf("kafkaReader = %v", kafkaReader)
155155

156-
ch := make(chan bool, 1)
156+
ch := make(chan struct{})
157157
utils.RegisterExitChannel(ch)
158158

159159
return &ingestKafka{

pkg/pipeline/ingest/ingest_kafka_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func Test_IngestKafka(t *testing.T) {
126126
require.Equal(t, record3, receivedEntries[2])
127127

128128
// make the ingest thread exit
129-
ingestKafka.exitChan <- true
129+
close(ingestKafka.exitChan)
130130
time.Sleep(time.Second)
131131
}
132132

@@ -179,7 +179,7 @@ func Test_KafkaListener(t *testing.T) {
179179
require.Equal(t, string(fakeRecord), receivedEntries[0])
180180

181181
// make the ingest thread exit
182-
ingestKafka.exitChan <- true
182+
close(ingestKafka.exitChan)
183183
time.Sleep(time.Second)
184184

185185
}

pkg/pipeline/utils/exit.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import (
2727
)
2828

2929
var (
30-
registeredChannels []chan bool
30+
registeredChannels []chan struct{}
3131
chanMutex sync.Mutex
3232
)
3333

34-
func RegisterExitChannel(ch chan bool) {
34+
func RegisterExitChannel(ch chan struct{}) {
3535
chanMutex.Lock()
3636
defer chanMutex.Unlock()
3737
registeredChannels = append(registeredChannels, ch)
@@ -40,7 +40,7 @@ func RegisterExitChannel(ch chan bool) {
4040
func SetupElegantExit() {
4141
log.Debugf("entering SetupElegantExit")
4242
// handle elegant exit; create support for channels of go routines that want to exit cleanly
43-
registeredChannels = make([]chan bool, 0)
43+
registeredChannels = make([]chan struct{}, 0)
4444
exitSigChan := make(chan os.Signal, 1)
4545
log.Debugf("registered exit signal channel")
4646
signal.Notify(exitSigChan, syscall.SIGINT, syscall.SIGTERM)
@@ -52,7 +52,7 @@ func SetupElegantExit() {
5252
defer chanMutex.Unlock()
5353
// exit signal received; stop other go functions
5454
for _, ch := range registeredChannels {
55-
ch <- true
55+
close(ch)
5656
}
5757
log.Debugf("exiting SetupElegantExit go function")
5858
}()

pkg/pipeline/utils/exit_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111
func Test_SetupElegantExit(t *testing.T) {
1212
SetupElegantExit()
1313
require.Equal(t, 0, len(registeredChannels))
14-
ch1 := make(chan bool, 1)
15-
ch2 := make(chan bool, 1)
16-
ch3 := make(chan bool, 1)
14+
ch1 := make(chan struct{})
15+
ch2 := make(chan struct{})
16+
ch3 := make(chan struct{})
1717
RegisterExitChannel(ch1)
1818
require.Equal(t, 1, len(registeredChannels))
1919
RegisterExitChannel(ch2)

0 commit comments

Comments
 (0)