11package json_extract
22
33import (
4- "bytes"
5-
64 "github.com/go-faster/jx"
75 "github.com/ozontech/file.d/cfg"
86 "github.com/ozontech/file.d/fd"
@@ -11,7 +9,7 @@ import (
119)
1210
1311/*{ introduction
14- It extracts a field from JSON-encoded event field and adds extracted field to the event root.
12+ It extracts fields from JSON-encoded event field and adds extracted fields to the event root.
1513> If extracted field already exists in the event root, it will be overridden.
1614}*/
1715
@@ -23,42 +21,65 @@ pipelines:
2321 actions:
2422 - type: json_extract
2523 field: log
26- extract_field: error.code
24+ extract_fields:
25+ - error.code
26+ - level
27+ - meta
28+ - flags
2729 ...
2830```
2931The original event:
3032```json
3133{
32- "log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error \":{\"code\":2 ,\"args \":[]} }",
34+ "log": "{\"level\":\"error\",\"message\":\"error occurred\",\"error\":{\"code\":2,\"args\":[]},\"meta\":{\" service\":\"my-service\",\"pod \":\"my-service-5c4dfcdcd4-4v5zw\"} ,\"flags \":[\"flag1\",\"flag2\"] }",
3335 "time": "2024-03-01T10:49:28.263317941Z"
3436}
3537```
3638The resulting event:
3739```json
3840{
39- "log": "{\"level\":\"error\",\"message\":\"error occurred\",\"service\":\"my-service\",\"error \":{\"code\":2 ,\"args \":[]} }",
41+ "log": "{\"level\":\"error\",\"message\":\"error occurred\",\"error\":{\"code\":2,\"args\":[]},\"meta\":{\" service\":\"my-service\",\"pod \":\"my-service-5c4dfcdcd4-4v5zw\"} ,\"flags \":[\"flag1\",\"flag2\"] }",
4042 "time": "2024-03-01T10:49:28.263317941Z",
41- "code": 2
43+ "code": 2,
44+ "level": "error",
45+ "meta": {
46+ "service": "my-service",
47+ "pod": "my-service-5c4dfcdcd4-4v5zw"
48+ },
49+ "flags": ["flag1", "flag2"]
4250}
4351```
4452}*/
4553
4654/*{ benchmarks
4755Performance comparison of `json_extract` and `json_decode` plugins.
48- `json_extract` on average 3 times faster than `json_decode`.
56+ `json_extract` on average 2.5 times faster than `json_decode` and
57+ doesn't allocate memory during the extract process.
58+
59+ ### Extract 1 field
60+ | json (length) | json_extract (time ns) | json_decode (time ns) |
61+ |---------------|------------------------|-----------------------|
62+ | 309 | 300 | 560 |
63+ | 2109 | 2570 | 7250 |
64+ | 10909 | 13550 | 34250 |
65+ | 21909 | 26000 | 67940 |
66+ | 237909 | 262500 | 741530 |
4967
68+ ### Extract 5 fields
5069| json (length) | json_extract (time ns) | json_decode (time ns) |
5170|---------------|------------------------|-----------------------|
52- | 129 | 33 | 176 |
53- | 309 | 264 | 520 |
54- | 2109 | 2263 | 6778 |
55- | 10909 | 11289 | 32205 |
56- | 21909 | 23277 | 62819 |
71+ | 309 | 450 | 685 |
72+ | 2109 | 2990 | 7410 |
73+ | 10909 | 14540 | 35000 |
74+ | 21909 | 28340 | 69950 |
75+ | 237909 | 286600 | 741600 |
5776}*/
5877
5978type Plugin struct {
60- config * Config
61- decoder * jx.Decoder
79+ config * Config
80+
81+ extractFields * pathTree
82+ decoder * jx.Decoder
6283}
6384
6485// ! config-params
@@ -73,8 +94,14 @@ type Config struct {
7394 // > @3@4@5@6
7495 // >
7596 // > Field to extract.
76- ExtractField cfg.FieldSelector `json:"extract_field" parse:"selector" required:"true"` // *
97+ // >> ⚠ DEPRECATED. Use `extract_fields` instead.
98+ ExtractField cfg.FieldSelector `json:"extract_field" parse:"selector"` // *
7799 ExtractField_ []string
100+
101+ // > @3@4@5@6
102+ // >
103+ // > Fields to extract.
104+ ExtractFields []cfg.FieldSelector `json:"extract_fields" slice:"true"` // *
78105}
79106
80107func init () {
@@ -88,9 +115,25 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
88115 return & Plugin {}, & Config {}
89116}
90117
91- func (p * Plugin ) Start (config pipeline.AnyConfig , _ * pipeline.ActionPluginParams ) {
118+ func (p * Plugin ) Start (config pipeline.AnyConfig , params * pipeline.ActionPluginParams ) {
92119 p .config = config .(* Config )
93120 p .decoder = & jx.Decoder {}
121+
122+ p .extractFields = newPathTree ()
123+ dupl := false
124+ for _ , f := range p .config .ExtractFields {
125+ if f == p .config .ExtractField {
126+ dupl = true
127+ }
128+ p .extractFields .add (cfg .ParseFieldSelector (string (f )))
129+ }
130+ if ! dupl {
131+ p .extractFields .add (p .config .ExtractField_ )
132+ }
133+
134+ if len (p .extractFields .root .children ) == 0 {
135+ params .Logger .Fatal ("extract fields are empty" )
136+ }
94137}
95138
96139func (p * Plugin ) Stop () {}
@@ -102,36 +145,52 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
102145 }
103146
104147 p .decoder .ResetBytes (jsonNode .AsBytes ())
105- extract (event .Root , p .decoder , p .config . ExtractField_ , 0 , false )
148+ extract (event .Root , p .decoder , p .extractFields . root . children , false )
106149 return pipeline .ActionPass
107150}
108151
109- // extract extracts field from decoder and adds it to the root.
110- // `skipAddField` flag is required for proper benchmarking.
111- func extract (root * insaneJSON.Root , d * jx.Decoder , field []string , depth int , skipAddField bool ) {
152+ // extract extracts fields from decoder and adds it to the root.
153+ //
154+ // [skipAddField] flag is required for proper benchmarking.
155+ func extract (root * insaneJSON.Root , d * jx.Decoder , fields pathNodes , skipAddField bool ) {
112156 objIter , err := d .ObjIter ()
113157 if err != nil {
114158 return
115159 }
116160
161+ processed := len (fields )
117162 for objIter .Next () {
118- if bytes .Equal (objIter .Key (), pipeline .StringToByteUnsafe (field [depth ])) {
119- if depth == len (field )- 1 { // add field
120- if skipAddField {
121- _ = d .Skip ()
122- } else {
123- addField (root , field [depth ], d )
124- }
125- } else { // go deep
126- raw , err := d .Raw ()
127- if err != nil {
128- break
129- }
130- d .ResetBytes (raw )
131- extract (root , d , field , depth + 1 , skipAddField )
163+ // find the field at the current depth
164+ n := fields .find (string (objIter .Key ()))
165+ if n == nil {
166+ if err = d .Skip (); err != nil {
167+ break
132168 }
133- break
134- } else if err = d .Skip (); err != nil {
169+ continue
170+ }
171+
172+ if len (n .children ) == 0 { // last field in path, add to root
173+ if skipAddField {
174+ _ = d .Skip ()
175+ } else {
176+ addField (root , n .data , d )
177+ }
178+ } else { // go deep
179+ // Capture calls f and then rolls back to state before call
180+ _ = d .Capture (func (d * jx.Decoder ) error {
181+ // recursively extract child fields
182+ extract (root , d , n .children , skipAddField )
183+ return nil
184+ })
185+ // skip the current field because we have processed it
186+ // and rolled back the state of the decoder
187+ if err = d .Skip (); err != nil {
188+ break
189+ }
190+ }
191+
192+ processed --
193+ if processed == 0 {
135194 break
136195 }
137196 }
@@ -154,6 +213,7 @@ func addField(root *insaneJSON.Root, field string, d *jx.Decoder) {
154213 s , _ := d .StrBytes ()
155214 root .AddFieldNoAlloc (root , field ).MutateToBytesCopy (root , s )
156215 case jx .Null :
216+ _ = d .Null ()
157217 root .AddFieldNoAlloc (root , field ).MutateToNull ()
158218 case jx .Bool :
159219 b , _ := d .Bool ()
0 commit comments