Skip to content

Commit 598934e

Browse files
grpc support to write logs to Loki
1 parent 838c22c commit 598934e

File tree

4 files changed

+584
-24
lines changed

4 files changed

+584
-24
lines changed

docs/api.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,26 @@ Following is the supported API format for writing to loki:
318318
labels: map of record fields to be used as labels
319319
staticLabels: map of common labels to set on each flow
320320
ignoreList: map of record fields to be removed from the record
321-
clientConfig: clientConfig
321+
clientConfig: HTTP client configuration (used only for HTTP client type)
322322
timestampLabel: label to use for time indexing
323323
timestampScale: timestamp units scale (e.g. for UNIX = 1s)
324324
format: the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)
325325
reorder: reorder json map keys
326+
clientType: type of client to use: 'http' or 'grpc' (default: 'http')
327+
grpcConfig: gRPC client configuration (used only for gRPC client type)
328+
serverAddress: gRPC server address (host:port)
329+
maxRecvMsgSize: maximum message size the client can receive
330+
maxSendMsgSize: maximum message size the client can send
331+
keepAlive: keep alive interval
332+
keepAliveTimeout: keep alive timeout
333+
useStreaming: use streaming for real-time log pushing
334+
tls: TLS configuration
335+
enabled: enable TLS
336+
certFile: path to client certificate file
337+
keyFile: path to client key file
338+
caFile: path to CA certificate file
339+
serverName: server name for certificate verification
340+
insecureSkipVerify: skip certificate verification (insecure)
326341
</pre>
327342
## Write Standard Output
328343
Following is the supported API format for writing to standard output:

pkg/api/write_loki.go

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package api
2020
import (
2121
"errors"
2222
"fmt"
23+
"time"
2324

2425
promConfig "github.com/prometheus/common/config"
2526
"github.com/prometheus/common/model"
@@ -37,7 +38,7 @@ type WriteLoki struct {
3738
Labels []string `yaml:"labels,omitempty" json:"labels,omitempty" doc:"map of record fields to be used as labels"`
3839
StaticLabels model.LabelSet `yaml:"staticLabels,omitempty" json:"staticLabels,omitempty" doc:"map of common labels to set on each flow"`
3940
IgnoreList []string `yaml:"ignoreList,omitempty" json:"ignoreList,omitempty" doc:"map of record fields to be removed from the record"`
40-
ClientConfig *promConfig.HTTPClientConfig `yaml:"clientConfig,omitempty" json:"clientConfig,omitempty" doc:"clientConfig"`
41+
ClientConfig *promConfig.HTTPClientConfig `yaml:"clientConfig,omitempty" json:"clientConfig,omitempty" doc:"HTTP client configuration (used only for HTTP client type)"`
4142
TimestampLabel model.LabelName `yaml:"timestampLabel,omitempty" json:"timestampLabel,omitempty" doc:"label to use for time indexing"`
4243
// TimestampScale provides the scale in time of the units from the timestamp
4344
// E.g. UNIX timescale is '1s' (one second) while other clock sources might have
@@ -46,6 +47,29 @@ type WriteLoki struct {
4647
TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"`
4748
Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)"`
4849
Reorder bool `yaml:"reorder,omitempty" json:"reorder,omitempty" doc:"reorder json map keys"`
50+
51+
// Client type selection
52+
ClientType string `yaml:"clientType,omitempty" json:"clientType,omitempty" doc:"type of client to use: 'http' or 'grpc' (default: 'http')"`
53+
GRPCConfig *GRPCLokiConfig `yaml:"grpcConfig,omitempty" json:"grpcConfig,omitempty" doc:"gRPC client configuration (used only for gRPC client type)"`
54+
}
55+
56+
type GRPCLokiConfig struct {
57+
ServerAddress string `yaml:"serverAddress,omitempty" json:"serverAddress,omitempty" doc:"gRPC server address (host:port)"`
58+
MaxRecvMsgSize int `yaml:"maxRecvMsgSize,omitempty" json:"maxRecvMsgSize,omitempty" doc:"maximum message size the client can receive"`
59+
MaxSendMsgSize int `yaml:"maxSendMsgSize,omitempty" json:"maxSendMsgSize,omitempty" doc:"maximum message size the client can send"`
60+
KeepAlive string `yaml:"keepAlive,omitempty" json:"keepAlive,omitempty" doc:"keep alive interval"`
61+
KeepAliveTimeout string `yaml:"keepAliveTimeout,omitempty" json:"keepAliveTimeout,omitempty" doc:"keep alive timeout"`
62+
UseStreaming bool `yaml:"useStreaming,omitempty" json:"useStreaming,omitempty" doc:"use streaming for real-time log pushing"`
63+
TLS *GRPCTLSConfig `yaml:"tls,omitempty" json:"tls,omitempty" doc:"TLS configuration"`
64+
}
65+
66+
type GRPCTLSConfig struct {
67+
Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty" doc:"enable TLS"`
68+
CertFile string `yaml:"certFile,omitempty" json:"certFile,omitempty" doc:"path to client certificate file"`
69+
KeyFile string `yaml:"keyFile,omitempty" json:"keyFile,omitempty" doc:"path to client key file"`
70+
CAFile string `yaml:"caFile,omitempty" json:"caFile,omitempty" doc:"path to CA certificate file"`
71+
ServerName string `yaml:"serverName,omitempty" json:"serverName,omitempty" doc:"server name for certificate verification"`
72+
InsecureSkipVerify bool `yaml:"insecureSkipVerify,omitempty" json:"insecureSkipVerify,omitempty" doc:"skip certificate verification (insecure)"`
4973
}
5074

5175
func (w *WriteLoki) SetDefaults() {
@@ -76,6 +100,29 @@ func (w *WriteLoki) SetDefaults() {
76100
if w.Format == "" {
77101
w.Format = "json"
78102
}
103+
if w.ClientType == "" {
104+
w.ClientType = "http"
105+
}
106+
107+
// Set defaults for gRPC config if gRPC client type is selected
108+
if w.ClientType == "grpc" && w.GRPCConfig != nil {
109+
w.GRPCConfig.SetDefaults()
110+
}
111+
}
112+
113+
func (g *GRPCLokiConfig) SetDefaults() {
114+
if g.MaxRecvMsgSize == 0 {
115+
g.MaxRecvMsgSize = 1024 * 1024 * 64 // 64MB
116+
}
117+
if g.MaxSendMsgSize == 0 {
118+
g.MaxSendMsgSize = 1024 * 1024 * 16 // 16MB
119+
}
120+
if g.KeepAlive == "" {
121+
g.KeepAlive = "30s"
122+
}
123+
if g.KeepAliveTimeout == "" {
124+
g.KeepAliveTimeout = "5s"
125+
}
79126
}
80127

81128
func (w *WriteLoki) Validate() error {
@@ -85,11 +132,52 @@ func (w *WriteLoki) Validate() error {
85132
if w.TimestampScale == "" {
86133
return errors.New("timestampUnit must be a valid Duration > 0 (e.g. 1m, 1s or 1ms)")
87134
}
88-
if w.URL == "" {
89-
return errors.New("url can't be empty")
90-
}
91135
if w.BatchSize <= 0 {
92136
return fmt.Errorf("invalid batchSize: %v. Required > 0", w.BatchSize)
93137
}
138+
139+
// Validate client type
140+
if w.ClientType != "" && w.ClientType != "http" && w.ClientType != "grpc" {
141+
return fmt.Errorf("invalid clientType: %s. Must be 'http' or 'grpc'", w.ClientType)
142+
}
143+
144+
// Validate based on client type
145+
switch w.ClientType {
146+
case "http", "":
147+
if w.URL == "" {
148+
return errors.New("url can't be empty for HTTP client")
149+
}
150+
case "grpc":
151+
if w.GRPCConfig == nil {
152+
return errors.New("grpcConfig is required when using gRPC client type")
153+
}
154+
if err := w.GRPCConfig.Validate(); err != nil {
155+
return fmt.Errorf("gRPC config validation failed: %w", err)
156+
}
157+
}
158+
159+
return nil
160+
}
161+
162+
func (g *GRPCLokiConfig) Validate() error {
163+
if g == nil {
164+
return errors.New("gRPC config cannot be nil")
165+
}
166+
if g.ServerAddress == "" {
167+
return errors.New("gRPC serverAddress cannot be empty")
168+
}
169+
170+
// Validate duration fields
171+
if g.KeepAlive != "" {
172+
if _, err := time.ParseDuration(g.KeepAlive); err != nil {
173+
return fmt.Errorf("invalid keepAlive duration: %w", err)
174+
}
175+
}
176+
if g.KeepAliveTimeout != "" {
177+
if _, err := time.ParseDuration(g.KeepAliveTimeout); err != nil {
178+
return fmt.Errorf("invalid keepAliveTimeout duration: %w", err)
179+
}
180+
}
181+
94182
return nil
95183
}

pkg/pipeline/write/write_loki.go

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
3131

3232
logAdapter "github.com/go-kit/kit/log/logrus"
33+
"github.com/netobserv/loki-client-go/grpc"
3334
"github.com/netobserv/loki-client-go/loki"
3435
"github.com/netobserv/loki-client-go/pkg/backoff"
3536
"github.com/netobserv/loki-client-go/pkg/urlutil"
@@ -49,7 +50,6 @@ type emitter interface {
4950

5051
// Loki record writer
5152
type Loki struct {
52-
lokiConfig loki.Config
5353
apiConfig api.WriteLoki
5454
timestampScale float64
5555
saneLabels map[string]model.LabelName
@@ -61,7 +61,46 @@ type Loki struct {
6161
formatter func(config.GenericMap) string
6262
}
6363

64-
func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
64+
func createLokiClient(c *api.WriteLoki) (emitter, error) {
65+
switch c.ClientType {
66+
case "grpc":
67+
return createGRPCClient(c)
68+
case "http", "":
69+
return createHTTPClient(c)
70+
default:
71+
return nil, fmt.Errorf("unsupported client type: %s", c.ClientType)
72+
}
73+
}
74+
75+
func createHTTPClient(c *api.WriteLoki) (emitter, error) {
76+
cfg, err := buildHTTPLokiConfig(c)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
client, err := loki.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki")))
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to create HTTP Loki client: %w", err)
84+
}
85+
86+
return client, nil
87+
}
88+
89+
func createGRPCClient(c *api.WriteLoki) (emitter, error) {
90+
cfg, err := buildGRPCLokiConfig(c)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
client, err := grpc.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki-grpc")))
96+
if err != nil {
97+
return nil, fmt.Errorf("failed to create gRPC Loki client: %w", err)
98+
}
99+
100+
return client, nil
101+
}
102+
103+
func buildHTTPLokiConfig(c *api.WriteLoki) (loki.Config, error) {
65104
batchWait, err := time.ParseDuration(c.BatchWait)
66105
if err != nil {
67106
return loki.Config{}, fmt.Errorf("failed in parsing BatchWait : %w", err)
@@ -105,6 +144,79 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
105144
return cfg, nil
106145
}
107146

147+
func buildGRPCLokiConfig(c *api.WriteLoki) (grpc.Config, error) {
148+
if c.GRPCConfig == nil {
149+
return grpc.Config{}, fmt.Errorf("gRPC config is required for gRPC client type")
150+
}
151+
152+
batchWait, err := time.ParseDuration(c.BatchWait)
153+
if err != nil {
154+
return grpc.Config{}, fmt.Errorf("failed in parsing BatchWait: %w", err)
155+
}
156+
157+
timeout, err := time.ParseDuration(c.Timeout)
158+
if err != nil {
159+
return grpc.Config{}, fmt.Errorf("failed in parsing Timeout: %w", err)
160+
}
161+
162+
minBackoff, err := time.ParseDuration(c.MinBackoff)
163+
if err != nil {
164+
return grpc.Config{}, fmt.Errorf("failed in parsing MinBackoff: %w", err)
165+
}
166+
167+
maxBackoff, err := time.ParseDuration(c.MaxBackoff)
168+
if err != nil {
169+
return grpc.Config{}, fmt.Errorf("failed in parsing MaxBackoff: %w", err)
170+
}
171+
172+
keepAlive, err := time.ParseDuration(c.GRPCConfig.KeepAlive)
173+
if err != nil {
174+
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAlive: %w", err)
175+
}
176+
177+
keepAliveTimeout, err := time.ParseDuration(c.GRPCConfig.KeepAliveTimeout)
178+
if err != nil {
179+
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAliveTimeout: %w", err)
180+
}
181+
182+
cfg := grpc.Config{
183+
ServerAddress: c.GRPCConfig.ServerAddress,
184+
TenantID: c.TenantID,
185+
BatchWait: batchWait,
186+
BatchSize: c.BatchSize,
187+
Timeout: timeout,
188+
MaxRecvMsgSize: c.GRPCConfig.MaxRecvMsgSize,
189+
MaxSendMsgSize: c.GRPCConfig.MaxSendMsgSize,
190+
KeepAlive: keepAlive,
191+
KeepAliveTimeout: keepAliveTimeout,
192+
UseStreaming: c.GRPCConfig.UseStreaming,
193+
BackoffConfig: backoff.BackoffConfig{
194+
MinBackoff: minBackoff,
195+
MaxBackoff: maxBackoff,
196+
MaxRetries: c.MaxRetries,
197+
},
198+
}
199+
200+
// Set external labels from static labels
201+
if len(c.StaticLabels) > 0 {
202+
cfg.ExternalLabels.LabelSet = c.StaticLabels
203+
}
204+
205+
// Configure TLS if provided
206+
if c.GRPCConfig.TLS != nil {
207+
cfg.TLS = grpc.TLSConfig{
208+
Enabled: c.GRPCConfig.TLS.Enabled,
209+
CertFile: c.GRPCConfig.TLS.CertFile,
210+
KeyFile: c.GRPCConfig.TLS.KeyFile,
211+
CAFile: c.GRPCConfig.TLS.CAFile,
212+
ServerName: c.GRPCConfig.TLS.ServerName,
213+
InsecureSkipVerify: c.GRPCConfig.TLS.InsecureSkipVerify,
214+
}
215+
}
216+
217+
return cfg, nil
218+
}
219+
108220
func (l *Loki) ProcessRecord(in config.GenericMap) error {
109221
labels, lines := l.splitLabelsLines(in)
110222

@@ -219,13 +331,10 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
219331
return nil, fmt.Errorf("the provided config is not valid: %w", err)
220332
}
221333

222-
lokiConfig, buildconfigErr := buildLokiConfig(&lokiConfigIn)
223-
if buildconfigErr != nil {
224-
return nil, buildconfigErr
225-
}
226-
client, newWithLoggerErr := loki.NewWithLogger(lokiConfig, logAdapter.NewLogger(log.WithField("module", "export/loki")))
227-
if newWithLoggerErr != nil {
228-
return nil, newWithLoggerErr
334+
// Create the appropriate client based on clientType
335+
client, err := createLokiClient(&lokiConfigIn)
336+
if err != nil {
337+
return nil, fmt.Errorf("failed to create Loki client: %w", err)
229338
}
230339

231340
timestampScale, err := time.ParseDuration(lokiConfigIn.TimestampScale)
@@ -253,7 +362,6 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
253362

254363
f := formatter(lokiConfigIn.Format, lokiConfigIn.Reorder)
255364
l := &Loki{
256-
lokiConfig: lokiConfig,
257365
apiConfig: lokiConfigIn,
258366
timestampScale: float64(timestampScale),
259367
saneLabels: saneLabels,

0 commit comments

Comments
 (0)