Skip to content

Commit ea1a478

Browse files
authored
chore: refactor loki.write manager (#4863)
* Add interface for manager and split the current one into two implementations. 1. MemoryManager is a manager that is keeping all data in memory, the "nromal" one 2. WALManager that is keeping data in WAL * Rename and use Consumer interface instead * Rename to FanoutConsumer * Only have two interfaces and add comments * rename and update comments
1 parent 7d0f15e commit ea1a478

File tree

13 files changed

+729
-763
lines changed

13 files changed

+729
-763
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package client
2+
3+
import "github.com/grafana/alloy/internal/component/common/loki"
4+
5+
// Consumer is an interface for consuming Loki log entries. It provides a channel
6+
// to send entries to and a method to stop the consumer.
7+
type Consumer interface {
8+
Chan() chan<- loki.Entry
9+
Stop()
10+
}
11+
12+
// DrainableConsumer extends Consumer with the ability to stop and drain any
13+
// remaining entries. This is useful for graceful shutdowns, particularly when
14+
// using write-ahead logs (WAL) where entries may be buffered and need to be
15+
// fully processed before stopping.
16+
type DrainableConsumer interface {
17+
Consumer
18+
StopAndDrain()
19+
}

internal/component/common/loki/client/client.go renamed to internal/component/common/loki/client/consumer_fanout.go

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"bytes"
66
"context"
7+
"crypto/sha256"
78
"errors"
89
"fmt"
910
"io"
@@ -13,17 +14,107 @@ import (
1314
"time"
1415

1516
"github.com/go-kit/log"
16-
"github.com/grafana/dskit/backoff"
17+
"github.com/prometheus/client_golang/prometheus"
1718
"github.com/prometheus/common/config"
1819
"github.com/prometheus/common/model"
1920

20-
lokiutil "github.com/grafana/alloy/internal/loki/util"
21-
2221
"github.com/grafana/alloy/internal/component/common/loki"
22+
lokiutil "github.com/grafana/alloy/internal/loki/util"
2323
"github.com/grafana/alloy/internal/runtime/logging/level"
2424
"github.com/grafana/alloy/internal/useragent"
25+
"github.com/grafana/dskit/backoff"
2526
)
2627

28+
func NewFanoutConsumer(logger log.Logger, reg prometheus.Registerer, clientCfgs ...Config) (*FanoutConsumer, error) {
29+
if len(clientCfgs) == 0 {
30+
return nil, fmt.Errorf("at least one client config must be provided")
31+
}
32+
33+
m := &FanoutConsumer{
34+
clients: make([]*client, 0, len(clientCfgs)),
35+
recv: make(chan loki.Entry),
36+
}
37+
38+
var (
39+
metrics = NewMetrics(reg)
40+
clientsCheck = make(map[string]struct{})
41+
)
42+
43+
for _, cfg := range clientCfgs {
44+
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
45+
clientName := getClientName(cfg)
46+
if _, ok := clientsCheck[clientName]; ok {
47+
return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for name: %s", cfg.Name)
48+
}
49+
50+
clientsCheck[clientName] = struct{}{}
51+
client, err := newClient(metrics, cfg, logger)
52+
if err != nil {
53+
return nil, fmt.Errorf("error starting client: %w", err)
54+
}
55+
56+
m.clients = append(m.clients, client)
57+
}
58+
59+
m.wg.Go(m.run)
60+
return m, nil
61+
}
62+
63+
var _ Consumer = (*FanoutConsumer)(nil)
64+
65+
type FanoutConsumer struct {
66+
clients []*client
67+
wg sync.WaitGroup
68+
once sync.Once
69+
recv chan loki.Entry
70+
}
71+
72+
func (c *FanoutConsumer) run() {
73+
for e := range c.recv {
74+
for _, c := range c.clients {
75+
c.Chan() <- e
76+
}
77+
}
78+
}
79+
80+
func (c *FanoutConsumer) Chan() chan<- loki.Entry {
81+
return c.recv
82+
}
83+
84+
func (c *FanoutConsumer) Stop() {
85+
// First stop the receiving channel.
86+
c.once.Do(func() { close(c.recv) })
87+
c.wg.Wait()
88+
89+
var stopWG sync.WaitGroup
90+
// Stop all clients.
91+
for _, c := range c.clients {
92+
stopWG.Go(func() {
93+
c.Stop()
94+
})
95+
}
96+
97+
// Wait for all clients to stop.
98+
stopWG.Wait()
99+
}
100+
101+
// getClientName computes the specific name for each client config. The name is either the configured Name setting in Config,
102+
// or a hash of the config as whole, this allows us to detect repeated configs.
103+
func getClientName(cfg Config) string {
104+
if cfg.Name != "" {
105+
return cfg.Name
106+
}
107+
return asSha256(cfg)
108+
}
109+
110+
func asSha256(o interface{}) string {
111+
h := sha256.New()
112+
_, _ = fmt.Fprintf(h, "%v", o)
113+
114+
temp := fmt.Sprintf("%x", h.Sum(nil))
115+
return temp[:6]
116+
}
117+
27118
const (
28119
contentType = "application/x-protobuf"
29120
maxErrMsgLen = 1024
@@ -35,12 +126,6 @@ const (
35126

36127
var userAgent = useragent.Get()
37128

38-
// Client pushes entries to Loki and can be stopped
39-
type Client interface {
40-
Chan() chan<- loki.Entry
41-
Stop()
42-
}
43-
44129
// Client for pushing logs in snappy-compressed protos over HTTP.
45130
type client struct {
46131
metrics *Metrics
@@ -57,11 +142,6 @@ type client struct {
57142
cancel context.CancelFunc
58143
}
59144

60-
// New makes a new Client.
61-
func New(metrics *Metrics, cfg Config, logger log.Logger) (Client, error) {
62-
return newClient(metrics, cfg, logger)
63-
}
64-
65145
func newClient(metrics *Metrics, cfg Config, logger log.Logger) (*client, error) {
66146
if cfg.URL.URL == nil {
67147
return nil, errors.New("client needs target URL")

internal/component/common/loki/client/client_test.go renamed to internal/component/common/loki/client/consumer_fanout_test.go

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package client
22

33
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/url"
48
"strings"
59
"testing"
610
"time"
@@ -17,9 +21,148 @@ import (
1721
"github.com/stretchr/testify/require"
1822

1923
"github.com/grafana/alloy/internal/component/common/loki"
24+
"github.com/grafana/alloy/internal/component/common/loki/utils"
2025
"github.com/grafana/alloy/internal/loki/util"
2126
)
2227

28+
func TestFanoutConsumer(t *testing.T) {
29+
testClientConfig, rwReceivedReqs, closeServer := newServerAndClientConfig(t)
30+
31+
consumer, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), testClientConfig)
32+
require.NoError(t, err)
33+
34+
receivedRequests := utils.NewSyncSlice[utils.RemoteWriteRequest]()
35+
go func() {
36+
for req := range rwReceivedReqs {
37+
receivedRequests.Append(req)
38+
}
39+
}()
40+
41+
defer func() {
42+
consumer.Stop()
43+
closeServer()
44+
}()
45+
46+
var testLabels = model.LabelSet{
47+
"pizza-flavour": "fugazzeta",
48+
}
49+
var totalLines = 100
50+
for i := range totalLines {
51+
consumer.Chan() <- loki.Entry{
52+
Labels: testLabels,
53+
Entry: push.Entry{
54+
Timestamp: time.Now(),
55+
Line: fmt.Sprintf("line%d", i),
56+
},
57+
}
58+
}
59+
60+
require.Eventually(t, func() bool {
61+
return receivedRequests.Length() == totalLines
62+
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")
63+
64+
var seenEntries = map[string]struct{}{}
65+
// assert over rw client received entries
66+
defer receivedRequests.DoneIterate()
67+
for _, req := range receivedRequests.StartIterate() {
68+
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
69+
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
70+
require.Equal(t, `{pizza-flavour="fugazzeta"}`, req.Request.Streams[0].Labels)
71+
seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{}
72+
}
73+
require.Len(t, seenEntries, totalLines)
74+
}
75+
76+
func TestFanoutConsumer_MultipleConfigs(t *testing.T) {
77+
testClientConfig, rwReceivedReqs, closeServer := newServerAndClientConfig(t)
78+
testClientConfig2, rwReceivedReqs2, closeServer2 := newServerAndClientConfig(t)
79+
testClientConfig2.Name = "test-client-2"
80+
81+
// start writer and consumer
82+
consumer, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), testClientConfig, testClientConfig2)
83+
require.NoError(t, err)
84+
85+
receivedRequests := utils.NewSyncSlice[utils.RemoteWriteRequest]()
86+
ctx, cancel := context.WithCancel(t.Context())
87+
go func(ctx context.Context) {
88+
for {
89+
select {
90+
case req := <-rwReceivedReqs:
91+
receivedRequests.Append(req)
92+
case req := <-rwReceivedReqs2:
93+
receivedRequests.Append(req)
94+
case <-ctx.Done():
95+
return
96+
}
97+
}
98+
}(ctx)
99+
100+
defer func() {
101+
consumer.Stop()
102+
closeServer()
103+
closeServer2()
104+
cancel()
105+
}()
106+
107+
var testLabels = model.LabelSet{
108+
"pizza-flavour": "fugazzeta",
109+
}
110+
var totalLines = 100
111+
for i := range totalLines {
112+
consumer.Chan() <- loki.Entry{
113+
Labels: testLabels,
114+
Entry: push.Entry{
115+
Timestamp: time.Now(),
116+
Line: fmt.Sprintf("line%d", i),
117+
},
118+
}
119+
}
120+
121+
// times 2 due to clients being run
122+
expectedTotalLines := totalLines * 2
123+
require.Eventually(t, func() bool {
124+
return receivedRequests.Length() == expectedTotalLines
125+
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")
126+
127+
var seenEntries int
128+
// assert over rw client received entries
129+
defer receivedRequests.DoneIterate()
130+
for _, req := range receivedRequests.StartIterate() {
131+
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
132+
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
133+
seenEntries += 1
134+
}
135+
require.Equal(t, seenEntries, expectedTotalLines)
136+
}
137+
138+
func TestFanoutConsumer_InvalidConfig(t *testing.T) {
139+
t.Run("no clients", func(t *testing.T) {
140+
_, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry())
141+
require.Error(t, err)
142+
})
143+
144+
t.Run("repeated client", func(t *testing.T) {
145+
host, _ := url.Parse("http://localhost:3100")
146+
config := Config{URL: flagext.URLValue{URL: host}}
147+
_, err := NewFanoutConsumer(log.NewNopLogger(), prometheus.NewRegistry(), config, config)
148+
require.Error(t, err)
149+
})
150+
}
151+
152+
func TestFanoutConsumer_NoDuplicateMetricsPanic(t *testing.T) {
153+
var (
154+
host, _ = url.Parse("http://localhost:3100")
155+
reg = prometheus.NewRegistry()
156+
)
157+
158+
require.NotPanics(t, func() {
159+
for range 2 {
160+
_, err := NewFanoutConsumer(log.NewNopLogger(), reg, Config{URL: flagext.URLValue{URL: host}})
161+
require.NoError(t, err)
162+
}
163+
})
164+
}
165+
23166
var logEntries = []loki.Entry{
24167
{Labels: model.LabelSet{}, Entry: push.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
25168
{Labels: model.LabelSet{}, Entry: push.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
@@ -435,7 +578,7 @@ func TestClient_Handle(t *testing.T) {
435578
}
436579

437580
m := NewMetrics(reg)
438-
c, err := New(m, cfg, log.NewNopLogger())
581+
c, err := newClient(m, cfg, log.NewNopLogger())
439582
require.NoError(t, err)
440583

441584
// Send all the input log entries
@@ -471,3 +614,30 @@ func TestClient_Handle(t *testing.T) {
471614
})
472615
}
473616
}
617+
618+
func newServerAndClientConfig(t *testing.T) (Config, chan utils.RemoteWriteRequest, func()) {
619+
receivedReqsChan := make(chan utils.RemoteWriteRequest, 10)
620+
621+
// Start a local HTTP server
622+
server := utils.NewRemoteWriteServer(receivedReqsChan, http.StatusOK)
623+
require.NotNil(t, server)
624+
625+
testClientURL, _ := url.Parse(server.URL)
626+
testClientConfig := Config{
627+
Name: "test-client",
628+
URL: flagext.URLValue{URL: testClientURL},
629+
Timeout: time.Second * 2,
630+
BatchSize: 1,
631+
BackoffConfig: backoff.Config{
632+
MaxRetries: 0,
633+
},
634+
Queue: QueueConfig{
635+
Capacity: 10, // buffered channel of size 10
636+
DrainTimeout: time.Second * 10,
637+
},
638+
}
639+
return testClientConfig, receivedReqsChan, func() {
640+
server.Close()
641+
close(receivedReqsChan)
642+
}
643+
}

0 commit comments

Comments
 (0)