Skip to content

Latest commit

 

History

History

README.md

Lookup Processor

Status
Stability alpha: logs
Distributions []

Description

The lookup processor enriches telemetry signals by performing external lookups to retrieve additional data. It reads an attribute value, uses it as a key to query a lookup source, and sets the result as a new attribute. Lookup processor needs a lookup source, available sources are listed in Built-in Sources section.

Currently supports logs, with metrics and traces support planned.

Full Configuration

Field Description Default
source.type The source type identifier (noop, yaml, dns) noop
attributes List of attribute enrichment rules (required) -
cache.enabled Enable caching of lookup results source specific
cache.size Max number of entries in the cache source specific
cache.ttl TTL for cached results (0 = none) source specific
cache.negative_ttl TTL for not-found (0 = none) source specific

Attribute Configuration

Each entry in attributes defines a lookup rule:

Field Description Default
key Name of the attribute to set with result (required) -
from_attribute Name of the attribute with lookup key (required) -
default Value to use when lookup returns no result -
action How to handle result: insert, update, upsert upsert
context Where to read/write: record, resource record

Actions

  • insert: Only set the attribute if it doesn't already exist
  • update: Only set the attribute if it already exists
  • upsert: Always set the attribute (default)

Context

  • record: Read from and write to record-level attributes (log records, spans, metric data points) (default)
  • resource: Read from and write to resource attributes

Example Configuration

processors:
  lookup:
    source:
      type: dns
      record_type: A
      timeout: 5s
    cache:
      enabled: true
      size: 1000
      ttl: 5m
      negative_ttl: 1m
    attributes:
      - key: server.ip
        from_attribute: server.hostname
        default: "unknown"
        action: upsert
        context: record

Built-in Sources

noop

A no-operation source that always returns "not found". Useful for testing.

processors:
  lookup:
    source:
      type: noop
    attributes:
      - key: result
        from_attribute: key
        default: "not-found"

yaml

Loads key-value mappings from a YAML file. The file should contain a flat map of string keys to values.

Field Description Default
path Path to the YAML file (required) -
processors:
  lookup:
    source:
      type: yaml
      path: /etc/otel/mappings.yaml
    attributes:
      - key: service.display_name
        from_attribute: service.name

Example mappings file (mappings.yaml):

svc-frontend: "Frontend Web App"
svc-backend: "Backend API Service"
svc-worker: "Background Worker"

dns

Performs DNS lookups to resolve hostnames to IPs or IPs to hostnames based on DNS record type.

Field Description Default
record_type DNS record type: A, AAAA, or PTR PTR
timeout Maximum time to wait for DNS resolution 5s
resolver Custom DNS server (format: "host:port") -
multiple_results Return all results or first only false

A record lookup (hostname to IPv4):

processors:
  lookup:
    source:
      type: dns
      record_type: A
      timeout: 5s
    cache:
      enabled: true
      size: 1000
      ttl: 5m
    attributes:
      - key: server.ip
        from_attribute: server.hostname

Detailed documentation for sources can be found under the respective source's readme.

Caching

All sources support caching to improve performance and reduce load on external systems. Configure caching at the processor level:

processors:
  lookup:
    source:
      type: dns
      record_type: A
    cache:
      enabled: true
      size: 1000 # Max entries
      ttl: 5m # Cache successful lookups for 5 minutes
      negative_ttl: 1m # Cache failed lookups for 1 minute
    attributes:
      - key: ip
        from_attribute: hostname

Cache Configuration:

  • enabled: Enable/disable caching
  • size: Maximum number of entries
  • ttl: Time-to-live for successful lookups. Use 0 for no expiration
  • negative_ttl: Time-to-live for not-found results. Use 0 to not cache failures

The cache uses an LRU (Least Recently Used) eviction policy when it reaches the size limit.

Note: Default values and caching behavior depend on the source implementation. For source-specific cache configuration details and recommendations, refer to the respective source's documentation.

Benchmarks

Run benchmarks with:

make benchmark

Processor Performance

Measures the full processing pipeline including pdata operations, attribute iteration, value conversion, and telemetry. Uses noop source to isolate processor overhead from source implementation (Apple M4 Pro):

Scenario ns/op B/op allocs/op
1 log, 1 attribute 323 696 20
10 logs, 1 attribute 1,274 3,216 74
100 logs, 1 attribute 11,076 28,512 614
100 logs, 3 attributes 21,604 54,113 1,014
1000 logs, 1 attribute 122,447 280,617 6,014

Source-specific benchmarks are available in each source's README.

Custom Sources

Custom lookup sources can be added using WithSources:

import (
    "github.com/open-telemetry/opentelemetry-collector-contrib/processor/lookupprocessor"
    "github.com/example/httplookup"
)

factories.Processors[lookupprocessor.Type] = lookupprocessor.NewFactoryWithOptions(
    lookupprocessor.WithSources(httplookup.NewFactory()),
)

Implementing a Source

package mysource

import (
    "context"
    "errors"
    "time"

    "github.com/open-telemetry/opentelemetry-collector-contrib/processor/lookupprocessor/lookupsource"
)

type Config struct {
    Endpoint string        `mapstructure:"endpoint"`
    Timeout  time.Duration `mapstructure:"timeout"`
}

func (c *Config) Validate() error {
    if c.Endpoint == "" {
        return errors.New("endpoint is required")
    }
    return nil
}

func NewFactory() lookupsource.SourceFactory {
    return lookupsource.NewSourceFactory(
        "mysource",
        func() lookupsource.SourceConfig {
            return &Config{Timeout: 5 * time.Second}
        },
        createSource,
    )
}

func createSource(
    ctx context.Context,
    settings lookupsource.CreateSettings,
    cfg lookupsource.SourceConfig,
) (lookupsource.Source, error) {
    c := cfg.(*Config)

    // Define the lookup function
    lookupFunc := func(ctx context.Context, key string) (any, bool, error) {
        // Perform lookup - return (value, found, error)
        return "result", true, nil
    }

    // Wrap with cache if enabled
    if settings.Cache.Enabled {
        cache := lookupsource.NewCache(settings.Cache)
        cache.SetLogger(settings.TelemetrySettings.Logger)
        lookupFunc = lookupsource.WrapWithCache(cache, lookupFunc)
    }

    return lookupsource.NewSource(
        lookupFunc,
        func() string { return "mysource" },
        nil, // start function (optional)
        nil, // shutdown function (optional)
    ), nil
}