Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,14 @@ func decodeMsg(dec *msgpack.Decoder, tag string) (Message, error) {
return out, fmt.Errorf("msgpack unmarshal event time: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to you but i dont understand the code above this at all. in the event of an error, it looks like we do a lot of stuff w/o any side effects and then return an error in all paths.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah that seems like a bug. It tries to decode the time, then has a couple fallback ways of decoding it but returns an error regardless of if the fallbacks succeed.


// Create a decoder that preserves number types (int64 vs float64)
// instead of converting all numbers to float64 like the default msgpack.Unmarshal
recordDecoder := msgpack.NewDecoder(bytes.NewReader(entry[1]))
// Configure decoder to preserve exact number types instead of converting to float64
recordDecoder.UseLooseInterfaceDecoding(false)

var record map[string]any
if err := msgpack.Unmarshal(entry[1], &record); err != nil {
if err := recordDecoder.Decode(&record); err != nil {
Comment on lines +523 to +530
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this only for Go output plugins?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm yeah upon further investigation this isn't the cause of the behavior I'm seeing when testing the json processing as http_sink doesn't use this. May as well fix this here tho.

return out, fmt.Errorf("msgpack unmarshal event record: %w", err)
}

Expand Down
76 changes: 76 additions & 0 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,79 @@ func TestToSnakeCase(t *testing.T) {
})
}
}

// TestNumberTypePreservation tests that the decodeMsg function preserves
// number types (int64 vs float64) instead of converting all numbers to float64.
func TestNumberTypePreservation(t *testing.T) {
now := time.Now()

// Create a message with various number types that should be preserved
originalRecord := map[string]any{
"integer_positive": int64(42),
"integer_negative": int64(-123),
"integer_zero": int64(0),
"integer_large": int64(9223372036854775807), // max int64
"float_simple": float64(3.14),
"float_negative": float64(-2.71),
"float_zero": float64(0.0),
"float_scientific": float64(1.23e-4),
"string_value": "test",
"boolean_value": true,
"mixed_in_array": []any{int64(1), float64(2.5), int64(3)},
"nested_map": map[string]any{
"nested_int": int64(456),
"nested_float": float64(7.89),
},
}

msg := Message{
Time: now,
Record: originalRecord,
}

// Marshal the message as it would be done by the input plugin
marshaledData, err := msgpack.Marshal([]any{
&EventTime{msg.Time},
msg.Record,
})
assert.NoError(t, err)

// Create a decoder and decode the message using our fixed decodeMsg function
decoder := msgpack.NewDecoder(bytes.NewReader(marshaledData))
decodedMsg, err := decodeMsg(decoder, "test-tag")
assert.NoError(t, err)

// Verify the decoded message structure
if decodedMsg.Record == nil {
t.Fatal("Record should not be nil")
}

record, ok := decodedMsg.Record.(map[string]any)
if !ok {
t.Fatal("Record should be a map[string]any")
}

// Compare each field from original record with decoded record
// assert.Equal checks both value and type, so this ensures type preservation
for k, v := range originalRecord {
decodedValue := record[k]
switch originalValue := v.(type) {
case []any:
decodedSlice := assertType[[]any](t, decodedValue)
assert.Equal(t, len(originalValue), len(decodedSlice), "Array length for field %q", k)
for i, originalItem := range originalValue {
assert.Equal(t, originalItem, decodedSlice[i], "Field %q[%d]", k, i)
}
case map[string]any:
decodedMap := assertType[map[string]any](t, decodedValue)
for nestedKey, nestedOriginal := range originalValue {
assert.Equal(t, nestedOriginal, decodedMap[nestedKey], "Field %q.%q", k, nestedKey)
}
default:
assert.Equal(t, v, decodedValue, "Field %q", k)
}
}

// Verify that tag is preserved
assert.Equal(t, "test-tag", decodedMsg.Tag())
}