From 2b6e9895a0e26c0461851ab4331c133c20d95c9d Mon Sep 17 00:00:00 2001 From: Christian Nygaard Date: Wed, 3 Dec 2025 00:01:19 +0100 Subject: [PATCH 1/2] fix(loki): prevent blocked fanout destination from blocking others Implement per-destination buffered queues for loki.process, loki.relabel, and loki.secretfilter components to prevent a slow or blocked downstream receiver from blocking log delivery to other destinations. Key changes: - Each downstream destination gets its own buffered queue with a dedicated worker goroutine, ensuring FIFO ordering while preventing head-of-line blocking - Configurable queue size via max_forward_queue_size (default: 100,000 entries per destination) - Optional block_on_full mode (default: false): - false: drop entries when queue is full (non-blocking) - true: retry with exponential backoff (5ms to 5s max) - New metrics for observability: - *_dropped_entries_total: entries dropped due to full queue - *_enqueue_retries_total: retry attempts when block_on_full=true Fixes: https://github.com/grafana/alloy/issues/2194 --- internal/component/loki/process/metrics.go | 38 ++++ internal/component/loki/process/process.go | 181 ++++++++++++++- .../component/loki/process/process_test.go | 176 ++++++++++++++- internal/component/loki/relabel/metrics.go | 14 ++ internal/component/loki/relabel/relabel.go | 185 +++++++++++++-- .../loki/secretfilter/secretfilter.go | 211 ++++++++++++++++-- 6 files changed, 764 insertions(+), 41 deletions(-) create mode 100644 internal/component/loki/process/metrics.go diff --git a/internal/component/loki/process/metrics.go b/internal/component/loki/process/metrics.go new file mode 100644 index 0000000000..de573b0ac2 --- /dev/null +++ b/internal/component/loki/process/metrics.go @@ -0,0 +1,38 @@ +package process + +import ( + "github.com/grafana/alloy/internal/util" + prometheus_client "github.com/prometheus/client_golang/prometheus" +) + +// forwardMetrics contains metrics related to log entry forwarding. +type forwardMetrics struct { + // Number of log entries dropped because the destination queue was full. + droppedEntriesTotal prometheus_client.Counter + + // Number of times we retried enqueueing when block_on_full is enabled. + enqueueRetriesTotal prometheus_client.Counter +} + +// newForwardMetrics creates a new set of forward metrics. If reg is non-nil, +// the metrics will also be registered. +func newForwardMetrics(reg prometheus_client.Registerer) *forwardMetrics { + var m forwardMetrics + + m.droppedEntriesTotal = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "loki_process_dropped_entries_total", + Help: "Total number of log entries dropped because the destination queue was full", + }) + + m.enqueueRetriesTotal = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "loki_process_enqueue_retries_total", + Help: "Total number of times enqueueing was retried when block_on_full is enabled", + }) + + if reg != nil { + m.droppedEntriesTotal = util.MustRegisterOrGet(reg, m.droppedEntriesTotal).(prometheus_client.Counter) + m.enqueueRetriesTotal = util.MustRegisterOrGet(reg, m.enqueueRetriesTotal).(prometheus_client.Counter) + } + + return &m +} diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index 15e12c3741..c1f05784d5 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -40,6 +40,29 @@ func init() { type Arguments struct { ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"` Stages []stages.StageConfig `alloy:"stage,enum,optional"` + + // MaxForwardQueueSize controls the maximum number of log entries buffered + // per downstream component. This prevents a slow destination from blocking + // other destinations. Default is 100000. + MaxForwardQueueSize int `alloy:"max_forward_queue_size,attr,optional"` + + // BlockOnFull controls behavior when a destination queue is full. + // If false (default), log entries are dropped when the queue is full. + // If true, the component will retry with exponential backoff, which may + // slow down the entire pipeline but prevents data loss. + BlockOnFull bool `alloy:"block_on_full,attr,optional"` +} + +// DefaultArguments provides the default arguments for the loki.process +// component. +var DefaultArguments = Arguments{ + MaxForwardQueueSize: 100_000, + BlockOnFull: false, +} + +// SetToDefault implements syntax.Defaulter. +func (a *Arguments) SetToDefault() { + *a = DefaultArguments } // Exports exposes the receiver that can be used to send log entries to @@ -64,12 +87,115 @@ type Component struct { entryHandler loki.EntryHandler stages []stages.StageConfig - fanoutMut sync.RWMutex - fanout []loki.LogsReceiver + fanoutMut sync.RWMutex + fanout []loki.LogsReceiver + queues []*destinationQueue + maxForwardQueueSize int + blockOnFull bool + metrics *forwardMetrics debugDataPublisher livedebugging.DebugDataPublisher } +// Backoff constants for blocking mode, similar to Prometheus remote write. +const ( + minBackoff = 5 * time.Millisecond + maxBackoff = 5 * time.Second +) + +// destinationQueue manages a buffered queue for a single destination to ensure +// FIFO ordering while preventing a slow destination from blocking others. +type destinationQueue struct { + receiver loki.LogsReceiver + buffer chan loki.Entry + stopCh chan struct{} + wg sync.WaitGroup +} + +func newDestinationQueue(receiver loki.LogsReceiver, size int) *destinationQueue { + dq := &destinationQueue{ + receiver: receiver, + buffer: make(chan loki.Entry, size), + stopCh: make(chan struct{}), + } + dq.wg.Add(1) + go dq.run() + return dq +} + +func (dq *destinationQueue) run() { + defer dq.wg.Done() + for { + select { + case <-dq.stopCh: + return + case entry := <-dq.buffer: + select { + case <-dq.stopCh: + return + case dq.receiver.Chan() <- entry: + } + } + } +} + +// send attempts to queue an entry for sending without blocking. +// Returns true if queued, false if buffer is full. +func (dq *destinationQueue) send(entry loki.Entry) bool { + select { + case dq.buffer <- entry: + return true + default: + return false + } +} + +// sendWithBackoff attempts to queue an entry, retrying with exponential backoff +// if the buffer is full. Returns true if queued, false if stopped during retry. +// The metrics parameter is used to track retry attempts. +func (dq *destinationQueue) sendWithBackoff(entry loki.Entry, metrics *forwardMetrics) bool { + // First try without blocking + select { + case dq.buffer <- entry: + return true + default: + } + + // Buffer is full, retry with backoff + backoff := minBackoff + for { + select { + case <-dq.stopCh: + return false + default: + } + + metrics.enqueueRetriesTotal.Inc() + + select { + case <-dq.stopCh: + return false + case <-time.After(backoff): + } + + select { + case dq.buffer <- entry: + return true + default: + // Still full, increase backoff + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +func (dq *destinationQueue) stop() { + close(dq.stopCh) + dq.wg.Wait() +} + // New creates a new loki.process component. func New(o component.Options, args Arguments) (*Component, error) { debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) @@ -79,6 +205,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, + metrics: newForwardMetrics(o.Registerer), debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } @@ -110,6 +237,14 @@ func (c *Component) Run(ctx context.Context) error { wgOut.Wait() } c.mut.RUnlock() + + // Stop all destination queues + c.fanoutMut.Lock() + for _, q := range c.queues { + q.stop() + } + c.queues = nil + c.fanoutMut.Unlock() }() wgIn := &sync.WaitGroup{} wgIn.Add(1) @@ -125,11 +260,29 @@ func (c *Component) Run(ctx context.Context) error { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) - // Update c.fanout first in case anything else fails. + // Update fanout and queues. Each destination gets its own queue to ensure + // FIFO ordering while preventing a slow destination from blocking others. + // See https://github.com/grafana/alloy/issues/2194 + queueSize := newArgs.MaxForwardQueueSize + if queueSize <= 0 { + queueSize = DefaultArguments.MaxForwardQueueSize + } c.fanoutMut.Lock() + oldQueues := c.queues c.fanout = newArgs.ForwardTo + c.maxForwardQueueSize = queueSize + c.blockOnFull = newArgs.BlockOnFull + c.queues = make([]*destinationQueue, len(newArgs.ForwardTo)) + for i, receiver := range newArgs.ForwardTo { + c.queues[i] = newDestinationQueue(receiver, queueSize) + } c.fanoutMut.Unlock() + // Stop old queues after releasing the lock to avoid blocking + for _, q := range oldQueues { + q.stop() + } + // Then update the pipeline itself. c.mut.Lock() defer c.mut.Unlock() @@ -200,7 +353,8 @@ func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) { return case entry := <-c.processOut: c.fanoutMut.RLock() - fanout := c.fanout + queues := c.queues + blockOnFull := c.blockOnFull c.fanoutMut.RUnlock() // The log entry is the same for every fanout, @@ -219,11 +373,20 @@ func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) { }, )) - for _, f := range fanout { - select { - case <-shutdownCh: - return - case f.Chan() <- entry: + // Send to each destination's queue. Each destination has its own + // buffered queue with a dedicated worker goroutine, ensuring FIFO + // ordering while preventing a slow destination from blocking others. + // See https://github.com/grafana/alloy/issues/2194 + for _, q := range queues { + var sent bool + if blockOnFull { + sent = q.sendWithBackoff(entry, c.metrics) + } else { + sent = q.send(entry) + } + if !sent { + c.metrics.droppedEntriesTotal.Inc() + level.Warn(c.opts.Logger).Log("msg", "dropping log entry because destination queue is full", "labels", entry.Labels.String()) } } } diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index c90b52500a..8bbd90c3ca 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -881,9 +881,11 @@ func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSend t.component.Update(args) - // Check the component metrics. + // Check the component metrics. Filter for only the custom stage metrics + // that this test is interested in (exclude forward queue metrics). if err := testutil.GatherAndCompare(t.registry, - strings.NewReader(expectedMetricsBeforeSendingLogs)); err != nil { + strings.NewReader(expectedMetricsBeforeSendingLogs), + "loki_process_custom_paulin_test", "loki_process_custom_paulin_test_3"); err != nil { require.NoError(t.t, err) } @@ -904,9 +906,175 @@ func (t *tester) updateAndTest(numLogsToSend int, cfg, expectedMetricsBeforeSend } } - // Check the component metrics. + // Check the component metrics. Filter for only the custom stage metrics + // that this test is interested in (exclude forward queue metrics). if err := testutil.GatherAndCompare(t.registry, - strings.NewReader(expectedMetricsAfterSendingLogs)); err != nil { + strings.NewReader(expectedMetricsAfterSendingLogs), + "loki_process_custom_paulin_test", "loki_process_custom_paulin_test_3"); err != nil { require.NoError(t.t, err) } } + +// TestBlockedReceiverDoesNotBlockOthers verifies that when one downstream +// receiver is blocked, other receivers continue to receive logs. +// This is a regression test for https://github.com/grafana/alloy/issues/2194 +func TestBlockedReceiverDoesNotBlockOthers(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + // blockedReceiver simulates a downstream component that is not consuming logs + // (e.g., loki.write with a blocked remote endpoint) + blockedReceiver := loki.NewLogsReceiver() + + // healthyReceiver simulates a downstream component that is consuming logs normally + healthyReceiver := loki.NewLogsReceiver() + + // Create and run the component with both receivers + opts := component.Options{ + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) {}, + GetServiceData: getServiceData, + } + args := Arguments{ + ForwardTo: []loki.LogsReceiver{blockedReceiver, healthyReceiver}, + Stages: nil, // No stages, just fanout + } + + c, err := New(opts, args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + c.Run(ctx) + wg.Done() + }() + + // Send multiple log entries + numLogs := 5 + for i := 0; i < numLogs; i++ { + logEntry := loki.Entry{ + Labels: model.LabelSet{"test": "blocked_receiver"}, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("log message %d", i), + }, + } + c.receiver.Chan() <- logEntry + } + + // The healthy receiver should receive all logs even though blockedReceiver is not consuming + receivedCount := 0 + timeout := time.After(2 * time.Second) + + for receivedCount < numLogs { + select { + case <-healthyReceiver.Chan(): + receivedCount++ + case <-timeout: + t.Fatalf("healthy receiver only received %d/%d logs; blocked receiver is blocking other destinations", receivedCount, numLogs) + } + } + + require.Equal(t, numLogs, receivedCount, "healthy receiver should receive all logs") +} + +// TestSlowReceiverPreservesOrdering verifies that when one downstream receiver +// is slow (but not fully blocked), log ordering is preserved for all receivers. +// This ensures the per-destination queue implementation maintains FIFO ordering. +func TestSlowReceiverPreservesOrdering(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + // slowReceiver simulates a downstream component that processes logs slowly + slowReceiver := loki.NewLogsReceiver() + + // fastReceiver simulates a downstream component that processes logs quickly + fastReceiver := loki.NewLogsReceiver() + + // Create and run the component with both receivers + opts := component.Options{ + Logger: util.TestAlloyLogger(t), + Registerer: prometheus.NewRegistry(), + OnStateChange: func(e component.Exports) {}, + GetServiceData: getServiceData, + } + args := Arguments{ + ForwardTo: []loki.LogsReceiver{slowReceiver, fastReceiver}, + Stages: nil, + } + + c, err := New(opts, args) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + c.Run(ctx) + wg.Done() + }() + + // Send multiple log entries with sequential identifiers + numLogs := 20 + for i := 0; i < numLogs; i++ { + logEntry := loki.Entry{ + Labels: model.LabelSet{"test": "ordering", "seq": model.LabelValue(fmt.Sprintf("%d", i))}, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("log message %d", i), + }, + } + c.receiver.Chan() <- logEntry + } + + // Collect logs from both receivers and verify ordering + collectLogs := func(receiver loki.LogsReceiver, name string) []int { + var received []int + timeout := time.After(5 * time.Second) + for len(received) < numLogs { + select { + case entry := <-receiver.Chan(): + seq := int(entry.Labels["seq"][0] - '0') + if len(entry.Labels["seq"]) > 1 { + seq = seq*10 + int(entry.Labels["seq"][1]-'0') + } + received = append(received, seq) + // Simulate slow processing for slowReceiver + if name == "slow" { + time.Sleep(10 * time.Millisecond) + } + case <-timeout: + t.Fatalf("%s receiver only received %d/%d logs", name, len(received), numLogs) + } + } + return received + } + + // Start collecting from both receivers concurrently + var slowLogs, fastLogs []int + var collectWg sync.WaitGroup + collectWg.Add(2) + + go func() { + defer collectWg.Done() + slowLogs = collectLogs(slowReceiver, "slow") + }() + + go func() { + defer collectWg.Done() + fastLogs = collectLogs(fastReceiver, "fast") + }() + + collectWg.Wait() + + // Verify ordering is preserved for both receivers + for i := 0; i < numLogs; i++ { + require.Equal(t, i, slowLogs[i], "slow receiver logs should be in order") + require.Equal(t, i, fastLogs[i], "fast receiver logs should be in order") + } +} diff --git a/internal/component/loki/relabel/metrics.go b/internal/component/loki/relabel/metrics.go index 7cdcbe0989..d2011d8b5c 100644 --- a/internal/component/loki/relabel/metrics.go +++ b/internal/component/loki/relabel/metrics.go @@ -11,6 +11,10 @@ type metrics struct { cacheHits prometheus_client.Counter cacheMisses prometheus_client.Counter cacheSize prometheus_client.Gauge + + // Forward queue metrics + droppedEntriesTotal prometheus_client.Counter + enqueueRetriesTotal prometheus_client.Counter } // newMetrics creates a new set of metrics. If reg is non-nil, the metrics @@ -38,6 +42,14 @@ func newMetrics(reg prometheus_client.Registerer) *metrics { Name: "loki_relabel_cache_size", Help: "Total size of relabel cache", }) + m.droppedEntriesTotal = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "loki_relabel_dropped_entries_total", + Help: "Total number of log entries dropped because the destination queue was full", + }) + m.enqueueRetriesTotal = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "loki_relabel_enqueue_retries_total", + Help: "Total number of times enqueueing was retried when block_on_full is enabled", + }) if reg != nil { m.entriesProcessed = util.MustRegisterOrGet(reg, m.entriesProcessed).(prometheus_client.Counter) @@ -45,6 +57,8 @@ func newMetrics(reg prometheus_client.Registerer) *metrics { m.cacheMisses = util.MustRegisterOrGet(reg, m.cacheMisses).(prometheus_client.Counter) m.cacheHits = util.MustRegisterOrGet(reg, m.cacheHits).(prometheus_client.Counter) m.cacheSize = util.MustRegisterOrGet(reg, m.cacheSize).(prometheus_client.Gauge) + m.droppedEntriesTotal = util.MustRegisterOrGet(reg, m.droppedEntriesTotal).(prometheus_client.Counter) + m.enqueueRetriesTotal = util.MustRegisterOrGet(reg, m.enqueueRetriesTotal).(prometheus_client.Counter) } return &m diff --git a/internal/component/loki/relabel/relabel.go b/internal/component/loki/relabel/relabel.go index 3feef9c393..10e099bca7 100644 --- a/internal/component/loki/relabel/relabel.go +++ b/internal/component/loki/relabel/relabel.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sync" + "time" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" @@ -41,12 +42,25 @@ type Arguments struct { // The maximum number of items to hold in the component's LRU cache. MaxCacheSize int `alloy:"max_cache_size,attr,optional"` + + // MaxForwardQueueSize controls the maximum number of log entries buffered + // per downstream component. This prevents a slow destination from blocking + // other destinations. Default is 100000. + MaxForwardQueueSize int `alloy:"max_forward_queue_size,attr,optional"` + + // BlockOnFull controls behavior when a destination queue is full. + // If false (default), log entries are dropped when the queue is full. + // If true, the component will retry with exponential backoff, which may + // slow down the entire pipeline but prevents data loss. + BlockOnFull bool `alloy:"block_on_full,attr,optional"` } // DefaultArguments provides the default arguments for the loki.relabel // component. var DefaultArguments = Arguments{ - MaxCacheSize: 10_000, + MaxCacheSize: 10_000, + MaxForwardQueueSize: 100_000, + BlockOnFull: false, } // SetToDefault implements syntax.Defaulter. @@ -65,10 +79,13 @@ type Component struct { opts component.Options metrics *metrics - mut sync.RWMutex - rcs []*relabel.Config - receiver loki.LogsReceiver - fanout []loki.LogsReceiver + mut sync.RWMutex + rcs []*relabel.Config + receiver loki.LogsReceiver + fanout []loki.LogsReceiver + queues []*destinationQueue + maxForwardQueueSize int + blockOnFull bool cache *lru.Cache maxCacheSize int @@ -78,6 +95,105 @@ type Component struct { builder labels.ScratchBuilder } +// Backoff constants for blocking mode, similar to Prometheus remote write. +const ( + minBackoff = 5 * time.Millisecond + maxBackoff = 5 * time.Second +) + +// destinationQueue manages a buffered queue for a single destination to ensure +// FIFO ordering while preventing a slow destination from blocking others. +type destinationQueue struct { + receiver loki.LogsReceiver + buffer chan loki.Entry + stopCh chan struct{} + wg sync.WaitGroup +} + +func newDestinationQueue(receiver loki.LogsReceiver, size int) *destinationQueue { + dq := &destinationQueue{ + receiver: receiver, + buffer: make(chan loki.Entry, size), + stopCh: make(chan struct{}), + } + dq.wg.Add(1) + go dq.run() + return dq +} + +func (dq *destinationQueue) run() { + defer dq.wg.Done() + for { + select { + case <-dq.stopCh: + return + case entry := <-dq.buffer: + select { + case <-dq.stopCh: + return + case dq.receiver.Chan() <- entry: + } + } + } +} + +// send attempts to queue an entry for sending without blocking. +// Returns true if queued, false if buffer is full. +func (dq *destinationQueue) send(entry loki.Entry) bool { + select { + case dq.buffer <- entry: + return true + default: + return false + } +} + +// sendWithBackoff attempts to queue an entry, retrying with exponential backoff +// if the buffer is full. Returns true if queued, false if stopped during retry. +// The metrics parameter is used to track retry attempts. +func (dq *destinationQueue) sendWithBackoff(entry loki.Entry, m *metrics) bool { + // First try without blocking + select { + case dq.buffer <- entry: + return true + default: + } + + // Buffer is full, retry with backoff + backoff := minBackoff + for { + select { + case <-dq.stopCh: + return false + default: + } + + m.enqueueRetriesTotal.Inc() + + select { + case <-dq.stopCh: + return false + case <-time.After(backoff): + } + + select { + case dq.buffer <- entry: + return true + default: + // Still full, increase backoff + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +func (dq *destinationQueue) stop() { + close(dq.stopCh) + dq.wg.Wait() +} + var ( _ component.Component = (*Component)(nil) _ component.LiveDebugging = (*Component)(nil) @@ -119,6 +235,16 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { + defer func() { + // Stop all destination queues + c.mut.Lock() + for _, q := range c.queues { + q.stop() + } + c.queues = nil + c.mut.Unlock() + }() + componentID := livedebugging.ComponentID(c.opts.ID) for { select { @@ -148,11 +274,25 @@ func (c *Component) Run(ctx context.Context) error { c.metrics.entriesOutgoing.Inc() entry.Labels = lbls - for _, f := range c.fanout { - select { - case <-ctx.Done(): - return nil - case f.Chan() <- entry: + + // Send to each destination's queue. Each destination has its own + // buffered queue with a dedicated worker goroutine, ensuring FIFO + // ordering while preventing a slow destination from blocking others. + // See https://github.com/grafana/alloy/issues/2194 + c.mut.RLock() + queues := c.queues + blockOnFull := c.blockOnFull + c.mut.RUnlock() + for _, q := range queues { + var sent bool + if blockOnFull { + sent = q.sendWithBackoff(entry, c.metrics) + } else { + sent = q.send(entry) + } + if !sent { + c.metrics.droppedEntriesTotal.Inc() + level.Warn(c.opts.Logger).Log("msg", "dropping log entry because destination queue is full", "labels", entry.Labels.String()) } } } @@ -161,11 +301,18 @@ func (c *Component) Run(ctx context.Context) error { // Update implements component.Component. func (c *Component) Update(args component.Arguments) error { - c.mut.Lock() - defer c.mut.Unlock() - newArgs := args.(Arguments) newRCS := alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelConfigs) + + // Update fanout and queues. Each destination gets its own queue to ensure + // FIFO ordering while preventing a slow destination from blocking others. + // See https://github.com/grafana/alloy/issues/2194 + queueSize := newArgs.MaxForwardQueueSize + if queueSize <= 0 { + queueSize = DefaultArguments.MaxForwardQueueSize + } + c.mut.Lock() + oldQueues := c.queues if relabelingChanged(c.rcs, newRCS) { level.Debug(c.opts.Logger).Log("msg", "received new relabel configs, purging cache") c.cache.Purge() @@ -179,6 +326,18 @@ func (c *Component) Update(args component.Arguments) error { } c.rcs = newRCS c.fanout = newArgs.ForwardTo + c.maxForwardQueueSize = queueSize + c.blockOnFull = newArgs.BlockOnFull + c.queues = make([]*destinationQueue, len(newArgs.ForwardTo)) + for i, receiver := range newArgs.ForwardTo { + c.queues[i] = newDestinationQueue(receiver, queueSize) + } + c.mut.Unlock() + + // Stop old queues after releasing the lock to avoid blocking + for _, q := range oldQueues { + q.stop() + } c.opts.OnStateChange(Exports{Receiver: c.receiver, Rules: newArgs.RelabelConfigs}) diff --git a/internal/component/loki/secretfilter/secretfilter.go b/internal/component/loki/secretfilter/secretfilter.go index 7a18ce283e..481cbc4a9f 100644 --- a/internal/component/loki/secretfilter/secretfilter.go +++ b/internal/component/loki/secretfilter/secretfilter.go @@ -71,6 +71,17 @@ type Arguments struct { PartialMask uint `alloy:"partial_mask,attr,optional"` // Show the first N characters of the secret (default: 0) OriginLabel string `alloy:"origin_label,attr,optional"` // The label name to use for tracking metrics by origin (if empty, no origin metrics are collected) EnableEntropy bool `alloy:"enable_entropy,attr,optional"` // Enable entropy calculation for secrets (default: false) + + // MaxForwardQueueSize controls the maximum number of log entries buffered + // per downstream component. This prevents a slow destination from blocking + // other destinations. Default is 100000. + MaxForwardQueueSize int `alloy:"max_forward_queue_size,attr,optional"` + + // BlockOnFull controls behavior when a destination queue is full. + // If false (default), log entries are dropped when the queue is full. + // If true, the component will retry with exponential backoff, which may + // slow down the entire pipeline but prevents data loss. + BlockOnFull bool `alloy:"block_on_full,attr,optional"` } // Exports holds the values exported by the loki.secretfilter component. @@ -79,7 +90,10 @@ type Exports struct { } // DefaultArguments defines the default settings for log scraping. -var DefaultArguments = Arguments{} +var DefaultArguments = Arguments{ + MaxForwardQueueSize: 100_000, + BlockOnFull: false, +} // SetToDefault implements syntax.Defaulter. func (args *Arguments) SetToDefault() { @@ -95,17 +109,119 @@ var ( type Component struct { opts component.Options - mut sync.RWMutex - args Arguments - receiver loki.LogsReceiver - fanout []loki.LogsReceiver - Rules []Rule - AllowList []AllowRule + mut sync.RWMutex + args Arguments + receiver loki.LogsReceiver + fanout []loki.LogsReceiver + queues []*destinationQueue + maxForwardQueueSize int + blockOnFull bool + Rules []Rule + AllowList []AllowRule metrics *metrics debugDataPublisher livedebugging.DebugDataPublisher } +// Backoff constants for blocking mode, similar to Prometheus remote write. +const ( + minBackoff = 5 * time.Millisecond + maxBackoff = 5 * time.Second +) + +// destinationQueue manages a buffered queue for a single destination to ensure +// FIFO ordering while preventing a slow destination from blocking others. +type destinationQueue struct { + receiver loki.LogsReceiver + buffer chan loki.Entry + stopCh chan struct{} + wg sync.WaitGroup +} + +func newDestinationQueue(receiver loki.LogsReceiver, size int) *destinationQueue { + dq := &destinationQueue{ + receiver: receiver, + buffer: make(chan loki.Entry, size), + stopCh: make(chan struct{}), + } + dq.wg.Add(1) + go dq.run() + return dq +} + +func (dq *destinationQueue) run() { + defer dq.wg.Done() + for { + select { + case <-dq.stopCh: + return + case entry := <-dq.buffer: + select { + case <-dq.stopCh: + return + case dq.receiver.Chan() <- entry: + } + } + } +} + +// send attempts to queue an entry for sending without blocking. +// Returns true if queued, false if buffer is full. +func (dq *destinationQueue) send(entry loki.Entry) bool { + select { + case dq.buffer <- entry: + return true + default: + return false + } +} + +// sendWithBackoff attempts to queue an entry, retrying with exponential backoff +// if the buffer is full. Returns true if queued, false if stopped during retry. +// The metrics parameter is used to track retry attempts. +func (dq *destinationQueue) sendWithBackoff(entry loki.Entry, m *metrics) bool { + // First try without blocking + select { + case dq.buffer <- entry: + return true + default: + } + + // Buffer is full, retry with backoff + backoff := minBackoff + for { + select { + case <-dq.stopCh: + return false + default: + } + + m.enqueueRetriesTotal.Inc() + + select { + case <-dq.stopCh: + return false + case <-time.After(backoff): + } + + select { + case dq.buffer <- entry: + return true + default: + // Still full, increase backoff + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + +func (dq *destinationQueue) stop() { + close(dq.stopCh) + dq.wg.Wait() +} + // This struct is used to parse the gitleaks.toml file // Non-exhaustive representation. See https://github.com/gitleaks/gitleaks/blob/master/config/config.go // @@ -152,6 +268,10 @@ type metrics struct { // Summary of time taken for redaction log processing processingDuration prometheus.Summary + + // Forward queue metrics + droppedEntriesTotal prometheus.Counter + enqueueRetriesTotal prometheus.Counter } // newMetrics creates a new set of metrics for the secretfilter component. @@ -201,6 +321,18 @@ func newMetrics(reg prometheus.Registerer, originLabel string) *metrics { }, }) + m.droppedEntriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: "loki_secretfilter", + Name: "dropped_entries_total", + Help: "Total number of log entries dropped because the destination queue was full.", + }) + + m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: "loki_secretfilter", + Name: "enqueue_retries_total", + Help: "Total number of times enqueueing was retried when block_on_full is enabled.", + }) + if reg != nil { m.secretsRedactedTotal = util.MustRegisterOrGet(reg, m.secretsRedactedTotal).(prometheus.Counter) m.secretsRedactedByRule = util.MustRegisterOrGet(reg, m.secretsRedactedByRule).(*prometheus.CounterVec) @@ -210,6 +342,8 @@ func newMetrics(reg prometheus.Registerer, originLabel string) *metrics { } m.secretsAllowlistedTotal = util.MustRegisterOrGet(reg, m.secretsAllowlistedTotal).(*prometheus.CounterVec) m.processingDuration = util.MustRegisterOrGet(reg, m.processingDuration).(prometheus.Summary) + m.droppedEntriesTotal = util.MustRegisterOrGet(reg, m.droppedEntriesTotal).(prometheus.Counter) + m.enqueueRetriesTotal = util.MustRegisterOrGet(reg, m.enqueueRetriesTotal).(prometheus.Counter) } return &m @@ -243,6 +377,16 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { + defer func() { + // Stop all destination queues + c.mut.Lock() + for _, q := range c.queues { + q.stop() + } + c.queues = nil + c.mut.Unlock() + }() + componentID := livedebugging.ComponentID(c.opts.ID) for { @@ -263,15 +407,26 @@ func (c *Component) Run(ctx context.Context) error { }, )) - for _, f := range c.fanout { - select { - case <-ctx.Done(): - c.mut.RUnlock() - return nil - case f.Chan() <- newEntry: + // Send to each destination's queue. Each destination has its own + // buffered queue with a dedicated worker goroutine, ensuring FIFO + // ordering while preventing a slow destination from blocking others. + // See https://github.com/grafana/alloy/issues/2194 + queues := c.queues + blockOnFull := c.blockOnFull + metrics := c.metrics + c.mut.RUnlock() + for _, q := range queues { + var sent bool + if blockOnFull { + sent = q.sendWithBackoff(newEntry, metrics) + } else { + sent = q.send(newEntry) + } + if !sent { + metrics.droppedEntriesTotal.Inc() + level.Warn(c.opts.Logger).Log("msg", "dropping log entry because destination queue is full", "labels", entry.Labels.String()) } } - c.mut.RUnlock() } } } @@ -406,11 +561,24 @@ func hashSecret(secret string) string { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) + // Update fanout and queues. Each destination gets its own queue to ensure + // FIFO ordering while preventing a slow destination from blocking others. + // See https://github.com/grafana/alloy/issues/2194 + queueSize := newArgs.MaxForwardQueueSize + if queueSize <= 0 { + queueSize = DefaultArguments.MaxForwardQueueSize + } c.mut.Lock() - defer c.mut.Unlock() + oldQueues := c.queues c.args = newArgs c.fanout = newArgs.ForwardTo + c.maxForwardQueueSize = queueSize + c.blockOnFull = newArgs.BlockOnFull + c.queues = make([]*destinationQueue, len(newArgs.ForwardTo)) + for i, receiver := range newArgs.ForwardTo { + c.queues[i] = newDestinationQueue(receiver, queueSize) + } c.metrics = newMetrics(c.opts.Registerer, newArgs.OriginLabel) @@ -420,12 +588,14 @@ func (c *Component) Update(args component.Arguments) error { // If no config file is explicitly provided, use the embedded one _, err := toml.DecodeFS(embedFs, "gitleaks.toml", &gitleaksCfg) if err != nil { + c.mut.Unlock() return err } } else { // If a config file is provided, use that _, err := toml.DecodeFile(c.args.GitleaksConfig, &gitleaksCfg) if err != nil { + c.mut.Unlock() return err } } @@ -455,6 +625,7 @@ func (c *Component) Update(args component.Arguments) error { re, err := regexp.Compile(rule.Regex) if err != nil { level.Error(c.opts.Logger).Log("msg", "error compiling regex", "error", err) + c.mut.Unlock() return err } // If the rule regex matches the empty string, skip this rule @@ -479,6 +650,7 @@ func (c *Component) Update(args component.Arguments) error { re, err := regexp.Compile(r) if err != nil { level.Error(c.opts.Logger).Log("msg", "error compiling allowlist regex", "error", err) + c.mut.Unlock() return err } allowlist = append(allowlist, AllowRule{Regex: re, Source: fmt.Sprintf("rule %s", rule.ID)}) @@ -488,6 +660,7 @@ func (c *Component) Update(args component.Arguments) error { re, err := regexp.Compile(r) if err != nil { level.Error(c.opts.Logger).Log("msg", "error compiling allowlist regex", "error", err) + c.mut.Unlock() return err } allowlist = append(allowlist, AllowRule{Regex: re, Source: fmt.Sprintf("rule %s", rule.ID)}) @@ -519,6 +692,7 @@ func (c *Component) Update(args component.Arguments) error { re, err := regexp.Compile(r) if err != nil { level.Error(c.opts.Logger).Log("msg", "error compiling allowlist regex", "error", err) + c.mut.Unlock() return err } c.AllowList = append(c.AllowList, AllowRule{Regex: re, Source: "alloy config"}) @@ -528,6 +702,7 @@ func (c *Component) Update(args component.Arguments) error { re, err := regexp.Compile(r) if err != nil { level.Error(c.opts.Logger).Log("msg", "error compiling allowlist regex", "error", err) + c.mut.Unlock() return err } c.AllowList = append(c.AllowList, AllowRule{Regex: re, Source: "gitleaks config"}) @@ -539,6 +714,12 @@ func (c *Component) Update(args component.Arguments) error { } level.Info(c.opts.Logger).Log("Compiled regexes for secret detection", len(c.Rules)) + c.mut.Unlock() + + // Stop old queues after releasing the lock to avoid blocking + for _, q := range oldQueues { + q.stop() + } return nil } From f3e67ef966507610e169aa3612245b69ad78f041 Mon Sep 17 00:00:00 2001 From: Christian Nygaard Date: Wed, 3 Dec 2025 00:14:40 +0100 Subject: [PATCH 2/2] docs: add documentation for fanout queue options Add documentation for the new max_forward_queue_size and block_on_full arguments in loki.process, loki.relabel, and loki.secretfilter. Update CHANGELOG.md with the bugfix entry. --- CHANGELOG.md | 1 + .../reference/components/loki/loki.process.md | 13 ++++++--- .../reference/components/loki/loki.relabel.md | 13 ++++++--- .../components/loki/loki.secretfilter.md | 28 +++++++++++-------- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24a72762e3..558d6964b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Main (unreleased) ### Bugfixes +- Fixed an issue where `loki.process`, `loki.relabel`, and `loki.secretfilter` would block all downstream receivers if one receiver was slow or blocked. Each component now uses per-destination queues with configurable `max_forward_queue_size` and optional `block_on_full` mode. (@cnygaard) - (_Public Preview_) Additions to `database_observability.postgres` component: - `schema_details` diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index 226670e9fe..c24c2ff63d 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -35,11 +35,16 @@ loki.process "