Skip to content

Commit 72b9c94

Browse files
authored
chore: refactor loki client code (#4840)
* move metrics to it's own file * remove unused function StopNow * Remove unused Name() function * move MaxStreams to config * move wal writer into client manager * refactor batch to now have two different methods.
1 parent 4b36248 commit 72b9c94

File tree

16 files changed

+229
-528
lines changed

16 files changed

+229
-528
lines changed

internal/component/common/loki/client/batch.go

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,44 +54,21 @@ func newBatch(maxStreams int, entries ...loki.Entry) *batch {
5454
// Add entries to the batch
5555
for _, entry := range entries {
5656
//never error here
57-
_ = b.add(entry)
57+
_ = b.add(entry, 0)
5858
}
5959

6060
return b
6161
}
6262

63-
// add an entry to the batch
64-
func (b *batch) add(entry loki.Entry) error {
63+
// add an entry to the batch. segmentNum is used to associate batch with a segment from WAL.
64+
// If entry is added from non backed WAL client it can be anything and is unused.
65+
func (b *batch) add(entry loki.Entry, segmentNum int) error {
6566
b.totalBytes += entrySize(entry.Entry)
6667

6768
// Append the entry to an already existing stream (if any)
6869
labels := labelsMapToString(entry.Labels)
6970
if stream, ok := b.streams[labels]; ok {
7071
stream.Entries = append(stream.Entries, entry.Entry)
71-
return nil
72-
}
73-
74-
streams := len(b.streams)
75-
if b.maxStreams > 0 && streams >= b.maxStreams {
76-
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
77-
}
78-
// Add the entry as a new stream
79-
b.streams[labels] = &push.Stream{
80-
Labels: labels,
81-
Entries: []push.Entry{entry.Entry},
82-
}
83-
return nil
84-
}
85-
86-
// addFromWAL adds an entry to the batch, tracking that the data being added comes from segment segmentNum read from the
87-
// WAL.
88-
func (b *batch) addFromWAL(lbs model.LabelSet, entry push.Entry, segmentNum int) error {
89-
b.totalBytes += len(entry.Line)
90-
91-
// Append the entry to an already existing stream (if any)
92-
labels := labelsMapToString(lbs)
93-
if stream, ok := b.streams[labels]; ok {
94-
stream.Entries = append(stream.Entries, entry)
9572
b.countForSegment(segmentNum)
9673
return nil
9774
}
@@ -100,14 +77,12 @@ func (b *batch) addFromWAL(lbs model.LabelSet, entry push.Entry, segmentNum int)
10077
if b.maxStreams > 0 && streams >= b.maxStreams {
10178
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
10279
}
103-
10480
// Add the entry as a new stream
10581
b.streams[labels] = &push.Stream{
10682
Labels: labels,
107-
Entries: []push.Entry{entry},
83+
Entries: []push.Entry{entry.Entry},
10884
}
10985
b.countForSegment(segmentNum)
110-
11186
return nil
11287
}
11388

internal/component/common/loki/client/batch_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestBatch_MaxStreams(t *testing.T) {
2727

2828
errCount := 0
2929
for _, entry := range inputEntries {
30-
err := b.add(entry)
30+
err := b.add(entry, 0)
3131
if err != nil {
3232
errCount++
3333
assert.ErrorIs(t, err, errMaxStreamsLimitExceeded)
@@ -78,7 +78,7 @@ func TestBatch_add(t *testing.T) {
7878
b := newBatch(0)
7979

8080
for _, entry := range testData.inputEntries {
81-
err := b.add(entry)
81+
err := b.add(entry, 0)
8282
assert.NoError(t, err)
8383
}
8484

@@ -146,9 +146,9 @@ func TestHashCollisions(t *testing.T) {
146146
const entriesPerLabel = 10
147147

148148
for i := 0; i < entriesPerLabel; i++ {
149-
_ = b.add(loki.Entry{Labels: ls1, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
149+
_ = b.add(loki.Entry{Labels: ls1, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}, 0)
150150

151-
_ = b.add(loki.Entry{Labels: ls2, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
151+
_ = b.add(loki.Entry{Labels: ls2, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}, 0)
152152
}
153153

154154
// make sure that colliding labels are stored properly as independent streams

internal/component/common/loki/client/client.go

Lines changed: 17 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414

1515
"github.com/go-kit/log"
1616
"github.com/grafana/dskit/backoff"
17-
"github.com/prometheus/client_golang/prometheus"
1817
"github.com/prometheus/common/config"
1918
"github.com/prometheus/common/model"
2019

@@ -23,7 +22,6 @@ import (
2322
"github.com/grafana/alloy/internal/component/common/loki"
2423
"github.com/grafana/alloy/internal/runtime/logging/level"
2524
"github.com/grafana/alloy/internal/useragent"
26-
"github.com/grafana/alloy/internal/util"
2725
)
2826

2927
const (
@@ -33,111 +31,18 @@ const (
3331
// Label reserved to override the tenant ID while processing
3432
// pipeline stages
3533
ReservedLabelTenantID = "__tenant_id__"
36-
37-
LatencyLabel = "filename"
38-
HostLabel = "host"
39-
ClientLabel = "client"
40-
TenantLabel = "tenant"
41-
ReasonLabel = "reason"
42-
43-
ReasonGeneric = "ingester_error"
44-
ReasonRateLimited = "rate_limited"
45-
ReasonStreamLimited = "stream_limited"
46-
ReasonLineTooLong = "line_too_long"
4734
)
4835

49-
var Reasons = []string{ReasonGeneric, ReasonRateLimited, ReasonStreamLimited, ReasonLineTooLong}
50-
5136
var userAgent = useragent.Get()
5237

53-
type Metrics struct {
54-
encodedBytes *prometheus.CounterVec
55-
sentBytes *prometheus.CounterVec
56-
droppedBytes *prometheus.CounterVec
57-
sentEntries *prometheus.CounterVec
58-
droppedEntries *prometheus.CounterVec
59-
mutatedEntries *prometheus.CounterVec
60-
mutatedBytes *prometheus.CounterVec
61-
requestDuration *prometheus.HistogramVec
62-
batchRetries *prometheus.CounterVec
63-
countersWithHostTenant []*prometheus.CounterVec
64-
countersWithHostTenantReason []*prometheus.CounterVec
65-
}
66-
67-
func NewMetrics(reg prometheus.Registerer) *Metrics {
68-
var m Metrics
69-
70-
m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
71-
Name: "loki_write_encoded_bytes_total",
72-
Help: "Number of bytes encoded and ready to send.",
73-
}, []string{HostLabel, TenantLabel})
74-
m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
75-
Name: "loki_write_sent_bytes_total",
76-
Help: "Number of bytes sent.",
77-
}, []string{HostLabel, TenantLabel})
78-
m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
79-
Name: "loki_write_dropped_bytes_total",
80-
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
81-
}, []string{HostLabel, TenantLabel, ReasonLabel})
82-
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
83-
Name: "loki_write_sent_entries_total",
84-
Help: "Number of log entries sent to the ingester.",
85-
}, []string{HostLabel, TenantLabel})
86-
m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
87-
Name: "loki_write_dropped_entries_total",
88-
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
89-
}, []string{HostLabel, TenantLabel, ReasonLabel})
90-
m.mutatedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
91-
Name: "loki_write_mutated_entries_total",
92-
Help: "The total number of log entries that have been mutated.",
93-
}, []string{HostLabel, TenantLabel, ReasonLabel})
94-
m.mutatedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
95-
Name: "loki_write_mutated_bytes_total",
96-
Help: "The total number of bytes that have been mutated.",
97-
}, []string{HostLabel, TenantLabel, ReasonLabel})
98-
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
99-
Name: "loki_write_request_duration_seconds",
100-
Help: "Duration of send requests.",
101-
}, []string{"status_code", HostLabel, TenantLabel})
102-
m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
103-
Name: "loki_write_batch_retries_total",
104-
Help: "Number of times batches has had to be retried.",
105-
}, []string{HostLabel, TenantLabel})
106-
107-
m.countersWithHostTenant = []*prometheus.CounterVec{
108-
m.batchRetries, m.encodedBytes, m.sentBytes, m.sentEntries,
109-
}
110-
111-
m.countersWithHostTenantReason = []*prometheus.CounterVec{
112-
m.droppedBytes, m.droppedEntries, m.mutatedEntries, m.mutatedBytes,
113-
}
114-
115-
if reg != nil {
116-
m.encodedBytes = util.MustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)
117-
m.sentBytes = util.MustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
118-
m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
119-
m.sentEntries = util.MustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
120-
m.droppedEntries = util.MustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
121-
m.mutatedEntries = util.MustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec)
122-
m.mutatedBytes = util.MustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec)
123-
m.requestDuration = util.MustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
124-
m.batchRetries = util.MustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
125-
}
126-
127-
return &m
128-
}
129-
13038
// Client pushes entries to Loki and can be stopped
13139
type Client interface {
132-
loki.EntryHandler
133-
// Stop goroutine sending batch of entries without retries.
134-
StopNow()
135-
Name() string
40+
Chan() chan<- loki.Entry
41+
Stop()
13642
}
13743

13844
// Client for pushing logs in snappy-compressed protos over HTTP.
13945
type client struct {
140-
name string
14146
metrics *Metrics
14247
logger log.Logger
14348
cfg Config
@@ -148,17 +53,16 @@ type client struct {
14853
wg sync.WaitGroup
14954

15055
// ctx is used in any upstream calls from the `client`.
151-
ctx context.Context
152-
cancel context.CancelFunc
153-
maxStreams int
56+
ctx context.Context
57+
cancel context.CancelFunc
15458
}
15559

15660
// New makes a new Client.
157-
func New(metrics *Metrics, cfg Config, maxStreams int, logger log.Logger) (Client, error) {
158-
return newClient(metrics, cfg, maxStreams, logger)
61+
func New(metrics *Metrics, cfg Config, logger log.Logger) (Client, error) {
62+
return newClient(metrics, cfg, logger)
15963
}
16064

161-
func newClient(metrics *Metrics, cfg Config, maxStreams int, logger log.Logger) (*client, error) {
65+
func newClient(metrics *Metrics, cfg Config, logger log.Logger) (*client, error) {
16266
if cfg.URL.URL == nil {
16367
return nil, errors.New("client needs target URL")
16468
}
@@ -169,17 +73,12 @@ func newClient(metrics *Metrics, cfg Config, maxStreams int, logger log.Logger)
16973
ctx, cancel := context.WithCancel(context.Background())
17074

17175
c := &client{
172-
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
173-
cfg: cfg,
174-
entries: make(chan loki.Entry),
175-
metrics: metrics,
176-
name: GetClientName(cfg),
177-
ctx: ctx,
178-
cancel: cancel,
179-
maxStreams: maxStreams,
180-
}
181-
if cfg.Name != "" {
182-
c.name = cfg.Name
76+
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
77+
cfg: cfg,
78+
entries: make(chan loki.Entry),
79+
metrics: metrics,
80+
ctx: ctx,
81+
cancel: cancel,
18382
}
18483

18584
err := cfg.Client.Validate()
@@ -194,8 +93,7 @@ func newClient(metrics *Metrics, cfg Config, maxStreams int, logger log.Logger)
19493

19594
c.client.Timeout = cfg.Timeout
19695

197-
c.wg.Add(1)
198-
go c.run()
96+
c.wg.Go(func() { c.run() })
19997
return c, nil
20098
}
20199

@@ -233,8 +131,6 @@ func (c *client) run() {
233131
for tenantID, batch := range batches {
234132
c.sendBatch(tenantID, batch)
235133
}
236-
237-
c.wg.Done()
238134
}()
239135

240136
for {
@@ -249,7 +145,7 @@ func (c *client) run() {
249145

250146
// If the batch doesn't exist yet, we create a new one with the entry
251147
if !ok {
252-
batches[tenantID] = newBatch(c.maxStreams, e)
148+
batches[tenantID] = newBatch(c.cfg.MaxStreams, e)
253149
c.initBatchMetrics(tenantID)
254150
break
255151
}
@@ -259,12 +155,12 @@ func (c *client) run() {
259155
if batch.sizeBytesAfter(e.Entry) > c.cfg.BatchSize {
260156
c.sendBatch(tenantID, batch)
261157

262-
batches[tenantID] = newBatch(c.maxStreams, e)
158+
batches[tenantID] = newBatch(c.cfg.MaxStreams, e)
263159
break
264160
}
265161

266162
// The max size of the batch isn't reached, so we can add the entry
267-
err := batch.add(e)
163+
err := batch.add(e, 0)
268164
if err != nil {
269165
level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err)
270166
reason := ReasonGeneric
@@ -422,18 +318,7 @@ func (c *client) Stop() {
422318
c.wg.Wait()
423319
}
424320

425-
// StopNow stops the client without retries
426-
func (c *client) StopNow() {
427-
// cancel will stop retrying http requests.
428-
c.cancel()
429-
c.Stop()
430-
}
431-
432321
func (c *client) processEntry(e loki.Entry) (loki.Entry, string) {
433322
tenantID := c.getTenantID(e.Labels)
434323
return e, tenantID
435324
}
436-
437-
func (c *client) Name() string {
438-
return c.name
439-
}

0 commit comments

Comments
 (0)