Skip to content

Commit 1b76368

Browse files
authored
fix loki.source.file and loki.source.journal deadlocks on shutdown (#4585)
* fix loki.source.file and loki.source.journal deadlocks on shutdown * fix
1 parent d422669 commit 1b76368

File tree

2 files changed

+33
-12
lines changed

2 files changed

+33
-12
lines changed

internal/component/loki/source/file/file.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,11 @@ func (c *Component) Run(ctx context.Context) error {
160160
case entry := <-c.handler.Chan():
161161
c.receiversMut.RLock()
162162
for _, receiver := range c.receivers {
163-
receiver.Chan() <- entry
163+
select {
164+
case <-ctx.Done():
165+
return nil
166+
case receiver.Chan() <- entry:
167+
}
164168
}
165169
c.receiversMut.RUnlock()
166170
case <-c.updateReaders:
@@ -178,7 +182,11 @@ func (c *Component) Run(ctx context.Context) error {
178182
select {
179183
case entry := <-c.handler.Chan():
180184
for _, receiver := range c.receivers {
181-
receiver.Chan() <- entry
185+
select {
186+
case <-readCtx.Done():
187+
return
188+
case receiver.Chan() <- entry:
189+
}
182190
}
183191
case <-readCtx.Done():
184192
return

internal/component/loki/source/journal/journal.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,24 @@ func New(o component.Options, args Arguments) (*Component, error) {
8686
// Run starts the component.
8787
func (c *Component) Run(ctx context.Context) error {
8888
defer func() {
89-
// Start draining routine to prevent potential deadlock if target attempts to send during Stop().
90-
cancel := c.startDrainingRoutine()
91-
defer cancel()
89+
level.Info(c.o.Logger).Log("msg", "loki.source.journal component shutting down")
90+
// Start black hole drain routine to prevent deadlock when we call c.t.Stop().
91+
drainCtx, cancelDrain := context.WithCancel(context.Background())
92+
defer cancelDrain()
93+
go func() {
94+
for {
95+
select {
96+
case <-drainCtx.Done():
97+
return
98+
case _ = <-c.handler: // Ignore the remaining entries
99+
}
100+
}
101+
}()
92102

93103
// Stop existing target
94104
c.mut.RLock()
95105
defer c.mut.RUnlock()
96106
if c.t != nil {
97-
level.Info(c.o.Logger).Log("msg", "loki.source.journal component shutting down, stopping journal target")
98107
err := c.t.Stop()
99108
if err != nil {
100109
level.Warn(c.o.Logger).Log("msg", "error stopping journal target", "err", err)
@@ -104,7 +113,7 @@ func (c *Component) Run(ctx context.Context) error {
104113
for {
105114
select {
106115
case <-c.targetsUpdated:
107-
c.reloadTargets()
116+
c.reloadTargets(ctx)
108117
case <-ctx.Done():
109118
return nil
110119
case entry := <-c.handler:
@@ -114,7 +123,11 @@ func (c *Component) Run(ctx context.Context) error {
114123
Entry: entry.Entry,
115124
}
116125
for _, r := range c.args.Receivers {
117-
r.Chan() <- lokiEntry
126+
select {
127+
case <-ctx.Done():
128+
return nil
129+
case r.Chan() <- lokiEntry:
130+
}
118131
}
119132
c.mut.RUnlock()
120133
}
@@ -153,8 +166,8 @@ func (c *Component) CurrentHealth() component.Health {
153166
}
154167
}
155168

156-
func (c *Component) startDrainingRoutine() func() {
157-
readCtx, cancel := context.WithCancel(context.Background())
169+
func (c *Component) startDrainingRoutine(parentCtx context.Context) func() {
170+
readCtx, cancel := context.WithCancel(parentCtx)
158171
c.mut.RLock()
159172
defer c.mut.RUnlock()
160173
receiversCopy := make([]loki.LogsReceiver, len(c.args.Receivers))
@@ -178,9 +191,9 @@ func (c *Component) startDrainingRoutine() func() {
178191
return cancel
179192
}
180193

181-
func (c *Component) reloadTargets() {
194+
func (c *Component) reloadTargets(parentCtx context.Context) {
182195
// Start draining routine to prevent potential deadlock if target attempts to send during Stop().
183-
cancel := c.startDrainingRoutine()
196+
cancel := c.startDrainingRoutine(parentCtx)
184197

185198
// Grab current state
186199
c.mut.RLock()

0 commit comments

Comments
 (0)