Skip to content

Commit 8d7194e

Browse files
committed
loki.source.docker: refactor to use shared scheduler
1 parent f55aea0 commit 8d7194e

File tree

9 files changed

+563
-822
lines changed

9 files changed

+563
-822
lines changed

internal/component/loki/source/docker/docker.go

Lines changed: 71 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"github.com/grafana/alloy/internal/component/common/loki"
2626
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
2727
"github.com/grafana/alloy/internal/component/discovery"
28-
dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget"
28+
"github.com/grafana/alloy/internal/component/loki/source"
2929
"github.com/grafana/alloy/internal/component/loki/source/internal/positions"
3030
"github.com/grafana/alloy/internal/featuregate"
3131
"github.com/grafana/alloy/internal/runtime/logging/level"
@@ -47,9 +47,11 @@ func init() {
4747
var userAgent = useragent.Get()
4848

4949
const (
50+
// See github.com/prometheus/prometheus/discovery/moby
5051
dockerLabel = model.MetaLabelPrefix + "docker_"
5152
dockerLabelContainerPrefix = dockerLabel + "container_"
5253
dockerLabelContainerID = dockerLabelContainerPrefix + "id"
54+
dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream"
5355
)
5456

5557
// Arguments holds values which are used to configure the loki.source.docker
@@ -102,16 +104,15 @@ var (
102104
// Component implements the loki.source.file component.
103105
type Component struct {
104106
opts component.Options
105-
metrics *dt.Metrics
107+
metrics *metrics
106108

107-
mut sync.RWMutex
108-
args Arguments
109-
manager *manager
110-
lastOptions *options
111-
handler loki.LogsReceiver
112-
posFile positions.Positions
113-
rcs []*relabel.Config
114-
defaultLabels model.LabelSet
109+
mut sync.RWMutex
110+
args Arguments
111+
scheduler *source.Scheduler[string]
112+
client client.APIClient
113+
handler loki.LogsReceiver
114+
posFile positions.Positions
115+
rcs []*relabel.Config
115116

116117
receiversMut sync.RWMutex
117118
receivers []loki.LogsReceiver
@@ -134,11 +135,10 @@ func New(o component.Options, args Arguments) (*Component, error) {
134135
}
135136

136137
c := &Component{
137-
opts: o,
138-
metrics: dt.NewMetrics(o.Registerer),
139-
138+
opts: o,
139+
metrics: newMetrics(o.Registerer),
140140
handler: loki.NewLogsReceiver(),
141-
manager: newManager(o.Logger, nil),
141+
scheduler: source.NewScheduler[string](),
142142
receivers: args.ForwardTo,
143143
posFile: positionsFile,
144144
}
@@ -159,11 +159,8 @@ func (c *Component) Run(ctx context.Context) error {
159159
c.mut.Lock()
160160
defer c.mut.Unlock()
161161

162-
// Guard for safety, but it's not possible for Run to be called without
163-
// c.tailer being initialized.
164-
if c.manager != nil {
165-
c.manager.stop()
166-
}
162+
// FIXME: We need to drain here
163+
c.scheduler.Stop()
167164
}()
168165

169166
for {
@@ -175,7 +172,11 @@ func (c *Component) Run(ctx context.Context) error {
175172
receivers := c.receivers
176173
c.receiversMut.RUnlock()
177174
for _, receiver := range receivers {
178-
receiver.Chan() <- entry
175+
select {
176+
case <-ctx.Done():
177+
return nil
178+
case receiver.Chan() <- entry:
179+
}
179180
}
180181
}
181182
}
@@ -198,33 +199,22 @@ func (c *Component) Update(args component.Arguments) error {
198199
c.mut.Lock()
199200
defer c.mut.Unlock()
200201

201-
managerOpts, err := c.getManagerOptions(newArgs)
202+
opts, err := c.getClient(newArgs)
202203
if err != nil {
203204
return err
204205
}
205206

206-
if managerOpts != c.lastOptions {
207-
// Options changed; pass it to the tailer.
208-
// This will never fail because it only fails if the context gets canceled.
209-
_ = c.manager.updateOptions(context.Background(), managerOpts)
210-
c.lastOptions = managerOpts
207+
if opts != c.client {
208+
// Stop all tailers so all will be restarted
209+
c.scheduler.Stop()
211210
}
212211

213212
defaultLabels := make(model.LabelSet, len(newArgs.Labels))
214213
for k, v := range newArgs.Labels {
215214
defaultLabels[model.LabelName(k)] = model.LabelValue(v)
216215
}
217-
c.defaultLabels = defaultLabels
218-
219-
if len(newArgs.RelabelRules) > 0 {
220-
c.rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
221-
} else {
222-
c.rcs = []*relabel.Config{}
223-
}
224216

225-
// Convert input targets into targets to give to tailer.
226-
targets := make([]*dt.Target, 0, len(newArgs.Targets))
227-
seenTargets := make(map[string]struct{}, len(newArgs.Targets))
217+
c.rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
228218

229219
promTargets := make([]promTarget, len(newArgs.Targets))
230220
for i, target := range newArgs.Targets {
@@ -238,53 +228,67 @@ func (c *Component) Update(args component.Arguments) error {
238228
return promTargets[i].fingerPrint < promTargets[j].fingerPrint
239229
})
240230

231+
shouldRun := make(map[string]struct{}, len(newArgs.Targets))
241232
for _, markedTarget := range promTargets {
242233
containerID, ok := markedTarget.labels[dockerLabelContainerID]
243234
if !ok {
244235
level.Debug(c.opts.Logger).Log("msg", "docker target did not include container ID label:"+dockerLabelContainerID)
245236
continue
246237
}
247-
if _, seen := seenTargets[string(containerID)]; seen {
238+
239+
if c.scheduler.Contains(string(containerID)) {
248240
continue
249241
}
250-
seenTargets[string(containerID)] = struct{}{}
251242

252-
tgt, err := dt.NewTarget(
243+
tailer, err := newTailer(
253244
c.metrics,
254245
log.With(c.opts.Logger, "target", fmt.Sprintf("docker/%s", containerID)),
255-
c.manager.opts.handler,
256-
c.manager.opts.positions,
246+
c.handler,
247+
c.posFile,
257248
string(containerID),
258-
markedTarget.labels.Merge(c.defaultLabels),
249+
markedTarget.labels.Merge(defaultLabels),
259250
c.rcs,
260-
c.manager.opts.client,
251+
opts,
252+
5*time.Second,
261253
)
254+
262255
if err != nil {
263256
return err
264257
}
265-
targets = append(targets, tgt)
258+
259+
shouldRun[tailer.Key()] = struct{}{}
260+
c.scheduler.ScheduleSource(tailer)
266261
}
267262

268-
// This will never fail because it only fails if the context gets canceled.
269-
_ = c.manager.syncTargets(context.Background(), targets)
263+
// Avoid mutating the scheduler state during iteration. Collect sources to
264+
// remove and stop them in a separate loop.
265+
var toDelete []source.Source[string]
266+
for source := range c.scheduler.Sources() {
267+
if _, ok := shouldRun[source.Key()]; ok {
268+
continue
269+
}
270+
toDelete = append(toDelete, source)
271+
}
272+
273+
for _, s := range toDelete {
274+
c.scheduler.StopSource(s) // stops without blocking
275+
}
270276

271277
c.args = newArgs
272278
return nil
273279
}
274280

275-
// getTailerOptions gets tailer options from arguments. If args hasn't changed
276-
// from the last call to getTailerOptions, c.lastOptions is returned.
277-
// c.lastOptions must be updated by the caller.
278-
//
279-
// getTailerOptions must only be called when c.mut is held.
280-
func (c *Component) getManagerOptions(args Arguments) (*options, error) {
281-
if reflect.DeepEqual(c.args.Host, args.Host) && c.lastOptions != nil {
282-
return c.lastOptions, nil
281+
// getClient created client from args. If args hasn't changed
282+
// from the last call to getClient, c.client is returned.
283+
// getClient must only be called when c.mut is held.
284+
func (c *Component) getClient(args Arguments) (client.APIClient, error) {
285+
if reflect.DeepEqual(c.args.Host, args.Host) && c.client != nil {
286+
return c.client, nil
283287
}
284288

285289
hostURL, err := url.Parse(args.Host)
286290
if err != nil {
287-
return c.lastOptions, err
291+
return c.client, err
288292
}
289293

290294
opts := []client.Opt{
@@ -298,7 +302,7 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) {
298302
if hostURL.Scheme == "http" || hostURL.Scheme == "https" {
299303
rt, err := config.NewRoundTripperFromConfig(*args.HTTPClientConfig.Convert(), "docker_sd")
300304
if err != nil {
301-
return c.lastOptions, err
305+
return c.client, err
302306
}
303307
opts = append(opts,
304308
client.WithHTTPClient(&http.Client{
@@ -315,41 +319,33 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) {
315319
client, err := client.NewClientWithOpts(opts...)
316320
if err != nil {
317321
level.Error(c.opts.Logger).Log("msg", "could not create new Docker client", "err", err)
318-
return c.lastOptions, fmt.Errorf("failed to build docker client: %w", err)
322+
return c.client, fmt.Errorf("failed to build docker client: %w", err)
319323
}
320324

321-
return &options{
322-
client: client,
323-
handler: loki.NewEntryHandler(c.handler.Chan(), func() {}),
324-
positions: c.posFile,
325-
targetRestartInterval: 5 * time.Second,
326-
}, nil
325+
return client, nil
327326
}
328327

329328
// DebugInfo returns information about the status of tailed targets.
330329
func (c *Component) DebugInfo() interface{} {
330+
c.mut.RLock()
331+
defer c.mut.RUnlock()
332+
331333
var res readerDebugInfo
332-
for _, tgt := range c.manager.targets() {
333-
details := tgt.Details()
334-
res.TargetsInfo = append(res.TargetsInfo, targetInfo{
335-
Labels: tgt.LabelsStr(),
336-
ID: details["id"],
337-
LastError: details["error"],
338-
IsRunning: details["running"],
339-
ReadOffset: details["position"],
340-
})
334+
for s := range c.scheduler.Sources() {
335+
ds := s.(source.DebugSource[string])
336+
res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo())
341337
}
342338
return res
343339
}
344340

345341
type readerDebugInfo struct {
346-
TargetsInfo []targetInfo `alloy:"targets_info,block"`
342+
TargetsInfo []any `alloy:"targets_info,block"`
347343
}
348344

349-
type targetInfo struct {
345+
type sourceInfo struct {
350346
ID string `alloy:"id,attr"`
351347
LastError string `alloy:"last_error,attr"`
352348
Labels string `alloy:"labels,attr"`
353-
IsRunning string `alloy:"is_running,attr"`
349+
IsRunning bool `alloy:"is_running,attr"`
354350
ReadOffset string `alloy:"read_offset,attr"`
355351
}

0 commit comments

Comments
 (0)