Skip to content

Commit 637837b

Browse files
authored
Address goroutine leak with dynamically determined log destinations (#1848)
1 parent b1a5bba commit 637837b

File tree

12 files changed

+299
-303
lines changed

12 files changed

+299
-303
lines changed

logs/logs.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ type LogBackend interface {
6565
// e.g. a particular log stream in cloudwatchlogs.
6666
type LogDest interface {
6767
Publish(events []LogEvent) error
68+
NotifySourceStopped()
6869
}
6970

7071
// LogAgent is the agent handles pure log pipelines
7172
type LogAgent struct {
7273
Config *config.Config
7374
backends map[string]LogBackend
74-
destNames map[LogDest]string
7575
collections []LogCollection
7676
retentionAlreadyAttempted map[string]bool
7777
}
@@ -80,7 +80,6 @@ func NewLogAgent(c *config.Config) *LogAgent {
8080
return &LogAgent{
8181
Config: c,
8282
backends: make(map[string]LogBackend),
83-
destNames: make(map[LogDest]string),
8483
retentionAlreadyAttempted: make(map[string]bool),
8584
}
8685
}
@@ -136,7 +135,6 @@ func (l *LogAgent) Run(ctx context.Context) {
136135
}
137136
retention = l.checkRetentionAlreadyAttempted(retention, logGroup)
138137
dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src)
139-
l.destNames[dest] = dname
140138
log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention)
141139
go l.runSrcToDest(src, dest)
142140
}
@@ -148,8 +146,10 @@ func (l *LogAgent) Run(ctx context.Context) {
148146
}
149147

150148
func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) {
149+
151150
eventsCh := make(chan LogEvent)
152151
defer src.Stop()
152+
defer dest.NotifySourceStopped()
153153

154154
closed := false
155155
src.SetOutput(func(e LogEvent) {
@@ -168,11 +168,11 @@ func (l *LogAgent) runSrcToDest(src LogSrc, dest LogDest) {
168168
for e := range eventsCh {
169169
err := dest.Publish([]LogEvent{e})
170170
if err == ErrOutputStopped {
171-
log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", l.destNames[dest], src.Group(), src.Stream())
171+
log.Printf("I! [logagent] Log destination %v has stopped, finalizing %v/%v", src.Destination(), src.Group(), src.Stream())
172172
return
173173
}
174174
if err != nil {
175-
log.Printf("E! [logagent] Failed to publish log to %v, error: %v", l.destNames[dest], err)
175+
log.Printf("E! [logagent] Failed to publish log to %v, error: %v", src.Destination(), err)
176176
return
177177
}
178178
}

plugins/inputs/logfile/logfile.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ type LogFile struct {
4141
started bool
4242
}
4343

44-
func NewLogFile() *LogFile {
44+
var _ logs.LogCollection = (*LogFile)(nil)
4545

46+
func NewLogFile() *LogFile {
4647
return &LogFile{
4748
configs: make(map[*FileConfig]map[string]*tailerSrc),
4849
done: make(chan struct{}),
@@ -251,11 +252,6 @@ func (t *LogFile) FindLogSrc() []logs.LogSrc {
251252
}
252253
}
253254

254-
destination := fileconfig.Destination
255-
if destination == "" {
256-
destination = t.Destination
257-
}
258-
259255
src := NewTailerSrc(
260256
groupName, streamName,
261257
t.Destination,

0 commit comments

Comments
 (0)