Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/agent/subcommands/analyzelogs/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Auto-discovery IDs:
err := json.Unmarshal(msg.GetContent(), &parsedMessage)
assert.NoError(t, err)

assert.Equal(t, parsedMessage.Message, expectedOutput[i])
assert.Equal(t, parsedMessage.Message.String(), expectedOutput[i])
}

launcher.Stop()
Expand All @@ -178,7 +178,7 @@ func TestRunAnalyzeLogsInvalidConfig(t *testing.T) {
- type: exclude_at_match
name: exclude_random
pattern: "datadog-agent"

`, tempLogFile.Name())
tempConfigFile := CreateTestFile(tempDir, "config.yaml", invalidConfig)
assert.NotNil(t, tempConfigFile)
Expand Down
31 changes: 31 additions & 0 deletions pkg/logs/processor/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package processor

import (
"bytes"
"strings"
"unicode/utf8"

Expand All @@ -18,6 +19,36 @@ type Encoder interface {
Encode(msg *message.Message, hostname string) error
}

type ValidUtf8Bytes []byte
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: its unfortunate that #30924 exported jsonPayload type. Maybe this type could be unexported given it is only used as fmt.Stringer:

fmt.Println(parsedMessage.Message)


func (msg ValidUtf8Bytes) MarshalText() (text []byte, err error) {
if utf8.Valid(msg) {
return msg, nil
}

var buf bytes.Buffer
buf.Grow(len(msg))

for len(msg) > 0 {
r, size := utf8.DecodeRune(msg)
// in case of invalid utf-8, DecodeRune returns (utf8.RuneError, 1)
// and since RuneError is the same as unicode.ReplacementChar
// no need to handle the error explicitly
buf.WriteRune(r)
msg = msg[size:]
}
return buf.Bytes(), nil
}

func (msg *ValidUtf8Bytes) UnmarshalText(text []byte) error {
*msg = bytes.Clone(text)
return nil
}

func (msg ValidUtf8Bytes) String() string {
return string(msg)
}

// toValidUtf8 ensures all characters are UTF-8.
func toValidUtf8(msg []byte) string {
if utf8.Valid(msg) {
Expand Down
127 changes: 108 additions & 19 deletions pkg/logs/processor/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/DataDog/agent-payload/v5/pb"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/sources"
Expand Down Expand Up @@ -195,31 +196,77 @@ func TestJsonEncoder(t *testing.T) {

source := sources.NewLogSource("", logsConfig)

content := []byte("message")
msg := newMessage(content, source, message.StatusError)
msg.State = message.StateRendered // we can only encode rendered message
msg.Origin.LogSource = source
msg.Origin.SetTags([]string{"a", "b:c"})
assert.Equal(t, msg.GetContent(), content) // before encoding, content should be the raw message
type payload struct {
Message string `json:"message"`
Comment on lines +199 to +200
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: use test payload type to parse message as string.

Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
Hostname string `json:"hostname"`
Service string `json:"service"`
Source string `json:"ddsource"`
Tags string `json:"ddtags"`
}

err := JSONEncoder.Encode(msg, "unknown")
assert.Nil(t, err)
t.Run("valid", func(t *testing.T) {
content := []byte("valid utf-8 message content")

log := &jsonPayload{}
err = json.Unmarshal(msg.GetContent(), log)
assert.Nil(t, err)
msg := newMessage(content, source, message.StatusError)
msg.State = message.StateRendered // we can only encode rendered message
msg.Origin.LogSource = source
msg.Origin.SetTags([]string{"a", "b:c"})
assert.Equal(t, msg.GetContent(), content) // before encoding, content should be the raw message

assert.NotEmpty(t, log.Hostname)
err := JSONEncoder.Encode(msg, "unknown")
assert.Nil(t, err)

assert.Equal(t, logsConfig.Service, log.Service)
assert.Equal(t, logsConfig.Source, log.Source)
assert.Equal(t, "a,b:c,sourcecategory:"+logsConfig.SourceCategory+",foo:bar,baz", log.Tags)
log := &payload{}

json, _ := json.Marshal(log)
assert.Equal(t, msg.GetContent(), json)
err = json.Unmarshal(msg.GetContent(), log)
assert.Nil(t, err)

assert.Equal(t, message.StatusError, log.Status)
assert.NotEmpty(t, log.Timestamp)
assert.Equal(t, "valid utf-8 message content", log.Message)
assert.NotEmpty(t, log.Hostname)

assert.Equal(t, logsConfig.Service, log.Service)
assert.Equal(t, logsConfig.Source, log.Source)
assert.Equal(t, "a,b:c,sourcecategory:"+logsConfig.SourceCategory+",foo:bar,baz", log.Tags)

json, _ := json.Marshal(log)
assert.Equal(t, msg.GetContent(), json)

assert.Equal(t, message.StatusError, log.Status)
assert.NotEmpty(t, log.Timestamp)
})

t.Run("invalid", func(t *testing.T) {
content := []byte("invalid utf-8 message content a\xf0\x8f\xbf\xbfz")

msg := newMessage(content, source, message.StatusError)
msg.State = message.StateRendered // we can only encode rendered message
msg.Origin.LogSource = source
msg.Origin.SetTags([]string{"a", "b:c"})
assert.Equal(t, msg.GetContent(), content) // before encoding, content should be the raw message

err := JSONEncoder.Encode(msg, "unknown")
assert.Nil(t, err)

log := &payload{}

err = json.Unmarshal(msg.GetContent(), log)
assert.Nil(t, err)

assert.Equal(t, "invalid utf-8 message content a����z", log.Message)
assert.NotEmpty(t, log.Hostname)

assert.Equal(t, logsConfig.Service, log.Service)
assert.Equal(t, logsConfig.Source, log.Source)
assert.Equal(t, "a,b:c,sourcecategory:"+logsConfig.SourceCategory+",foo:bar,baz", log.Tags)

json, _ := json.Marshal(log)
assert.Equal(t, msg.GetContent(), json)

assert.Equal(t, message.StatusError, log.Status)
assert.NotEmpty(t, log.Timestamp)
})
}

func TestEncoderToValidUTF8(t *testing.T) {
Expand All @@ -237,3 +284,45 @@ func TestEncoderToValidUTF8(t *testing.T) {
assert.Equal(t, "a����z", toValidUtf8([]byte("a\xf0\x8f\xbf\xbfz")))
assert.Equal(t, "世界����z 世界", toValidUtf8([]byte("世界\xf0\x8f\xbf\xbfz 世界")))
}

func BenchmarkJSONEncoder_Encode(b *testing.B) {
logsConfig := &config.LogsConfig{
Service: "Service",
Source: "Source",
SourceCategory: "SourceCategory",
Tags: []string{"foo:bar", "baz"},
}
source := sources.NewLogSource("", logsConfig)

b.Run("valid", func(b *testing.B) {
content := []byte(strings.Repeat("x", 100))
var msg *message.Message

b.ResetTimer()
b.ReportAllocs()
for range b.N {
msg = newMessage(content, source, message.StatusError)
msg.State = message.StateRendered // we can only encode rendered message
msg.Origin.LogSource = source
msg.Origin.SetTags([]string{"a", "b:c"})

assert.Nil(b, JSONEncoder.Encode(msg, "unknown"))
}
})

b.Run("invalid", func(b *testing.B) {
content := []byte(strings.Repeat("x", 100) + "\uFFFD")
var msg *message.Message

b.ResetTimer()
b.ReportAllocs()
for range b.N {
msg = newMessage(content, source, message.StatusError)
msg.State = message.StateRendered // we can only encode rendered message
msg.Origin.LogSource = source
msg.Origin.SetTags([]string{"a", "b:c"})

assert.Nil(b, JSONEncoder.Encode(msg, "unknown"))
}
})
}
16 changes: 8 additions & 8 deletions pkg/logs/processor/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ type jsonEncoder struct{}

// JSON representation of a message.
type jsonPayload struct {
Message string `json:"message"`
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
Hostname string `json:"hostname"`
Service string `json:"service"`
Source string `json:"ddsource"`
Tags string `json:"ddtags"`
Message ValidUtf8Bytes `json:"message"`
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
Hostname string `json:"hostname"`
Service string `json:"service"`
Source string `json:"ddsource"`
Tags string `json:"ddtags"`
}

// Encode encodes a message into a JSON byte array.
Expand All @@ -48,7 +48,7 @@ func (j *jsonEncoder) Encode(msg *message.Message, hostname string) error {
}

encoded, err := json.Marshal(jsonPayload{
Message: toValidUtf8(msg.GetContent()),
Message: ValidUtf8Bytes(msg.GetContent()),
Status: msg.GetStatus(),
Timestamp: ts.UnixNano() / nanoToMillis,
Hostname: hostname,
Expand Down
16 changes: 8 additions & 8 deletions pkg/logs/processor/json_serverless_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ type jsonServerlessInitEncoder struct {

// JSON representation of a message for serverless-init.
type jsonServerlessInitPayload struct {
Message string `json:"message"`
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
Hostname string `json:"hostname"`
Service string `json:"service,omitempty"`
Source string `json:"ddsource"`
Tags string `json:"ddtags"`
Message ValidUtf8Bytes `json:"message"`
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
Hostname string `json:"hostname"`
Service string `json:"service,omitempty"`
Source string `json:"ddsource"`
Tags string `json:"ddtags"`
}

// Encode encodes a message into a JSON byte array.
Expand All @@ -52,7 +52,7 @@ func (j *jsonServerlessInitEncoder) Encode(msg *message.Message, hostname string
}

encoded, err := json.Marshal(jsonServerlessInitPayload{
Message: toValidUtf8(msg.GetContent()),
Message: ValidUtf8Bytes(msg.GetContent()),
Status: msg.GetStatus(),
Timestamp: ts.UnixNano() / nanoToMillis,
Hostname: hostname,
Expand Down