Skip to content

Commit 5f9aeec

Browse files
authored
chore: cleanup loki imports (#4930)
* vendor wal reader that is only used in tests. * vendor enc and dec wrappers * move to own file and add comment * add comment * move comment * fix * fix
1 parent fa11851 commit 5f9aeec

File tree

4 files changed

+154
-55
lines changed

4 files changed

+154
-55
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package wal
2+
3+
// KEEP IN SYNC WITH:
4+
// https://github.com/grafana/loki/blob/main/pkg/util/encoding/encoding.go
5+
// Local modifications should be minimized.
6+
7+
import (
8+
"encoding/binary"
9+
"hash/crc32"
10+
11+
"github.com/prometheus/prometheus/tsdb/encoding"
12+
)
13+
14+
func encWith(b []byte) (res encbuf) {
15+
res.B = b
16+
return res
17+
}
18+
19+
// encbuf extends encoding.Encbuf with support for multi byte encoding
20+
type encbuf struct {
21+
encoding.Encbuf
22+
}
23+
24+
func (e *encbuf) PutString(s string) { e.B = append(e.B, s...) }
25+
26+
func (e *encbuf) Skip(i int) {
27+
e.B = e.B[:len(e.B)+i]
28+
}
29+
30+
func decWith(b []byte) (res decbuf) {
31+
res.B = b
32+
return res
33+
}
34+
35+
// decbuf extends encoding.Decbuf with support for multi byte decoding
36+
type decbuf struct {
37+
encoding.Decbuf
38+
}
39+
40+
func (d *decbuf) Bytes(n int) []byte {
41+
if d.E != nil {
42+
return nil
43+
}
44+
if len(d.B) < n {
45+
d.E = encoding.ErrInvalidSize
46+
return nil
47+
}
48+
x := d.B[:n]
49+
d.B = d.B[n:]
50+
return x
51+
}
52+
53+
func (d *decbuf) CheckCrc(castagnoliTable *crc32.Table) error {
54+
if d.E != nil {
55+
return d.E
56+
}
57+
if len(d.B) < 4 {
58+
d.E = encoding.ErrInvalidSize
59+
return d.E
60+
}
61+
62+
offset := len(d.B) - 4
63+
expCRC := binary.BigEndian.Uint32(d.B[offset:])
64+
d.B = d.B[:offset]
65+
66+
if d.Crc32(castagnoliTable) != expCRC {
67+
d.E = encoding.ErrInvalidChecksum
68+
return d.E
69+
}
70+
return nil
71+
}

internal/component/common/loki/wal/encoding.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/grafana/loki/pkg/push"
89
"github.com/prometheus/prometheus/tsdb/chunks"
910
"github.com/prometheus/prometheus/tsdb/record"
10-
11-
"github.com/grafana/loki/pkg/push"
12-
"github.com/grafana/loki/v3/pkg/util/encoding"
1311
)
1412

1513
// RecordType represents the type of the WAL/Checkpoint record.
@@ -82,7 +80,7 @@ type RefEntries struct {
8280
}
8381

8482
func (r *Record) EncodeSeries(b []byte) []byte {
85-
buf := encoding.EncWith(b)
83+
buf := encWith(b)
8684
buf.PutByte(byte(WALRecordSeries))
8785
buf.PutUvarintStr(r.UserID)
8886

@@ -96,7 +94,7 @@ func (r *Record) EncodeSeries(b []byte) []byte {
9694
}
9795

9896
func (r *Record) EncodeEntries(version RecordType, b []byte) []byte {
99-
buf := encoding.EncWith(b)
97+
buf := encWith(b)
10098
buf.PutByte(byte(version))
10199
buf.PutUvarintStr(r.UserID)
102100

@@ -152,7 +150,7 @@ func DecodeEntries(b []byte, version RecordType, rec *Record) error {
152150
return nil
153151
}
154152

155-
dec := encoding.DecWith(b)
153+
dec := decWith(b)
156154
baseTime := dec.Be64int64()
157155

158156
for len(dec.B) > 0 && dec.Err() == nil {
@@ -220,7 +218,7 @@ func DecodeRecord(b []byte, walRec *Record) (err error) {
220218
dec record.Decoder
221219
rSeries []record.RefSeries
222220

223-
decbuf = encoding.DecWith(b)
221+
decbuf = decWith(b)
224222
t = RecordType(decbuf.Byte())
225223
)
226224

Lines changed: 33 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,45 @@
11
package wal
22

3-
import (
4-
"fmt"
5-
6-
"github.com/prometheus/common/model"
3+
// KEEP IN SYNC WITH:
4+
// https://github.com/grafana/loki/blob/main/pkg/util/wal/reader.go
5+
// Local modifications should be minimized.
76

8-
"github.com/grafana/alloy/internal/loki/util"
9-
walUtils "github.com/grafana/loki/v3/pkg/util/wal"
7+
import (
8+
"errors"
9+
"io"
1010

11-
"github.com/grafana/alloy/internal/component/common/loki"
11+
"github.com/prometheus/prometheus/tsdb/wlog"
1212
)
1313

14-
// ReadWAL will read all entries in the WAL located under dir. Mainly used for testing
15-
func ReadWAL(dir string) ([]loki.Entry, error) {
16-
reader, closeFn, err := walUtils.NewWalReader(dir, -1)
17-
if err != nil {
18-
return nil, err
19-
}
20-
defer func() { closeFn.Close() }()
21-
22-
seenSeries := make(map[uint64]model.LabelSet)
23-
seenEntries := []loki.Entry{}
24-
25-
for reader.Next() {
26-
var walRec = Record{}
27-
bytes := reader.Record()
28-
err = DecodeRecord(bytes, &walRec)
14+
func newWalReader(dir string, startSegment int) (*wlog.Reader, io.Closer, error) {
15+
var (
16+
segmentReader io.ReadCloser
17+
err error
18+
)
19+
if startSegment < 0 {
20+
segmentReader, err = wlog.NewSegmentsReader(dir)
2921
if err != nil {
30-
return nil, fmt.Errorf("error decoding wal record: %w", err)
22+
return nil, nil, err
3123
}
32-
33-
// first read series
34-
for _, series := range walRec.Series {
35-
if _, ok := seenSeries[uint64(series.Ref)]; !ok {
36-
seenSeries[uint64(series.Ref)] = util.MapToModelLabelSet(series.Labels.Map())
37-
}
24+
} else {
25+
first, last, err := wlog.Segments(dir)
26+
if err != nil {
27+
return nil, nil, err
3828
}
39-
40-
for _, entries := range walRec.RefEntries {
41-
for _, entry := range entries.Entries {
42-
labels, ok := seenSeries[uint64(entries.Ref)]
43-
if !ok {
44-
return nil, fmt.Errorf("found entry without matching series")
45-
}
46-
seenEntries = append(seenEntries, loki.Entry{
47-
Labels: labels,
48-
Entry: entry,
49-
})
50-
}
29+
if startSegment > last {
30+
return nil, nil, errors.New("start segment is beyond the last WAL segment")
31+
}
32+
if first > startSegment {
33+
startSegment = first
34+
}
35+
segmentReader, err = wlog.NewSegmentsRangeReader(wlog.SegmentRange{
36+
Dir: dir,
37+
First: startSegment,
38+
Last: -1, // Till the end.
39+
})
40+
if err != nil {
41+
return nil, nil, err
5142
}
52-
53-
// reset entry
54-
walRec.Series = walRec.Series[:]
55-
walRec.RefEntries = walRec.RefEntries[:]
5643
}
57-
58-
return seenEntries, nil
44+
return wlog.NewReader(segmentReader), segmentReader, nil
5945
}

internal/component/common/loki/wal/writer_test.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/stretchr/testify/require"
1616

1717
"github.com/grafana/alloy/internal/component/common/loki"
18+
"github.com/grafana/alloy/internal/loki/util"
1819
"github.com/grafana/alloy/internal/runtime/logging/level"
1920
)
2021

@@ -251,7 +252,7 @@ func eventuallyReadWAL(t *testing.T, expectedEntries int, dir string) []loki.Ent
251252
var readEntries []loki.Entry
252253
require.Eventually(t, func() bool {
253254
// read WAL and assert over read entries
254-
readEntries, err = ReadWAL(dir)
255+
readEntries, err = readWAL(dir)
255256
if err != nil {
256257
return false
257258
}
@@ -260,6 +261,49 @@ func eventuallyReadWAL(t *testing.T, expectedEntries int, dir string) []loki.Ent
260261
return readEntries
261262
}
262263

264+
// readWAL will read all entries in the WAL located under dir.
265+
func readWAL(dir string) ([]loki.Entry, error) {
266+
reader, closeFn, err := newWalReader(dir, -1)
267+
if err != nil {
268+
return nil, err
269+
}
270+
defer func() { closeFn.Close() }()
271+
272+
seenSeries := make(map[uint64]model.LabelSet)
273+
seenEntries := []loki.Entry{}
274+
275+
for reader.Next() {
276+
var walRec = Record{}
277+
bytes := reader.Record()
278+
err = DecodeRecord(bytes, &walRec)
279+
if err != nil {
280+
return nil, fmt.Errorf("error decoding wal record: %w", err)
281+
}
282+
283+
// first read series
284+
for _, series := range walRec.Series {
285+
if _, ok := seenSeries[uint64(series.Ref)]; !ok {
286+
seenSeries[uint64(series.Ref)] = util.MapToModelLabelSet(series.Labels.Map())
287+
}
288+
}
289+
290+
for _, entries := range walRec.RefEntries {
291+
for _, entry := range entries.Entries {
292+
labels, ok := seenSeries[uint64(entries.Ref)]
293+
if !ok {
294+
return nil, fmt.Errorf("found entry without matching series")
295+
}
296+
seenEntries = append(seenEntries, loki.Entry{
297+
Labels: labels,
298+
Entry: entry,
299+
})
300+
}
301+
}
302+
}
303+
304+
return seenEntries, nil
305+
}
306+
263307
func BenchmarkWriter_WriteEntries(b *testing.B) {
264308
type testCase struct {
265309
lines int

0 commit comments

Comments
 (0)