diff --git a/docs/api.md b/docs/api.md index 188126b4a..3d19e1b32 100644 --- a/docs/api.md +++ b/docs/api.md @@ -324,6 +324,10 @@ Following is the supported API format for writing to loki: timestampScale: timestamp units scale (e.g. for UNIX = 1s) format: the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default) reorder: reorder json map keys + clientProtocol: type of client protocol to use: 'http' or 'grpc' (default: 'http') + grpcConfig: gRPC client configuration (used only for gRPC client type) + keepAlive: keep alive interval + keepAliveTimeout: keep alive timeout ## Write Standard Output Following is the supported API format for writing to standard output: diff --git a/go.mod b/go.mod index 9ad7c921d..e53a33c05 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/minio/minio-go/v7 v7.0.95 github.com/mitchellh/mapstructure v1.5.0 github.com/netobserv/gopipes v0.3.0 - github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 + github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3 github.com/netobserv/netobserv-ebpf-agent v1.9.2-community github.com/netsampler/goflow2 v1.3.7 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index f227eb3c1..9dad29260 100644 --- a/go.sum +++ b/go.sum @@ -255,8 +255,8 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60= github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU= -github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 h1:hjzhVZSSKIOmAzHbGUV4JhVIPkgKs/UtrWDx6JSVKMw= -github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8= +github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3 h1:rxQipq0xpoiao7ifls/82JCcOVALC4n08ppTLCUFGL4= +github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8= github.com/netobserv/netobserv-ebpf-agent v1.9.2-community h1:ghW16OO4QRWj0Uh1gMYX+NjAlgx2sZmCsO3Tkwoj4Do= github.com/netobserv/netobserv-ebpf-agent v1.9.2-community/go.mod h1:17OaUNAwx0LxoeV/SaHlJIJP6bpN7zSvUP3GtZelESQ= github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc= diff --git a/pkg/api/write_loki.go b/pkg/api/write_loki.go index cdc9a36bc..0794c70b5 100644 --- a/pkg/api/write_loki.go +++ b/pkg/api/write_loki.go @@ -20,6 +20,7 @@ package api import ( "errors" "fmt" + "time" promConfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -46,6 +47,15 @@ type WriteLoki struct { TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"` Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)"` Reorder bool `yaml:"reorder,omitempty" json:"reorder,omitempty" doc:"reorder json map keys"` + + // Client protocol selection + ClientProtocol string `yaml:"clientProtocol,omitempty" json:"clientProtocol,omitempty" doc:"type of client protocol to use: 'http' or 'grpc' (default: 'http')"` + GRPCConfig *GRPCLokiConfig `yaml:"grpcConfig,omitempty" json:"grpcConfig,omitempty" doc:"gRPC client configuration (used only for gRPC client type)"` +} + +type GRPCLokiConfig struct { + KeepAlive string `yaml:"keepAlive,omitempty" json:"keepAlive,omitempty" doc:"keep alive interval"` + KeepAliveTimeout string `yaml:"keepAliveTimeout,omitempty" json:"keepAliveTimeout,omitempty" doc:"keep alive timeout"` } func (w *WriteLoki) SetDefaults() { @@ -76,6 +86,26 @@ func (w *WriteLoki) SetDefaults() { if w.Format == "" { w.Format = "json" } + if w.ClientProtocol == "" { + w.ClientProtocol = "http" + } + + // Set defaults for gRPC config if gRPC client protocol is selected + if w.ClientProtocol == "grpc" { + if w.GRPCConfig == nil { + w.GRPCConfig = &GRPCLokiConfig{} + } + w.GRPCConfig.SetDefaults() + } +} + +func (g *GRPCLokiConfig) SetDefaults() { + if g.KeepAlive == "" { + g.KeepAlive = "30s" + } + if g.KeepAliveTimeout == "" { + g.KeepAliveTimeout = "5s" + } } func (w *WriteLoki) Validate() error { @@ -85,11 +115,51 @@ func (w *WriteLoki) Validate() error { if w.TimestampScale == "" { return errors.New("timestampUnit must be a valid Duration > 0 (e.g. 1m, 1s or 1ms)") } - if w.URL == "" { - return errors.New("url can't be empty") - } if w.BatchSize <= 0 { return fmt.Errorf("invalid batchSize: %v. Required > 0", w.BatchSize) } + + // Validate client protocol + if w.ClientProtocol != "" && w.ClientProtocol != "http" && w.ClientProtocol != "grpc" { + return fmt.Errorf("invalid clientProtocol: %s. Must be 'http' or 'grpc'", w.ClientProtocol) + } + + // Validate based on client protocol + switch w.ClientProtocol { + case "http", "": + if w.URL == "" { + return errors.New("url can't be empty for HTTP client") + } + case "grpc": + if w.URL == "" { + return errors.New("url can't be empty for gRPC client") + } + if w.GRPCConfig == nil { + return errors.New("grpcConfig is required when using gRPC client protocol") + } + if err := w.GRPCConfig.Validate(); err != nil { + return fmt.Errorf("gRPC config validation failed: %w", err) + } + } + + return nil +} + +func (g *GRPCLokiConfig) Validate() error { + if g == nil { + return errors.New("gRPC config cannot be nil") + } + // Validate duration fields + if g.KeepAlive != "" { + if _, err := time.ParseDuration(g.KeepAlive); err != nil { + return fmt.Errorf("invalid keepAlive duration: %w", err) + } + } + if g.KeepAliveTimeout != "" { + if _, err := time.ParseDuration(g.KeepAliveTimeout); err != nil { + return fmt.Errorf("invalid keepAliveTimeout duration: %w", err) + } + } + return nil } diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index 8e4de72d3..82f1035e7 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -30,6 +30,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/utils" logAdapter "github.com/go-kit/kit/log/logrus" + "github.com/netobserv/loki-client-go/grpc" "github.com/netobserv/loki-client-go/loki" "github.com/netobserv/loki-client-go/pkg/backoff" "github.com/netobserv/loki-client-go/pkg/urlutil" @@ -49,7 +50,6 @@ type emitter interface { // Loki record writer type Loki struct { - lokiConfig loki.Config apiConfig api.WriteLoki timestampScale float64 saneLabels map[string]model.LabelName @@ -61,7 +61,46 @@ type Loki struct { formatter func(config.GenericMap) string } -func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) { +func createLokiClient(c *api.WriteLoki) (emitter, error) { + switch c.ClientProtocol { + case "grpc": + return createGRPCClient(c) + case "http", "": + return createHTTPClient(c) + default: + return nil, fmt.Errorf("unsupported client protocol: %s", c.ClientProtocol) + } +} + +func createHTTPClient(c *api.WriteLoki) (emitter, error) { + cfg, err := buildHTTPLokiConfig(c) + if err != nil { + return nil, err + } + + client, err := loki.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki"))) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP Loki client: %w", err) + } + + return client, nil +} + +func createGRPCClient(c *api.WriteLoki) (emitter, error) { + cfg, err := buildGRPCLokiConfig(c) + if err != nil { + return nil, err + } + + client, err := grpc.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki-grpc"))) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC Loki client: %w", err) + } + + return client, nil +} + +func buildHTTPLokiConfig(c *api.WriteLoki) (loki.Config, error) { batchWait, err := time.ParseDuration(c.BatchWait) if err != nil { return loki.Config{}, fmt.Errorf("failed in parsing BatchWait : %w", err) @@ -105,6 +144,69 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) { return cfg, nil } +func buildGRPCLokiConfig(c *api.WriteLoki) (grpc.Config, error) { + if c.GRPCConfig == nil { + return grpc.Config{}, fmt.Errorf("gRPC config is required for gRPC client protocol") + } + + batchWait, err := time.ParseDuration(c.BatchWait) + if err != nil { + return grpc.Config{}, fmt.Errorf("failed in parsing BatchWait: %w", err) + } + + timeout, err := time.ParseDuration(c.Timeout) + if err != nil { + return grpc.Config{}, fmt.Errorf("failed in parsing Timeout: %w", err) + } + + minBackoff, err := time.ParseDuration(c.MinBackoff) + if err != nil { + return grpc.Config{}, fmt.Errorf("failed in parsing MinBackoff: %w", err) + } + + maxBackoff, err := time.ParseDuration(c.MaxBackoff) + if err != nil { + return grpc.Config{}, fmt.Errorf("failed in parsing MaxBackoff: %w", err) + } + + keepAlive, err := time.ParseDuration(c.GRPCConfig.KeepAlive) + if err != nil { + return grpc.Config{}, fmt.Errorf("failed in parsing KeepAlive: %w", err) + } + + keepAliveTimeout, err := time.ParseDuration(c.GRPCConfig.KeepAliveTimeout) + if err != nil { + return grpc.Config{}, fmt.Errorf("failed in parsing KeepAliveTimeout: %w", err) + } + + cfg := grpc.Config{ + ServerAddress: c.URL, + TenantID: c.TenantID, + BatchWait: batchWait, + BatchSize: c.BatchSize, + Timeout: timeout, + KeepAlive: keepAlive, + KeepAliveTimeout: keepAliveTimeout, + BackoffConfig: backoff.BackoffConfig{ + MinBackoff: minBackoff, + MaxBackoff: maxBackoff, + MaxRetries: c.MaxRetries, + }, + } + + // Set external labels from static labels + if len(c.StaticLabels) > 0 { + cfg.ExternalLabels.LabelSet = c.StaticLabels + } + + // Configure TLS from shared ClientConfig (same as HTTP client) + if c.ClientConfig != nil { + cfg.TLS = c.ClientConfig.TLSConfig + } + + return cfg, nil +} + func (l *Loki) ProcessRecord(in config.GenericMap) error { labels, lines := l.splitLabelsLines(in) @@ -219,13 +321,10 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo return nil, fmt.Errorf("the provided config is not valid: %w", err) } - lokiConfig, buildconfigErr := buildLokiConfig(&lokiConfigIn) - if buildconfigErr != nil { - return nil, buildconfigErr - } - client, newWithLoggerErr := loki.NewWithLogger(lokiConfig, logAdapter.NewLogger(log.WithField("module", "export/loki"))) - if newWithLoggerErr != nil { - return nil, newWithLoggerErr + // Create the appropriate client based on clientProtocol + client, err := createLokiClient(&lokiConfigIn) + if err != nil { + return nil, fmt.Errorf("failed to create Loki client: %w", err) } timestampScale, err := time.ParseDuration(lokiConfigIn.TimestampScale) @@ -253,7 +352,6 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo f := formatter(lokiConfigIn.Format, lokiConfigIn.Reorder) l := &Loki{ - lokiConfig: lokiConfig, apiConfig: lokiConfigIn, timestampScale: float64(timestampScale), saneLabels: saneLabels, diff --git a/pkg/pipeline/write/write_loki_test.go b/pkg/pipeline/write/write_loki_test.go index 4c526c45b..c8770f293 100644 --- a/pkg/pipeline/write/write_loki_test.go +++ b/pkg/pipeline/write/write_loki_test.go @@ -30,6 +30,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/test" + promConfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -89,15 +90,14 @@ parameters: loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) - assert.Equal(t, "https://foo:8888/loki/api/v1/push", loki.lokiConfig.URL.String()) - assert.Equal(t, "theTenant", loki.lokiConfig.TenantID) - assert.Equal(t, time.Minute, loki.lokiConfig.BatchWait) - minBackoff, _ := time.ParseDuration(loki.apiConfig.MinBackoff) - assert.Equal(t, minBackoff, loki.lokiConfig.BackoffConfig.MinBackoff) + // Test that the API config was properly set and defaults applied + assert.Equal(t, "https://foo:8888/", loki.apiConfig.URL) + assert.Equal(t, "theTenant", loki.apiConfig.TenantID) + assert.Equal(t, "1m", loki.apiConfig.BatchWait) + assert.Equal(t, "5s", loki.apiConfig.MinBackoff) // Make sure default batch size is set - assert.Equal(t, 102400, loki.lokiConfig.BatchSize) - assert.Equal(t, loki.apiConfig.BatchSize, loki.lokiConfig.BatchSize) + assert.Equal(t, 102400, loki.apiConfig.BatchSize) } func Test_buildLokiConfig_ClientDeserialization(t *testing.T) { @@ -122,8 +122,9 @@ parameters: loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) require.NoError(t, err) - proxyFunc := loki.lokiConfig.Client.Proxy() - assert.Nil(t, proxyFunc) + // Test that the client was created successfully and API config is preserved + assert.NotNil(t, loki.client) + assert.NotNil(t, loki.apiConfig.ClientConfig) } func TestLoki_ProcessRecord(t *testing.T) { @@ -369,6 +370,273 @@ func hundredFlows() []config.GenericMap { return flows } +func TestGRPCClientCreation(t *testing.T) { + params := api.WriteLoki{ + URL: "localhost:9095", + ClientProtocol: "grpc", + TenantID: "test-tenant", + GRPCConfig: &api.GRPCLokiConfig{ + KeepAlive: "30s", + KeepAliveTimeout: "5s", + }, + } + + loki, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: ¶ms}}) + require.NoError(t, err) + require.NotNil(t, loki) + require.NotNil(t, loki.client) +} + +func TestGRPCClientCreationWithTLS(t *testing.T) { + params := api.WriteLoki{ + URL: "loki.example.com:9095", + ClientProtocol: "grpc", + TenantID: "test-tenant", + ClientConfig: &promConfig.HTTPClientConfig{ + TLSConfig: promConfig.TLSConfig{ + CertFile: "/path/to/cert.pem", + KeyFile: "/path/to/key.pem", + CAFile: "/path/to/ca.pem", + ServerName: "loki.example.com", + InsecureSkipVerify: false, + }, + }, + GRPCConfig: &api.GRPCLokiConfig{ + KeepAlive: "30s", + KeepAliveTimeout: "5s", + }, + } + + // This test expects to fail due to missing certificate files + // We're testing the TLS config validation, not actual connection + _, err := NewWriteLoki(operational.NewMetrics(&config.MetricsSettings{}), config.StageParam{Write: &config.Write{Loki: ¶ms}}) + require.Error(t, err) + require.Contains(t, err.Error(), "no such file or directory") +} + +func TestBuildGRPCLokiConfig(t *testing.T) { + tests := []struct { + name string + input *api.WriteLoki + wantErr bool + validate func(t *testing.T, cfg interface{}) + }{ + { + name: "valid basic gRPC config", + input: &api.WriteLoki{ + URL: "localhost:9095", + ClientProtocol: "grpc", + BatchWait: "2s", + BatchSize: 50000, + Timeout: "15s", + MinBackoff: "2s", + MaxBackoff: "10s", + MaxRetries: 5, + TenantID: "test-tenant", + GRPCConfig: &api.GRPCLokiConfig{ + KeepAlive: "60s", + KeepAliveTimeout: "10s", + }, + }, + wantErr: false, + validate: func(t *testing.T, cfg interface{}) { + // Basic validation that config was created without error + require.NotNil(t, cfg) + }, + }, + { + name: "missing gRPC config", + input: &api.WriteLoki{ + URL: "localhost:9095", + ClientProtocol: "grpc", + TenantID: "test-tenant", + GRPCConfig: nil, + }, + wantErr: true, // buildGRPCLokiConfig validates directly (SetDefaults not called in this unit test) + }, + { + name: "invalid duration in gRPC config", + input: &api.WriteLoki{ + URL: "localhost:9095", + ClientProtocol: "grpc", + BatchWait: "invalid-duration", + TenantID: "test-tenant", + GRPCConfig: &api.GRPCLokiConfig{}, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg, err := buildGRPCLokiConfig(tt.input) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + if tt.validate != nil { + tt.validate(t, cfg) + } + }) + } +} + +func TestGRPCConfigValidation(t *testing.T) { + tests := []struct { + name string + config *api.GRPCLokiConfig + wantErr bool + errMsg string + }{ + { + name: "valid config", + config: &api.GRPCLokiConfig{ + KeepAlive: "30s", + KeepAliveTimeout: "5s", + }, + wantErr: false, + }, + { + name: "nil config", + config: nil, + wantErr: true, + errMsg: "cannot be nil", + }, + { + name: "invalid keepAlive duration", + config: &api.GRPCLokiConfig{ + KeepAlive: "invalid-duration", + }, + wantErr: true, + errMsg: "invalid keepAlive duration", + }, + { + name: "invalid keepAliveTimeout duration", + config: &api.GRPCLokiConfig{ + KeepAlive: "30s", + KeepAliveTimeout: "invalid-timeout", + }, + wantErr: true, + errMsg: "invalid keepAliveTimeout duration", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.wantErr { + require.Error(t, err) + if tt.errMsg != "" { + require.Contains(t, err.Error(), tt.errMsg) + } + return + } + require.NoError(t, err) + }) + } +} + +func TestGRPCConfigDefaults(t *testing.T) { + config := &api.GRPCLokiConfig{} + + config.SetDefaults() + + assert.Equal(t, "30s", config.KeepAlive) + assert.Equal(t, "5s", config.KeepAliveTimeout) +} + +func TestWriteLokiValidation(t *testing.T) { + tests := []struct { + name string + config *api.WriteLoki + wantErr bool + errMsg string + }{ + { + name: "valid HTTP config", + config: &api.WriteLoki{ + URL: "http://localhost:3100", + ClientProtocol: "http", + BatchSize: 1024, + }, + wantErr: false, + }, + { + name: "valid gRPC config", + config: &api.WriteLoki{ + URL: "localhost:9095", + ClientProtocol: "grpc", + BatchSize: 1024, + GRPCConfig: &api.GRPCLokiConfig{}, + }, + wantErr: false, + }, + { + name: "invalid client type", + config: &api.WriteLoki{ + ClientProtocol: "websocket", + BatchSize: 1024, + }, + wantErr: true, + errMsg: "invalid clientProtocol", + }, + { + name: "missing URL for HTTP", + config: &api.WriteLoki{ + ClientProtocol: "http", + BatchSize: 1024, + }, + wantErr: true, + errMsg: "url can't be empty", + }, + { + name: "missing URL for gRPC", + config: &api.WriteLoki{ + ClientProtocol: "grpc", + BatchSize: 1024, + GRPCConfig: &api.GRPCLokiConfig{}, + }, + wantErr: true, + errMsg: "url can't be empty", + }, + { + name: "auto-created gRPC config with defaults", + config: &api.WriteLoki{ + URL: "localhost:9095", + ClientProtocol: "grpc", + BatchSize: 1024, + }, + wantErr: false, // SetDefaults will auto-create GRPCConfig + }, + { + name: "invalid batch size", + config: &api.WriteLoki{ + URL: "http://localhost:3100", + ClientProtocol: "http", + BatchSize: -1, + }, + wantErr: true, + errMsg: "invalid batchSize", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.config.SetDefaults() + err := tt.config.Validate() + if tt.wantErr { + require.Error(t, err) + if tt.errMsg != "" { + require.Contains(t, err.Error(), tt.errMsg) + } + return + } + require.NoError(t, err) + }) + } +} + func BenchmarkWriteLoki(b *testing.B) { logrus.SetLevel(logrus.ErrorLevel) lokiFlows := make(chan map[string]interface{}, 256) diff --git a/vendor/github.com/netobserv/loki-client-go/grpc/batch.go b/vendor/github.com/netobserv/loki-client-go/grpc/batch.go new file mode 100644 index 000000000..3cc2e764c --- /dev/null +++ b/vendor/github.com/netobserv/loki-client-go/grpc/batch.go @@ -0,0 +1,109 @@ +package grpc + +import ( + "time" + + "github.com/netobserv/loki-client-go/pkg/logproto" + "github.com/prometheus/common/model" +) + +// entry represents a log entry with tenant and label information +type entry struct { + tenantID string + labels model.LabelSet + logproto.Entry +} + +// batch holds pending log streams waiting to be sent to Loki via GRPC. +// Similar to HTTP batch but optimized for GRPC operations. +type batch struct { + streams map[string]*logproto.Stream + bytes int + createdAt time.Time + tenantID string // GRPC batches are per-tenant for connection management +} + +// newBatch creates a new batch for a specific tenant +func newBatch(tenantID string, entries ...entry) *batch { + b := &batch{ + streams: map[string]*logproto.Stream{}, + bytes: 0, + createdAt: time.Now(), + tenantID: tenantID, + } + + // Add entries to the batch + for _, entry := range entries { + b.add(entry) + } + + return b +} + +// add an entry to the batch +func (b *batch) add(entry entry) { + b.bytes += len(entry.Line) + + // Append the entry to an already existing stream (if any) + labels := entry.labels.String() + if stream, ok := b.streams[labels]; ok { + stream.Entries = append(stream.Entries, entry.Entry) + return + } + + // Add the entry as a new stream + b.streams[labels] = &logproto.Stream{ + Labels: labels, + Entries: []logproto.Entry{entry.Entry}, + } +} + +// sizeBytes returns the current batch size in bytes +func (b *batch) sizeBytes() int { + return b.bytes +} + +// sizeBytesAfter returns the size of the batch after the input entry +// will be added to the batch itself +func (b *batch) sizeBytesAfter(entry entry) int { + return b.bytes + len(entry.Line) +} + +// age of the batch since its creation +func (b *batch) age() time.Duration { + return time.Since(b.createdAt) +} + +// createPushRequest creates a push request from the batch +func (b *batch) createPushRequest() (*logproto.PushRequest, int) { + req := &logproto.PushRequest{ + Streams: make([]logproto.Stream, 0, len(b.streams)), + } + + entriesCount := 0 + for _, stream := range b.streams { + req.Streams = append(req.Streams, *stream) + entriesCount += len(stream.Entries) + } + + return req, entriesCount +} + +// isEmpty returns true if the batch has no entries +func (b *batch) isEmpty() bool { + return len(b.streams) == 0 +} + +// streamCount returns the number of streams in the batch +func (b *batch) streamCount() int { + return len(b.streams) +} + +// entryCount returns the total number of entries across all streams +func (b *batch) entryCount() int { + count := 0 + for _, stream := range b.streams { + count += len(stream.Entries) + } + return count +} diff --git a/vendor/github.com/netobserv/loki-client-go/grpc/client.go b/vendor/github.com/netobserv/loki-client-go/grpc/client.go new file mode 100644 index 000000000..5cb293868 --- /dev/null +++ b/vendor/github.com/netobserv/loki-client-go/grpc/client.go @@ -0,0 +1,340 @@ +package grpc + +import ( + "context" + "errors" + "fmt" + "os" + "strconv" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/netobserv/loki-client-go/pkg/backoff" + "github.com/netobserv/loki-client-go/pkg/logproto" + "github.com/netobserv/loki-client-go/pkg/metrics" + "github.com/prometheus/common/model" + "github.com/prometheus/common/version" + "github.com/prometheus/prometheus/promql/parser" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const ( + // Label reserved to override the tenant ID while processing pipeline stages + ReservedLabelTenantID = "__tenant_id__" + + transportGRPC = "grpc" +) + +var ( + UserAgent = fmt.Sprintf("loki-grpc-client/%s", version.Version) +) + +func init() { + metrics.RegisterMetrics() +} + +// Client for pushing logs via GRPC +type Client struct { + logger log.Logger + cfg Config + conn *grpc.ClientConn + pusher logproto.PusherClient + quit chan struct{} + once sync.Once + entries chan entry + wg sync.WaitGroup + + externalLabels model.LabelSet +} + +// New creates a new GRPC client from config +func New(cfg Config) (*Client, error) { + logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowWarn()) + return NewWithLogger(cfg, logger) +} + +// NewWithDefault creates a new client with default configuration +func NewWithDefault(serverAddress string) (*Client, error) { + cfg, err := NewDefaultConfig(serverAddress) + if err != nil { + return nil, err + } + return New(cfg) +} + +// NewWithLogger creates a new GRPC client with a logger and config +func NewWithLogger(cfg Config, logger log.Logger) (*Client, error) { + if cfg.ServerAddress == "" { + return nil, errors.New("grpc client needs server address") + } + + c := &Client{ + logger: log.With(logger, "component", "grpc-client", "host", cfg.ServerAddress), + cfg: cfg, + quit: make(chan struct{}), + entries: make(chan entry), + externalLabels: cfg.ExternalLabels.LabelSet, + } + + // Initialize connection + if err := c.connect(); err != nil { + return nil, fmt.Errorf("failed to connect to GRPC server: %w", err) + } + + // Initialize counters to 0 + for _, counter := range metrics.CountersWithHost { + counter.WithLabelValues(c.cfg.ServerAddress, transportGRPC).Add(0) + } + + c.wg.Add(1) + go c.run() + return c, nil +} + +// connect establishes GRPC connection +func (c *Client) connect() error { + opts, err := c.cfg.BuildDialOptions() + if err != nil { + return err + } + + conn, err := grpc.NewClient(c.cfg.ServerAddress, opts...) + if err != nil { + return err + } + + c.conn = conn + c.pusher = logproto.NewPusherClient(conn) + + level.Info(c.logger).Log("msg", "connected to GRPC server", "address", c.cfg.ServerAddress) + return nil +} + +func (c *Client) run() { + batches := map[string]*batch{} + + // Batch timer logic similar to HTTP client + minWaitCheckFrequency := 10 * time.Millisecond + maxWaitCheckFrequency := c.cfg.BatchWait / 10 + if maxWaitCheckFrequency < minWaitCheckFrequency { + maxWaitCheckFrequency = minWaitCheckFrequency + } + + maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) + + defer func() { + // Send all pending batches + for tenantID, batch := range batches { + c.sendBatch(tenantID, batch) + } + c.wg.Done() + }() + + for { + select { + case <-c.quit: + return + + case e := <-c.entries: + batch, ok := batches[e.tenantID] + + // Create new batch if doesn't exist + if !ok { + batches[e.tenantID] = newBatch(e.tenantID, e) + break + } + + // Send batch if adding entry would exceed max size + if batch.sizeBytesAfter(e) > c.cfg.BatchSize { + c.sendBatch(e.tenantID, batch) + batches[e.tenantID] = newBatch(e.tenantID, e) + break + } + + // Add entry to existing batch + batch.add(e) + + case <-maxWaitCheck.C: + // Send batches that have reached max wait time + for tenantID, batch := range batches { + if batch.age() < c.cfg.BatchWait { + continue + } + + c.sendBatch(tenantID, batch) + delete(batches, tenantID) + } + } + } +} + +func (c *Client) sendBatch(tenantID string, batch *batch) { + req, entriesCount := batch.createPushRequest() + + if len(req.Streams) == 0 { + return + } + + // Calculate wire bytes (protobuf size) to match HTTP client behavior + wireBytes := float64(proto.Size(req)) + metrics.EncodedBytes.WithLabelValues(c.cfg.ServerAddress, transportGRPC).Add(wireBytes) + + backoffCtx := context.Background() + backoffInstance := backoff.New(backoffCtx, c.cfg.BackoffConfig) + var status string + var err error + + for backoffInstance.Ongoing() { + start := time.Now() + // Create a fresh context for each retry attempt + pushCtx := context.Background() + err = c.push(pushCtx, tenantID, req) + + // Convert error to status code for metrics + status = c.getStatusCode(err) + metrics.RequestDuration.WithLabelValues(status, c.cfg.ServerAddress, transportGRPC).Observe(time.Since(start).Seconds()) + + if err == nil { + // Success metrics + metrics.SentEntries.WithLabelValues(c.cfg.ServerAddress, transportGRPC).Add(float64(entriesCount)) + metrics.SentBytes.WithLabelValues(c.cfg.ServerAddress, transportGRPC).Add(wireBytes) + + c.updateStreamLagMetrics(req.Streams) + return + } + + level.Warn(c.logger).Log("msg", "error sending batch via GRPC, will retry", "status", status, "error", err) + metrics.BatchRetries.WithLabelValues(c.cfg.ServerAddress, transportGRPC).Inc() + backoffInstance.Wait() + } + + // Failed after all retries + if err != nil { + level.Error(c.logger).Log("msg", "final error sending batch via GRPC", "status", status, "error", err) + metrics.DroppedEntries.WithLabelValues(c.cfg.ServerAddress, transportGRPC).Add(float64(entriesCount)) + metrics.DroppedBytes.WithLabelValues(c.cfg.ServerAddress, transportGRPC).Add(wireBytes) + } +} + +func (c *Client) push(ctx context.Context, tenantID string, req *logproto.PushRequest) error { + ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) + defer cancel() + + // Add tenant ID to metadata if specified + if tenantID != "" { + ctx = metadata.AppendToOutgoingContext(ctx, "x-scope-orgid", tenantID) + } + + // Add user agent + ctx = metadata.AppendToOutgoingContext(ctx, "user-agent", UserAgent) + + // gRPC handles connection management automatically + _, err := c.pusher.Push(ctx, req) + return err +} + +func (c *Client) getStatusCode(err error) string { + if err == nil { + return "200" + } + + st, ok := status.FromError(err) + if !ok { + return "Unknown" + } + + // Convert gRPC status codes to HTTP-like status codes for metrics compatibility + switch st.Code() { + case codes.OK: + return "200" + case codes.ResourceExhausted: + return "429" // Rate limited + case codes.Internal, codes.Aborted, codes.Unavailable: + return "500" + case codes.DeadlineExceeded: + return "504" // Gateway timeout + default: + return strconv.Itoa(int(st.Code())) + } +} + +func (c *Client) getTenantID(labels model.LabelSet) string { + // Check if overridden in pipeline stages + if value, ok := labels[ReservedLabelTenantID]; ok { + return string(value) + } + + // Check config + if c.cfg.TenantID != "" { + return c.cfg.TenantID + } + + return "" +} + +// Stop the client +func (c *Client) Stop() { + c.once.Do(func() { + close(c.quit) + + if c.conn != nil { + c.conn.Close() + } + }) + c.wg.Wait() +} + +// updateStreamLagMetrics updates lag metrics to match HTTP client behavior +func (c *Client) updateStreamLagMetrics(streams []logproto.Stream) { + for _, s := range streams { + lbls, err := parser.ParseMetric(s.Labels) + if err != nil { + // is this possible? + level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err) + return + } + var lblSet model.LabelSet + for i := range lbls { + if lbls[i].Name == metrics.LatencyLabel { + lblSet = model.LabelSet{ + model.LabelName(metrics.HostLabel): model.LabelValue(c.cfg.ServerAddress), + model.LabelName(metrics.LatencyLabel): model.LabelValue(lbls[i].Value), + } + } + } + if lblSet != nil { + metrics.StreamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) + } + } +} + +// Handle implements EntryHandler; adds a new line to the next batch; send is async +func (c *Client) Handle(ls model.LabelSet, t time.Time, s string) error { + if len(c.externalLabels) > 0 { + ls = c.externalLabels.Merge(ls) + } + + // Get tenant ID and remove special label + tenantID := c.getTenantID(ls) + if _, ok := ls[ReservedLabelTenantID]; ok { + ls = ls.Clone() + delete(ls, ReservedLabelTenantID) + } + + c.entries <- entry{tenantID, ls, logproto.Entry{ + Timestamp: t, + Line: s, + }} + return nil +} + +func (c *Client) UnregisterLatencyMetric(labels model.LabelSet) { + labels[metrics.HostLabel] = model.LabelValue(c.cfg.ServerAddress) + metrics.StreamLag.Delete(labels) +} diff --git a/vendor/github.com/netobserv/loki-client-go/grpc/config.go b/vendor/github.com/netobserv/loki-client-go/grpc/config.go new file mode 100644 index 000000000..31f9b8144 --- /dev/null +++ b/vendor/github.com/netobserv/loki-client-go/grpc/config.go @@ -0,0 +1,160 @@ +package grpc + +import ( + "flag" + "time" + + "github.com/netobserv/loki-client-go/pkg/backoff" + "github.com/netobserv/loki-client-go/pkg/labelutil" + promConfig "github.com/prometheus/common/config" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" +) + +// Default configuration values for GRPC client +const ( + DefaultBatchWait = 1 * time.Second + DefaultBatchSize int = 1024 * 1024 + DefaultMinBackoff = 500 * time.Millisecond + DefaultMaxBackoff = 5 * time.Minute + DefaultMaxRetries int = 10 + DefaultTimeout = 10 * time.Second + DefaultKeepAlive = 30 * time.Second + DefaultKeepAliveTimeout = 5 * time.Second +) + +// Config describes configuration for a GRPC pusher client. +type Config struct { + // Server address (host:port) + ServerAddress string `yaml:"server_address"` + + // Batching configuration + BatchWait time.Duration `yaml:"batch_wait"` + BatchSize int `yaml:"batch_size"` + + // Connection configuration + Timeout time.Duration `yaml:"timeout"` + + // TLS configuration (uses Prometheus common TLSConfig for consistency) + TLS promConfig.TLSConfig `yaml:"tls"` + + // Keep alive configuration + KeepAlive time.Duration `yaml:"keep_alive"` + KeepAliveTimeout time.Duration `yaml:"keep_alive_timeout"` + + // Retry configuration + BackoffConfig backoff.BackoffConfig `yaml:"backoff_config"` + + // Labels to add to any time series when communicating with loki + ExternalLabels labelutil.LabelSet `yaml:"external_labels,omitempty"` + + // Tenant ID for multi-tenant mode (empty string means single tenant) + TenantID string `yaml:"tenant_id"` +} + +// NewDefaultConfig creates a default configuration for a given GRPC server address. +func NewDefaultConfig(serverAddress string) (Config, error) { + var cfg Config + f := &flag.FlagSet{} + cfg.RegisterFlags(f) + if err := f.Parse(nil); err != nil { + return cfg, err + } + cfg.ServerAddress = serverAddress + return cfg, nil +} + +// RegisterFlags registers configuration flags +func (c *Config) RegisterFlags(f *flag.FlagSet) { + c.RegisterFlagsWithPrefix("", f) +} + +// RegisterFlagsWithPrefix registers flags with a given prefix +func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&c.ServerAddress, prefix+"grpc.server-address", "", "GRPC server address (host:port)") + f.DurationVar(&c.BatchWait, prefix+"grpc.batch-wait", DefaultBatchWait, "Maximum wait period before sending batch.") + f.IntVar(&c.BatchSize, prefix+"grpc.batch-size-bytes", DefaultBatchSize, "Maximum batch size to accrue before sending.") + + f.DurationVar(&c.Timeout, prefix+"grpc.timeout", DefaultTimeout, "Maximum time to wait for server to respond to a request") + + f.StringVar(&c.TLS.CertFile, prefix+"grpc.tls.cert-file", "", "Path to client certificate file") + f.StringVar(&c.TLS.KeyFile, prefix+"grpc.tls.key-file", "", "Path to client key file") + f.StringVar(&c.TLS.CAFile, prefix+"grpc.tls.ca-file", "", "Path to CA certificate file") + f.StringVar(&c.TLS.ServerName, prefix+"grpc.tls.server-name", "", "Server name for certificate verification") + f.BoolVar(&c.TLS.InsecureSkipVerify, prefix+"grpc.tls.insecure-skip-verify", false, "Skip certificate verification") + + f.DurationVar(&c.KeepAlive, prefix+"grpc.keep-alive", DefaultKeepAlive, "Keep alive interval") + f.DurationVar(&c.KeepAliveTimeout, prefix+"grpc.keep-alive-timeout", DefaultKeepAliveTimeout, "Keep alive timeout") + + f.IntVar(&c.BackoffConfig.MaxRetries, prefix+"grpc.max-retries", DefaultMaxRetries, "Maximum number of retries when sending batches.") + f.DurationVar(&c.BackoffConfig.MinBackoff, prefix+"grpc.min-backoff", DefaultMinBackoff, "Initial backoff time between retries.") + f.DurationVar(&c.BackoffConfig.MaxBackoff, prefix+"grpc.max-backoff", DefaultMaxBackoff, "Maximum backoff time between retries.") + + f.Var(&c.ExternalLabels, prefix+"grpc.external-labels", "list of external labels to add to each log (e.g: --grpc.external-labels=lb1=v1,lb2=v2)") + f.StringVar(&c.TenantID, prefix+"grpc.tenant-id", "", "Tenant ID to use when pushing logs to Loki.") +} + +// BuildDialOptions creates GRPC dial options from the configuration +func (c *Config) BuildDialOptions() ([]grpc.DialOption, error) { + var opts []grpc.DialOption + + // Keep alive settings + opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: c.KeepAlive, + Timeout: c.KeepAliveTimeout, + PermitWithoutStream: true, + })) + + // TLS configuration - check if any TLS field is set to determine if TLS should be enabled + tlsEnabled := c.TLS.CAFile != "" || c.TLS.CertFile != "" || c.TLS.KeyFile != "" || + c.TLS.CA != "" || c.TLS.Cert != "" || string(c.TLS.Key) != "" + + if tlsEnabled { + // Use Prometheus common config to build TLS config + tlsConfig, err := promConfig.NewTLSConfig(&c.TLS) + if err != nil { + return nil, err + } + + creds := credentials.NewTLS(tlsConfig) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + return opts, nil +} + +// UnmarshalYAML implements YAML unmarshaler +func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { + type raw Config + var cfg raw + + if c.ServerAddress != "" { + // Use existing values as defaults + cfg = raw(*c) + } else { + // Set sane defaults + cfg = raw{ + BackoffConfig: backoff.BackoffConfig{ + MaxBackoff: DefaultMaxBackoff, + MaxRetries: DefaultMaxRetries, + MinBackoff: DefaultMinBackoff, + }, + BatchSize: DefaultBatchSize, + BatchWait: DefaultBatchWait, + Timeout: DefaultTimeout, + KeepAlive: DefaultKeepAlive, + KeepAliveTimeout: DefaultKeepAliveTimeout, + } + } + + if err := unmarshal(&cfg); err != nil { + return err + } + + *c = Config(cfg) + return nil +} diff --git a/vendor/github.com/netobserv/loki-client-go/loki/client.go b/vendor/github.com/netobserv/loki-client-go/loki/client.go index bf3093b5e..691c05459 100644 --- a/vendor/github.com/netobserv/loki-client-go/loki/client.go +++ b/vendor/github.com/netobserv/loki-client-go/loki/client.go @@ -14,14 +14,11 @@ import ( "time" "github.com/netobserv/loki-client-go/pkg/backoff" + "github.com/netobserv/loki-client-go/pkg/metrics" "github.com/prometheus/prometheus/promql/parser" - "github.com/netobserv/loki-client-go/pkg/metric" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" @@ -39,74 +36,15 @@ const ( // pipeline stages ReservedLabelTenantID = "__tenant_id__" - LatencyLabel = "filename" - HostLabel = "host" - MetricPrefix = "netobserv" + transportHTTP = "http" ) var ( - encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: MetricPrefix, - Name: "loki_encoded_bytes_total", - Help: "Number of bytes encoded and ready to send.", - }, []string{HostLabel}) - sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: MetricPrefix, - Name: "loki_sent_bytes_total", - Help: "Number of bytes sent.", - }, []string{HostLabel}) - droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: MetricPrefix, - Name: "loki_dropped_bytes_total", - Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", - }, []string{HostLabel}) - sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: MetricPrefix, - Name: "loki_sent_entries_total", - Help: "Number of log entries sent to the ingester.", - }, []string{HostLabel}) - droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: MetricPrefix, - Name: "loki_dropped_entries_total", - Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", - }, []string{HostLabel}) - requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: MetricPrefix, - Name: "loki_request_duration_seconds", - Help: "Duration of send requests.", - }, []string{"status_code", HostLabel}) - batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: MetricPrefix, - Name: "loki_batch_retries_total", - Help: "Number of times batches has had to be retried.", - }, []string{HostLabel}) - streamLag *metric.Gauges - - countersWithHost = []*prometheus.CounterVec{ - encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries, - } - UserAgent = fmt.Sprintf("promtail/%s", version.Version) ) func init() { - prometheus.MustRegister(encodedBytes) - prometheus.MustRegister(sentBytes) - prometheus.MustRegister(droppedBytes) - prometheus.MustRegister(sentEntries) - prometheus.MustRegister(droppedEntries) - prometheus.MustRegister(requestDuration) - prometheus.MustRegister(batchRetries) - var err error - streamLag, err = metric.NewGauges(MetricPrefix+"_loki_stream_lag_seconds", - "Difference between current time and last batch timestamp for successful sends", - metric.GaugeConfig{Action: "set"}, - int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric. - ) - if err != nil { - panic(err) - } - prometheus.MustRegister(streamLag) + metrics.RegisterMetrics() } // Client for pushing logs in snappy-compressed protos over HTTP. @@ -172,8 +110,8 @@ func NewWithLogger(cfg Config, logger log.Logger) (*Client, error) { // Initialize counters to 0 so the metrics are exported before the first // occurrence of incrementing to avoid missing metrics. - for _, counter := range countersWithHost { - counter.WithLabelValues(c.cfg.URL.Host).Add(0) + for _, counter := range metrics.CountersWithHost { + counter.WithLabelValues(c.cfg.URL.Host, transportHTTP).Add(0) } c.wg.Add(1) @@ -264,7 +202,7 @@ func (c *Client) sendBatch(tenantID string, batch *batch) { return } bufBytes := float64(len(buf)) - encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + metrics.EncodedBytes.WithLabelValues(c.cfg.URL.Host, transportHTTP).Add(bufBytes) ctx := context.Background() backoff := backoff.New(ctx, c.cfg.BackoffConfig) @@ -272,11 +210,11 @@ func (c *Client) sendBatch(tenantID string, batch *batch) { for backoff.Ongoing() { start := time.Now() status, err = c.send(ctx, tenantID, buf) - requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) + metrics.RequestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host, transportHTTP).Observe(time.Since(start).Seconds()) if err == nil { - sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + metrics.SentBytes.WithLabelValues(c.cfg.URL.Host, transportHTTP).Add(bufBytes) + metrics.SentEntries.WithLabelValues(c.cfg.URL.Host, transportHTTP).Add(float64(entriesCount)) for _, s := range batch.streams { lbls, err := parser.ParseMetric(s.Labels) if err != nil { @@ -286,15 +224,15 @@ func (c *Client) sendBatch(tenantID string, batch *batch) { } var lblSet model.LabelSet for i := range lbls { - if lbls[i].Name == LatencyLabel { + if lbls[i].Name == metrics.LatencyLabel { lblSet = model.LabelSet{ - model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host), - model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value), + model.LabelName(metrics.HostLabel): model.LabelValue(c.cfg.URL.Host), + model.LabelName(metrics.LatencyLabel): model.LabelValue(lbls[i].Value), } } } if lblSet != nil { - streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) + metrics.StreamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) } } return @@ -306,14 +244,14 @@ func (c *Client) sendBatch(tenantID string, batch *batch) { } level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) - batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() + metrics.BatchRetries.WithLabelValues(c.cfg.URL.Host, transportHTTP).Inc() backoff.Wait() } if err != nil { level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err) - droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + metrics.DroppedBytes.WithLabelValues(c.cfg.URL.Host, transportHTTP).Add(bufBytes) + metrics.DroppedEntries.WithLabelValues(c.cfg.URL.Host, transportHTTP).Add(float64(entriesCount)) } } @@ -399,6 +337,6 @@ func (c *Client) Handle(ls model.LabelSet, t time.Time, s string) error { } func (c *Client) UnregisterLatencyMetric(labels model.LabelSet) { - labels[HostLabel] = model.LabelValue(c.cfg.URL.Host) - streamLag.Delete(labels) + labels[metrics.HostLabel] = model.LabelValue(c.cfg.URL.Host) + metrics.StreamLag.Delete(labels) } diff --git a/vendor/github.com/netobserv/loki-client-go/pkg/metrics/metrics.go b/vendor/github.com/netobserv/loki-client-go/pkg/metrics/metrics.go new file mode 100644 index 000000000..b2f2a3c74 --- /dev/null +++ b/vendor/github.com/netobserv/loki-client-go/pkg/metrics/metrics.go @@ -0,0 +1,94 @@ +package metrics + +import ( + "sync" + "time" + + "github.com/netobserv/loki-client-go/pkg/metric" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + LatencyLabel = "filename" + HostLabel = "host" + MetricPrefix = "netobserv" +) + +var ( + // Shared metrics for both HTTP and gRPC clients + EncodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricPrefix, + Name: "loki_encoded_bytes_total", + Help: "Number of bytes encoded and ready to send.", + }, []string{HostLabel, "transport"}) + + SentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricPrefix, + Name: "loki_sent_bytes_total", + Help: "Number of bytes sent.", + }, []string{HostLabel, "transport"}) + + DroppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricPrefix, + Name: "loki_dropped_bytes_total", + Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", + }, []string{HostLabel, "transport"}) + + SentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricPrefix, + Name: "loki_sent_entries_total", + Help: "Number of log entries sent to the ingester.", + }, []string{HostLabel, "transport"}) + + DroppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricPrefix, + Name: "loki_dropped_entries_total", + Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", + }, []string{HostLabel, "transport"}) + + RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: MetricPrefix, + Name: "loki_request_duration_seconds", + Help: "Duration of send requests.", + }, []string{"status_code", HostLabel, "transport"}) + + BatchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricPrefix, + Name: "loki_batch_retries_total", + Help: "Number of times batches has had to be retried.", + }, []string{HostLabel, "transport"}) + + + StreamLag *metric.Gauges + + // CountersWithHost are the counters that have host as a label + CountersWithHost = []*prometheus.CounterVec{ + EncodedBytes, SentBytes, DroppedBytes, SentEntries, DroppedEntries, BatchRetries, + } + + registrationOnce sync.Once +) + +// RegisterMetrics registers all metrics with prometheus +func RegisterMetrics() { + registrationOnce.Do(func() { + prometheus.MustRegister(EncodedBytes) + prometheus.MustRegister(SentBytes) + prometheus.MustRegister(DroppedBytes) + prometheus.MustRegister(SentEntries) + prometheus.MustRegister(DroppedEntries) + prometheus.MustRegister(RequestDuration) + prometheus.MustRegister(BatchRetries) + + var err error + StreamLag, err = metric.NewGauges(MetricPrefix+"_stream_lag_seconds", + "Difference between current time and last batch timestamp for successful sends", + metric.GaugeConfig{Action: "set"}, + int64(1*time.Minute.Seconds()), + ) + if err != nil { + panic(err) + } + prometheus.MustRegister(StreamLag) + }) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index faeb2a5bf..5e82de896 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -305,14 +305,16 @@ github.com/mxk/go-flowrate/flowrate ## explicit; go 1.18 github.com/netobserv/gopipes/pkg/node github.com/netobserv/gopipes/pkg/node/internal/connect -# github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 +# github.com/netobserv/loki-client-go v0.0.0-20251014110557-40bc8d2e6cf3 ## explicit; go 1.23.0 +github.com/netobserv/loki-client-go/grpc github.com/netobserv/loki-client-go/loki github.com/netobserv/loki-client-go/pkg/backoff github.com/netobserv/loki-client-go/pkg/helpers github.com/netobserv/loki-client-go/pkg/labelutil github.com/netobserv/loki-client-go/pkg/logproto github.com/netobserv/loki-client-go/pkg/metric +github.com/netobserv/loki-client-go/pkg/metrics github.com/netobserv/loki-client-go/pkg/urlutil # github.com/netobserv/netobserv-ebpf-agent v1.9.2-community ## explicit; go 1.24.0