Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Main (unreleased)

### Bugfixes

- Fixed an issue where `loki.process`, `loki.relabel`, and `loki.secretfilter` would block all downstream receivers if one receiver was slow or blocked. Each component now uses per-destination queues with configurable `max_forward_queue_size` and optional `block_on_full` mode. (@cnygaard)

- (_Public Preview_) Additions to `database_observability.postgres` component:
- `schema_details`
Expand Down
13 changes: 9 additions & 4 deletions docs/sources/reference/components/loki/loki.process.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ loki.process "<LABEL>" {

## Arguments

You can use the following argument with `loki.process`:
You can use the following arguments with `loki.process`:

| Name | Type | Description | Default | Required |
| ------------ | -------------------- | ---------------------------------------------- | ------- | -------- |
| `forward_to` | `list(LogsReceiver)` | Where to forward log entries after processing. | | yes |
| Name | Type | Description | Default | Required |
| ------------------------ | -------------------- | --------------------------------------------------------------------------- | -------- | -------- |
| `forward_to` | `list(LogsReceiver)` | Where to forward log entries after processing. | | yes |
| `max_forward_queue_size` | `int` | Maximum number of log entries to buffer per destination before dropping. | `100000` | no |
| `block_on_full` | `bool` | Block instead of dropping when the queue is full, retrying with backoff. | `false` | no |

When `block_on_full` is `false` (default), log entries are dropped if a destination's queue is full.
When `block_on_full` is `true`, the component retries with exponential backoff (5ms to 5s), which may slow the pipeline but prevents data loss.

## Blocks

Expand Down
13 changes: 9 additions & 4 deletions docs/sources/reference/components/loki/loki.relabel.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ loki.relabel "<LABEL>" {

You can use the following arguments with `loki.relabel`:

| Name | Type | Description | Default | Required |
| ---------------- | ---------------- | -------------------------------------------------------------- | -------- | -------- |
| `forward_to` | `list(receiver)` | Where to forward log entries after relabeling. | | yes |
| `max_cache_size` | `int` | The maximum number of elements to hold in the relabeling cache | `10000` | no |
| Name | Type | Description | Default | Required |
| ------------------------ | ---------------- | --------------------------------------------------------------------------- | -------- | -------- |
| `forward_to` | `list(receiver)` | Where to forward log entries after relabeling. | | yes |
| `max_cache_size` | `int` | The maximum number of elements to hold in the relabeling cache. | `10000` | no |
| `max_forward_queue_size` | `int` | Maximum number of log entries to buffer per destination before dropping. | `100000` | no |
| `block_on_full` | `bool` | Block instead of dropping when the queue is full, retrying with backoff. | `false` | no |

When `block_on_full` is `false` (default), log entries are dropped if a destination's queue is full.
When `block_on_full` is `true`, the component retries with exponential backoff (5ms to 5s), which may slow the pipeline but prevents data loss.

## Blocks

Expand Down
28 changes: 16 additions & 12 deletions docs/sources/reference/components/loki/loki.secretfilter.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,22 @@ loki.secretfilter "<LABEL>" {

You can use the following arguments with `loki.secretfilter`:

| Name | Type | Description | Default | Required |
| ----------------- | -------------------- | -------------------------------------------------------------- | ---------------------------------- | -------- |
| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes |
| `allowlist` | `list(string)` | List of regular expressions to allowlist matching secrets. | `[]` | no |
| `enable_entropy` | `bool` | Enable entropy-based filtering. | `false` | no |
| `gitleaks_config` | `string` | Path to the custom `gitleaks.toml` file. | Embedded Gitleaks file | no |
| `include_generic` | `bool` | Include the generic API key rule. | `false` | no |
| `origin_label` | `string` | Loki label to use for the `secrets_redacted_by_origin` metric. | `""` | no |
| `partial_mask` | `int` | Show the first N characters of the secret. | `0` | no |
| `redact_with` | `string` | String to use to redact secrets. | `"<REDACTED-SECRET:$SECRET_NAME>"` | no |
| `types` | `list(string)` | List of secret types to look for. | All types | no |

| Name | Type | Description | Default | Required |
| ------------------------ | -------------------- | --------------------------------------------------------------------------- | ---------------------------------- | -------- |
| `forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes |
| `allowlist` | `list(string)` | List of regular expressions to allowlist matching secrets. | `[]` | no |
| `block_on_full` | `bool` | Block instead of dropping when the queue is full, retrying with backoff. | `false` | no |
| `enable_entropy` | `bool` | Enable entropy-based filtering. | `false` | no |
| `gitleaks_config` | `string` | Path to the custom `gitleaks.toml` file. | Embedded Gitleaks file | no |
| `include_generic` | `bool` | Include the generic API key rule. | `false` | no |
| `max_forward_queue_size` | `int` | Maximum number of log entries to buffer per destination before dropping. | `100000` | no |
| `origin_label` | `string` | Loki label to use for the `secrets_redacted_by_origin` metric. | `""` | no |
| `partial_mask` | `int` | Show the first N characters of the secret. | `0` | no |
| `redact_with` | `string` | String to use to redact secrets. | `"<REDACTED-SECRET:$SECRET_NAME>"` | no |
| `types` | `list(string)` | List of secret types to look for. | All types | no |

When `block_on_full` is `false` (default), log entries are dropped if a destination's queue is full.
When `block_on_full` is `true`, the component retries with exponential backoff (5ms to 5s), which may slow the pipeline but prevents data loss.

The `gitleaks_config` argument is the path to the custom `gitleaks.toml` file.
If you don't provide the path to a custom configuration file, the Gitleaks configuration file [embedded in the component][embedded-config] is used.
Expand Down
38 changes: 38 additions & 0 deletions internal/component/loki/process/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package process

import (
"github.com/grafana/alloy/internal/util"
prometheus_client "github.com/prometheus/client_golang/prometheus"
)

// forwardMetrics contains metrics related to log entry forwarding.
type forwardMetrics struct {
// Number of log entries dropped because the destination queue was full.
droppedEntriesTotal prometheus_client.Counter

// Number of times we retried enqueueing when block_on_full is enabled.
enqueueRetriesTotal prometheus_client.Counter
}

// newForwardMetrics creates a new set of forward metrics. If reg is non-nil,
// the metrics will also be registered.
func newForwardMetrics(reg prometheus_client.Registerer) *forwardMetrics {
var m forwardMetrics

m.droppedEntriesTotal = prometheus_client.NewCounter(prometheus_client.CounterOpts{
Name: "loki_process_dropped_entries_total",
Help: "Total number of log entries dropped because the destination queue was full",
})

m.enqueueRetriesTotal = prometheus_client.NewCounter(prometheus_client.CounterOpts{
Name: "loki_process_enqueue_retries_total",
Help: "Total number of times enqueueing was retried when block_on_full is enabled",
})

if reg != nil {
m.droppedEntriesTotal = util.MustRegisterOrGet(reg, m.droppedEntriesTotal).(prometheus_client.Counter)
m.enqueueRetriesTotal = util.MustRegisterOrGet(reg, m.enqueueRetriesTotal).(prometheus_client.Counter)
}

return &m
}
181 changes: 172 additions & 9 deletions internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,29 @@ func init() {
type Arguments struct {
ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"`
Stages []stages.StageConfig `alloy:"stage,enum,optional"`

// MaxForwardQueueSize controls the maximum number of log entries buffered
// per downstream component. This prevents a slow destination from blocking
// other destinations. Default is 100000.
MaxForwardQueueSize int `alloy:"max_forward_queue_size,attr,optional"`

// BlockOnFull controls behavior when a destination queue is full.
// If false (default), log entries are dropped when the queue is full.
// If true, the component will retry with exponential backoff, which may
// slow down the entire pipeline but prevents data loss.
BlockOnFull bool `alloy:"block_on_full,attr,optional"`
}

// DefaultArguments provides the default arguments for the loki.process
// component.
var DefaultArguments = Arguments{
MaxForwardQueueSize: 100_000,
BlockOnFull: false,
}

// SetToDefault implements syntax.Defaulter.
func (a *Arguments) SetToDefault() {
*a = DefaultArguments
}

// Exports exposes the receiver that can be used to send log entries to
Expand All @@ -64,12 +87,115 @@ type Component struct {
entryHandler loki.EntryHandler
stages []stages.StageConfig

fanoutMut sync.RWMutex
fanout []loki.LogsReceiver
fanoutMut sync.RWMutex
fanout []loki.LogsReceiver
queues []*destinationQueue
maxForwardQueueSize int
blockOnFull bool

metrics *forwardMetrics
debugDataPublisher livedebugging.DebugDataPublisher
}

// Backoff constants for blocking mode, similar to Prometheus remote write.
const (
minBackoff = 5 * time.Millisecond
maxBackoff = 5 * time.Second
)

// destinationQueue manages a buffered queue for a single destination to ensure
// FIFO ordering while preventing a slow destination from blocking others.
type destinationQueue struct {
receiver loki.LogsReceiver
buffer chan loki.Entry
stopCh chan struct{}
wg sync.WaitGroup
}

func newDestinationQueue(receiver loki.LogsReceiver, size int) *destinationQueue {
dq := &destinationQueue{
receiver: receiver,
buffer: make(chan loki.Entry, size),
stopCh: make(chan struct{}),
}
dq.wg.Add(1)
go dq.run()
return dq
}

func (dq *destinationQueue) run() {
defer dq.wg.Done()
for {
select {
case <-dq.stopCh:
return
case entry := <-dq.buffer:
select {
case <-dq.stopCh:
return
case dq.receiver.Chan() <- entry:
}
}
}
}

// send attempts to queue an entry for sending without blocking.
// Returns true if queued, false if buffer is full.
func (dq *destinationQueue) send(entry loki.Entry) bool {
select {
case dq.buffer <- entry:
return true
default:
return false
}
}

// sendWithBackoff attempts to queue an entry, retrying with exponential backoff
// if the buffer is full. Returns true if queued, false if stopped during retry.
// The metrics parameter is used to track retry attempts.
func (dq *destinationQueue) sendWithBackoff(entry loki.Entry, metrics *forwardMetrics) bool {
// First try without blocking
select {
case dq.buffer <- entry:
return true
default:
}

// Buffer is full, retry with backoff
backoff := minBackoff
for {
select {
case <-dq.stopCh:
return false
default:
}

metrics.enqueueRetriesTotal.Inc()

select {
case <-dq.stopCh:
return false
case <-time.After(backoff):
}

select {
case dq.buffer <- entry:
return true
default:
// Still full, increase backoff
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}

func (dq *destinationQueue) stop() {
close(dq.stopCh)
dq.wg.Wait()
}

// New creates a new loki.process component.
func New(o component.Options, args Arguments) (*Component, error) {
debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName)
Expand All @@ -79,6 +205,7 @@ func New(o component.Options, args Arguments) (*Component, error) {

c := &Component{
opts: o,
metrics: newForwardMetrics(o.Registerer),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}

Expand Down Expand Up @@ -110,6 +237,14 @@ func (c *Component) Run(ctx context.Context) error {
wgOut.Wait()
}
c.mut.RUnlock()

// Stop all destination queues
c.fanoutMut.Lock()
for _, q := range c.queues {
q.stop()
}
c.queues = nil
c.fanoutMut.Unlock()
}()
wgIn := &sync.WaitGroup{}
wgIn.Add(1)
Expand All @@ -125,11 +260,29 @@ func (c *Component) Run(ctx context.Context) error {
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)

// Update c.fanout first in case anything else fails.
// Update fanout and queues. Each destination gets its own queue to ensure
// FIFO ordering while preventing a slow destination from blocking others.
// See https://github.com/grafana/alloy/issues/2194
queueSize := newArgs.MaxForwardQueueSize
if queueSize <= 0 {
queueSize = DefaultArguments.MaxForwardQueueSize
}
c.fanoutMut.Lock()
oldQueues := c.queues
c.fanout = newArgs.ForwardTo
c.maxForwardQueueSize = queueSize
c.blockOnFull = newArgs.BlockOnFull
c.queues = make([]*destinationQueue, len(newArgs.ForwardTo))
for i, receiver := range newArgs.ForwardTo {
c.queues[i] = newDestinationQueue(receiver, queueSize)
}
c.fanoutMut.Unlock()

// Stop old queues after releasing the lock to avoid blocking
for _, q := range oldQueues {
q.stop()
}

// Then update the pipeline itself.
c.mut.Lock()
defer c.mut.Unlock()
Expand Down Expand Up @@ -200,7 +353,8 @@ func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) {
return
case entry := <-c.processOut:
c.fanoutMut.RLock()
fanout := c.fanout
queues := c.queues
blockOnFull := c.blockOnFull
c.fanoutMut.RUnlock()

// The log entry is the same for every fanout,
Expand All @@ -219,11 +373,20 @@ func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) {
},
))

for _, f := range fanout {
select {
case <-shutdownCh:
return
case f.Chan() <- entry:
// Send to each destination's queue. Each destination has its own
// buffered queue with a dedicated worker goroutine, ensuring FIFO
// ordering while preventing a slow destination from blocking others.
// See https://github.com/grafana/alloy/issues/2194
for _, q := range queues {
var sent bool
if blockOnFull {
sent = q.sendWithBackoff(entry, c.metrics)
} else {
sent = q.send(entry)
}
if !sent {
c.metrics.droppedEntriesTotal.Inc()
level.Warn(c.opts.Logger).Log("msg", "dropping log entry because destination queue is full", "labels", entry.Labels.String())
}
}
}
Expand Down
Loading