Skip to content

Commit 2e77c56

Browse files
committed
feat(loki.process): add regex field to json stage
1 parent 04932e0 commit 2e77c56

File tree

2 files changed

+103
-18
lines changed

2 files changed

+103
-18
lines changed

internal/component/loki/process/stages/json.go

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"reflect"
7+
"regexp"
78

89
"github.com/go-kit/log"
910
"github.com/grafana/alloy/internal/runtime/logging/level"
@@ -13,32 +14,33 @@ import (
1314

1415
// Config Errors
1516
const (
16-
ErrExpressionsRequired = "JMES expression is required"
17-
ErrCouldNotCompileJMES = "could not compile JMES expression"
18-
ErrEmptyJSONStageConfig = "empty json stage configuration"
19-
ErrEmptyJSONStageSource = "empty source"
20-
ErrMalformedJSON = "malformed json"
17+
ErrExpressionsOrRegexRequired = "JMES expressions or regex is required"
18+
ErrCouldNotCompileJMES = "could not compile JMES expression"
19+
ErrEmptyJSONStageConfig = "empty json stage configuration"
20+
ErrEmptyJSONStageSource = "empty source"
21+
ErrMalformedJSON = "malformed json"
2122
)
2223

2324
// JSONConfig represents a JSON Stage configuration
2425
type JSONConfig struct {
25-
Expressions map[string]string `alloy:"expressions,attr"`
26+
Expressions map[string]string `alloy:"expressions,attr,optional"`
27+
Regex string `alloy:"regex,attr,optional"`
2628
Source *string `alloy:"source,attr,optional"`
2729
DropMalformed bool `alloy:"drop_malformed,attr,optional"`
2830
}
2931

3032
// validateJSONConfig validates a json config and returns a map of necessary jmespath expressions.
31-
func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, error) {
33+
func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, *regexp.Regexp, error) {
3234
if c == nil {
33-
return nil, errors.New(ErrEmptyJSONStageConfig)
35+
return nil, nil, errors.New(ErrEmptyJSONStageConfig)
3436
}
3537

36-
if len(c.Expressions) == 0 {
37-
return nil, errors.New(ErrExpressionsRequired)
38+
if len(c.Expressions) == 0 && len(c.Regex) == 0 {
39+
return nil, nil, errors.New(ErrExpressionsOrRegexRequired)
3840
}
3941

4042
if c.Source != nil && *c.Source == "" {
41-
return nil, errors.New(ErrEmptyJSONStageSource)
43+
return nil, nil, errors.New(ErrEmptyJSONStageSource)
4244
}
4345

4446
expressions := map[string]jmespath.JMESPath{}
@@ -52,28 +54,36 @@ func validateJSONConfig(c *JSONConfig) (map[string]jmespath.JMESPath, error) {
5254
}
5355
expressions[n], err = jmespath.Compile(jmes)
5456
if err != nil {
55-
return nil, fmt.Errorf("%s: %w", ErrCouldNotCompileJMES, err)
57+
return nil, nil, fmt.Errorf("%s: %w", ErrCouldNotCompileJMES, err)
5658
}
5759
}
58-
return expressions, nil
60+
61+
re, err := regexp.Compile(c.Regex)
62+
if err != nil {
63+
return nil, nil, err
64+
}
65+
66+
return expressions, re, nil
5967
}
6068

6169
// jsonStage sets extracted data using JMESPath expressions
6270
type jsonStage struct {
6371
cfg *JSONConfig
6472
expressions map[string]jmespath.JMESPath
73+
regex regexp.Regexp
6574
logger log.Logger
6675
}
6776

6877
// newJSONStage creates a new json pipeline stage from a config.
6978
func newJSONStage(logger log.Logger, cfg JSONConfig) (Stage, error) {
70-
expressions, err := validateJSONConfig(&cfg)
79+
expressions, regex, err := validateJSONConfig(&cfg)
7180
if err != nil {
7281
return nil, err
7382
}
7483
return &jsonStage{
7584
cfg: &cfg,
7685
expressions: expressions,
86+
regex: *regex,
7787
logger: log.With(logger, "component", "stage", "type", "json"),
7888
}, nil
7989
}
@@ -164,6 +174,31 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
164174
extracted[n] = string(jm)
165175
}
166176
}
177+
if j.regex.String() != "" {
178+
for key, value := range data {
179+
if j.regex.MatchString(key) {
180+
switch value.(type) {
181+
case float64:
182+
extracted[key] = value
183+
case string:
184+
extracted[key] = value
185+
case bool:
186+
extracted[key] = value
187+
case nil:
188+
extracted[key] = nil
189+
default:
190+
jm, err := json.Marshal(value)
191+
if err != nil {
192+
if Debug {
193+
level.Debug(j.logger).Log("msg", "failed to marshal complex type back to string", "err", err)
194+
}
195+
continue
196+
}
197+
extracted[key] = string(jm)
198+
}
199+
}
200+
}
201+
}
167202
if Debug {
168203
level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted_data", fmt.Sprintf("%v", extracted))
169204
}

internal/component/loki/process/stages/json_test.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,36 @@ stage.json {
2828
2929
stage.json {
3030
expressions = { "user" = "" }
31-
source = "extra"
31+
source = "extra"
3232
}`
3333

34+
var testJSONAlloyRegex = `
35+
stage.json {
36+
regex = "pod_.*"
37+
}
38+
`
39+
40+
var testJSONAlloyRegexAll = `
41+
stage.json {
42+
regex = ".*"
43+
}
44+
`
45+
46+
var testJSONAlloyExpressionsAndRegex = `
47+
stage.json {
48+
expressions = {"out" = "message", "app" = ""}
49+
regex = "(app|duration)"
50+
}
51+
`
52+
3453
var testJSONLogLine = `
3554
{
3655
"time":"2012-11-01T22:08:41+00:00",
3756
"app":"loki",
3857
"component": ["parser","type"],
3958
"level" : "WARN",
4059
"nested" : {"child":"value"},
41-
"duration" : 125,
60+
"duration" : 125,
4261
"message" : "this is a log line",
4362
"extra": "{\"user\":\"marco\"}"
4463
}
@@ -72,6 +91,37 @@ func TestPipeline_JSON(t *testing.T) {
7291
"user": "marco",
7392
},
7493
},
94+
"successfully extract regex values from json": {
95+
testJSONAlloyRegex,
96+
`{"time":"2012-11-01T22:08:41+00:00", "pod_name": "my-pod-123", "pod_label": "my-label"}`,
97+
map[string]interface{}{
98+
"pod_name": "my-pod-123",
99+
"pod_label": "my-label",
100+
},
101+
},
102+
"successfully extract all values from json via regex": {
103+
testJSONAlloyRegexAll,
104+
testJSONLogLine,
105+
map[string]interface{}{
106+
"time": "2012-11-01T22:08:41+00:00",
107+
"app": "loki",
108+
"component": `["parser","type"]`,
109+
"level": "WARN",
110+
"nested": `{"child":"value"}`,
111+
"duration": float64(125),
112+
"message": "this is a log line",
113+
"extra": "{\"user\":\"marco\"}",
114+
},
115+
},
116+
"successfully extract values with expressions and regex from json": {
117+
testJSONAlloyExpressionsAndRegex,
118+
testJSONLogLine,
119+
map[string]interface{}{
120+
"out": "this is a log line",
121+
"app": "loki",
122+
"duration": float64(125),
123+
},
124+
},
75125
}
76126

77127
for testName, testData := range tests {
@@ -132,7 +182,7 @@ func TestJSONConfig_validate(t *testing.T) {
132182
"no expressions": {
133183
&JSONConfig{},
134184
0,
135-
errors.New(ErrExpressionsRequired),
185+
errors.New(ErrExpressionsOrRegexRequired),
136186
},
137187
"invalid expression": {
138188
&JSONConfig{
@@ -180,7 +230,7 @@ func TestJSONConfig_validate(t *testing.T) {
180230
for tName, tt := range tests {
181231
tt := tt
182232
t.Run(tName, func(t *testing.T) {
183-
got, err := validateJSONConfig(tt.config)
233+
got, _, err := validateJSONConfig(tt.config)
184234
if tt.err != nil {
185235
assert.NotNil(t, err, "JSONConfig.validate() expected error = %v, but got nil", tt.err)
186236
}

0 commit comments

Comments
 (0)