Skip to content

Commit c1111d4

Browse files
authored
Remove some loki/v3 dependencies (#4576)
* Remove loki/v3 dependencies * Fix lint error * Revert flagext vendoring
1 parent 00cb3ff commit c1111d4

File tree

65 files changed

+282
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+282
-299
lines changed

internal/component/common/loki/client/batch.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/golang/snappy"
1313
"github.com/prometheus/common/model"
1414

15-
"github.com/grafana/loki/v3/pkg/logproto"
15+
"github.com/grafana/loki/pkg/push"
1616

1717
"github.com/grafana/alloy/internal/component/common/loki"
1818
)
@@ -32,7 +32,7 @@ type SentDataMarkerHandler interface {
3232
// and entries in a single batch request. In case of multi-tenant Promtail, log
3333
// streams for each tenant are stored in a dedicated batch.
3434
type batch struct {
35-
streams map[string]*logproto.Stream
35+
streams map[string]*push.Stream
3636
// totalBytes holds the total amounts of bytes, across the log lines in this batch.
3737
totalBytes int
3838
createdAt time.Time
@@ -45,7 +45,7 @@ type batch struct {
4545

4646
func newBatch(maxStreams int, entries ...loki.Entry) *batch {
4747
b := &batch{
48-
streams: map[string]*logproto.Stream{},
48+
streams: map[string]*push.Stream{},
4949
totalBytes: 0,
5050
createdAt: time.Now(),
5151
maxStreams: maxStreams,
@@ -77,16 +77,16 @@ func (b *batch) add(entry loki.Entry) error {
7777
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
7878
}
7979
// Add the entry as a new stream
80-
b.streams[labels] = &logproto.Stream{
80+
b.streams[labels] = &push.Stream{
8181
Labels: labels,
82-
Entries: []logproto.Entry{entry.Entry},
82+
Entries: []push.Entry{entry.Entry},
8383
}
8484
return nil
8585
}
8686

8787
// addFromWAL adds an entry to the batch, tracking that the data being added comes from segment segmentNum read from the
8888
// WAL.
89-
func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum int) error {
89+
func (b *batch) addFromWAL(lbs model.LabelSet, entry push.Entry, segmentNum int) error {
9090
b.totalBytes += len(entry.Line)
9191

9292
// Append the entry to an already existing stream (if any)
@@ -103,9 +103,9 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry, segmentNum
103103
}
104104

105105
// Add the entry as a new stream
106-
b.streams[labels] = &logproto.Stream{
106+
b.streams[labels] = &push.Stream{
107107
Labels: labels,
108-
Entries: []logproto.Entry{entry},
108+
Entries: []push.Entry{entry},
109109
}
110110
b.countForSegment(segmentNum)
111111

@@ -152,7 +152,7 @@ func (b *batch) sizeBytes() int {
152152

153153
// sizeBytesAfter returns the size of the batch after the input entry
154154
// will be added to the batch itself
155-
func (b *batch) sizeBytesAfter(entry logproto.Entry) int {
155+
func (b *batch) sizeBytesAfter(entry push.Entry) int {
156156
return b.totalBytes + entrySize(entry)
157157
}
158158

@@ -174,9 +174,9 @@ func (b *batch) encode() ([]byte, int, error) {
174174
}
175175

176176
// creates push request and returns it, together with number of entries
177-
func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
178-
req := logproto.PushRequest{
179-
Streams: make([]logproto.Stream, 0, len(b.streams)),
177+
func (b *batch) createPushRequest() (*push.PushRequest, int) {
178+
req := push.PushRequest{
179+
Streams: make([]push.Stream, 0, len(b.streams)),
180180
}
181181

182182
entriesCount := 0
@@ -204,7 +204,7 @@ func (b *batch) reportAsSentData(h SentDataMarkerHandler) {
204204
}
205205
}
206206

207-
func entrySize(entry logproto.Entry) int {
207+
func entrySize(entry push.Entry) int {
208208
structuredMetadataSize := 0
209209
for _, label := range entry.StructuredMetadata {
210210
structuredMetadataSize += label.Size()

internal/component/common/loki/client/batch_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,18 @@ import (
99
"github.com/stretchr/testify/assert"
1010
"github.com/stretchr/testify/require"
1111

12-
"github.com/grafana/loki/v3/pkg/logproto"
13-
1412
"github.com/grafana/alloy/internal/component/common/loki"
13+
"github.com/grafana/loki/pkg/push"
1514
)
1615

1716
func TestBatch_MaxStreams(t *testing.T) {
1817
maxStream := 2
1918

2019
var inputEntries = []loki.Entry{
21-
{Labels: model.LabelSet{"app": "app-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},
22-
{Labels: model.LabelSet{"app": "app-2"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},
23-
{Labels: model.LabelSet{"app": "app-3"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
24-
{Labels: model.LabelSet{"app": "app-4"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
20+
{Labels: model.LabelSet{"app": "app-1"}, Entry: push.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},
21+
{Labels: model.LabelSet{"app": "app-2"}, Entry: push.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},
22+
{Labels: model.LabelSet{"app": "app-3"}, Entry: push.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
23+
{Labels: model.LabelSet{"app": "app-4"}, Entry: push.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
2524
}
2625

2726
b := newBatch(maxStream)
@@ -147,9 +146,9 @@ func TestHashCollisions(t *testing.T) {
147146
const entriesPerLabel = 10
148147

149148
for i := 0; i < entriesPerLabel; i++ {
150-
_ = b.add(loki.Entry{Labels: ls1, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
149+
_ = b.add(loki.Entry{Labels: ls1, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
151150

152-
_ = b.add(loki.Entry{Labels: ls2, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
151+
_ = b.add(loki.Entry{Labels: ls2, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
153152
}
154153

155154
// make sure that colliding labels are stored properly as independent streams

internal/component/common/loki/client/client_test.go

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,19 @@ import (
2121
"github.com/grafana/loki/v3/clients/pkg/promtail/utils"
2222

2323
"github.com/grafana/alloy/internal/component/common/loki"
24-
25-
"github.com/grafana/loki/v3/pkg/logproto"
2624
)
2725

2826
var logEntries = []loki.Entry{
29-
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
30-
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
31-
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
32-
{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},
33-
{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},
34-
{Labels: model.LabelSet{"__tenant_id__": "tenant-2"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
35-
{Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line0123456789"}},
27+
{Labels: model.LabelSet{}, Entry: push.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
28+
{Labels: model.LabelSet{}, Entry: push.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
29+
{Labels: model.LabelSet{}, Entry: push.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
30+
{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: push.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}},
31+
{Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: push.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}},
32+
{Labels: model.LabelSet{"__tenant_id__": "tenant-2"}, Entry: push.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}},
33+
{Labels: model.LabelSet{}, Entry: push.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line0123456789"}},
3634
{
3735
Labels: model.LabelSet{},
38-
Entry: logproto.Entry{
36+
Entry: push.Entry{
3937
Timestamp: time.Unix(7, 0).UTC(),
4038
Line: "line7",
4139
StructuredMetadata: push.LabelsAdapter{
@@ -67,11 +65,11 @@ func TestClient_Handle(t *testing.T) {
6765
expectedReqs: []utils.RemoteWriteRequest{
6866
{
6967
TenantID: "",
70-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
68+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
7169
},
7270
{
7371
TenantID: "",
74-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},
72+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[2].Entry}}}},
7573
},
7674
},
7775
expectedMetrics: `
@@ -108,11 +106,11 @@ func TestClient_Handle(t *testing.T) {
108106
expectedReqs: []utils.RemoteWriteRequest{
109107
{
110108
TenantID: "",
111-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
109+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
112110
},
113111
{
114112
TenantID: "",
115-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}}}},
113+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[1].Entry}}}},
116114
},
117115
},
118116
expectedMetrics: `
@@ -148,15 +146,15 @@ func TestClient_Handle(t *testing.T) {
148146
expectedReqs: []utils.RemoteWriteRequest{
149147
{
150148
TenantID: "",
151-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
149+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
152150
},
153151
{
154152
TenantID: "",
155-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
153+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
156154
},
157155
{
158156
TenantID: "",
159-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
157+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
160158
},
161159
},
162160
expectedMetrics: `
@@ -192,7 +190,7 @@ func TestClient_Handle(t *testing.T) {
192190
expectedReqs: []utils.RemoteWriteRequest{
193191
{
194192
TenantID: "",
195-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
193+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
196194
},
197195
},
198196
expectedMetrics: `
@@ -228,15 +226,15 @@ func TestClient_Handle(t *testing.T) {
228226
expectedReqs: []utils.RemoteWriteRequest{
229227
{
230228
TenantID: "",
231-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
229+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
232230
},
233231
{
234232
TenantID: "",
235-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
233+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
236234
},
237235
{
238236
TenantID: "",
239-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
237+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
240238
},
241239
},
242240
expectedMetrics: `
@@ -273,7 +271,7 @@ func TestClient_Handle(t *testing.T) {
273271
expectedReqs: []utils.RemoteWriteRequest{
274272
{
275273
TenantID: "",
276-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
274+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
277275
},
278276
},
279277
expectedMetrics: `
@@ -310,7 +308,7 @@ func TestClient_Handle(t *testing.T) {
310308
expectedReqs: []utils.RemoteWriteRequest{
311309
{
312310
TenantID: "tenant-default",
313-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
311+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
314312
},
315313
},
316314
expectedMetrics: `
@@ -347,15 +345,15 @@ func TestClient_Handle(t *testing.T) {
347345
expectedReqs: []utils.RemoteWriteRequest{
348346
{
349347
TenantID: "tenant-default",
350-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
348+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
351349
},
352350
{
353351
TenantID: "tenant-1",
354-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[3].Entry, logEntries[4].Entry}}}},
352+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[3].Entry, logEntries[4].Entry}}}},
355353
},
356354
{
357355
TenantID: "tenant-2",
358-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[5].Entry}}}},
356+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[5].Entry}}}},
359357
},
360358
},
361359
expectedMetrics: `
@@ -505,11 +503,11 @@ func TestClient_StopNow(t *testing.T) {
505503
expectedReqs: []utils.RemoteWriteRequest{
506504
{
507505
TenantID: "",
508-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
506+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
509507
},
510508
{
511509
TenantID: "",
512-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}}}},
510+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[2].Entry}}}},
513511
},
514512
},
515513
expectedMetrics: `
@@ -534,7 +532,7 @@ func TestClient_StopNow(t *testing.T) {
534532
expectedReqs: []utils.RemoteWriteRequest{
535533
{
536534
TenantID: "",
537-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}}}},
535+
Request: push.PushRequest{Streams: []push.Stream{{Labels: "{}", Entries: []push.Entry{logEntries[0].Entry}}}},
538536
},
539537
},
540538
expectedMetrics: `

internal/component/common/loki/client/manager_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/go-kit/log"
1313
"github.com/grafana/dskit/backoff"
1414
"github.com/grafana/dskit/flagext"
15-
"github.com/grafana/loki/v3/pkg/logproto"
15+
"github.com/grafana/loki/pkg/push"
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/prometheus/common/model"
1818
"github.com/stretchr/testify/require"
@@ -159,7 +159,7 @@ func TestManager_WALEnabled(t *testing.T) {
159159
for i := 0; i < totalLines; i++ {
160160
writer.Chan() <- loki.Entry{
161161
Labels: testLabels,
162-
Entry: logproto.Entry{
162+
Entry: push.Entry{
163163
Timestamp: time.Now(),
164164
Line: fmt.Sprintf("line%d", i),
165165
},
@@ -214,7 +214,7 @@ func TestManager_WALDisabled(t *testing.T) {
214214
for i := 0; i < totalLines; i++ {
215215
manager.Chan() <- loki.Entry{
216216
Labels: testLabels,
217-
Entry: logproto.Entry{
217+
Entry: push.Entry{
218218
Timestamp: time.Now(),
219219
Line: fmt.Sprintf("line%d", i),
220220
},
@@ -283,7 +283,7 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) {
283283
for i := range totalLines {
284284
manager.Chan() <- loki.Entry{
285285
Labels: testLabels,
286-
Entry: logproto.Entry{
286+
Entry: push.Entry{
287287
Timestamp: time.Now(),
288288
Line: fmt.Sprintf("line%d", i),
289289
},

internal/component/common/loki/client/queue_client_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
"github.com/go-kit/log"
1111
"github.com/grafana/dskit/backoff"
1212
"github.com/grafana/dskit/flagext"
13+
"github.com/grafana/loki/pkg/push"
1314
"github.com/grafana/loki/v3/pkg/ingester/wal"
14-
"github.com/grafana/loki/v3/pkg/logproto"
1515
"github.com/prometheus/client_golang/prometheus"
1616
"github.com/prometheus/common/config"
1717
"github.com/prometheus/common/model"
@@ -156,7 +156,7 @@ func TestQueueClient(t *testing.T) {
156156

157157
_ = qc.AppendEntries(wal.RefEntries{
158158
Ref: chunks.HeadSeriesRef(mod),
159-
Entries: []logproto.Entry{{
159+
Entries: []push.Entry{{
160160
Timestamp: time.Now(),
161161
Line: l,
162162
}},
@@ -302,7 +302,7 @@ func runQueueClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin
302302

303303
_ = qc.AppendEntries(wal.RefEntries{
304304
Ref: chunks.HeadSeriesRef(seriesId),
305-
Entries: []logproto.Entry{{
305+
Entries: []push.Entry{{
306306
Timestamp: time.Now(),
307307
Line: l,
308308
}},
@@ -389,7 +389,7 @@ func runRegularClientBenchCase(b *testing.B, bc testCase) {
389389
// take j module bc.numSeries to evenly distribute those numSeries across all sent entries
390390
"app": model.LabelValue(fmt.Sprintf("series-%d", seriesId)),
391391
},
392-
Entry: logproto.Entry{
392+
Entry: push.Entry{
393393
Timestamp: time.Now(),
394394
Line: l,
395395
},

internal/component/common/loki/positions/positions_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ import (
1414
"github.com/go-kit/log"
1515
"github.com/stretchr/testify/require"
1616
"gopkg.in/yaml.v3"
17-
18-
util_log "github.com/grafana/loki/v3/pkg/util/log"
1917
)
2018

2119
func tempFilename(t *testing.T) string {
@@ -278,7 +276,7 @@ positions:
278276
if err != nil {
279277
t.Fatal(err)
280278
}
281-
p, err := New(util_log.Logger, Config{
279+
p, err := New(log.NewNopLogger(), Config{
282280
SyncPeriod: 20 * time.Second,
283281
PositionsFile: temp,
284282
ReadOnly: true,
@@ -322,7 +320,7 @@ positions:
322320
if err != nil {
323321
t.Fatal(err)
324322
}
325-
p, err := New(util_log.Logger, Config{
323+
p, err := New(log.NewNopLogger(), Config{
326324
SyncPeriod: 20 * time.Second,
327325
PositionsFile: temp,
328326
})

0 commit comments

Comments
 (0)