From eaa4e6371c5433d85ea5169825f5236b71d07a7a Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Tue, 2 Dec 2025 14:04:17 -0500 Subject: [PATCH 1/3] Add component which can save prometheus metrics (#4970) --- internal/component/all/all.go | 1 + .../component/telemetry/save/component.go | 279 ++++++++++++++++++ 2 files changed, 280 insertions(+) create mode 100644 internal/component/telemetry/save/component.go diff --git a/internal/component/all/all.go b/internal/component/all/all.go index c2bf374708..15464adb49 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -176,6 +176,7 @@ import ( _ "github.com/grafana/alloy/internal/component/remote/kubernetes/secret" // Import remote.kubernetes.secret _ "github.com/grafana/alloy/internal/component/remote/s3" // Import remote.s3 _ "github.com/grafana/alloy/internal/component/remote/vault" // Import remote.vault + _ "github.com/grafana/alloy/internal/component/telemetry/save" // Import telemetry.save _ "github.com/grafana/alloy/internal/util/otelfeaturegatefix" // Gracefully handle duplicate OTEL feature gates ) diff --git a/internal/component/telemetry/save/component.go b/internal/component/telemetry/save/component.go new file mode 100644 index 0000000000..26eb7f9435 --- /dev/null +++ b/internal/component/telemetry/save/component.go @@ -0,0 +1,279 @@ +package save + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" +) + +func init() { + component.Register(component.Registration{ + Name: "telemetry.save", + Args: Arguments{}, + Exports: Exports{}, + Stability: featuregate.StabilityExperimental, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewComponent(opts, args.(Arguments)) + }, + }) +} + +// Arguments configures the telemetry.save component. +type Arguments struct { + OutputLocation string `alloy:"output_location,attr,optional"` +} + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{OutputLocation: "telemetry/save/"} +} + +// Exports are the set of fields exposed by the telemetry.save component. +type Exports struct { + Receiver storage.Appendable `alloy:"receiver,attr"` +} + +// Component is the telemetry.save component. +type Component struct { + mut sync.RWMutex + args Arguments + logger log.Logger + promMetricsFolder string +} + +var _ component.Component = (*Component)(nil) + +// NewComponent creates a new telemetry.save component. +func NewComponent(opts component.Options, args Arguments) (*Component, error) { + c := &Component{ + args: args, + logger: opts.Logger, + } + + level.Info(c.logger).Log("msg", "initializing telemetry.save component", "output_location", args.OutputLocation) + + // Ensure the output directory exists + dir := filepath.Dir(args.OutputLocation) + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to create output directory: %w", err) + } + + promMetricsFolder := filepath.Join(dir, "prometheus") + if err := os.MkdirAll(promMetricsFolder, 0755); err != nil { + return nil, fmt.Errorf("failed to prometheus metrics directory: %w", err) + } + c.promMetricsFolder = promMetricsFolder + + // Export the receiver interface + opts.OnStateChange(Exports{Receiver: c}) + + return c, nil +} + +// Run starts the component, blocking until ctx is canceled. +func (c *Component) Run(ctx context.Context) error { + _ = level.Info(c.logger).Log("msg", "telemetry.save component started", "output_location", c.args.OutputLocation) + + <-ctx.Done() + + _ = level.Info(c.logger).Log("msg", "telemetry.save component stopped") + return nil +} + +// Update provides a new config to the component. +func (c *Component) Update(args component.Arguments) error { + newArgs := args.(Arguments) + + c.mut.Lock() + defer c.mut.Unlock() + + // Check if output location changed + if newArgs.OutputLocation == c.args.OutputLocation { + return nil + } + + // Ensure the new output directory exists + dir := filepath.Dir(newArgs.OutputLocation) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create output directory: %w", err) + } + + // Cleanup the old directory + oldDir := filepath.Dir(c.args.OutputLocation) + if err := os.RemoveAll(oldDir); err != nil { + level.Warn(c.logger).Log("msg", "failed to remove old output directory", "dir", oldDir, "err", err) + } + + c.args.OutputLocation = newArgs.OutputLocation + level.Info(c.logger).Log("msg", "telemetry.save component updated", "output_location", c.args.OutputLocation) + return nil +} + +// Appender returns an Appender for writing metrics. +func (c *Component) Appender(ctx context.Context) storage.Appender { + return &appender{ + component: c, + ctx: ctx, + } +} + +// Define the RecordType enum +type RecordType string + +const ( + RecordTypeSample RecordType = "sample" + RecordTypeExemplar RecordType = "exemplar" + RecordTypeHistogram RecordType = "histogram" +) + +// Sample is the base type for telemetry samples. +type Sample struct { + RecordType RecordType `json:"record_type"` + Labels map[string]string `json:"labels"` + Timestamp int64 `json:"timestamp"` +} + +// Update ValueSample +type ValueSample struct { + Sample + Value float64 `json:"value,omitempty"` +} + +// Update ExemplarSample +type ExemplarSample struct { + Sample + Exemplar *exemplar.Exemplar `json:"exemplar,omitempty"` +} + +// Update HistogramSample +type HistogramSample struct { + Sample + Histogram *histogram.Histogram `json:"histogram,omitempty"` + FloatHistogram *histogram.FloatHistogram `json:"float_histogram,omitempty"` +} + +// appender implements storage.Appender for writing metrics to file. +type appender struct { + component *Component + ctx context.Context + samples []any // Use a generic type to accommodate multiple concrete types +} + +// Append adds a sample to be written in JSON format. +func (a *appender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + sample := ValueSample{ + Sample: Sample{ + RecordType: RecordTypeSample, + Labels: l.Map(), + Timestamp: t, + }, + Value: v, + } + a.samples = append(a.samples, sample) + return 0, nil +} + +// AppendExemplar adds an exemplar for a series in JSON format. +func (a *appender) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + sample := ExemplarSample{ + Sample: Sample{ + RecordType: RecordTypeExemplar, + Labels: l.Map(), + }, + Exemplar: &e, + } + a.samples = append(a.samples, sample) + return 0, nil +} + +// AppendHistogram adds a histogram sample in JSON format. +func (a *appender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + sample := HistogramSample{ + Sample: Sample{ + RecordType: RecordTypeHistogram, + Labels: l.Map(), + Timestamp: t, + }, + Histogram: h, + FloatHistogram: fh, + } + a.samples = append(a.samples, sample) + return 0, nil +} + +// AppendCTZeroSample adds a CT zero sample (no-op for file appender). +// Mark unused parameters to suppress warnings +func (a *appender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { + return 0, nil +} + +// AppendHistogramCTZeroSample adds a histogram CT zero sample (no-op for file appender). +// Mark unused parameters to suppress warnings +func (a *appender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, nil +} + +// Commit writes all accumulated samples to the output file. +func (a *appender) Commit() error { + a.component.mut.RLock() + defer a.component.mut.RUnlock() + + if len(a.samples) == 0 { + return nil + } + + filePath := filepath.Join(a.component.promMetricsFolder, "metrics.json") + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open metrics file: %w", err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + _ = level.Error(a.component.logger).Log("msg", "failed to close file", "err", closeErr) + } + }() + + for _, sample := range a.samples { + jsonData, err := json.Marshal(sample) + if err != nil { + return fmt.Errorf("failed to marshal sample to JSON: %w", err) + } + if _, err := file.WriteString(string(jsonData) + "\n"); err != nil { + return fmt.Errorf("failed to write sample to file: %w", err) + } + } + + a.samples = nil // Clear the buffer + return nil +} + +// Rollback discards all accumulated samples. +func (a *appender) Rollback() error { + a.samples = a.samples[:0] + return nil +} + +// SetOptions sets the options for the appender. +func (a *appender) SetOptions(_ *storage.AppendOptions) { + // Not implemented for this component +} + +// UpdateMetadata updates the metadata for a series. +func (a *appender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { + // Not implemented for this component + return 0, nil +} From e6b831e939bf91ac8d82c97cfb11da610ddda740 Mon Sep 17 00:00:00 2001 From: Jeff Levin Date: Tue, 2 Dec 2025 12:42:21 -0900 Subject: [PATCH 2/3] add loki writer (#4986) * add loki writer * refactor * Batch all samples from appender, rename receiver -> metrics_receiver, and clear the save directory on startup --------- Co-authored-by: Kyle Eckhart --- .../component/telemetry/save/component.go | 227 +++++------------- internal/component/telemetry/save/logs.go | 185 ++++++++++++++ .../component/telemetry/save/logs_test.go | 205 ++++++++++++++++ internal/component/telemetry/save/metrics.go | 168 +++++++++++++ 4 files changed, 618 insertions(+), 167 deletions(-) create mode 100644 internal/component/telemetry/save/logs.go create mode 100644 internal/component/telemetry/save/logs_test.go create mode 100644 internal/component/telemetry/save/metrics.go diff --git a/internal/component/telemetry/save/component.go b/internal/component/telemetry/save/component.go index 26eb7f9435..3316744904 100644 --- a/internal/component/telemetry/save/component.go +++ b/internal/component/telemetry/save/component.go @@ -2,7 +2,6 @@ package save import ( "context" - "encoding/json" "fmt" "os" "path/filepath" @@ -10,13 +9,10 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/featuregate" ) @@ -44,7 +40,8 @@ func (args *Arguments) SetToDefault() { // Exports are the set of fields exposed by the telemetry.save component. type Exports struct { - Receiver storage.Appendable `alloy:"receiver,attr"` + MetricsReceiver storage.Appendable `alloy:"metrics_receiver,attr"` + LogsReceiver loki.LogsReceiver `alloy:"logs_receiver,attr"` } // Component is the telemetry.save component. @@ -53,6 +50,9 @@ type Component struct { args Arguments logger log.Logger promMetricsFolder string + lokiLogsFolder string + logsReceiver loki.LogsReceiver + logsHandler *LogsHandler } var _ component.Component = (*Component)(nil) @@ -66,20 +66,46 @@ func NewComponent(opts component.Options, args Arguments) (*Component, error) { level.Info(c.logger).Log("msg", "initializing telemetry.save component", "output_location", args.OutputLocation) - // Ensure the output directory exists + // Ensure the output directory exists and is clean dir := filepath.Dir(args.OutputLocation) + if _, err := os.Stat(dir); err == nil { + // Directory exists, clear it + if err := os.RemoveAll(dir); err != nil { + return nil, fmt.Errorf("failed to clear existing output directory: %w", err) + } + } if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("failed to create output directory: %w", err) } + // Create prometheus metrics folder promMetricsFolder := filepath.Join(dir, "prometheus") if err := os.MkdirAll(promMetricsFolder, 0755); err != nil { - return nil, fmt.Errorf("failed to prometheus metrics directory: %w", err) + return nil, fmt.Errorf("failed to create prometheus metrics directory: %w", err) } c.promMetricsFolder = promMetricsFolder - // Export the receiver interface - opts.OnStateChange(Exports{Receiver: c}) + // Create loki logs folder + lokiLogsFolder := filepath.Join(dir, "loki") + if err := os.MkdirAll(lokiLogsFolder, 0755); err != nil { + return nil, fmt.Errorf("failed to create loki logs directory: %w", err) + } + c.lokiLogsFolder = lokiLogsFolder + + // Create logs receiver + c.logsReceiver = loki.NewLogsReceiver(loki.WithComponentID("telemetry.save")) + + // Initialize logs handler + c.logsHandler = NewLogsHandler(c) + + // Start the log entry handler goroutine + c.logsHandler.Start(c.logsReceiver) + + // Export the receiver interfaces + opts.OnStateChange(Exports{ + MetricsReceiver: c, + LogsReceiver: c.logsReceiver, + }) return c, nil } @@ -90,6 +116,9 @@ func (c *Component) Run(ctx context.Context) error { <-ctx.Done() + // Clean shutdown: stop logs handler + c.logsHandler.Stop() + _ = level.Info(c.logger).Log("msg", "telemetry.save component stopped") return nil } @@ -106,174 +135,38 @@ func (c *Component) Update(args component.Arguments) error { return nil } - // Ensure the new output directory exists + // Ensure the new output directory exists and is clean dir := filepath.Dir(newArgs.OutputLocation) + if _, err := os.Stat(dir); err == nil { + // Directory exists, clear it + if err := os.RemoveAll(dir); err != nil { + return fmt.Errorf("failed to clear existing output directory: %w", err) + } + } if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("failed to create output directory: %w", err) } + // Update prometheus and loki folders + promMetricsFolder := filepath.Join(dir, "prometheus") + if err := os.MkdirAll(promMetricsFolder, 0755); err != nil { + return fmt.Errorf("failed to create prometheus metrics directory: %w", err) + } + c.promMetricsFolder = promMetricsFolder + + lokiLogsFolder := filepath.Join(dir, "loki") + if err := os.MkdirAll(lokiLogsFolder, 0755); err != nil { + return fmt.Errorf("failed to create loki logs directory: %w", err) + } + c.lokiLogsFolder = lokiLogsFolder + // Cleanup the old directory oldDir := filepath.Dir(c.args.OutputLocation) if err := os.RemoveAll(oldDir); err != nil { level.Warn(c.logger).Log("msg", "failed to remove old output directory", "dir", oldDir, "err", err) } - c.args.OutputLocation = newArgs.OutputLocation + c.args = newArgs level.Info(c.logger).Log("msg", "telemetry.save component updated", "output_location", c.args.OutputLocation) return nil } - -// Appender returns an Appender for writing metrics. -func (c *Component) Appender(ctx context.Context) storage.Appender { - return &appender{ - component: c, - ctx: ctx, - } -} - -// Define the RecordType enum -type RecordType string - -const ( - RecordTypeSample RecordType = "sample" - RecordTypeExemplar RecordType = "exemplar" - RecordTypeHistogram RecordType = "histogram" -) - -// Sample is the base type for telemetry samples. -type Sample struct { - RecordType RecordType `json:"record_type"` - Labels map[string]string `json:"labels"` - Timestamp int64 `json:"timestamp"` -} - -// Update ValueSample -type ValueSample struct { - Sample - Value float64 `json:"value,omitempty"` -} - -// Update ExemplarSample -type ExemplarSample struct { - Sample - Exemplar *exemplar.Exemplar `json:"exemplar,omitempty"` -} - -// Update HistogramSample -type HistogramSample struct { - Sample - Histogram *histogram.Histogram `json:"histogram,omitempty"` - FloatHistogram *histogram.FloatHistogram `json:"float_histogram,omitempty"` -} - -// appender implements storage.Appender for writing metrics to file. -type appender struct { - component *Component - ctx context.Context - samples []any // Use a generic type to accommodate multiple concrete types -} - -// Append adds a sample to be written in JSON format. -func (a *appender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - sample := ValueSample{ - Sample: Sample{ - RecordType: RecordTypeSample, - Labels: l.Map(), - Timestamp: t, - }, - Value: v, - } - a.samples = append(a.samples, sample) - return 0, nil -} - -// AppendExemplar adds an exemplar for a series in JSON format. -func (a *appender) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - sample := ExemplarSample{ - Sample: Sample{ - RecordType: RecordTypeExemplar, - Labels: l.Map(), - }, - Exemplar: &e, - } - a.samples = append(a.samples, sample) - return 0, nil -} - -// AppendHistogram adds a histogram sample in JSON format. -func (a *appender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { - sample := HistogramSample{ - Sample: Sample{ - RecordType: RecordTypeHistogram, - Labels: l.Map(), - Timestamp: t, - }, - Histogram: h, - FloatHistogram: fh, - } - a.samples = append(a.samples, sample) - return 0, nil -} - -// AppendCTZeroSample adds a CT zero sample (no-op for file appender). -// Mark unused parameters to suppress warnings -func (a *appender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { - return 0, nil -} - -// AppendHistogramCTZeroSample adds a histogram CT zero sample (no-op for file appender). -// Mark unused parameters to suppress warnings -func (a *appender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - return 0, nil -} - -// Commit writes all accumulated samples to the output file. -func (a *appender) Commit() error { - a.component.mut.RLock() - defer a.component.mut.RUnlock() - - if len(a.samples) == 0 { - return nil - } - - filePath := filepath.Join(a.component.promMetricsFolder, "metrics.json") - file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return fmt.Errorf("failed to open metrics file: %w", err) - } - defer func() { - if closeErr := file.Close(); closeErr != nil { - _ = level.Error(a.component.logger).Log("msg", "failed to close file", "err", closeErr) - } - }() - - for _, sample := range a.samples { - jsonData, err := json.Marshal(sample) - if err != nil { - return fmt.Errorf("failed to marshal sample to JSON: %w", err) - } - if _, err := file.WriteString(string(jsonData) + "\n"); err != nil { - return fmt.Errorf("failed to write sample to file: %w", err) - } - } - - a.samples = nil // Clear the buffer - return nil -} - -// Rollback discards all accumulated samples. -func (a *appender) Rollback() error { - a.samples = a.samples[:0] - return nil -} - -// SetOptions sets the options for the appender. -func (a *appender) SetOptions(_ *storage.AppendOptions) { - // Not implemented for this component -} - -// UpdateMetadata updates the metadata for a series. -func (a *appender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { - // Not implemented for this component - return 0, nil -} diff --git a/internal/component/telemetry/save/logs.go b/internal/component/telemetry/save/logs.go new file mode 100644 index 0000000000..9134073dc1 --- /dev/null +++ b/internal/component/telemetry/save/logs.go @@ -0,0 +1,185 @@ +package save + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/alloy/internal/component/common/loki" +) + +// LogEntry represents a log entry with its metadata for JSON serialization. +type LogEntry struct { + Timestamp time.Time `json:"timestamp"` + Line string `json:"line"` + Labels map[string]string `json:"labels"` + StructuredMetadata []LabelPair `json:"structured_metadata,omitempty"` + Parsed []LabelPair `json:"parsed,omitempty"` +} + +// LabelPair represents a key-value pair for labels. +type LabelPair struct { + Name string `json:"name"` + Value string `json:"value"` +} + +// LogsHandler manages the batching and writing of log entries. +type LogsHandler struct { + component *Component + logsBatch []loki.Entry + batchMut sync.Mutex + flushTimer *time.Timer + ctx context.Context + cancel context.CancelFunc +} + +// NewLogsHandler creates a new logs handler for the component. +func NewLogsHandler(component *Component) *LogsHandler { + ctx, cancel := context.WithCancel(context.Background()) + + h := &LogsHandler{ + component: component, + logsBatch: make([]loki.Entry, 0, 100), // Pre-allocate for 100 entries + flushTimer: time.NewTimer(5 * time.Second), + ctx: ctx, + cancel: cancel, + } + + h.flushTimer.Stop() // Don't start the timer yet + return h +} + +// Start begins processing log entries in a background goroutine. +func (h *LogsHandler) Start(logsReceiver loki.LogsReceiver) { + go h.handleLogEntries(logsReceiver) +} + +// Stop shuts down the logs handler gracefully. +func (h *LogsHandler) Stop() { + h.cancel() + h.flushLogBatch() + if h.flushTimer != nil { + h.flushTimer.Stop() + } +} + +// handleLogEntries processes incoming log entries and batches them for efficient writing. +func (h *LogsHandler) handleLogEntries(logsReceiver loki.LogsReceiver) { + const maxBatchSize = 50 // Max entries per batch + + for { + select { + case entry := <-logsReceiver.Chan(): + h.addLogToBatch(entry) + + h.batchMut.Lock() + batchSize := len(h.logsBatch) + h.batchMut.Unlock() + + // Flush if batch is full + if batchSize >= maxBatchSize { + h.flushLogBatch() + } + + case <-h.flushTimer.C: + // Periodic flush + h.flushLogBatch() + + case <-h.ctx.Done(): + // Handler is shutting down + h.flushLogBatch() + return + } + } +} + +// addLogToBatch adds a log entry to the current batch. +func (h *LogsHandler) addLogToBatch(entry loki.Entry) { + h.batchMut.Lock() + defer h.batchMut.Unlock() + + h.logsBatch = append(h.logsBatch, entry) + + // Start flush timer if this is the first entry in the batch + if len(h.logsBatch) == 1 { + h.flushTimer.Reset(5 * time.Second) + } +} + +// flushLogBatch writes all batched log entries to disk and clears the batch. +func (h *LogsHandler) flushLogBatch() { + h.batchMut.Lock() + defer h.batchMut.Unlock() + + if len(h.logsBatch) == 0 { + return + } + + h.component.mut.RLock() + lokiLogsFolder := h.component.lokiLogsFolder + h.component.mut.RUnlock() + + // Write all entries in the batch + filePath := filepath.Join(lokiLogsFolder, "logs.json") + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + _ = level.Error(h.component.logger).Log("msg", "failed to open logs file", "err", err) + return + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + _ = level.Error(h.component.logger).Log("msg", "failed to close logs file", "err", closeErr) + } + }() + + for _, entry := range h.logsBatch { + // Convert loki.Entry to our JSON-serializable format + logEntry := LogEntry{ + Timestamp: entry.Timestamp, + Line: entry.Line, + Labels: make(map[string]string), + } + + // Convert model.LabelSet to map[string]string + for k, v := range entry.Labels { + logEntry.Labels[string(k)] = string(v) + } + + // Convert structured metadata + for _, label := range entry.StructuredMetadata { + logEntry.StructuredMetadata = append(logEntry.StructuredMetadata, LabelPair{ + Name: label.Name, + Value: label.Value, + }) + } + + // Convert parsed labels + for _, label := range entry.Parsed { + logEntry.Parsed = append(logEntry.Parsed, LabelPair{ + Name: label.Name, + Value: label.Value, + }) + } + + jsonData, err := json.Marshal(logEntry) + if err != nil { + _ = level.Error(h.component.logger).Log("msg", "failed to marshal log entry to JSON", "err", err) + continue + } + + if _, err := file.WriteString(string(jsonData) + "\n"); err != nil { + _ = level.Error(h.component.logger).Log("msg", "failed to write log entry to file", "err", err) + break + } + } + + // Clear the batch + h.logsBatch = h.logsBatch[:0] + + // Stop the flush timer + h.flushTimer.Stop() +} \ No newline at end of file diff --git a/internal/component/telemetry/save/logs_test.go b/internal/component/telemetry/save/logs_test.go new file mode 100644 index 0000000000..917f6641d9 --- /dev/null +++ b/internal/component/telemetry/save/logs_test.go @@ -0,0 +1,205 @@ +package save + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/loki/pkg/push" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/loki" +) + +func TestLogsReceiver(t *testing.T) { + // Create a temporary directory for testing + tempDir := t.TempDir() + + // Create component with temporary output location + args := Arguments{ + OutputLocation: filepath.Join(tempDir, "telemetry/save/"), + } + + opts := component.Options{ + Logger: log.NewNopLogger(), + OnStateChange: func(exports component.Exports) { + // No-op for testing + }, + } + + // Create the component + c, err := NewComponent(opts, args) + require.NoError(t, err) + + // Start the component + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = c.Run(ctx) + }() + + // Give component a moment to start + time.Sleep(100 * time.Millisecond) + + // Create a test log entry + testEntry := loki.Entry{ + Labels: model.LabelSet{ + "service": "test-service", + "level": "info", + }, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "This is a test log message", + StructuredMetadata: push.LabelsAdapter{ + {Name: "trace_id", Value: "abc123"}, + {Name: "span_id", Value: "def456"}, + }, + }, + } + + // Send the log entry + c.logsReceiver.Chan() <- testEntry + + // Give some time for the entry to be added to batch + time.Sleep(100 * time.Millisecond) + + // Force flush the batch immediately for testing + c.logsHandler.flushLogBatch() + + // Verify the log file was created and contains our entry + logFilePath := filepath.Join(c.lokiLogsFolder, "logs.json") + require.FileExists(t, logFilePath) + + // Read and verify the content + content, err := os.ReadFile(logFilePath) + require.NoError(t, err) + + var logEntry LogEntry + err = json.Unmarshal(content[:len(content)-1], &logEntry) // Remove trailing newline + require.NoError(t, err) + + // Verify the log entry fields + require.Equal(t, "This is a test log message", logEntry.Line) + require.Equal(t, "test-service", logEntry.Labels["service"]) + require.Equal(t, "info", logEntry.Labels["level"]) + require.Len(t, logEntry.StructuredMetadata, 2) + require.Equal(t, "trace_id", logEntry.StructuredMetadata[0].Name) + require.Equal(t, "abc123", logEntry.StructuredMetadata[0].Value) + require.Equal(t, "span_id", logEntry.StructuredMetadata[1].Name) + require.Equal(t, "def456", logEntry.StructuredMetadata[1].Value) +} + +func TestComponentExports(t *testing.T) { + // Create a temporary directory for testing + tempDir := t.TempDir() + + // Create component with temporary output location + args := Arguments{ + OutputLocation: filepath.Join(tempDir, "telemetry/save/"), + } + + var exports Exports + opts := component.Options{ + Logger: log.NewNopLogger(), + OnStateChange: func(e component.Exports) { + exports = e.(Exports) + }, + } + + // Create the component + c, err := NewComponent(opts, args) + require.NoError(t, err) + + // Verify exports are set correctly + require.NotNil(t, exports.MetricsReceiver) + require.NotNil(t, exports.LogsReceiver) + + // Verify the metrics receiver is the component itself + require.Equal(t, c, exports.MetricsReceiver) + + // Verify the logs receiver has a channel + require.NotNil(t, exports.LogsReceiver.Chan()) +} + +func TestLogsBatching(t *testing.T) { + // Create a temporary directory for testing + tempDir := t.TempDir() + + // Create component with temporary output location + args := Arguments{ + OutputLocation: filepath.Join(tempDir, "telemetry/save/"), + } + + opts := component.Options{ + Logger: log.NewNopLogger(), + OnStateChange: func(exports component.Exports) { + // No-op for testing + }, + } + + // Create the component + c, err := NewComponent(opts, args) + require.NoError(t, err) + + // Start the component + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = c.Run(ctx) + }() + + // Give component a moment to start + time.Sleep(100 * time.Millisecond) + + // Send multiple log entries + for i := 0; i < 5; i++ { + testEntry := loki.Entry{ + Labels: model.LabelSet{ + "service": model.LabelValue(fmt.Sprintf("test-service-%d", i)), + "level": "info", + }, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("Test log message %d", i), + }, + } + c.logsReceiver.Chan() <- testEntry + } + + // Give some time for entries to be batched + time.Sleep(200 * time.Millisecond) + + // Force flush the batch + c.logsHandler.flushLogBatch() + + // Verify the log file was created and contains all entries + logFilePath := filepath.Join(c.lokiLogsFolder, "logs.json") + require.FileExists(t, logFilePath) + + // Read and verify the content + content, err := os.ReadFile(logFilePath) + require.NoError(t, err) + + // Split by lines and verify we have 5 entries + lines := strings.Split(strings.TrimSpace(string(content)), "\n") + require.Len(t, lines, 5) + + // Verify each line is valid JSON + for i, line := range lines { + var logEntry LogEntry + err = json.Unmarshal([]byte(line), &logEntry) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("Test log message %d", i), logEntry.Line) + require.Equal(t, fmt.Sprintf("test-service-%d", i), logEntry.Labels["service"]) + } +} diff --git a/internal/component/telemetry/save/metrics.go b/internal/component/telemetry/save/metrics.go new file mode 100644 index 0000000000..9928694f80 --- /dev/null +++ b/internal/component/telemetry/save/metrics.go @@ -0,0 +1,168 @@ +package save + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +// Define the RecordType enum for metrics +type RecordType string + +const ( + RecordTypeSample RecordType = "sample" + RecordTypeExemplar RecordType = "exemplar" + RecordTypeHistogram RecordType = "histogram" +) + +// Sample is the base type for telemetry samples. +type Sample struct { + RecordType RecordType `json:"record_type"` + Labels map[string]string `json:"labels"` + Timestamp int64 `json:"timestamp"` +} + +// ValueSample represents a metric sample with a value. +type ValueSample struct { + Sample + Value float64 `json:"value,omitempty"` +} + +// ExemplarSample represents an exemplar sample. +type ExemplarSample struct { + Sample + Exemplar *exemplar.Exemplar `json:"exemplar,omitempty"` +} + +// HistogramSample represents a histogram sample. +type HistogramSample struct { + Sample + Histogram *histogram.Histogram `json:"histogram,omitempty"` + FloatHistogram *histogram.FloatHistogram `json:"float_histogram,omitempty"` +} + +// metricsAppender implements storage.Appender for writing metrics to file. +type metricsAppender struct { + component *Component + ctx context.Context + samples []any // Use a generic type to accommodate multiple concrete types +} + +// Appender returns an Appender for writing metrics. +func (c *Component) Appender(ctx context.Context) storage.Appender { + return &metricsAppender{ + component: c, + ctx: ctx, + } +} + +// Append adds a sample to be written in JSON format. +func (a *metricsAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + sample := ValueSample{ + Sample: Sample{ + RecordType: RecordTypeSample, + Labels: l.Map(), + Timestamp: t, + }, + Value: v, + } + a.samples = append(a.samples, sample) + return 0, nil +} + +// AppendExemplar adds an exemplar for a series in JSON format. +func (a *metricsAppender) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + sample := ExemplarSample{ + Sample: Sample{ + RecordType: RecordTypeExemplar, + Labels: l.Map(), + }, + Exemplar: &e, + } + a.samples = append(a.samples, sample) + return 0, nil +} + +// AppendHistogram adds a histogram sample in JSON format. +func (a *metricsAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + sample := HistogramSample{ + Sample: Sample{ + RecordType: RecordTypeHistogram, + Labels: l.Map(), + Timestamp: t, + }, + Histogram: h, + FloatHistogram: fh, + } + a.samples = append(a.samples, sample) + return 0, nil +} + +// AppendCTZeroSample adds a CT zero sample (no-op for file appender). +func (a *metricsAppender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64) (storage.SeriesRef, error) { + return 0, nil +} + +// AppendHistogramCTZeroSample adds a histogram CT zero sample (no-op for file appender). +func (a *metricsAppender) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _ int64, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { + return 0, nil +} + +// Commit writes all accumulated samples to the output file. +func (a *metricsAppender) Commit() error { + a.component.mut.RLock() + defer a.component.mut.RUnlock() + + if len(a.samples) == 0 { + return nil + } + + filePath := filepath.Join(a.component.promMetricsFolder, "metrics.json") + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open metrics file: %w", err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + _ = level.Error(a.component.logger).Log("msg", "failed to close file", "err", closeErr) + } + }() + + jsonData, err := json.Marshal(a.samples) + if err != nil { + return fmt.Errorf("failed to marshal sample to JSON: %w", err) + } + if _, err := file.WriteString(string(jsonData) + "\n"); err != nil { + return fmt.Errorf("failed to write samples to file: %w", err) + } + + // Clear samples after successful write + a.samples = a.samples[:0] + return nil +} + +// Rollback discards all accumulated samples. +func (a *metricsAppender) Rollback() error { + a.samples = a.samples[:0] + return nil +} + +// SetOptions sets the options for the appender. +func (a *metricsAppender) SetOptions(_ *storage.AppendOptions) { + // Not implemented for this component +} + +// UpdateMetadata updates the metadata for a series. +func (a *metricsAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { + // Not implemented for this component + return 0, nil +} From fea90efe87710bb5c053bde234a15408b00fe25b Mon Sep 17 00:00:00 2001 From: Kyle Eckhart Date: Fri, 5 Dec 2025 12:35:19 -0500 Subject: [PATCH 3/3] Add OTLP save support (#5016) --- .../component/telemetry/save/component.go | 82 +++++- .../telemetry/save/component_test.go | 254 ++++++++++++++++++ internal/component/telemetry/save/otlp.go | 126 +++++++++ 3 files changed, 454 insertions(+), 8 deletions(-) create mode 100644 internal/component/telemetry/save/component_test.go create mode 100644 internal/component/telemetry/save/otlp.go diff --git a/internal/component/telemetry/save/component.go b/internal/component/telemetry/save/component.go index 3316744904..e6f1089966 100644 --- a/internal/component/telemetry/save/component.go +++ b/internal/component/telemetry/save/component.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/otelcol" "github.com/grafana/alloy/internal/featuregate" ) @@ -40,19 +41,27 @@ func (args *Arguments) SetToDefault() { // Exports are the set of fields exposed by the telemetry.save component. type Exports struct { - MetricsReceiver storage.Appendable `alloy:"metrics_receiver,attr"` - LogsReceiver loki.LogsReceiver `alloy:"logs_receiver,attr"` + MetricsReceiver storage.Appendable `alloy:"metrics_receiver,attr"` + LogsReceiver loki.LogsReceiver `alloy:"logs_receiver,attr"` + OTLP otelcol.ConsumerExports `alloy:"otlp,attr"` } // Component is the telemetry.save component. type Component struct { - mut sync.RWMutex - args Arguments - logger log.Logger + mut sync.RWMutex + args Arguments + logger log.Logger + promMetricsFolder string - lokiLogsFolder string - logsReceiver loki.LogsReceiver - logsHandler *LogsHandler + + lokiLogsFolder string + logsReceiver loki.LogsReceiver + logsHandler *LogsHandler + + otlpLogsFolder string + otlpMetricsFolder string + otlpTracesFolder string + otlpConsumer otelcol.Consumer } var _ component.Component = (*Component)(nil) @@ -92,6 +101,33 @@ func NewComponent(opts component.Options, args Arguments) (*Component, error) { } c.lokiLogsFolder = lokiLogsFolder + // Create OTLP folder + otlpFolder := filepath.Join(dir, "otlp") + if err := os.MkdirAll(otlpFolder, 0755); err != nil { + return nil, fmt.Errorf("failed to create otlp directory: %w", err) + } + + // Create OTLP logs folder + otlpLogsFolder := filepath.Join(otlpFolder, "logs") + if err := os.MkdirAll(otlpLogsFolder, 0755); err != nil { + return nil, fmt.Errorf("failed to create otlp logs directory: %w", err) + } + c.otlpLogsFolder = otlpLogsFolder + + // Create OTLP metrics folder + otlpMetricsFolder := filepath.Join(otlpFolder, "metrics") + if err := os.MkdirAll(otlpMetricsFolder, 0755); err != nil { + return nil, fmt.Errorf("failed to create otlp metrics directory: %w", err) + } + c.otlpMetricsFolder = otlpMetricsFolder + + // Create OTLP traces folder + otlpTracesFolder := filepath.Join(otlpFolder, "traces") + if err := os.MkdirAll(otlpTracesFolder, 0755); err != nil { + return nil, fmt.Errorf("failed to create otlp traces directory: %w", err) + } + c.otlpTracesFolder = otlpTracesFolder + // Create logs receiver c.logsReceiver = loki.NewLogsReceiver(loki.WithComponentID("telemetry.save")) @@ -101,10 +137,16 @@ func NewComponent(opts component.Options, args Arguments) (*Component, error) { // Start the log entry handler goroutine c.logsHandler.Start(c.logsReceiver) + // Initialize OTLP consumer + c.otlpConsumer = newOTLPConsumer(c) + // Export the receiver interfaces opts.OnStateChange(Exports{ MetricsReceiver: c, LogsReceiver: c.logsReceiver, + OTLP: otelcol.ConsumerExports{ + Input: c.otlpConsumer, + }, }) return c, nil @@ -160,6 +202,30 @@ func (c *Component) Update(args component.Arguments) error { } c.lokiLogsFolder = lokiLogsFolder + // Update OTLP folders + otlpFolder := filepath.Join(dir, "otlp") + if err := os.MkdirAll(otlpFolder, 0755); err != nil { + return fmt.Errorf("failed to create otlp directory: %w", err) + } + + otlpLogsFolder := filepath.Join(otlpFolder, "logs") + if err := os.MkdirAll(otlpLogsFolder, 0755); err != nil { + return fmt.Errorf("failed to create otlp logs directory: %w", err) + } + c.otlpLogsFolder = otlpLogsFolder + + otlpMetricsFolder := filepath.Join(otlpFolder, "metrics") + if err := os.MkdirAll(otlpMetricsFolder, 0755); err != nil { + return fmt.Errorf("failed to create otlp metrics directory: %w", err) + } + c.otlpMetricsFolder = otlpMetricsFolder + + otlpTracesFolder := filepath.Join(otlpFolder, "traces") + if err := os.MkdirAll(otlpTracesFolder, 0755); err != nil { + return fmt.Errorf("failed to create otlp traces directory: %w", err) + } + c.otlpTracesFolder = otlpTracesFolder + // Cleanup the old directory oldDir := filepath.Dir(c.args.OutputLocation) if err := os.RemoveAll(oldDir); err != nil { diff --git a/internal/component/telemetry/save/component_test.go b/internal/component/telemetry/save/component_test.go new file mode 100644 index 0000000000..ab044b0a44 --- /dev/null +++ b/internal/component/telemetry/save/component_test.go @@ -0,0 +1,254 @@ +package save_test + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "testing" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/telemetry/save" +) + +// decodeSample decodes a single sample based on its record type +func decodeSample(rawMessage json.RawMessage) (any, error) { + var base save.Sample + if err := json.Unmarshal(rawMessage, &base); err != nil { + return nil, err + } + + switch base.RecordType { + case save.RecordTypeSample: + var sample save.ValueSample + if err := json.Unmarshal(rawMessage, &sample); err != nil { + return nil, err + } + return sample, nil + case save.RecordTypeExemplar: + var exemplarSample save.ExemplarSample + if err := json.Unmarshal(rawMessage, &exemplarSample); err != nil { + return nil, err + } + return exemplarSample, nil + case save.RecordTypeHistogram: + var histogramSample save.HistogramSample + if err := json.Unmarshal(rawMessage, &histogramSample); err != nil { + return nil, err + } + return histogramSample, nil + default: + return nil, fmt.Errorf("unknown record type: %s", base.RecordType) + } +} + +// decodeSamples decodes different sample types from JSON messages +func decodeSamples(data []byte, samples []any) ([]any, error) { + var rawMessages []json.RawMessage + if err := json.Unmarshal(data, &rawMessages); err != nil { + return nil, err + } + + samples = samples[:0] + for _, rawMessage := range rawMessages { + sample, err := decodeSample(rawMessage) + if err != nil { + return nil, err + } + samples = append(samples, sample) + } + + return samples, nil +} + +func samplesToWriteRequest(samples []any) (*prompb.WriteRequest, error) { + var timeSeries []prompb.TimeSeries + + for _, sample := range samples { + switch s := sample.(type) { + case save.ValueSample: + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: convertLabels(s.Labels), + Samples: []prompb.Sample{ + { + Timestamp: s.Timestamp, + Value: s.Value, + }, + }, + }) + case save.ExemplarSample: + if s.Exemplar != nil { + // Extract exemplar labels from the exemplar struct + exemplarLabels := make(map[string]string) + if !s.Exemplar.Labels.IsEmpty() { + exemplarLabels = s.Exemplar.Labels.Map() + } + + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: convertLabels(s.Labels), + Exemplars: []prompb.Exemplar{ + { + Timestamp: s.Exemplar.Ts, + Value: s.Exemplar.Value, + Labels: convertLabels(exemplarLabels), + }, + }, + }) + } + case save.HistogramSample: + var histogramPb prompb.Histogram + histogramPb.Timestamp = s.Timestamp + + // Handle native histogram + if s.Histogram != nil { + histogramPb.Count = &prompb.Histogram_CountInt{CountInt: s.Histogram.Count} + histogramPb.Sum = s.Histogram.Sum + histogramPb.Schema = s.Histogram.Schema + histogramPb.ZeroThreshold = s.Histogram.ZeroThreshold + histogramPb.ZeroCount = &prompb.Histogram_ZeroCountInt{ZeroCountInt: s.Histogram.ZeroCount} + histogramPb.NegativeSpans = convertSpans(s.Histogram.NegativeSpans) + histogramPb.PositiveSpans = convertSpans(s.Histogram.PositiveSpans) + histogramPb.NegativeDeltas = s.Histogram.NegativeBuckets + histogramPb.PositiveDeltas = s.Histogram.PositiveBuckets + } + + // Handle float histogram + if s.FloatHistogram != nil { + histogramPb.Count = &prompb.Histogram_CountFloat{CountFloat: s.FloatHistogram.Count} + histogramPb.Sum = s.FloatHistogram.Sum + histogramPb.Schema = s.FloatHistogram.Schema + histogramPb.ZeroThreshold = s.FloatHistogram.ZeroThreshold + histogramPb.ZeroCount = &prompb.Histogram_ZeroCountFloat{ZeroCountFloat: s.FloatHistogram.ZeroCount} + histogramPb.NegativeSpans = convertSpans(s.FloatHistogram.NegativeSpans) + histogramPb.PositiveSpans = convertSpans(s.FloatHistogram.PositiveSpans) + // Convert float buckets to int64 deltas (this is a simplification) + histogramPb.NegativeDeltas = make([]int64, len(s.FloatHistogram.NegativeBuckets)) + for i, v := range s.FloatHistogram.NegativeBuckets { + histogramPb.NegativeDeltas[i] = int64(v) + } + histogramPb.PositiveDeltas = make([]int64, len(s.FloatHistogram.PositiveBuckets)) + for i, v := range s.FloatHistogram.PositiveBuckets { + histogramPb.PositiveDeltas[i] = int64(v) + } + } + + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: convertLabels(s.Labels), + Histograms: []prompb.Histogram{histogramPb}, + }) + default: + return nil, fmt.Errorf("unsupported sample type: %T", s) + } + } + + return &prompb.WriteRequest{ + Timeseries: timeSeries, + }, nil +} + +func convertSpans(spans []histogram.Span) []prompb.BucketSpan { + result := make([]prompb.BucketSpan, len(spans)) + for i, span := range spans { + result[i] = prompb.BucketSpan{ + Offset: span.Offset, + Length: span.Length, + } + } + return result +} + +func convertLabels(labels map[string]string) []prompb.Label { + var result []prompb.Label + for k, v := range labels { + result = append(result, prompb.Label{ + Name: k, + Value: v, + }) + } + return result +} + +func TestSendMetrics(t *testing.T) { + metricsFilePath := filepath.Join("..", "..", "..", "..", "telemetry", "save", "prometheus", "metrics.json") + + file, err := os.Open(metricsFilePath) + if err != nil { + t.Skipf("Skipping test, metrics file not found: %v", err) + return + } + defer func() { _ = file.Close() }() + + var samples []any + decoder := json.NewDecoder(file) + + for { + var rawMessage json.RawMessage + if err := decoder.Decode(&rawMessage); err == io.EOF { + break // End of file + } + require.NoError(t, err) + + samples, err = decodeSamples(rawMessage, samples) + require.NoError(t, err) + + if len(samples) > 0 { + writeRequest, err := samplesToWriteRequest(samples) + require.NoError(t, err) + + t.Logf("Converted %d time series to remote write request", len(writeRequest.Timeseries)) + + err = sendWriteRequest(writeRequest, "http://localhost:9009/api/v1/push") + if err != nil { + t.Logf("Failed to send WriteRequest (this is expected if the endpoint is not running): %v", err) + } else { + t.Logf("Successfully sent WriteRequest to http://localhost:9009/api/v1/push") + } + } + } +} + +// sendWriteRequest encodes a WriteRequest with snappy compression and sends it to the given endpoint +func sendWriteRequest(writeRequest *prompb.WriteRequest, endpoint string) error { + // Encode the WriteRequest to Protobuf binary format + data, err := writeRequest.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal WriteRequest: %w", err) + } + + // Compress the encoded request using Snappy + compressed := snappy.Encode(nil, data) + + // Create HTTP request + req, err := http.NewRequest("POST", endpoint, bytes.NewReader(compressed)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + // Set required headers for Prometheus remote write + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + // Send the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send HTTP request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + // Check response status + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + bodyBytes, _ := io.ReadAll(resp.Body) + return fmt.Errorf("HTTP request failed with status %d: %s", resp.StatusCode, string(bodyBytes)) + } + + return nil +} diff --git a/internal/component/telemetry/save/otlp.go b/internal/component/telemetry/save/otlp.go new file mode 100644 index 0000000000..7d3008fed1 --- /dev/null +++ b/internal/component/telemetry/save/otlp.go @@ -0,0 +1,126 @@ +package save + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/go-kit/log/level" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/grafana/alloy/internal/component/otelcol" +) + +// otlpConsumer implements otelcol.Consumer (all three: Logs, Metrics, Traces) for saving OTLP data to files. +type otlpConsumer struct { + component *Component + logsMarshaler *plog.JSONMarshaler + metricsMarshaler *pmetric.JSONMarshaler + tracesMarshaler *ptrace.JSONMarshaler +} + +// newOTLPConsumer creates a new combined OTLP consumer. +func newOTLPConsumer(component *Component) otelcol.Consumer { + return &otlpConsumer{ + component: component, + logsMarshaler: &plog.JSONMarshaler{}, + metricsMarshaler: &pmetric.JSONMarshaler{}, + tracesMarshaler: &ptrace.JSONMarshaler{}, + } +} + +// Capabilities returns the consumer capabilities. +func (c *otlpConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// ConsumeLogs saves the OTLP logs to a file. +func (c *otlpConsumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error { + c.component.mut.RLock() + otlpLogsFolder := c.component.otlpLogsFolder + c.component.mut.RUnlock() + + jsonData, err := c.logsMarshaler.MarshalLogs(logs) + if err != nil { + return fmt.Errorf("failed to marshal OTLP logs to JSON: %w", err) + } + + filePath := filepath.Join(otlpLogsFolder, "logs.json") + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open OTLP logs file: %w", err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + _ = level.Error(c.component.logger).Log("msg", "failed to close OTLP logs file", "err", closeErr) + } + }() + + if _, err := file.WriteString(string(jsonData) + "\n"); err != nil { + return fmt.Errorf("failed to write OTLP logs to file: %w", err) + } + + return nil +} + +// ConsumeMetrics saves the OTLP metrics to a file. +func (c *otlpConsumer) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { + c.component.mut.RLock() + otlpMetricsFolder := c.component.otlpMetricsFolder + c.component.mut.RUnlock() + + jsonData, err := c.metricsMarshaler.MarshalMetrics(metrics) + if err != nil { + return fmt.Errorf("failed to marshal OTLP metrics to JSON: %w", err) + } + + filePath := filepath.Join(otlpMetricsFolder, "metrics.json") + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open OTLP metrics file: %w", err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + _ = level.Error(c.component.logger).Log("msg", "failed to close OTLP metrics file", "err", closeErr) + } + }() + + if _, err := file.WriteString(string(jsonData) + "\n"); err != nil { + return fmt.Errorf("failed to write OTLP metrics to file: %w", err) + } + + return nil +} + +// ConsumeTraces saves the OTLP traces to a file. +func (c *otlpConsumer) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { + c.component.mut.RLock() + otlpTracesFolder := c.component.otlpTracesFolder + c.component.mut.RUnlock() + + jsonData, err := c.tracesMarshaler.MarshalTraces(traces) + if err != nil { + return fmt.Errorf("failed to marshal OTLP traces to JSON: %w", err) + } + + filePath := filepath.Join(otlpTracesFolder, "traces.json") + file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open OTLP traces file: %w", err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + _ = level.Error(c.component.logger).Log("msg", "failed to close OTLP traces file", "err", closeErr) + } + }() + + if _, err := file.WriteString(string(jsonData) + "\n"); err != nil { + return fmt.Errorf("failed to write OTLP traces to file: %w", err) + } + + return nil +}