Skip to content

Commit 32132b5

Browse files
[encoding/jsonlogencodingextension] Add array_mode config to jsonlogencodingextension (open-telemetry#40932)
#### Description This PR adds `array_mode` configuration option to the extension. This allows this extension to be used by multiple JSON payload types, - `array_mode: true` : This is the default mode which expects payload to be an json array (kept as default for backward compatiblity) - `array_mode: false`: This mode accepts the payload to contain single JSON document or multiple delimited documents (ex:- ndjson) Change includes implementation for both marshaling and unmarshaling. #### Link to tracking issue Fixes open-telemetry#40877 #### Testing I have validated the data flow for encoding with following scenarios, - With empty `array_mode` : Works similarly to current implementation where payload is expected as array - With `array_mode: true` : Default mode and expects array - With `array_mode: false` : Accepts single json document as well as new line delimited JSON document #### Documentation I have updated the extension's documentation to reflect the changes. --------- Signed-off-by: Kavindu Dodanduwa <[email protected]>
1 parent deff5b5 commit 32132b5

File tree

12 files changed

+368
-65
lines changed

12 files changed

+368
-65
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: jsonlogencodingextension
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add array_mode configuration option and add support to process arbitrary JSON inputs
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40877, 40545]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: "`array_mode` is default set to true to preserve backward compatibility. When set to `true`, extension accepts single or concatenated Json (ex:- NDJSON)"
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

extension/encoding/jsonlogencodingextension/README.md

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313

1414
## Configuration
1515

16-
| Name | Description | Default |
17-
| ------------------------ | -------------------------------------------------- | -------------------------------------------- |
18-
| mode | What mode of the JSON encoding extension you want | body |
19-
20-
16+
| Name | Description | Default |
17+
|------------|---------------------------------------------------------------------------------------|---------|
18+
| mode | What mode of the JSON encoding extension you want | body |
19+
| array_mode | Set whether JSON payloads is extracted from an array(legacy mode). Accepts a boolean. | true |
2120

2221
### Mode
2322

@@ -50,4 +49,22 @@ The `body_with_inline_attributes` mode within the JSON encoding extension grabs
5049
}
5150
}
5251
]
53-
```
52+
```
53+
54+
### array_mode
55+
56+
Configuration accepts a boolean.
57+
58+
- `array_mode: true` : This is the default mode to preserve backward compatibility. JSON input is expected as an array
59+
60+
> [{"key": "value"}, {"key": "value"}]
61+
62+
- `array_mode: false` : Disable legacy mode and allow to accept a verity of JSON payloads. This includes single document or even a concatenated JSON payload
63+
64+
Single payload
65+
> {"key": "value"}
66+
67+
New line delimited JSON payload
68+
> {"key": "value"}\
69+
> {"key": "value"}
70+

extension/encoding/jsonlogencodingextension/config.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@ const (
1414

1515
type Config struct {
1616
// Export raw log string instead of log wrapper
17-
Mode JSONEncodingMode `mapstructure:"mode,omitempty"`
17+
Mode JSONEncodingMode `mapstructure:"mode,omitempty"`
18+
ArrayMode bool `mapstructure:"array_mode,omitempty"`
19+
1820
// prevent unkeyed literal initialization
1921
_ struct{}
2022
}
2123

2224
func (c *Config) Validate() error {
25+
// validate marshaling mode
2326
switch c.Mode {
24-
case JSONEncodingModeBodyWithInlineAttributes:
25-
case JSONEncodingModeBody:
26-
default:
27-
return fmt.Errorf("invalid mode %q", c.Mode)
27+
case JSONEncodingModeBodyWithInlineAttributes, JSONEncodingModeBody:
28+
return nil
2829
}
29-
return nil
30+
31+
return fmt.Errorf("invalid mode %q", c.Mode)
3032
}

extension/encoding/jsonlogencodingextension/extension.go

Lines changed: 107 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
package jsonlogencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/jsonlogencodingextension"
55

66
import (
7+
"bytes"
78
"context"
9+
"errors"
810
"fmt"
11+
"io"
912

1013
"github.com/goccy/go-json"
1114
"go.opentelemetry.io/collector/component"
@@ -21,24 +24,44 @@ var (
2124
)
2225

2326
type jsonLogExtension struct {
24-
config component.Config
27+
config *Config
2528
}
2629

2730
func (e *jsonLogExtension) MarshalLogs(ld plog.Logs) ([]byte, error) {
28-
if e.config.(*Config).Mode == JSONEncodingModeBodyWithInlineAttributes {
29-
return e.logProcessor(ld)
30-
}
31-
logs := make([]map[string]any, 0, ld.LogRecordCount())
31+
var logs []map[string]any
3232

3333
rls := ld.ResourceLogs()
3434
for i := 0; i < rls.Len(); i++ {
3535
rl := rls.At(i)
36+
resourceAttrs := rl.Resource().Attributes().AsRaw()
3637
sls := rl.ScopeLogs()
3738
for j := 0; j < sls.Len(); j++ {
3839
sl := sls.At(j)
3940
logSlice := sl.LogRecords()
4041
for k := 0; k < logSlice.Len(); k++ {
4142
log := logSlice.At(k)
43+
if e.config.Mode == JSONEncodingModeBodyWithInlineAttributes {
44+
// special handling for inline attributes Mode
45+
entry := make(map[string]any)
46+
47+
body := log.Body().AsRaw()
48+
if body != nil {
49+
entry["body"] = body
50+
}
51+
52+
if len(resourceAttrs) != 0 {
53+
entry["resourceAttributes"] = resourceAttrs
54+
}
55+
56+
logAttribs := log.Attributes().AsRaw()
57+
if len(logAttribs) != 0 {
58+
entry["logAttributes"] = logAttribs
59+
}
60+
61+
logs = append(logs, entry)
62+
continue
63+
}
64+
4265
switch log.Body().Type() {
4366
case pcommon.ValueTypeMap:
4467
logs = append(logs, log.Body().Map().AsRaw())
@@ -48,23 +71,60 @@ func (e *jsonLogExtension) MarshalLogs(ld plog.Logs) ([]byte, error) {
4871
}
4972
}
5073
}
74+
75+
// check for processing mode so we can return the best format
76+
if !e.config.ArrayMode {
77+
var buf bytes.Buffer
78+
for i, log := range logs {
79+
m, err := json.Marshal(log)
80+
if err != nil {
81+
return nil, fmt.Errorf("marshaling error with ndjson log: %w", err)
82+
}
83+
84+
buf.Write(m)
85+
if i < len(logs)-1 {
86+
// if multiple logs, then consider exporting as ndjson
87+
buf.WriteByte('\n')
88+
}
89+
}
90+
91+
return buf.Bytes(), nil
92+
}
93+
94+
// default mode
5195
return json.Marshal(logs)
5296
}
5397

5498
func (e *jsonLogExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
5599
p := plog.NewLogs()
100+
sl := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty()
56101

57-
// get json logs from the buffer
58-
var jsonVal []map[string]any
59-
if err := json.Unmarshal(buf, &jsonVal); err != nil {
60-
return p, err
61-
}
102+
if e.config.ArrayMode {
103+
// Default mode to handle arrays having backward compatibility
104+
var jsonLogs []map[string]any
105+
var err error
62106

63-
sl := p.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty()
64-
for _, r := range jsonVal {
65-
if err := sl.LogRecords().AppendEmpty().Body().SetEmptyMap().FromRaw(r); err != nil {
107+
if err = json.Unmarshal(buf, &jsonLogs); err != nil {
66108
return p, err
67109
}
110+
111+
for _, r := range jsonLogs {
112+
if err := sl.LogRecords().AppendEmpty().Body().SetEmptyMap().FromRaw(r); err != nil {
113+
return p, err
114+
}
115+
}
116+
} else {
117+
reader := newStreamReader(bytes.NewReader(buf))
118+
for reader.next() {
119+
record, err := reader.value()
120+
if err != nil {
121+
return plog.Logs{}, err
122+
}
123+
124+
if err := sl.LogRecords().AppendEmpty().Body().SetEmptyMap().FromRaw(record); err != nil {
125+
return p, err
126+
}
127+
}
68128
}
69129

70130
return p, nil
@@ -78,35 +138,45 @@ func (e *jsonLogExtension) Shutdown(_ context.Context) error {
78138
return nil
79139
}
80140

81-
func (e *jsonLogExtension) logProcessor(ld plog.Logs) ([]byte, error) {
82-
logs := make([]logBody, 0, ld.LogRecordCount())
141+
// streamReader is a wrapper to process input stream and return processed JSON records one by one
142+
type streamReader struct {
143+
decoder *json.Decoder
144+
current map[string]any
145+
err error
146+
done bool
147+
}
83148

84-
rls := ld.ResourceLogs()
85-
for i := 0; i < rls.Len(); i++ {
86-
rl := rls.At(i)
87-
resourceAttrs := rl.Resource().Attributes().AsRaw()
149+
func newStreamReader(r io.Reader) *streamReader {
150+
return &streamReader{
151+
decoder: json.NewDecoder(r),
152+
}
153+
}
88154

89-
sls := rl.ScopeLogs()
90-
for j := 0; j < sls.Len(); j++ {
91-
sl := sls.At(j)
92-
logSlice := sl.LogRecords()
93-
for k := 0; k < logSlice.Len(); k++ {
94-
log := logSlice.At(k)
95-
logEvent := logBody{
96-
Body: log.Body().AsRaw(),
97-
ResourceAttributes: resourceAttrs,
98-
LogAttributes: log.Attributes().AsRaw(),
99-
}
100-
logs = append(logs, logEvent)
101-
}
155+
func (r *streamReader) next() bool {
156+
if r.done {
157+
return false
158+
}
159+
160+
var entry map[string]any
161+
err := r.decoder.Decode(&entry)
162+
if err != nil {
163+
if errors.Is(err, io.EOF) {
164+
// EOF signals the end
165+
r.done = true
166+
r.current = nil
167+
return false
102168
}
169+
170+
// Record error and let caller handles the result
171+
r.err = err
172+
r.current = nil
173+
return true
103174
}
104175

105-
return json.Marshal(logs)
176+
r.current = entry
177+
return true
106178
}
107179

108-
type logBody struct {
109-
Body any `json:"body,omitempty"`
110-
LogAttributes map[string]any `json:"logAttributes,omitempty"`
111-
ResourceAttributes map[string]any `json:"resourceAttributes,omitempty"`
180+
func (r *streamReader) value() (map[string]any, error) {
181+
return r.current, r.err
112182
}

extension/encoding/jsonlogencodingextension/factory.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ func NewFactory() extension.Factory {
2323

2424
func createExtension(_ context.Context, _ extension.Settings, config component.Config) (extension.Extension, error) {
2525
return &jsonLogExtension{
26-
config: config,
26+
config: config.(*Config),
2727
}, nil
2828
}
2929

3030
func createDefaultConfig() component.Config {
3131
return &Config{
32-
Mode: JSONEncodingModeBody,
32+
Mode: JSONEncodingModeBody,
33+
ArrayMode: true,
3334
}
3435
}

extension/encoding/jsonlogencodingextension/go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ go 1.23.0
55
require (
66
github.com/goccy/go-json v0.10.5
77
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.129.0
8+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.129.0
9+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.0.0-00010101000000-000000000000
810
github.com/stretchr/testify v1.10.0
911
go.opentelemetry.io/collector/component v1.35.1-0.20250708151327-74cb2f311035
1012
go.opentelemetry.io/collector/component/componenttest v0.129.1-0.20250708151327-74cb2f311035
@@ -16,6 +18,7 @@ require (
1618
)
1719

1820
require (
21+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1922
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
2023
github.com/go-logr/logr v1.4.3 // indirect
2124
github.com/go-logr/stdr v1.2.2 // indirect
@@ -32,6 +35,7 @@ require (
3235
github.com/mitchellh/reflectwalk v1.0.2 // indirect
3336
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3437
github.com/modern-go/reflect2 v1.0.2 // indirect
38+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.129.0 // indirect
3539
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
3640
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
3741
go.opentelemetry.io/collector/featuregate v1.35.1-0.20250708151327-74cb2f311035 // indirect
@@ -57,3 +61,9 @@ require (
5761
)
5862

5963
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../
64+
65+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
66+
67+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../../pkg/pdatatest
68+
69+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../../pkg/golden

extension/encoding/jsonlogencodingextension/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)