Skip to content

Commit 08c69ec

Browse files
authored
loki.source.journal: fix deadlock (#4571)
* loki.source.journal: fix deadlock * changelog * remove redundant field * rephrase changelog
1 parent b65fbbd commit 08c69ec

File tree

3 files changed

+99
-47
lines changed

3 files changed

+99
-47
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ Main (unreleased)
5050

5151
- Fix `prometheus.exporter.cloudwatch` to not always emit debug logs but respect debug property. (@kalleep)
5252

53+
- Fix potential deadlock in `loki.source.journal` when stopping or reloading the component. (@thampiotr)
54+
5355
v1.11.0
5456
-----------------
5557

internal/component/loki/source/journal/journal.go

Lines changed: 91 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,15 @@ var _ component.Component = (*Component)(nil)
3838

3939
// Component represents reading from a journal
4040
type Component struct {
41-
mut sync.RWMutex
42-
t *target.JournalTarget
43-
metrics *target.Metrics
44-
o component.Options
45-
handler chan loki.Entry
46-
positions positions.Positions
47-
receivers []loki.LogsReceiver
48-
argsUpdated chan struct{}
49-
args Arguments
50-
healthErr error
41+
mut sync.RWMutex
42+
t *target.JournalTarget
43+
metrics *target.Metrics
44+
o component.Options
45+
handler chan loki.Entry
46+
positions positions.Positions
47+
targetsUpdated chan struct{}
48+
args Arguments
49+
healthErr error
5150
}
5251

5352
// New creates a new component.
@@ -73,13 +72,12 @@ func New(o component.Options, args Arguments) (*Component, error) {
7372
}
7473

7574
c := &Component{
76-
metrics: target.NewMetrics(o.Registerer),
77-
o: o,
78-
handler: make(chan loki.Entry),
79-
positions: positionsFile,
80-
receivers: args.Receivers,
81-
argsUpdated: make(chan struct{}, 1),
82-
args: args,
75+
metrics: target.NewMetrics(o.Registerer),
76+
o: o,
77+
handler: make(chan loki.Entry),
78+
positions: positionsFile,
79+
targetsUpdated: make(chan struct{}, 1),
80+
args: args,
8381
}
8482
err = c.Update(args)
8583
return c, err
@@ -88,18 +86,25 @@ func New(o component.Options, args Arguments) (*Component, error) {
8886
// Run starts the component.
8987
func (c *Component) Run(ctx context.Context) error {
9088
defer func() {
89+
// Start draining routine to prevent potential deadlock if target attempts to send during Stop().
90+
cancel := c.startDrainingRoutine()
91+
defer cancel()
92+
93+
// Stop existing target
9194
c.mut.RLock()
95+
defer c.mut.RUnlock()
9296
if c.t != nil {
97+
level.Info(c.o.Logger).Log("msg", "loki.source.journal component shutting down, stopping journal target")
9398
err := c.t.Stop()
9499
if err != nil {
95100
level.Warn(c.o.Logger).Log("msg", "error stopping journal target", "err", err)
96101
}
97102
}
98-
c.mut.RUnlock()
99-
100103
}()
101104
for {
102105
select {
106+
case <-c.targetsUpdated:
107+
c.reloadTargets()
103108
case <-ctx.Done():
104109
return nil
105110
case entry := <-c.handler:
@@ -108,31 +113,10 @@ func (c *Component) Run(ctx context.Context) error {
108113
Labels: entry.Labels,
109114
Entry: entry.Entry,
110115
}
111-
for _, r := range c.receivers {
116+
for _, r := range c.args.Receivers {
112117
r.Chan() <- lokiEntry
113118
}
114119
c.mut.RUnlock()
115-
case <-c.argsUpdated:
116-
c.mut.Lock()
117-
if c.t != nil {
118-
err := c.t.Stop()
119-
if err != nil {
120-
level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err)
121-
}
122-
c.t = nil
123-
}
124-
rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules)
125-
entryHandler := loki.NewEntryHandler(c.handler, func() {})
126-
127-
newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args))
128-
if err != nil {
129-
level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path)
130-
c.healthErr = fmt.Errorf("error creating journal target: %w", err)
131-
} else {
132-
c.t = newTarget
133-
c.healthErr = nil
134-
}
135-
c.mut.Unlock()
136120
}
137121
}
138122
}
@@ -144,7 +128,7 @@ func (c *Component) Update(args component.Arguments) error {
144128
defer c.mut.Unlock()
145129
c.args = newArgs
146130
select {
147-
case c.argsUpdated <- struct{}{}:
131+
case c.targetsUpdated <- struct{}{}:
148132
default: // Update notification already sent
149133
}
150134
return nil
@@ -169,6 +153,71 @@ func (c *Component) CurrentHealth() component.Health {
169153
}
170154
}
171155

156+
func (c *Component) startDrainingRoutine() func() {
157+
readCtx, cancel := context.WithCancel(context.Background())
158+
c.mut.RLock()
159+
defer c.mut.RUnlock()
160+
receiversCopy := make([]loki.LogsReceiver, len(c.args.Receivers))
161+
copy(receiversCopy, c.args.Receivers)
162+
go func() {
163+
for {
164+
select {
165+
case <-readCtx.Done():
166+
return
167+
case entry := <-c.handler:
168+
lokiEntry := loki.Entry{
169+
Labels: entry.Labels,
170+
Entry: entry.Entry,
171+
}
172+
for _, r := range receiversCopy {
173+
r.Chan() <- lokiEntry
174+
}
175+
}
176+
}
177+
}()
178+
return cancel
179+
}
180+
181+
func (c *Component) reloadTargets() {
182+
// Start draining routine to prevent potential deadlock if target attempts to send during Stop().
183+
cancel := c.startDrainingRoutine()
184+
185+
// Grab current state
186+
c.mut.RLock()
187+
var targetToStop *target.JournalTarget
188+
if c.t != nil {
189+
targetToStop = c.t
190+
}
191+
rcs := alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules)
192+
c.mut.RUnlock()
193+
194+
// Stop existing target
195+
if targetToStop != nil {
196+
err := targetToStop.Stop()
197+
if err != nil {
198+
level.Error(c.o.Logger).Log("msg", "error stopping journal target", "err", err)
199+
}
200+
}
201+
202+
// Stop draining routine
203+
cancel()
204+
205+
// Create new target
206+
c.mut.Lock()
207+
defer c.mut.Unlock()
208+
c.t = nil
209+
entryHandler := loki.NewEntryHandler(c.handler, func() {})
210+
211+
newTarget, err := target.NewJournalTarget(c.metrics, c.o.Logger, entryHandler, c.positions, c.o.ID, rcs, convertArgs(c.o.ID, c.args))
212+
if err != nil {
213+
level.Error(c.o.Logger).Log("msg", "error creating journal target", "err", err, "path", c.args.Path)
214+
c.healthErr = fmt.Errorf("error creating journal target: %w", err)
215+
} else {
216+
c.t = newTarget
217+
c.healthErr = nil
218+
}
219+
}
220+
172221
func convertArgs(job string, a Arguments) *scrapeconfig.JournalTargetConfig {
173222
labels := model.LabelSet{
174223
model.LabelName("job"): model.LabelValue(job),

internal/component/loki/source/syslog/syslog.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,9 @@ func (c *Component) startDrainingRoutine() func() {
137137
case <-readCtx.Done():
138138
return
139139
case entry := <-c.handler.Chan():
140-
c.mut.RLock()
141140
for _, receiver := range fanoutCopy {
142141
receiver.Chan() <- entry
143142
}
144-
c.mut.RUnlock()
145143
}
146144
}
147145
}()
@@ -152,20 +150,23 @@ func (c *Component) reloadTargets() {
152150
// Start draining routine to prevent potential deadlock if targets attempt to send during Stop().
153151
cancel := c.startDrainingRoutine()
154152

155-
// Stop all targets
153+
// Grab current state
156154
c.mut.RLock()
157155
var rcs []*relabel.Config
158156
if len(c.args.RelabelRules) > 0 {
159157
rcs = alloy_relabel.ComponentToPromRelabelConfigs(c.args.RelabelRules)
160158
}
159+
targetsToStop := make([]*st.SyslogTarget, len(c.targets))
160+
copy(targetsToStop, c.targets)
161+
c.mut.RUnlock()
161162

162-
for _, l := range c.targets {
163+
// Stop existing targets
164+
for _, l := range targetsToStop {
163165
err := l.Stop()
164166
if err != nil {
165167
level.Error(c.opts.Logger).Log("msg", "error while stopping syslog listener", "err", err)
166168
}
167169
}
168-
c.mut.RUnlock()
169170

170171
// Stop draining routine
171172
cancel()

0 commit comments

Comments
 (0)