Skip to content

Commit f55aea0

Browse files
committed
Add ext interface and move info creation into sources
1 parent 05bfe8a commit f55aea0

File tree

5 files changed

+29
-22
lines changed

5 files changed

+29
-22
lines changed

internal/component/loki/source/file/decompresser.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,14 @@ func (d *decompressor) Key() positions.Entry {
339339
return d.key
340340
}
341341

342-
func (d *decompressor) IsRunning() bool {
343-
return d.running.Load()
342+
func (d *decompressor) DebugInfo() any {
343+
offset, _ := d.positions.Get(d.key.Path, d.key.Labels)
344+
return sourceDebugInfo{
345+
Path: d.key.Path,
346+
Labels: d.key.Labels,
347+
IsRunning: d.running.Load(),
348+
ReadOffset: offset,
349+
}
344350
}
345351

346352
// cleanupMetrics removes all metrics exported by this reader

internal/component/loki/source/file/file.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -386,10 +386,10 @@ func (c *Component) scheduleSources() {
386386
}
387387

388388
type debugInfo struct {
389-
TargetsInfo []targetInfo `alloy:"targets_info,block"`
389+
TargetsInfo []any `alloy:"targets_info,block"`
390390
}
391391

392-
type targetInfo struct {
392+
type sourceDebugInfo struct {
393393
Path string `alloy:"path,attr"`
394394
Labels string `alloy:"labels,attr"`
395395
IsRunning bool `alloy:"is_running,attr"`
@@ -404,13 +404,8 @@ func (c *Component) DebugInfo() any {
404404
defer c.mut.RUnlock()
405405
var res debugInfo
406406
for s := range c.scheduler.Sources() {
407-
offset, _ := c.posFile.Get(s.Key().Path, s.Key().Labels)
408-
res.TargetsInfo = append(res.TargetsInfo, targetInfo{
409-
Path: s.Key().Path,
410-
Labels: s.Key().Labels,
411-
IsRunning: s.IsRunning(),
412-
ReadOffset: offset,
413-
})
407+
ds := s.(source.DebugSource[positions.Entry])
408+
res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo())
414409
}
415410
return res
416411
}

internal/component/loki/source/file/tailer.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,10 +416,6 @@ func (t *tailer) Key() positions.Entry {
416416
return t.key
417417
}
418418

419-
func (t *tailer) IsRunning() bool {
420-
return t.running.Load()
421-
}
422-
423419
// cleanupMetrics removes all metrics exported by this tailer
424420
func (t *tailer) cleanupMetrics() {
425421
// When we stop tailing the file, also un-export metrics related to the file
@@ -428,3 +424,13 @@ func (t *tailer) cleanupMetrics() {
428424
t.metrics.readBytes.DeleteLabelValues(t.key.Path)
429425
t.metrics.totalBytes.DeleteLabelValues(t.key.Path)
430426
}
427+
428+
func (t *tailer) DebugInfo() any {
429+
offset, _ := t.positions.Get(t.key.Path, t.key.Labels)
430+
return sourceDebugInfo{
431+
Path: t.key.Path,
432+
Labels: t.key.Labels,
433+
IsRunning: t.running.Load(),
434+
ReadOffset: offset,
435+
}
436+
}

internal/component/loki/source/file/tailer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func TestTailerCorruptedPositions(t *testing.T) {
355355
}()
356356

357357
require.EventuallyWithT(t, func(c *assert.CollectT) {
358-
assert.True(c, tailer.IsRunning())
358+
assert.True(c, tailer.running.Load())
359359
assert.Equal(c, "16", positionsFile.GetString(logFile.Name(), labels.String()))
360360
}, time.Second, 50*time.Millisecond)
361361

internal/component/loki/source/scheduler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,12 @@ type Source[K comparable] interface {
9898
Run(ctx context.Context)
9999
// Key is used to uniquely identify the source.
100100
Key() K
101-
// IsRunning reports if source is still running.
102-
IsRunning() bool
101+
}
102+
103+
// DebugSource is an optional interface with debug information.
104+
type DebugSource[k comparable] interface {
105+
Source[k]
106+
DebugInfo() any
103107
}
104108

105109
func NewSourceWithRetry[K comparable](source Source[K], config backoff.Config) *SourceWithRetry[K] {
@@ -126,10 +130,6 @@ func (s *SourceWithRetry[K]) Key() K {
126130
return s.source.Key()
127131
}
128132

129-
func (s *SourceWithRetry[K]) IsRunning() bool {
130-
return s.source.IsRunning()
131-
}
132-
133133
// scheduledSource is a source that is already scheduled.
134134
// to stop the scheduledSource cancel needs to be called.
135135
type scheduledSource[K comparable] struct {

0 commit comments

Comments
 (0)