Skip to content

mmjsontransform: Add YAML-driven schema and transformation policy support #6251

@rgerhards

Description

@rgerhards

Summary

Extend mmjsontransform with a reloadable external YAML policy that defines JSON schema-like validation and transformation rules.
The goal is to make data normalization, field renaming, and lightweight validation easy to configure without extra modules or custom scripting.

Motivation

Users frequently need to:

  • Validate required fields and types before ingestion.
  • Parse embedded JSON strings (e.g. message fields).
  • Normalize timestamps and apply canonical field names.
  • Add default values and drop unwanted keys.
  • Perform simple type coercion, enum checks, and value ranges.
  • Redact sensitive data before forwarding.

Currently these tasks require complex rulesets or external preprocessors.
A YAML-based policy, reloadable at runtime, provides a compact and familiar workflow consistent with other modern log pipelines. Reload shall happen with HUP, like also done e.g. for lookup tables.

Proposed Functionality

Module parameter

module(load="mmjsonparse" policy="/etc/rsyslog/mmjsonparse-policy.yaml")
  • policy points to a YAML file describing transformation and validation rules.
  • File is reloaded automatically on HUP or timestamp change.
  • Invalid YAML reverts to the last known good version with an error log.
  • the current flatten and unflatten modes shall be integrated, but the old-style definiton possibly be kept (to be decided during implementation).

Example Policy File

version: 1
description: Basic normalization policy

map:
  rename:
    "usr": "user.name"
    "fields.client.ip": "client.ip"
  drop:
    - "debug"
    - "trace"

coerce:
  types:
    "http.status_code": int
    "user.id": int

timestamp:
  candidates: ["@timestamp", "time", "logtime"]
  formats: ["rfc3339", "epoch_ms", "%d/%b/%Y:%H:%M:%S %z"]
  output: "!ts"
  fallback: "timereported"

defaults:
  "event.dataset": "unknown"
  "schema_version": 1

validate:
  required: ["@timestamp", "event.kind", "host.name"]
  enum:
    "event.kind": ["event", "alert", "metric"]
  range:
    "http.status_code": [100, 599]
  on_fail:
    action: "tag"
    tag: "json_invalid"

redact:
  keys: ["user.password", "auth.token"]

json:
  parse_embedded:
    - "message"
    - "log.json"

Example Behavior

Input JSON

{
  "usr": "alice",
  "message": "{\"user\":\"alice\",\"action\":\"login\"}",
  "time": "2025-10-15T12:00:00Z"
}

After mmjsonparse with policy

{
  "user": {
    "name": "alice"
  },
  "message": {
    "user": "alice",
    "action": "login"
  },
  "ts": "2025-10-15T12:00:00Z",
  "event": {
    "dataset": "unknown"
  },
  "schema_version": 1
}

Implementation Notes for AI Agent / Developer

  1. Add a policy parameter to mmjsonparse.
  2. Implement YAML loader (libyaml or in-tree parser) → internal normalized structure.
  3. Apply transformations and validation in doAction():
    • mapping/renames
    • type coercion
    • timestamp normalization
    • parse_embedded fields
    • defaults injection
    • validation + on_fail handling
    • redaction
  4. Provide reload mechanism (policy re-read on HUP / possibly mtime change).
  5. Ensure backward compatibility when policy is unset (current behavior unchanged).

Expected Benefits

  • Single, readable configuration for JSON transformations.
  • Simplified ingestion pipelines.
  • Easier troubleshooting and reproducibility.
  • Alignment with YAML-driven approaches used in Logstash, Fluent Bit, Vector, etc.

References

Broader log pipeline community feedback shows frequent demand for:

  • Required-field checks and type enforcement.
  • Timestamp normalization.
  • Embedded JSON parsing.
  • Canonical renaming and data pruning.
    These are the same operations commonly provided by Logstash mutate, Fluent Bit modify, and Kafka Connect ReplaceField.

Metadata

Metadata

Assignees

Projects

Status

Ready

Status

In progress

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions