Skip to content

Commit d91ed4d

Browse files
author
Mario Macias
committed
fix panic on close and goroutine leak on slow export
1 parent 6feb4ed commit d91ed4d

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

pkg/flow/account.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func NewAccounter(
3131
return &Accounter{
3232
maxEntries: maxEntries,
3333
evictTimeout: evictTimeout,
34-
entries: make(map[RecordKey]*RecordMetrics, maxEntries),
34+
entries: map[RecordKey]*RecordMetrics{},
3535
namer: ifaceNamer,
3636
clock: clock,
3737
monoClock: monoClock,
@@ -51,10 +51,10 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
5151
break
5252
}
5353
evictingEntries := c.entries
54-
c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries)
54+
c.entries = map[RecordKey]*RecordMetrics{}
5555
logrus.WithField("flows", len(evictingEntries)).
5656
Debug("evicting flows from userspace accounter on timeout")
57-
go c.evict(evictingEntries, out)
57+
c.evict(evictingEntries, out)
5858
case record, ok := <-in:
5959
if !ok {
6060
alog.Debug("input channel closed. Evicting entries")
@@ -70,10 +70,10 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
7070
} else {
7171
if len(c.entries) >= c.maxEntries {
7272
evictingEntries := c.entries
73-
c.entries = make(map[RecordKey]*RecordMetrics, c.maxEntries)
73+
c.entries = map[RecordKey]*RecordMetrics{}
7474
logrus.WithField("flows", len(evictingEntries)).
7575
Debug("evicting flows from userspace accounter after reaching cache max length")
76-
go c.evict(evictingEntries, out)
76+
c.evict(evictingEntries, out)
7777
}
7878
c.entries[record.RecordKey] = &record.RecordMetrics
7979
}

pkg/flow/tracer_map.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,16 @@ func (m *MapTracer) evictionSynchronization(ctx context.Context, out chan<- []*R
7373
// make sure we only evict once at a time, even if there are multiple eviction signals
7474
m.evictionCond.L.Lock()
7575
m.evictionCond.Wait()
76-
mtlog.Debug("evictionSynchronization signal received")
77-
m.evictFlows(out)
78-
m.evictionCond.L.Unlock()
79-
80-
// if context is canceled, stops the goroutine after evicting flows
8176
select {
8277
case <-ctx.Done():
78+
mtlog.Debug("context canceled. Stopping goroutine before evicting flows")
8379
return
8480
default:
81+
mtlog.Debug("evictionSynchronization signal received")
82+
m.evictFlows(out)
8583
}
84+
m.evictionCond.L.Unlock()
85+
8686
}
8787
}
8888

0 commit comments

Comments
 (0)