Skip to content

Commit 04932e0

Browse files
committed
feat(loki.process): add regex field to logfmt stage
1 parent 9ec199b commit 04932e0

File tree

2 files changed

+83
-14
lines changed

2 files changed

+83
-14
lines changed

internal/component/loki/process/stages/logfmt.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"reflect"
7+
"regexp"
78
"strings"
89
"time"
910

@@ -15,26 +16,27 @@ import (
1516

1617
// Config Errors
1718
var (
18-
ErrMappingRequired = errors.New("logfmt mapping is required")
19+
ErrMappingOrRegexRequired = errors.New("logfmt mapping or regex is required")
1920
ErrEmptyLogfmtStageConfig = errors.New("empty logfmt stage configuration")
2021
)
2122

2223
// LogfmtConfig represents a logfmt Stage configuration
2324
type LogfmtConfig struct {
24-
Mapping map[string]string `alloy:"mapping,attr"`
25+
Mapping map[string]string `alloy:"mapping,attr,optional"`
2526
Source string `alloy:"source,attr,optional"`
27+
Regex string `alloy:"regex,attr,optional"`
2628
}
2729

2830
// validateLogfmtConfig validates a logfmt stage config and returns an inverse mapping of configured mapping.
2931
// Mapping inverse is done to make lookup easier. The key would be the key from parsed logfmt and
3032
// value would be the key with which the data in extracted map would be set.
31-
func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) {
33+
func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, *regexp.Regexp, error) {
3234
if c == nil {
33-
return nil, ErrEmptyLogfmtStageConfig
35+
return nil, nil, ErrEmptyLogfmtStageConfig
3436
}
3537

36-
if len(c.Mapping) == 0 {
37-
return nil, ErrMappingRequired
38+
if len(c.Mapping) == 0 && len(c.Regex) == 0 {
39+
return nil, nil, ErrMappingOrRegexRequired
3840
}
3941

4042
inverseMapping := make(map[string]string)
@@ -46,28 +48,35 @@ func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) {
4648
inverseMapping[v] = k
4749
}
4850

49-
return inverseMapping, nil
51+
re, err := regexp.Compile(c.Regex)
52+
if err != nil {
53+
return nil, nil, err
54+
}
55+
56+
return inverseMapping, re, nil
5057
}
5158

5259
// logfmtStage sets extracted data using logfmt parser
5360
type logfmtStage struct {
5461
cfg *LogfmtConfig
5562
inverseMapping map[string]string
63+
regex regexp.Regexp
5664
logger log.Logger
5765
}
5866

5967
// newLogfmtStage creates a new logfmt pipeline stage from a config.
6068
func newLogfmtStage(logger log.Logger, config LogfmtConfig) (Stage, error) {
6169
// inverseMapping would hold the mapping in inverse which would make lookup easier.
6270
// To explain it simply, the key would be the key from parsed logfmt and value would be the key with which the data in extracted map would be set.
63-
inverseMapping, err := validateLogfmtConfig(&config)
71+
inverseMapping, regex, err := validateLogfmtConfig(&config)
6472
if err != nil {
6573
return nil, err
6674
}
6775

6876
return toStage(&logfmtStage{
6977
cfg: &config,
7078
inverseMapping: inverseMapping,
79+
regex: *regex,
7180
logger: log.With(logger, "component", "stage", "type", "logfmt"),
7281
}), nil
7382
}
@@ -98,13 +107,22 @@ func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interf
98107
return
99108
}
100109
decoder := logfmt.NewDecoder(strings.NewReader(*input))
101-
extractedEntriesCount := 0
110+
mappingExtractedEntriesCount := 0
111+
regexExtractedEntriesCount := 0
102112
for decoder.ScanRecord() {
103113
for decoder.ScanKeyval() {
114+
// handle "mapping"
104115
mapKey, ok := j.inverseMapping[string(decoder.Key())]
105116
if ok {
106117
extracted[mapKey] = string(decoder.Value())
107-
extractedEntriesCount++
118+
mappingExtractedEntriesCount++
119+
} else if j.regex.String() != "" {
120+
// handle "regex"
121+
fmt.Println(j.regex.String(), string(decoder.Key()))
122+
if j.regex.MatchString(string(decoder.Key())) {
123+
extracted[string(decoder.Key())] = string(decoder.Value())
124+
regexExtractedEntriesCount++
125+
}
108126
}
109127
}
110128
}
@@ -114,8 +132,11 @@ func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interf
114132
return
115133
}
116134

117-
if extractedEntriesCount != len(j.inverseMapping) {
118-
level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", extractedEntriesCount, len(j.inverseMapping)))
135+
if mappingExtractedEntriesCount != len(j.inverseMapping) {
136+
level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", mappingExtractedEntriesCount, len(j.inverseMapping)))
137+
}
138+
if regexExtractedEntriesCount > 0 {
139+
level.Debug(j.logger).Log("msg", fmt.Sprintf("found %d mappings via regex in logfmt stage", regexExtractedEntriesCount))
119140
}
120141
level.Debug(j.logger).Log("msg", "extracted data debug in logfmt stage", "extracted data", fmt.Sprintf("%v", extracted))
121142
}

internal/component/loki/process/stages/logfmt_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,25 @@ stage.logfmt {
2626
source = "extra"
2727
}`
2828

29+
var testLogfmtAlloyRegex = `
30+
stage.logfmt {
31+
regex = "pod_.*"
32+
}
33+
`
34+
35+
var testLogfmtAlloyRegexAll = `
36+
stage.logfmt {
37+
regex = ".*"
38+
}
39+
`
40+
41+
var testLogfmtAlloyRegexAndMapping = `
42+
stage.logfmt {
43+
mapping = { "out" = "message", "app" = ""}
44+
regex = "(app|duration)"
45+
}
46+
`
47+
2948
func TestLogfmt(t *testing.T) {
3049
var testLogfmtLogLine = `
3150
time=2012-11-01T22:08:41+00:00 app=loki level=WARN duration=125 message="this is a log line" extra="user=foo""
@@ -54,6 +73,35 @@ func TestLogfmt(t *testing.T) {
5473
"user": "foo",
5574
},
5675
},
76+
"successfully extract regex values from logfmt": {
77+
testLogfmtAlloyRegex,
78+
`time=2012-11-01T22:08:41+00:00 pod_name=my-pod-123 pod_label=my-label`,
79+
map[string]interface{}{
80+
"pod_name": "my-pod-123",
81+
"pod_label": "my-label",
82+
},
83+
},
84+
"successfully extract all values via regex from logfmt": {
85+
testLogfmtAlloyRegexAll,
86+
testLogfmtLogLine,
87+
map[string]interface{}{
88+
"time": "2012-11-01T22:08:41+00:00",
89+
"app": "loki",
90+
"level": "WARN",
91+
"duration": "125",
92+
"message": "this is a log line",
93+
"extra": "user=foo",
94+
},
95+
},
96+
"successfully extract values with expressions and regex from logfmt": {
97+
testLogfmtAlloyRegexAndMapping,
98+
testLogfmtLogLine,
99+
map[string]interface{}{
100+
"out": "this is a log line",
101+
"app": "loki",
102+
"duration": "125",
103+
},
104+
},
57105
}
58106

59107
for testName, testData := range tests {
@@ -81,7 +129,7 @@ func TestLogfmtConfigValidation(t *testing.T) {
81129
"no mapping": {
82130
LogfmtConfig{},
83131
0,
84-
ErrMappingRequired,
132+
ErrMappingOrRegexRequired,
85133
},
86134
"valid without source": {
87135
LogfmtConfig{
@@ -108,7 +156,7 @@ func TestLogfmtConfigValidation(t *testing.T) {
108156
for tName, tt := range tests {
109157
tt := tt
110158
t.Run(tName, func(t *testing.T) {
111-
got, err := validateLogfmtConfig(&tt.config)
159+
got, _, err := validateLogfmtConfig(&tt.config)
112160
if tt.err != nil {
113161
assert.EqualError(t, err, tt.err.Error())
114162
} else {

0 commit comments

Comments
 (0)