Skip to content

Commit 4595b07

Browse files
authored
chore: remove unused fields in loki client code (#4579)
* loki: remove unused fields
1 parent 911431c commit 4595b07

File tree

10 files changed

+54
-206
lines changed

10 files changed

+54
-206
lines changed

internal/component/common/loki/client/client.go

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,17 @@ type client struct {
148148
wg sync.WaitGroup
149149

150150
// ctx is used in any upstream calls from the `client`.
151-
ctx context.Context
152-
cancel context.CancelFunc
153-
maxStreams int
154-
maxLineSize int
155-
maxLineSizeTruncate bool
151+
ctx context.Context
152+
cancel context.CancelFunc
153+
maxStreams int
156154
}
157155

158156
// New makes a new Client.
159-
func New(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (Client, error) {
160-
return newClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger)
157+
func New(metrics *Metrics, cfg Config, maxStreams int, logger log.Logger) (Client, error) {
158+
return newClient(metrics, cfg, maxStreams, logger)
161159
}
162160

163-
func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (*client, error) {
161+
func newClient(metrics *Metrics, cfg Config, maxStreams int, logger log.Logger) (*client, error) {
164162
if cfg.URL.URL == nil {
165163
return nil, errors.New("client needs target URL")
166164
}
@@ -171,16 +169,14 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLin
171169
ctx, cancel := context.WithCancel(context.Background())
172170

173171
c := &client{
174-
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
175-
cfg: cfg,
176-
entries: make(chan loki.Entry),
177-
metrics: metrics,
178-
name: GetClientName(cfg),
179-
ctx: ctx,
180-
cancel: cancel,
181-
maxStreams: maxStreams,
182-
maxLineSize: maxLineSize,
183-
maxLineSizeTruncate: maxLineSizeTruncate,
172+
logger: log.With(logger, "component", "client", "host", cfg.URL.Host),
173+
cfg: cfg,
174+
entries: make(chan loki.Entry),
175+
metrics: metrics,
176+
name: GetClientName(cfg),
177+
ctx: ctx,
178+
cancel: cancel,
179+
maxStreams: maxStreams,
184180
}
185181
if cfg.Name != "" {
186182
c.name = cfg.Name
@@ -249,20 +245,6 @@ func (c *client) run() {
249245
}
250246

251247
e, tenantID := c.processEntry(e)
252-
253-
// Either drop or mutate the log entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled.
254-
if c.maxLineSize != 0 && len(e.Line) > c.maxLineSize {
255-
if !c.maxLineSizeTruncate {
256-
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
257-
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line)))
258-
break
259-
}
260-
261-
c.metrics.mutatedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
262-
c.metrics.mutatedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line) - c.maxLineSize))
263-
e.Line = e.Line[:c.maxLineSize]
264-
}
265-
266248
batch, ok := batches[tenantID]
267249

268250
// If the batch doesn't exist yet, we create a new one with the entry

internal/component/common/loki/client/client_test.go

Lines changed: 12 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,16 @@ var logEntries = []loki.Entry{
4747

4848
func TestClient_Handle(t *testing.T) {
4949
tests := map[string]struct {
50-
clientBatchSize int
51-
clientBatchWait time.Duration
52-
clientMaxRetries int
53-
clientMaxLineSize int
54-
clientMaxLineSizeTruncate bool
55-
clientTenantID string
56-
clientDropRateLimited bool
57-
serverResponseStatus int
58-
inputEntries []loki.Entry
59-
inputDelay time.Duration
60-
expectedReqs []utils.RemoteWriteRequest
61-
expectedMetrics string
50+
clientBatchSize int
51+
clientBatchWait time.Duration
52+
clientMaxRetries int
53+
clientTenantID string
54+
clientDropRateLimited bool
55+
serverResponseStatus int
56+
inputEntries []loki.Entry
57+
inputDelay time.Duration
58+
expectedReqs []utils.RemoteWriteRequest
59+
expectedMetrics string
6260
}{
6361
"batch log entries together until the batch size is reached": {
6462
clientBatchSize: 10,
@@ -100,90 +98,6 @@ func TestClient_Handle(t *testing.T) {
10098
loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
10199
`,
102100
},
103-
"dropping log entries that have max_line_size exceeded": {
104-
clientBatchSize: 10,
105-
clientBatchWait: 100 * time.Millisecond,
106-
clientMaxRetries: 3,
107-
clientMaxLineSize: 10, // any log line more than this length should be discarded
108-
clientMaxLineSizeTruncate: false,
109-
serverResponseStatus: 200,
110-
inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[6]}, // this logEntries[6] entries has line more than size 10
111-
expectedReqs: []utils.RemoteWriteRequest{
112-
{
113-
TenantID: "",
114-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}}}},
115-
},
116-
},
117-
expectedMetrics: `
118-
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
119-
# TYPE loki_write_sent_entries_total counter
120-
loki_write_sent_entries_total{host="__HOST__",tenant=""} 2.0
121-
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
122-
# TYPE loki_write_dropped_entries_total counter
123-
loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
124-
loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 1
125-
loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
126-
loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
127-
# HELP loki_write_mutated_entries_total The total number of log entries that have been mutated.
128-
# TYPE loki_write_mutated_entries_total counter
129-
loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
130-
loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
131-
loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
132-
loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
133-
# HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated.
134-
# TYPE loki_write_mutated_bytes_total counter
135-
loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
136-
loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 0
137-
loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
138-
loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
139-
`,
140-
},
141-
"truncating log entries that have max_line_size exceeded": {
142-
clientBatchSize: 10,
143-
clientBatchWait: 100 * time.Millisecond,
144-
clientMaxRetries: 3,
145-
clientMaxLineSize: 10,
146-
clientMaxLineSizeTruncate: true,
147-
serverResponseStatus: 200,
148-
inputEntries: []loki.Entry{logEntries[0], logEntries[1], logEntries[6]}, // logEntries[6]'s line is greater than 10 bytes
149-
expectedReqs: []utils.RemoteWriteRequest{
150-
{
151-
TenantID: "",
152-
Request: logproto.PushRequest{Streams: []logproto.Stream{{Labels: "{}", Entries: []logproto.Entry{
153-
logEntries[0].Entry,
154-
logEntries[1].Entry,
155-
{
156-
Timestamp: logEntries[6].Entry.Timestamp,
157-
Line: logEntries[6].Line[:10],
158-
},
159-
}}}},
160-
},
161-
},
162-
expectedMetrics: `
163-
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
164-
# TYPE loki_write_sent_entries_total counter
165-
loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0
166-
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
167-
# TYPE loki_write_dropped_entries_total counter
168-
loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
169-
loki_write_dropped_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 0
170-
loki_write_dropped_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
171-
loki_write_dropped_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
172-
# HELP loki_write_mutated_entries_total The total number of log entries that have been mutated.
173-
# TYPE loki_write_mutated_entries_total counter
174-
loki_write_mutated_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
175-
loki_write_mutated_entries_total{host="__HOST__",reason="line_too_long",tenant=""} 1
176-
loki_write_mutated_entries_total{host="__HOST__",reason="rate_limited",tenant=""} 0
177-
loki_write_mutated_entries_total{host="__HOST__",reason="stream_limited",tenant=""} 0
178-
# HELP loki_write_mutated_bytes_total The total number of bytes that have been mutated.
179-
# TYPE loki_write_mutated_bytes_total counter
180-
loki_write_mutated_bytes_total{host="__HOST__",reason="ingester_error",tenant=""} 0
181-
loki_write_mutated_bytes_total{host="__HOST__",reason="line_too_long",tenant=""} 4
182-
loki_write_mutated_bytes_total{host="__HOST__",reason="rate_limited",tenant=""} 0
183-
loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
184-
`,
185-
},
186-
187101
"batch log entries together until the batch wait time is reached": {
188102
clientBatchSize: 10,
189103
clientBatchWait: 100 * time.Millisecond,
@@ -526,7 +440,7 @@ func TestClient_Handle(t *testing.T) {
526440
}
527441

528442
m := NewMetrics(reg)
529-
c, err := New(m, cfg, 0, testData.clientMaxLineSize, testData.clientMaxLineSizeTruncate, log.NewNopLogger())
443+
c, err := New(m, cfg, 0, log.NewNopLogger())
530444
require.NoError(t, err)
531445

532446
// Send all the input log entries
@@ -666,7 +580,7 @@ func TestClient_StopNow(t *testing.T) {
666580
}
667581

668582
m := NewMetrics(reg)
669-
cl, err := New(m, cfg, 0, 0, false, log.NewNopLogger())
583+
cl, err := New(m, cfg, 0, log.NewNopLogger())
670584
require.NoError(t, err)
671585

672586
// Send all the input log entries

internal/component/common/loki/client/manager.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/prometheus/client_golang/prometheus"
1313

1414
"github.com/grafana/alloy/internal/component/common/loki"
15-
"github.com/grafana/alloy/internal/component/common/loki/limit"
1615
"github.com/grafana/alloy/internal/component/common/loki/wal"
1716
)
1817

@@ -87,7 +86,7 @@ type Manager struct {
8786
}
8887

8988
// NewManager creates a new Manager
90-
func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg prometheus.Registerer, walCfg wal.Config, notifier WriterEventsNotifier, clientCfgs ...Config) (*Manager, error) {
89+
func NewManager(metrics *Metrics, logger log.Logger, maxStreams int, reg prometheus.Registerer, walCfg wal.Config, notifier WriterEventsNotifier, clientCfgs ...Config) (*Manager, error) {
9190
var fake struct{}
9291

9392
walWatcherMetrics := wal.NewWatcherMetrics(reg)
@@ -120,7 +119,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
120119
}
121120
markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName))
122121

123-
queue, err := NewQueue(metrics, queueClientMetrics.CurryWithId(clientName), cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler)
122+
queue, err := NewQueue(metrics, queueClientMetrics.CurryWithId(clientName), cfg, maxStreams, logger, markerHandler)
124123
if err != nil {
125124
return nil, fmt.Errorf("error starting queue client: %w", err)
126125
}
@@ -141,7 +140,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
141140
client: queue,
142141
})
143142
} else {
144-
client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger)
143+
client, err := New(metrics, cfg, maxStreams, logger)
145144
if err != nil {
146145
return nil, fmt.Errorf("error starting client: %w", err)
147146
}

internal/component/common/loki/client/manager_test.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,11 @@ import (
1818
"github.com/stretchr/testify/require"
1919

2020
"github.com/grafana/alloy/internal/component/common/loki"
21-
"github.com/grafana/alloy/internal/component/common/loki/limit"
2221
"github.com/grafana/alloy/internal/component/common/loki/utils"
2322
"github.com/grafana/alloy/internal/component/common/loki/wal"
2423
)
2524

26-
var (
27-
testLimitsConfig = limit.Config{
28-
MaxLineSizeTruncate: false,
29-
MaxStreams: 0,
30-
MaxLineSize: 0,
31-
}
32-
nilMetrics = NewMetrics(nil)
33-
)
25+
var nilMetrics = NewMetrics(nil)
3426

3527
// TestManager_NoDuplicateMetricsPanic ensures that creating two managers does
3628
// not lead to duplicate metrics registration.
@@ -44,7 +36,7 @@ func TestManager_NoDuplicateMetricsPanic(t *testing.T) {
4436

4537
require.NotPanics(t, func() {
4638
for i := 0; i < 2; i++ {
47-
_, err := NewManager(metrics, log.NewLogfmtLogger(os.Stdout), testLimitsConfig, reg, wal.Config{
39+
_, err := NewManager(metrics, log.NewLogfmtLogger(os.Stdout), 0, reg, wal.Config{
4840
WatchConfig: wal.DefaultWatchConfig,
4941
}, NilNotifier, Config{
5042
URL: flagext.URLValue{URL: host},
@@ -58,7 +50,7 @@ func TestManager_ErrorCreatingWhenNoClientConfigsProvided(t *testing.T) {
5850
for _, walEnabled := range []bool{true, false} {
5951
t.Run(fmt.Sprintf("wal-enabled = %t", walEnabled), func(t *testing.T) {
6052
walDir := t.TempDir()
61-
_, err := NewManager(nilMetrics, log.NewLogfmtLogger(os.Stdout), testLimitsConfig, prometheus.NewRegistry(), wal.Config{
53+
_, err := NewManager(nilMetrics, log.NewLogfmtLogger(os.Stdout), 0, prometheus.NewRegistry(), wal.Config{
6254
Dir: walDir,
6355
Enabled: walEnabled,
6456
WatchConfig: wal.DefaultWatchConfig,
@@ -79,7 +71,7 @@ func TestManager_ErrorCreatingWhenRepeatedConfigs(t *testing.T) {
7971
for _, walEnabled := range []bool{true, false} {
8072
t.Run(fmt.Sprintf("wal-enabled = %t", walEnabled), func(t *testing.T) {
8173
walDir := t.TempDir()
82-
_, err := NewManager(nilMetrics, log.NewLogfmtLogger(os.Stdout), testLimitsConfig, prometheus.NewRegistry(), wal.Config{
74+
_, err := NewManager(nilMetrics, log.NewLogfmtLogger(os.Stdout), 0, prometheus.NewRegistry(), wal.Config{
8375
Dir: walDir,
8476
Enabled: walEnabled,
8577
WatchConfig: wal.DefaultWatchConfig,
@@ -143,7 +135,7 @@ func TestManager_WALEnabled(t *testing.T) {
143135
// start writer and manager
144136
writer, err := wal.NewWriter(walConfig, logger, reg)
145137
require.NoError(t, err)
146-
manager, err := NewManager(clientMetrics, logger, testLimitsConfig, prometheus.NewRegistry(), walConfig, writer, testClientConfig)
138+
manager, err := NewManager(clientMetrics, logger, 0, prometheus.NewRegistry(), walConfig, writer, testClientConfig)
147139
require.NoError(t, err)
148140
require.Equal(t, "wal:test-client", manager.Name())
149141

@@ -199,7 +191,7 @@ func TestManager_WALDisabled(t *testing.T) {
199191
clientMetrics := NewMetrics(reg)
200192

201193
// start writer and manager
202-
manager, err := NewManager(clientMetrics, logger, testLimitsConfig, prometheus.NewRegistry(), walConfig, NilNotifier, testClientConfig)
194+
manager, err := NewManager(clientMetrics, logger, 0, prometheus.NewRegistry(), walConfig, NilNotifier, testClientConfig)
203195
require.NoError(t, err)
204196
require.Equal(t, "multi:test-client", manager.Name())
205197

@@ -258,7 +250,7 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) {
258250
clientMetrics := NewMetrics(reg)
259251

260252
// start writer and manager
261-
manager, err := NewManager(clientMetrics, logger, testLimitsConfig, prometheus.NewRegistry(), walConfig, NilNotifier, testClientConfig, testClientConfig2)
253+
manager, err := NewManager(clientMetrics, logger, 0, prometheus.NewRegistry(), walConfig, NilNotifier, testClientConfig, testClientConfig2)
262254
require.NoError(t, err)
263255
require.Equal(t, "multi:test-client,test-client-2", manager.Name())
264256

internal/component/common/loki/client/queue_client.go

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -171,21 +171,19 @@ type queueClient struct {
171171
seriesLock sync.RWMutex
172172

173173
// ctx is used in any upstream calls from the `client`.
174-
ctx context.Context
175-
cancel context.CancelFunc
176-
maxStreams int
177-
maxLineSize int
178-
maxLineSizeTruncate bool
179-
quit chan struct{}
180-
markerHandler MarkerHandler
174+
ctx context.Context
175+
cancel context.CancelFunc
176+
maxStreams int
177+
quit chan struct{}
178+
markerHandler MarkerHandler
181179
}
182180

183181
// NewQueue creates a new queueClient.
184-
func NewQueue(metrics *Metrics, queueClientMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) {
185-
return newQueueClient(metrics, queueClientMetrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler)
182+
func NewQueue(metrics *Metrics, queueClientMetrics *QueueClientMetrics, cfg Config, maxStreams int, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) {
183+
return newQueueClient(metrics, queueClientMetrics, cfg, maxStreams, logger, markerHandler)
186184
}
187185

188-
func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) {
186+
func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config, maxStreams int, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) {
189187
if cfg.URL.URL == nil {
190188
return nil, errors.New("client needs target URL")
191189
}
@@ -206,11 +204,9 @@ func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config,
206204
series: make(map[chunks.HeadSeriesRef]model.LabelSet),
207205
seriesSegment: make(map[chunks.HeadSeriesRef]int),
208206

209-
ctx: ctx,
210-
cancel: cancel,
211-
maxStreams: maxStreams,
212-
maxLineSize: maxLineSize,
213-
maxLineSizeTruncate: maxLineSizeTruncate,
207+
ctx: ctx,
208+
cancel: cancel,
209+
maxStreams: maxStreams,
214210
}
215211

216212
// The buffered channel size is calculated using the configured capacity, which is the worst case number of bytes
@@ -300,19 +296,6 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error {
300296
func (c *queueClient) appendSingleEntry(segmentNum int, lbs model.LabelSet, e logproto.Entry) {
301297
lbs, tenantID := c.processLabels(lbs)
302298

303-
// Either drop or mutate the log entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled.
304-
if c.maxLineSize != 0 && len(e.Line) > c.maxLineSize {
305-
if !c.maxLineSizeTruncate {
306-
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
307-
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line)))
308-
return
309-
}
310-
311-
c.metrics.mutatedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc()
312-
c.metrics.mutatedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line) - c.maxLineSize))
313-
e.Line = e.Line[:c.maxLineSize]
314-
}
315-
316299
// TODO: can I make this locking more fine grained?
317300
c.batchesMtx.Lock()
318301

0 commit comments

Comments
 (0)