Skip to content

Commit 2796763

Browse files
committed
Extract common function to drain handler
1 parent 8e845f8 commit 2796763

File tree

3 files changed

+51
-24
lines changed

3 files changed

+51
-24
lines changed

internal/component/loki/source/docker/docker.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,14 @@ func (c *Component) Run(ctx context.Context) error {
156156
defer c.posFile.Stop()
157157

158158
defer func() {
159-
c.mut.Lock()
160-
defer c.mut.Unlock()
161-
162-
// FIXME: We need to drain here
163-
c.scheduler.Stop()
159+
c.posFile.Stop()
160+
161+
// Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop().
162+
source.Drain(c.handler, func() {
163+
c.mut.Lock()
164+
defer c.mut.Unlock()
165+
c.scheduler.Stop()
166+
})
164167
}()
165168

166169
for {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package source
2+
3+
import (
4+
"context"
5+
6+
"github.com/grafana/alloy/internal/component/common/loki"
7+
)
8+
9+
// Drain consumes log entries from recv in a background goroutine while f executes.
10+
// This prevents deadlocks that can occur when stopping components that may still be
11+
// sending entries to the receiver channel. The draining goroutine will continue
12+
// consuming entries until f returns, at which point the context is cancelled and
13+
// the goroutine exits.
14+
//
15+
// This is typically used during component shutdown to drain any remaining entries
16+
// from a receiver channel while performing cleanup operations (e.g., stopping
17+
// sources, closing channels).
18+
func Drain(recv loki.LogsReceiver, f func()) {
19+
ctx, cancel := context.WithCancel(context.Background())
20+
defer cancel()
21+
go func() {
22+
for {
23+
select {
24+
case <-ctx.Done():
25+
return
26+
case <-recv.Chan():
27+
// Consume and discard entries to prevent channel blocking
28+
}
29+
}
30+
}()
31+
32+
f()
33+
}

internal/component/loki/source/file/file.go

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -203,27 +203,18 @@ func New(o component.Options, args Arguments) (*Component, error) {
203203
func (c *Component) Run(ctx context.Context) error {
204204
defer func() {
205205
level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping sources and positions file")
206-
207206
// We need to stop posFile first so we don't record entries we are draining
208207
c.posFile.Stop()
209-
// Start black hole drain routine to prevent deadlock when we call c.t.Stop().
210-
drainCtx, cancelDrain := context.WithCancel(context.Background())
211-
defer cancelDrain()
212-
go func() {
213-
for {
214-
select {
215-
case <-drainCtx.Done():
216-
return
217-
case <-c.handler.Chan(): // Ignore the remaining entries
218-
}
219-
}
220-
}()
221-
c.mut.Lock()
222-
c.stopping.Store(true)
223-
c.watcher.Stop()
224-
c.scheduler.Stop()
225-
close(c.handler.Chan())
226-
c.mut.Unlock()
208+
209+
// Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop().
210+
source.Drain(c.handler, func() {
211+
c.mut.Lock()
212+
c.stopping.Store(true)
213+
c.watcher.Stop()
214+
c.scheduler.Stop()
215+
close(c.handler.Chan())
216+
c.mut.Unlock()
217+
})
227218
}()
228219

229220
var wg sync.WaitGroup

0 commit comments

Comments
 (0)