From ca24c7bce3aab65afa77bca7abc0529b2c245eaf Mon Sep 17 00:00:00 2001 From: Fiery-Fenix Date: Wed, 26 Nov 2025 14:13:49 +0200 Subject: [PATCH] [extension/azure_encoding] Implement general Azure Resource Log parsing functionality --- .../feat_azureencoding-logs-general.yaml | 27 ++ .../encoding/azureencodingextension/README.md | 2 + .../azureencodingextension/extension.go | 7 +- .../azureencodingextension/factory.go | 9 +- .../encoding/azureencodingextension/go.mod | 18 +- .../encoding/azureencodingextension/go.sum | 6 + .../internal/unmarshaler/helpers.go | 97 +++++ .../internal/unmarshaler/helpers_test.go | 368 ++++++++++++++++++ .../internal/unmarshaler/logs/README.md | 44 +++ .../unmarshaler/logs/benchmark_test.go | 138 +++++++ .../internal/unmarshaler/logs/category.go | 235 +++++++++++ .../internal/unmarshaler/logs/config.go | 2 +- .../internal/unmarshaler/logs/helpers.go | 29 ++ .../internal/unmarshaler/logs/helpers_test.go | 30 ++ .../logs/testdata/general/maximum.json | 85 ++++ .../testdata/general/maximum_expected.yaml | 210 ++++++++++ .../logs/testdata/general/minimum-2.json | 16 + .../testdata/general/minimum-2_expected.yaml | 35 ++ .../logs/testdata/general/minimum.json | 10 + .../testdata/general/minimum_expected.yaml | 26 ++ .../internal/unmarshaler/logs/unmarshaler.go | 264 +++++++++++++ .../unmarshaler/logs/unmarshaler_test.go | 287 ++++++++++++++ 22 files changed, 1937 insertions(+), 8 deletions(-) create mode 100644 .chloggen/feat_azureencoding-logs-general.yaml create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/helpers_test.go create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/README.md create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/benchmark_test.go create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/category.go create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers.go create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers_test.go create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum.json create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum_expected.yaml create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2.json create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2_expected.yaml create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum.json create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum_expected.yaml create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler.go create mode 100644 extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler_test.go diff --git a/.chloggen/feat_azureencoding-logs-general.yaml b/.chloggen/feat_azureencoding-logs-general.yaml new file mode 100644 index 0000000000000..ad2de8e7f9344 --- /dev/null +++ b/.chloggen/feat_azureencoding-logs-general.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: extension/azure_encoding + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement general Azure Resource Log parsing functionality + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41725] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/encoding/azureencodingextension/README.md b/extension/encoding/azureencodingextension/README.md index 12727433f0b0c..f674f74f96bd8 100644 --- a/extension/encoding/azureencodingextension/README.md +++ b/extension/encoding/azureencodingextension/README.md @@ -142,6 +142,8 @@ Currently supported following Azure Resource Logs export formats: Currently only subset of available Azure Resource Logs Categories properly translated using OpenTelemetry SemConv. +[Transformation rules from Azure Resource Logs fields to OpenTelemetry](./internal/unmarshaler/logs/README.md). + Unsupported Categories simply copies attributes from "properties" field of incoming Azure Log Record to OpenTelemetry Log Attributes as a strings. ***time_formats (Optional)*** diff --git a/extension/encoding/azureencodingextension/extension.go b/extension/encoding/azureencodingextension/extension.go index 60df7abc3d1d9..9c752b776a222 100644 --- a/extension/encoding/azureencodingextension/extension.go +++ b/extension/encoding/azureencodingextension/extension.go @@ -22,15 +22,16 @@ var ( ) type azureExtension struct { - config *Config + config *Config + logUnmarshaler plog.Unmarshaler } func (*azureExtension) UnmarshalTraces(_ []byte) (ptrace.Traces, error) { return ptrace.Traces{}, errors.New("not implemented yet") } -func (*azureExtension) UnmarshalLogs(_ []byte) (plog.Logs, error) { - return plog.Logs{}, errors.New("not implemented yet") +func (ex *azureExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) { + return ex.logUnmarshaler.UnmarshalLogs(buf) } func (*azureExtension) UnmarshalMetrics(_ []byte) (pmetric.Metrics, error) { diff --git a/extension/encoding/azureencodingextension/factory.go b/extension/encoding/azureencodingextension/factory.go index 9bf6cc619651c..d9463d31b1035 100644 --- a/extension/encoding/azureencodingextension/factory.go +++ b/extension/encoding/azureencodingextension/factory.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler/logs" ) func NewFactory() extension.Factory { @@ -22,11 +23,17 @@ func NewFactory() extension.Factory { ) } -func createExtension(_ context.Context, _ extension.Settings, cfg component.Config) (extension.Extension, error) { +func createExtension(_ context.Context, settings extension.Settings, cfg component.Config) (extension.Extension, error) { config := cfg.(*Config) return &azureExtension{ config: config, + logUnmarshaler: logs.NewAzureResourceLogsUnmarshaler( + settings.BuildInfo, + settings.Logger, + config.Format, + config.Logs, + ), }, nil } diff --git a/extension/encoding/azureencodingextension/go.mod b/extension/encoding/azureencodingextension/go.mod index e9f4aafceeb1f..a6efd9abad477 100644 --- a/extension/encoding/azureencodingextension/go.mod +++ b/extension/encoding/azureencodingextension/go.mod @@ -3,7 +3,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/encod go 1.24.0 require ( + github.com/goccy/go-json v0.10.5 + github.com/json-iterator/go v1.1.12 github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.140.1 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.140.1 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.140.1 + github.com/relvacode/iso8601 v1.7.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.46.1-0.20251120204106-2e9c82787618 go.opentelemetry.io/collector/component/componenttest v0.140.1-0.20251120204106-2e9c82787618 @@ -12,10 +17,13 @@ require ( go.opentelemetry.io/collector/extension v1.46.1-0.20251120204106-2e9c82787618 go.opentelemetry.io/collector/extension/extensiontest v0.140.1-0.20251120204106-2e9c82787618 go.opentelemetry.io/collector/pdata v1.46.1-0.20251120204106-2e9c82787618 + go.opentelemetry.io/otel v1.38.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -23,7 +31,6 @@ require ( github.com/gobwas/glob v0.2.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/knadh/koanf/providers/confmap v1.0.0 // indirect github.com/knadh/koanf/v2 v2.3.0 // indirect @@ -31,20 +38,25 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.140.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/featuregate v1.46.1-0.20251120204106-2e9c82787618 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.140.1-0.20251120204106-2e9c82787618 // indirect - go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/sdk v1.38.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/sys v0.37.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../ + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../../pkg/golden diff --git a/extension/encoding/azureencodingextension/go.sum b/extension/encoding/azureencodingextension/go.sum index 1aea1bd9471d4..371fd9668d118 100644 --- a/extension/encoding/azureencodingextension/go.sum +++ b/extension/encoding/azureencodingextension/go.sum @@ -1,3 +1,5 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -11,6 +13,8 @@ github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9L github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -43,6 +47,8 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWu github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/relvacode/iso8601 v1.7.0 h1:BXy+V60stMP6cpswc+a93Mq3e65PfXCgDFfhvNNGrdo= +github.com/relvacode/iso8601 v1.7.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/helpers.go b/extension/encoding/azureencodingextension/internal/unmarshaler/helpers.go index 0dce60ca6bfc3..c03c64f07f38a 100644 --- a/extension/encoding/azureencodingextension/internal/unmarshaler/helpers.go +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/helpers.go @@ -3,9 +3,106 @@ package unmarshaler // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler" +import ( + "encoding/json" + "fmt" + "time" + + "github.com/relvacode/iso8601" + "go.opentelemetry.io/collector/pdata/pcommon" +) + type RecordsBatchFormat string +// Supported wrapper formats of Azure Logs Records batch const ( FormatEventHub RecordsBatchFormat = "eventhub" FormatBlobStorage RecordsBatchFormat = "blobstorage" ) + +// JSON Path expressions that matches specific wrapper format +const ( + // As exported to Azure Event Hub, e.g. `{"records": [ {...}, {...} ]}` + JSONPathEventHubLogRecords = "$.records[*]" + // As exported to Azure Blob Storage, e.g. `[ {...}, {...} ]` + JSONPathBlobStorageLogRecords = "$[*]" +) + +// Commonly used attributes non-SemConv attributes across all telemetry signals +const ( + AttributeAzureCategory = "azure.category" + AttributeAzureOperationName = "azure.operation.name" +) + +const originalSuffix = ".original" + +// AsTimestamp tries to parse a string with timestamp into OpenTelemetry +// using provided list of formats layouts. +// If not formats provided or parsing using them failed - will use an ISO8601 parser. +// If the string cannot be parsed, it will return zero and the error. +func AsTimestamp(s string, formats ...string) (pcommon.Timestamp, error) { + var err error + var t time.Time + + // Try parsing with provided formats first + for _, format := range formats { + if t, err = time.Parse(format, s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil + } + } + + // Fallback to ISO 8601 parsing if no format matches + if t, err = iso8601.ParseString(s); err == nil { + return pcommon.Timestamp(t.UnixNano()), nil + } + + return 0, err +} + +// AttrPutStrIf is a helper function to set a string attribute +// only if the value is not empty +func AttrPutStrIf(attrs pcommon.Map, attrKey, attrValue string) { + if attrValue != "" { + attrs.PutStr(attrKey, attrValue) + } +} + +// AttrPutStrPtrIf is a helper function to set a string attribute +// only if the value exists and is not empty +func AttrPutStrPtrIf(attrs pcommon.Map, attrKey string, attrValue *string) { + if attrValue != nil && *attrValue != "" { + attrs.PutStr(attrKey, *attrValue) + } +} + +// AttrPutIntNumberIf is a helper function to set an int64 attribute with defined key, +// trying to parse it from json.Number value +// If parsing failed - no attribute will be set +func AttrPutIntNumberIf(attrs pcommon.Map, attrKey string, attrValue json.Number) { + if i, err := attrValue.Int64(); err == nil { + attrs.PutInt(attrKey, i) + } +} + +// AttrPutIntNumberPtrIf is a same function as AttrPutIntNumberIf but +// accepts a pointer to json.Number instead of value +func AttrPutIntNumberPtrIf(attrs pcommon.Map, attrKey string, attrValue *json.Number) { + if attrValue != nil { + AttrPutIntNumberIf(attrs, attrKey, *attrValue) + } +} + +// attrPutMap is a helper function to set a map attribute with defined key, +// trying to parse it from raw value +// If parsing failed - no attribute will be set +func AttrPutMapIf(attrs pcommon.Map, attrKey string, attrValue any) { + if attrKey == "" || attrValue == nil { + return + } + + if err := attrs.PutEmpty(attrKey).FromRaw(attrValue); err != nil { + // Failed to parse - put string representation of the attrValue + attrs.Remove(attrKey) + attrs.PutStr(attrKey+originalSuffix, fmt.Sprintf("%v", attrValue)) + } +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/helpers_test.go b/extension/encoding/azureencodingextension/internal/unmarshaler/helpers_test.go new file mode 100644 index 0000000000000..0a0b0a11ecd20 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/helpers_test.go @@ -0,0 +1,368 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package unmarshaler + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestAsTimestamp(t *testing.T) { + type args struct { + s string + formats []string + } + tests := []struct { + name string + args args + want pcommon.Timestamp + wantErr bool + }{ + { + name: "ISO8601 format", + args: args{ + s: "2025-01-02T12:34:56.000000Z", + }, + want: pcommon.NewTimestampFromTime(time.Date(2025, time.January, 2, 12, 34, 56, 0, time.UTC)), + wantErr: false, + }, + { + name: "Custom format", + args: args{ + s: "02/03/2025 12:34:56", + formats: []string{"02/01/2006 15:04:05"}, + }, + want: pcommon.NewTimestampFromTime(time.Date(2025, time.March, 2, 12, 34, 56, 0, time.UTC)), + wantErr: false, + }, + { + name: "Multiple formats", + args: args{ + s: "2025-03-04T12:34:56Z", + formats: []string{"01/02/2006 15:04:05", "2006-01-02T15:04:05Z"}, + }, + want: pcommon.NewTimestampFromTime(time.Date(2025, time.March, 4, 12, 34, 56, 0, time.UTC)), + wantErr: false, + }, + { + name: "Fallback format", + args: args{ + s: "2025-04-05T12:34:56.000000Z", + formats: []string{"01/02/2006 15:04:05"}, + }, + want: pcommon.NewTimestampFromTime(time.Date(2025, time.April, 5, 12, 34, 56, 0, time.UTC)), + wantErr: false, + }, + { + name: "Invalid format", + args: args{ + s: "string", + }, + want: 0, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := AsTimestamp(tt.args.s, tt.args.formats...) + if tt.wantErr { + require.Error(t, err) + } + assert.Equal(t, tt.want, got) + }) + } +} + +func TestAttrPutStrIf(t *testing.T) { + type args struct { + key string + value string + } + tests := []struct { + name string + attrs pcommon.Map + args args + wantRaw map[string]any + }{ + { + name: "Empty string", + attrs: pcommon.NewMap(), + args: args{ + key: "attr1", + value: "", + }, + wantRaw: map[string]any{}, + }, + { + name: "Value string", + attrs: pcommon.NewMap(), + args: args{ + key: "attr2", + value: "value", + }, + wantRaw: map[string]any{"attr2": "value"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + AttrPutStrIf(tt.attrs, tt.args.key, tt.args.value) + want := pcommon.NewMap() + err := want.FromRaw(tt.wantRaw) + require.NoError(t, err) + assert.Equal(t, want.AsRaw(), tt.attrs.AsRaw()) + }) + } +} + +func TestAttrPutStrPtrIf(t *testing.T) { + emptyStr := "" + valueStr := "value" + + type args struct { + key string + value *string + } + tests := []struct { + name string + args args + wantRaw map[string]any + }{ + { + name: "No value", + args: args{ + key: "attr1", + value: nil, + }, + wantRaw: map[string]any{}, + }, + { + name: "Empty string", + args: args{ + key: "attr2", + value: &emptyStr, + }, + wantRaw: map[string]any{}, + }, + { + name: "Value string", + args: args{ + key: "attr3", + value: &valueStr, + }, + wantRaw: map[string]any{"attr3": "value"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := pcommon.NewMap() + AttrPutStrPtrIf(got, tt.args.key, tt.args.value) + want := pcommon.NewMap() + err := want.FromRaw(tt.wantRaw) + require.NoError(t, err) + assert.Equal(t, want.AsRaw(), got.AsRaw()) + }) + } +} + +func TestAttrPutIntNumberIf(t *testing.T) { + t.Parallel() + + type testNumStruct struct { + Value json.Number `json:"value"` + } + + tests := []struct { + name string + inputJSON string + wantRaw map[string]any + }{ + { + name: "string value", + inputJSON: `{"value": "3"}`, + wantRaw: map[string]any{ + "value": int64(3), + }, + }, + { + name: "int value", + inputJSON: `{"value": 10}`, + wantRaw: map[string]any{ + "value": int64(10), + }, + }, + { + name: "float value", + inputJSON: `{"value": 3.14}`, + wantRaw: map[string]any{}, + }, + { + name: "no value", + inputJSON: `{"novalue": 15}`, + wantRaw: map[string]any{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ts testNumStruct + err := json.Unmarshal([]byte(tt.inputJSON), &ts) + require.NoError(t, err) + + got := pcommon.NewMap() + AttrPutIntNumberIf(got, "value", ts.Value) + want := pcommon.NewMap() + err = want.FromRaw(tt.wantRaw) + require.NoError(t, err) + assert.Equal(t, want.AsRaw(), got.AsRaw()) + }) + } +} + +func TestAttrPutIntNumberPtrIf(t *testing.T) { + t.Parallel() + + type testNumStruct struct { + Value *json.Number `json:"value"` + } + + tests := []struct { + name string + inputJSON string + wantRaw map[string]any + }{ + { + name: "string value", + inputJSON: `{"value": "3"}`, + wantRaw: map[string]any{ + "value": int64(3), + }, + }, + { + name: "int value", + inputJSON: `{"value": 10}`, + wantRaw: map[string]any{ + "value": int64(10), + }, + }, + { + name: "float value", + inputJSON: `{"value": 3.14}`, + wantRaw: map[string]any{}, + }, + { + name: "nil value", + inputJSON: `{"novalue": 15}`, + wantRaw: map[string]any{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ts testNumStruct + err := json.Unmarshal([]byte(tt.inputJSON), &ts) + require.NoError(t, err) + + got := pcommon.NewMap() + AttrPutIntNumberPtrIf(got, "value", ts.Value) + want := pcommon.NewMap() + err = want.FromRaw(tt.wantRaw) + require.NoError(t, err) + assert.Equal(t, want.AsRaw(), got.AsRaw()) + }) + } +} + +func TestAttrPutMapIf(t *testing.T) { + t.Parallel() + + type testStruct struct { + name string + value string + } + + type args struct { + attrKey string + attrValue any + } + tests := []struct { + name string + args args + wantRaw map[string]any + }{ + { + name: "string value", + args: args{ + attrKey: "foo", + attrValue: "bar", + }, + wantRaw: map[string]any{ + "foo": "bar", + }, + }, + { + name: "int value", + args: args{ + attrKey: "num", + attrValue: 42, + }, + wantRaw: map[string]any{ + "num": 42, + }, + }, + { + name: "nil value", + args: args{ + attrKey: "nil", + attrValue: nil, + }, + wantRaw: map[string]any{}, + }, + { + name: "slice value", + args: args{ + attrKey: "slice", + attrValue: []any{1, "a"}, + }, + wantRaw: map[string]any{ + "slice": []any{1, "a"}, + }, + }, + { + name: "map value", + args: args{ + attrKey: "map", + attrValue: map[string]any{"x": 1}, + }, + wantRaw: map[string]any{ + "map": map[string]any{"x": 1}, + }, + }, + { + name: "struct value", + args: args{ + attrKey: "map", + attrValue: testStruct{name: "test", value: "value"}, + }, + wantRaw: map[string]any{ + "map.original": "{test value}", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := pcommon.NewMap() + AttrPutMapIf(got, tt.args.attrKey, tt.args.attrValue) + want := pcommon.NewMap() + err := want.FromRaw(tt.wantRaw) + require.NoError(t, err) + assert.Equal(t, want.AsRaw(), got.AsRaw()) + }) + } +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/README.md b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/README.md new file mode 100644 index 0000000000000..88fd2bb75b8c3 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/README.md @@ -0,0 +1,44 @@ +# Azure Resource Logs transformation rules + +## General transformation rules + +Transformation of Azure Resource Log records happened based on Category defined in incoming log record (`category` or `type` field) using mappings described in this document. Mapping are defined to be OpenTelemetry SemConv compatible as much as possible. + +### Unknown/Unsupported Azure Resource Log record Category + +For logs Category that conform [common Azure Resource Logs schema](https://learn.microsoft.com/en-us/azure/azure-monitor/platform/resource-logs-schema), +but doesn't have mapping for specific Category in this extension following rules will be applied: + +* Common known fields are parsed according to [common map below](#common-fields-available-in-all-categories) +* If `properties` field is parsable JSON - all parsed attributes are put as is into Log Attributes (except for `message` - goes to Body, `correlationId` and `duration` - goes to Log Attributes according to map below) +* If `properties` field is primitive value (string, number, bool, etc.) - it will be stored into `azure.properties` Log Attribute +* If non of above is possible - `properties` will be stored as-is to Log Body + +### Unparsable Azure Resource Log record + +In case of parsing or transformation failure - original Azure Resource Log record +will be saved as-is (original JSON string representation) into OpenTelemetry log.Body and error will be logged. + +This approach allows you to try to parse or transform Azure Resource Log record later +in OpenTelemetry Collector pipeline (for example, using `transformprocessor`) or in log Storage if applicable. + +## Common fields, available in all Categories + +| Azure | Open Telemetry | +|-----------------------|----------------| +| `time`, `timestamp` | `log.timestamp` | +| `resourceId` | `cloud.resource_id` (resource attribute) | +| `tenantId` | `azure.tenant.id` (resource attribute) | +| `location` | `cloud.region` (resource attribute) | +| `operationName` | `azure.operation.name` (log attribute) | +| `operationVersion` | `azure.operation.version` (log attribute) | +| `category`, `type` | `azure.category` (log attribute) | +| `resultType` | `azure.result.type` (log attribute) | +| `resultSignature` | `azure.result.signature` (log attribute) | +| `resultDescription` | `azure.result.description` (log attribute) | +| `durationMs` | `azure.duration` (log attribute) | +| `callerIpAddress` | `network.peer.address` (log attribute) | +| `correlationId` | `azure.correlation_id` (log attribute) | +| `identity` | `azure.identity` (log attribute) | +| `Level` | `log.SeverityNumber` | +| `properties` | see mapping for each Category below | diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/benchmark_test.go b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/benchmark_test.go new file mode 100644 index 0000000000000..e570c62ea25f0 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/benchmark_test.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + "testing" + + gojson "github.com/goccy/go-json" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler" +) + +func BenchmarkUnmarshalLogs(b *testing.B) { + tests := map[string]struct { + nRecords int + }{ + "1_record": { + nRecords: 1, + }, + "100_record": { + nRecords: 100, + }, + "1000_record": { + nRecords: 1_000, + }, + } + + u := &ResourceLogsUnmarshaler{ + buildInfo: component.BuildInfo{ + Version: "test", + }, + logger: zap.NewNop(), + } + + for name, test := range tests { + buf := newBuf(test.nRecords) + b.Run(name, func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for b.Loop() { + _, err := u.UnmarshalLogs(buf) + require.NoError(b, err) + } + }) + } +} + +func BenchmarkUnmarshalLogsByCategory(b *testing.B) { + testCategoriesDirs, err := os.ReadDir(testFilesDirectory) + require.NoError(b, err) + rootPath, err := gojson.CreatePath(unmarshaler.JSONPathEventHubLogRecords) + require.NoError(b, err) + u := &ResourceLogsUnmarshaler{ + buildInfo: component.BuildInfo{ + Version: "test", + }, + logger: zap.NewNop(), + } + + for _, dir := range testCategoriesDirs { + if !dir.IsDir() { + continue + } + + category := dir.Name() + + testFiles, err := filepath.Glob(filepath.Join(testFilesDirectory, category, "*.json")) + require.NoError(b, err) + + for _, n := range []int{1, 100, 1_000} { + b.Run(category+"_records_"+fmt.Sprintf("%d", n), func(b *testing.B) { + data := getTestRecords(b, rootPath, testFiles, n) + b.ResetTimer() + b.ReportAllocs() + for b.Loop() { + _, err := u.UnmarshalLogs(data) + require.NoError(b, err) + } + }) + } + } +} + +// newBuf creates an azure resource log with +// as many records as defined in nRecords +func newBuf(nRecords int) []byte { + data := []byte(`{ + "time": "2024-04-24T12:06:12.0000000Z", + "resourceId": "/test", + "category": "AppServiceAppLogs", + "operationName": "AppLog" + }`) + + buf := bytes.NewBuffer(make([]byte, 0, nRecords*(len(data)))) + buf.WriteString(`{"records": [`) + for i := range nRecords { + if i > 0 { + buf.WriteByte(',') + } + buf.Write(data) + } + buf.WriteString(`]}`) + + return buf.Bytes() +} + +// getTestRecords generates set of test records +// from available testdata files +// as many copies as defined in "n" +func getTestRecords(b *testing.B, rootPath *gojson.Path, files []string, n int) []byte { + buf := bytes.NewBuffer(nil) + buf.WriteString(`{"records": [`) + for _, file := range files { + data, err := os.ReadFile(file) + require.NoError(b, err) + + records, err := rootPath.Extract(data) + require.NoError(b, err) + for _, record := range records { + for range n { + buf.Write(record) + buf.WriteString(",") + } + } + } + buf.Truncate(buf.Len() - 1) + buf.WriteString(`]}`) + + return buf.Bytes() +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/category.go b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/category.go new file mode 100644 index 0000000000000..f47820357f528 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/category.go @@ -0,0 +1,235 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler/logs" + +import ( + "encoding/json" + "errors" + "fmt" + + gojson "github.com/goccy/go-json" + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + conventions "go.opentelemetry.io/otel/semconv/v1.34.0" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler" +) + +// Non-SemConv attributes that are used for common Azure Log Record fields +const ( + // OpenTelemetry attribute name for Azure Correlation ID, + // from `correlationId` field in Azure Log Record + attributeAzureCorrelationID = "azure.correlation_id" + + // OpenTelemetry attribute name for Azure Operation Version, + // from `operationVersion` field in Azure Log Record + attributeAzureOperationVersion = "azure.operation.version" + + // OpenTelemetry attribute name for Azure Duration, + // from `durationMs` field in Azure Log Record + attributeAzureDuration = "azure.duration" + + // OpenTelemetry attribute name for Azure Identity, + // from `identity` field in Azure Log Record + attributeAzureIdentity = "azure.identity" + + // OpenTelemetry attribute name for Azure Log Record properties, + // from `properties` field in Azure Log Record + // Used when we cannot map parse "properties" field or + // cannot map parsed "properties" to attributes directly + attributesAzureProperties = "azure.properties" + + // OpenTelemetry attribute name for Azure Result Type, + // from `resultType` field in Azure Log Record + attributeAzureResultType = "azure.result.type" + + // OpenTelemetry attribute name for Azure Result Signature, + // from `resultSignature` field in Azure Log Record + attributeAzureResultSignature = "azure.result.signature" + + // OpenTelemetry attribute name for Azure Result Description, + // from `resultDescription` field in Azure Log Record + attributesAzureResultDescription = "azure.result.description" +) + +var errNoTimestamp = errors.New("no valid time fields are set on Log record ('time' or 'timestamp')") + +// azureLogRecord is a common interface for all category-specific structures +type azureLogRecord interface { + GetResource() logsResourceAttributes + GetTimestamp(formats ...string) (pcommon.Timestamp, error) + GetLevel() (plog.SeverityNumber, string, bool) + PutCommonAttributes(attrs pcommon.Map, body pcommon.Value) + PutProperties(attrs pcommon.Map, body pcommon.Value) error +} + +// azureLogRecordBase represents a single Azure log following the common schema: +// https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema +// This schema are applicable to most Resource Logs and +// can be extended with additional fields for specific Log Categories +type azureLogRecordBase struct { + Time string `json:"time"` // most Categories use this field for timestamp + TimeStamp string `json:"timestamp"` // some Categories use this field for timestamp + ResourceID string `json:"resourceId"` + TenantID string `json:"tenantId"` + OperationName string `json:"operationName"` + OperationVersion *string `json:"operationVersion"` + ResultType *string `json:"resultType"` + ResultSignature *string `json:"resultSignature"` + ResultDescription *string `json:"resultDescription"` + DurationMs *json.Number `json:"durationMs"` // int + CallerIPAddress *string `json:"callerIpAddress"` + CorrelationID *string `json:"correlationId"` + Identity *any `json:"identity"` + Level *string `json:"level"` + Location string `json:"location"` +} + +// GetResource returns resource attributes for the parsed Log Record +// As for now it includes ResourceID, TenantID and Location +func (r *azureLogRecordBase) GetResource() logsResourceAttributes { + return logsResourceAttributes{ + ResourceID: r.ResourceID, + TenantID: r.TenantID, + Location: r.Location, + } +} + +// GetTimestamp tries to parse timestamp from either `time` or `timestamp` fields +// using provided list of time formats. +// If both fields are empty (undefined), or parsing failed - return an error +func (r *azureLogRecordBase) GetTimestamp(formats ...string) (pcommon.Timestamp, error) { + if r.Time == "" && r.TimeStamp == "" { + return pcommon.Timestamp(0), errNoTimestamp + } + + time := r.Time + if time == "" { + time = r.TimeStamp + } + + nanos, err := unmarshaler.AsTimestamp(time, formats...) + if err != nil { + return pcommon.Timestamp(0), fmt.Errorf("unable to convert value %q as timestamp: %w", time, err) + } + + return nanos, nil +} + +// GetLevel tries to convert the Log Level into OpenTelemetry SeverityNumber +// If level is not set - return SeverityNumberUnspecified and flag that level is not set +// If level is set, but invalid - return SeverityNumberUnspecified and flag that level is set +func (r *azureLogRecordBase) GetLevel() (plog.SeverityNumber, string, bool) { + if r.Level == nil { + return plog.SeverityNumberUnspecified, "", false + } + + severity := asSeverity(*r.Level) + // Saving original log.Level text, + // not the internal OpenTelemetry SeverityNumber -> SeverityText mapping + return severity, *r.Level, true +} + +// PutCommonAttributes puts already parsed common attributes into provided Attributes Map/Body +func (r *azureLogRecordBase) PutCommonAttributes(attrs pcommon.Map, _ pcommon.Value) { + // Common fields for all Azure Resource Log Categories should be + // placed as attributes, no matter if we can map the category or not + unmarshaler.AttrPutStrIf(attrs, unmarshaler.AttributeAzureOperationName, r.OperationName) + unmarshaler.AttrPutStrPtrIf(attrs, attributeAzureOperationVersion, r.OperationVersion) + unmarshaler.AttrPutStrPtrIf(attrs, attributeAzureResultType, r.ResultType) + unmarshaler.AttrPutStrPtrIf(attrs, attributeAzureResultSignature, r.ResultSignature) + unmarshaler.AttrPutStrPtrIf(attrs, attributesAzureResultDescription, r.ResultDescription) + unmarshaler.AttrPutStrPtrIf(attrs, string(conventions.NetworkPeerAddressKey), r.CallerIPAddress) + unmarshaler.AttrPutStrPtrIf(attrs, attributeAzureCorrelationID, r.CorrelationID) + unmarshaler.AttrPutIntNumberPtrIf(attrs, attributeAzureDuration, r.DurationMs) + if r.Identity != nil { + unmarshaler.AttrPutMapIf(attrs, attributeAzureIdentity, *r.Identity) + } +} + +// PutProperties puts already attributes from "properties" field into provided Attributes Map/Body +// MUST be implemented by each specific logCategory structure if "properties" field is expected there +func (*azureLogRecordBase) PutProperties(_ pcommon.Map, _ pcommon.Value) error { + // By default - no "properties", so nothing to do here + return nil +} + +// azureLogRecordBase represents a single Azure log following the common schema, +// but has unknown for us Category +// In this case we couldn't correctly map properties to attributes and simply copy them +// as-is to the attributes +type azureLogRecordGeneric struct { + azureLogRecordBase + + Properties json.RawMessage `json:"properties"` +} + +func (r *azureLogRecordGeneric) PutProperties(attrs pcommon.Map, body pcommon.Value) error { + var properties map[string]any + var errs []error + + if len(r.Properties) == 0 { + // Nothing to parse + return nil + } + + // Trying to parse "properties" as correct JSON object... + if err := gojson.Unmarshal(r.Properties, &properties); err != nil { + errs = append(errs, err) + // If failed - trying to parse using a primitive value + var val any + if err = gojson.Unmarshal(r.Properties, &val); err == nil { + // Parsed, put primitive value as "properties" attribute + if err = attrs.PutEmpty(attributesAzureProperties).FromRaw(val); err != nil { + errs = append(errs, err) + // All attempts above - failed, put unparsable properties to log.Body + body.SetStr(string(r.Properties)) + } + } + return fmt.Errorf("failed to parse Azure Logs 'properties' field as JSON: %w", errors.Join(errs...)) + } + + // Put everything into attributes + for k, v := range properties { + switch k { + case "Message", "message": + if err := body.FromRaw(v); err != nil { + body.SetStr(fmt.Sprintf("%v", v)) + } + case "correlationId": + value := attrs.PutEmpty(attributeAzureCorrelationID) + if err := value.FromRaw(v); err != nil { + value.SetStr(fmt.Sprintf("%v", v)) + } + case "duration": + value := attrs.PutEmpty(attributeAzureDuration) + if err := value.FromRaw(v); err != nil { + value.SetStr(fmt.Sprintf("%v", v)) + } + default: + // Keep all other fields as-is + value := attrs.PutEmpty(k) + if err := value.FromRaw(v); err != nil { + value.SetStr(fmt.Sprintf("%v", v)) + } + } + } + + return nil +} + +// processLogRecord tries to parse incoming record based of provided logCategory +func processLogRecord(_ string, record []byte) (azureLogRecord, error) { + parsed := new(azureLogRecordGeneric) + + // Unfortunately, "goccy/go-json" has a bug with case-insensitive key matching + // for nested structures, so we have to use jsoniter here + // see https://github.com/goccy/go-json/issues/470 + if err := jsoniter.ConfigFastest.Unmarshal(record, parsed); err != nil { + return nil, fmt.Errorf("JSON parse failed: %w", err) + } + + return parsed, nil +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/config.go b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/config.go index 4a771c8df1e71..9e6eefb97771c 100644 --- a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/config.go +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/config.go @@ -4,7 +4,7 @@ package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler/logs" type LogsConfig struct { - // TimeFormats is a list of time formats parsing layouts for Azure Traces Records + // TimeFormats is a list of time formats parsing layouts for Azure Resource Log Records TimeFormats []string `mapstructure:"time_formats"` IncludeCategories []string `mapstructure:"include_categories"` ExcludeCategories []string `mapstructure:"exclude_categories"` diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers.go b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers.go new file mode 100644 index 0000000000000..be7f81f53cc35 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler/logs" + +import ( + "go.opentelemetry.io/collector/pdata/plog" +) + +// asSeverity converts the Azure log level to equivalent +// OpenTelemetry severity numbers. If the log level is not +// valid, then the 'Unspecified' value is returned. +// According to the documentation, the level Must be one of: +// `Informational`, `Warning`, `Error` or `Critical`. +// see https://learn.microsoft.com/en-us/azure/azure-monitor/platform/resource-logs-schema +func asSeverity(input string) plog.SeverityNumber { + switch input { + case "Informational": + return plog.SeverityNumberInfo + case "Warning": + return plog.SeverityNumberWarn + case "Error": + return plog.SeverityNumberError + case "Critical": + return plog.SeverityNumberFatal + default: + return plog.SeverityNumberUnspecified + } +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers_test.go b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers_test.go new file mode 100644 index 0000000000000..76c62730db266 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/helpers_test.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/plog" +) + +func TestAsSeverity(t *testing.T) { + t.Parallel() + + tests := map[string]plog.SeverityNumber{ + "Informational": plog.SeverityNumberInfo, + "Warning": plog.SeverityNumberWarn, + "Error": plog.SeverityNumberError, + "Critical": plog.SeverityNumberFatal, + "unknown": plog.SeverityNumberUnspecified, + "9": plog.SeverityNumberUnspecified, + } + + for input, expected := range tests { + t.Run(input, func(t *testing.T) { + assert.Equal(t, expected, asSeverity(input)) + }) + } +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum.json b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum.json new file mode 100644 index 0000000000000..b105c6b168bc4 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum.json @@ -0,0 +1,85 @@ +{ + "records": [ + { + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID-1", + "tenantId": "/TENANT_ID", + "operationName": "SecretGet", + "operationVersion": "7.0", + "category": "AuditEvent", + "resultType": "Success", + "resultSignature": "Signature", + "resultDescription": "Description", + "durationMs": "1234", + "callerIpAddress": "127.0.0.1", + "correlationId": "607964b6-41a5-4e24-a5db-db7aab3b9b34", + "Level": "Warning", + "location": "ukso", + "identity": { + "claim": { + "oid": "607964b6-41a5-4e24-a5db-db7aab3b9b34" + } + }, + "properties": { + "string": "string", + "int": 429, + "float": 3.14, + "bool": false + } + }, + { + "time": "2022-11-11T04:48:29.6767145Z", + "resourceId": "/RESOURCE_ID-2", + "tenantId": "/TENANT_ID", + "operationName": "SecretSet", + "operationVersion": "7.0", + "category": "AuditEvent", + "resultType": "Success", + "resultSignature": "Signature", + "resultDescription": "Description", + "durationMs": "4321", + "callerIpAddress": "127.0.0.1", + "correlationId": "96317703-2132-4a8d-a5d7-e18d2f486783", + "Level": "Warning", + "location": "ukso", + "identity": { + "claim": { + "oid": "96317703-2132-4a8d-a5d7-e18d2f486783" + } + }, + "properties": { + "string": "string", + "int": 924, + "float": 41.3, + "bool": true + } + }, + { + "time": "2022-11-11T04:48:31.6767145Z", + "resourceId": "/RESOURCE_ID-2", + "tenantId": "/TENANT_ID", + "operationName": "SecretGet", + "operationVersion": "7.0", + "category": "AuditEvent", + "resultType": "Success", + "resultSignature": "Signature", + "resultDescription": "Description", + "durationMs": "321", + "callerIpAddress": "127.0.0.1", + "correlationId": "4ae807da-39d9-4327-b5b4-0ab685a57f9a", + "Level": "Warning", + "location": "ukso", + "identity": { + "claim": { + "oid": "4ae807da-39d9-4327-b5b4-0ab685a57f9a" + } + }, + "properties": { + "string": "string", + "int": 925, + "float": 41.4, + "bool": false + } + } + ] +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum_expected.yaml b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum_expected.yaml new file mode 100644 index 0000000000000..36039050186d7 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/maximum_expected.yaml @@ -0,0 +1,210 @@ +resourceLogs: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: azure + - key: cloud.resource_id + value: + stringValue: /RESOURCE_ID-1 + - key: cloudevents.event_source + value: + stringValue: azure.resource.log + - key: cloud.region + value: + stringValue: ukso + - key: azure.tenant.id + value: + stringValue: /TENANT_ID + scopeLogs: + - logRecords: + - attributes: + - key: azure.operation.name + value: + stringValue: SecretGet + - key: azure.operation.version + value: + stringValue: "7.0" + - key: azure.category + value: + stringValue: AuditEvent + - key: azure.result.type + value: + stringValue: Success + - key: azure.result.signature + value: + stringValue: Signature + - key: azure.result.description + value: + stringValue: Description + - key: network.peer.address + value: + stringValue: 127.0.0.1 + - key: azure.correlation_id + value: + stringValue: 607964b6-41a5-4e24-a5db-db7aab3b9b34 + - key: azure.duration + value: + intValue: "1234" + - key: azure.identity + value: + kvlistValue: + values: + - key: claim + value: + kvlistValue: + values: + - key: oid + value: + stringValue: 607964b6-41a5-4e24-a5db-db7aab3b9b34 + - key: string + value: + stringValue: string + - key: int + value: + doubleValue: 429 + - key: float + value: + doubleValue: 3.14 + - key: bool + value: + boolValue: false + body: {} + severityNumber: 13 + severityText: Warning + timeUnixNano: "1668142107676714500" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension + version: test-version + - resource: + attributes: + - key: cloud.provider + value: + stringValue: azure + - key: cloud.resource_id + value: + stringValue: /RESOURCE_ID-2 + - key: cloudevents.event_source + value: + stringValue: azure.resource.log + - key: cloud.region + value: + stringValue: ukso + - key: azure.tenant.id + value: + stringValue: /TENANT_ID + scopeLogs: + - logRecords: + - attributes: + - key: azure.operation.name + value: + stringValue: SecretSet + - key: azure.operation.version + value: + stringValue: "7.0" + - key: azure.category + value: + stringValue: AuditEvent + - key: azure.result.type + value: + stringValue: Success + - key: azure.result.signature + value: + stringValue: Signature + - key: azure.result.description + value: + stringValue: Description + - key: network.peer.address + value: + stringValue: 127.0.0.1 + - key: azure.correlation_id + value: + stringValue: 96317703-2132-4a8d-a5d7-e18d2f486783 + - key: azure.duration + value: + intValue: "4321" + - key: azure.identity + value: + kvlistValue: + values: + - key: claim + value: + kvlistValue: + values: + - key: oid + value: + stringValue: 96317703-2132-4a8d-a5d7-e18d2f486783 + - key: string + value: + stringValue: string + - key: int + value: + doubleValue: 924 + - key: float + value: + doubleValue: 41.3 + - key: bool + value: + boolValue: true + body: {} + severityNumber: 13 + severityText: Warning + timeUnixNano: "1668142109676714500" + - attributes: + - key: azure.operation.name + value: + stringValue: SecretGet + - key: azure.operation.version + value: + stringValue: "7.0" + - key: azure.category + value: + stringValue: AuditEvent + - key: azure.result.type + value: + stringValue: Success + - key: azure.result.signature + value: + stringValue: Signature + - key: azure.result.description + value: + stringValue: Description + - key: network.peer.address + value: + stringValue: 127.0.0.1 + - key: azure.correlation_id + value: + stringValue: 4ae807da-39d9-4327-b5b4-0ab685a57f9a + - key: azure.duration + value: + intValue: "321" + - key: azure.identity + value: + kvlistValue: + values: + - key: claim + value: + kvlistValue: + values: + - key: oid + value: + stringValue: 4ae807da-39d9-4327-b5b4-0ab685a57f9a + - key: string + value: + stringValue: string + - key: int + value: + doubleValue: 925 + - key: float + value: + doubleValue: 41.4 + - key: bool + value: + boolValue: false + body: {} + severityNumber: 13 + severityText: Warning + timeUnixNano: "1668142111676714500" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension + version: test-version diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2.json b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2.json new file mode 100644 index 0000000000000..6eac63fa0389d --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2.json @@ -0,0 +1,16 @@ +{ + "records": [ + { + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + }, + { + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + } + ] +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2_expected.yaml b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2_expected.yaml new file mode 100644 index 0000000000000..b7cc13d75c205 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum-2_expected.yaml @@ -0,0 +1,35 @@ +resourceLogs: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: azure + - key: cloud.resource_id + value: + stringValue: /RESOURCE_ID + - key: cloudevents.event_source + value: + stringValue: azure.resource.log + scopeLogs: + - logRecords: + - attributes: + - key: azure.operation.name + value: + stringValue: SecretGet + - key: azure.category + value: + stringValue: AuditEvent + body: {} + timeUnixNano: "1668142107676714500" + - attributes: + - key: azure.operation.name + value: + stringValue: SecretGet + - key: azure.category + value: + stringValue: AuditEvent + body: {} + timeUnixNano: "1668142107676714500" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension + version: test-version diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum.json b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum.json new file mode 100644 index 0000000000000..16d4f2e71177d --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum.json @@ -0,0 +1,10 @@ +{ + "records": [ + { + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + } + ] +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum_expected.yaml b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum_expected.yaml new file mode 100644 index 0000000000000..312f60b0a9a96 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/testdata/general/minimum_expected.yaml @@ -0,0 +1,26 @@ +resourceLogs: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: azure + - key: cloud.resource_id + value: + stringValue: /RESOURCE_ID + - key: cloudevents.event_source + value: + stringValue: azure.resource.log + scopeLogs: + - logRecords: + - attributes: + - key: azure.operation.name + value: + stringValue: SecretGet + - key: azure.category + value: + stringValue: AuditEvent + body: {} + timeUnixNano: "1668142107676714500" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension + version: test-version diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler.go b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler.go new file mode 100644 index 0000000000000..8305d4508c061 --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler.go @@ -0,0 +1,264 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler/logs" + +import ( + "fmt" + "time" + + gojson "github.com/goccy/go-json" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + conventions "go.opentelemetry.io/otel/semconv/v1.34.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler" +) + +// Commonly used non-SemConv attributes +const ( + // Predefined value for `cloudevents.event_source` resource attribute + attributeCloudEventSourceValue = "azure.resource.log" + // OpenTelemetry resource attribute name for Azure Tenant ID + attributeAzureTenantID = "azure.tenant.id" +) + +// logsResourceAttributes is a helper struct to hold resource attributes for specific log records +// Each Category Parser decides which Resource Attributes to populate +type logsResourceAttributes struct { + ResourceID string + TenantID string + SubscriptionID string + Location string + SeviceNamespace string + ServiceName string + ServiceInstanceID string + Environment string +} + +// categoryHolder is a small helper struct to get `category`/`type` fields +// from each Log Record +type categoryHolder struct { + Category string `json:"category"` + Type string `json:"type"` // Used in AppInsights logs instead of `category` +} + +type ResourceLogsUnmarshaler struct { + buildInfo component.BuildInfo + logger *zap.Logger + batchFormat unmarshaler.RecordsBatchFormat + timeFormat []string + includeCategories map[string]bool + excludeCategories map[string]bool +} + +func (r ResourceLogsUnmarshaler) UnmarshalLogs(buf []byte) (plog.Logs, error) { + jsonPath := unmarshaler.JSONPathEventHubLogRecords + if r.batchFormat == unmarshaler.FormatBlobStorage { + jsonPath = unmarshaler.JSONPathBlobStorageLogRecords + } + + // This will allow us to parse Azure Log Records in both formats: + // 1) As exported to Azure Event Hub, e.g. `{"records": [ {...}, {...} ]}` + // 2) As exported to Azure Blob Storage, e.g. `[ {...}, {...} ]` + rootPath, err := gojson.CreatePath(jsonPath) + if err != nil { + // This should never happen, but still... + return plog.NewLogs(), fmt.Errorf("failed to create JSON Path %q: %w", jsonPath, err) + } + + records, err := rootPath.Extract(buf) + if err != nil { + // This should never happen, but still... + return plog.NewLogs(), fmt.Errorf("failed to extract Azure Log Records: %w", err) + } + + hasIncludes := len(r.includeCategories) > 0 + allResourceScopeLogs := map[logsResourceAttributes]plog.ScopeLogs{} + + for _, record := range records { + // Despite of the fact that official Azure documentation states that exists common Logs schema + // (see https://learn.microsoft.com/en-us/azure/azure-monitor/platform/resource-logs-schema), + // in reality - it's not true, some Resources exposing Logs in totally different formats. + // For example, Azure Service Bus logs doesn't conform schema above + // (see examples here - https://github.com/noakup/AzMonLogsAgent/blob/main/NGSchema/AzureServiceBus/SampleInputRecords/ServiceBusOperationLogSample.json) + // The only 2 fields that SHOULD be present in most Log schemas - `category` and `resourceId` + // So, proper way to correctly decode incoming Log Record - first get value from `category` field + // and Unmarshal record into category-specific struct. + // That's actually double-unmarshaling, but there is no other way to parse variety of Azure Logs schemas + + var ch categoryHolder + if err := gojson.Unmarshal(record, &ch); err != nil { + r.logger.Error("JSON unmarshal failed for Azure Log Record", zap.Error(err)) + continue + } + logCategory := ch.Category + if logCategory == "" { + logCategory = ch.Type + } + + if logCategory == "" { + // We couldn't do any SemConv conversion as it's an unknown Log Schema for us, + // because it doesn't have a "category" field which we rely on + // So we will save incoming Log Record as a JSON string into Body just + // not to loose data + r.logger.Warn( + "No Category field are set on Log Record, couldn't parse SemConv way, will save it as-is", + ) + r.storeRawLog(allResourceScopeLogs, record) + continue + } + + // Filter out categories based on provided configuration + if _, exclude := r.excludeCategories[logCategory]; exclude { + continue + } + if hasIncludes { + if _, include := r.includeCategories[logCategory]; !include { + continue + } + } + + // Let's parse it + log, err := processLogRecord(logCategory, record) + if err != nil { + r.storeRawLog(allResourceScopeLogs, record) + r.logger.Warn( + "Unable to parse Log Record", + zap.String("category", logCategory), + zap.Error(err), + ) + continue + } + + // Get timestamp for any of the possible fields + nanos, err := log.GetTimestamp(r.timeFormat...) + if err != nil { + r.storeRawLog(allResourceScopeLogs, record) + r.logger.Warn( + "Unable to convert timestamp from log", + zap.String("category", logCategory), + zap.Error(err), + ) + continue + } + + rs := log.GetResource() + if rs.ResourceID == "" { + r.logger.Warn( + "No ResourceID set on Log record", + zap.String("category", logCategory), + ) + } + scopeLogs := r.getScopeLog(allResourceScopeLogs, rs) + + lr := scopeLogs.LogRecords().AppendEmpty() + lr.SetTimestamp(nanos) + + severity, severityName, isSet := log.GetLevel() + // Do not set Log Severity if it's not provided in the Log Record + // to avoid confusion with actual SeverityNumberUnspecified value + if isSet { + lr.SetSeverityNumber(severity) + lr.SetSeverityText(severityName) + } + + attrs := lr.Attributes() + // Put Log Category anyway + unmarshaler.AttrPutStrIf(attrs, unmarshaler.AttributeAzureCategory, logCategory) + // Parse Common Attributes + Properties (if applicable) + body := lr.Body() + log.PutCommonAttributes(attrs, body) + if err := log.PutProperties(attrs, body); err != nil { + r.logger.Warn( + "Unable to parse Azure Log Properties into OpenTelemetry Attributes", + zap.String("category", logCategory), + zap.String("resourceId", rs.ResourceID), + zap.Error(err), + ) + } + } + + l := plog.NewLogs() + for resourceID, scopeLogs := range allResourceScopeLogs { + rl := l.ResourceLogs().AppendEmpty() + ra := rl.Resource().Attributes() + ra.EnsureCapacity(10) + // Set SemConv attributes + unmarshaler.AttrPutStrIf(ra, string(conventions.CloudProviderKey), conventions.CloudProviderAzure.Value.AsString()) + unmarshaler.AttrPutStrIf(ra, string(conventions.CloudEventsEventSourceKey), attributeCloudEventSourceValue) + // Resource attributes parsed by Category Parser + unmarshaler.AttrPutStrIf(ra, string(conventions.CloudResourceIDKey), resourceID.ResourceID) + unmarshaler.AttrPutStrIf(ra, string(conventions.CloudRegionKey), resourceID.Location) + unmarshaler.AttrPutStrIf(ra, string(conventions.ServiceNamespaceKey), resourceID.SeviceNamespace) + unmarshaler.AttrPutStrIf(ra, string(conventions.ServiceNameKey), resourceID.ServiceName) + unmarshaler.AttrPutStrIf(ra, string(conventions.ServiceInstanceIDKey), resourceID.ServiceInstanceID) + unmarshaler.AttrPutStrIf(ra, string(conventions.DeploymentEnvironmentNameKey), resourceID.Environment) + unmarshaler.AttrPutStrIf(ra, attributeAzureTenantID, resourceID.TenantID) + // In Azure - Subscription is the closes analog to the Account, + // so we'll transform SubscriptionID into `cloud.account.id` + unmarshaler.AttrPutStrIf(ra, string(conventions.CloudAccountIDKey), resourceID.SubscriptionID) + scopeLogs.MoveTo(rl.ScopeLogs().AppendEmpty()) + } + + return l, nil +} + +// getScopeLog gets current ScopeLog based on provided set of ResourceAttributes +// If ScopeLog doesn't exists - create a new one and append to the list +func (r ResourceLogsUnmarshaler) getScopeLog(allResourceScopeLogs map[logsResourceAttributes]plog.ScopeLogs, rs logsResourceAttributes) plog.ScopeLogs { + scopeLogs, found := allResourceScopeLogs[rs] + if !found { + scopeLogs = plog.NewScopeLogs() + scopeLogs.Scope().SetName(metadata.ScopeName) + scopeLogs.Scope().SetVersion(r.buildInfo.Version) + allResourceScopeLogs[rs] = scopeLogs + } + + return scopeLogs +} + +// storeRawLog stores incoming Azure Resource Log Record as a string into log.Body +// It's used we couldn't do any SemConv conversion, for example: +// * In case when there is no "category" field +// * In case when JSON unmarshaling failed +// Stored record than can be used for debugging and fixing purposes +func (r ResourceLogsUnmarshaler) storeRawLog(allResourceScopeLogs map[logsResourceAttributes]plog.ScopeLogs, record []byte) { + // We couldn't do any SemConv conversion as it's an unknown Log Schema for us, + // because it doesn't have a "category" field which we rely on + // So we will save incoming Log Record as a JSON string into Body just + // not to loose data + scopeLogs := r.getScopeLog(allResourceScopeLogs, logsResourceAttributes{}) + lr := scopeLogs.LogRecords().AppendEmpty() + // We couldn't get timestamp from incoming Record, so to keep the Log + // we will set timestamp to current time + lr.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + // Set unspecified log level + lr.SetSeverityNumber(plog.SeverityNumberUnspecified) + lr.SetSeverityText(plog.SeverityNumberUnspecified.String()) + // Put record to Body as-is + lr.Body().SetStr(string(record)) +} + +func NewAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, format unmarshaler.RecordsBatchFormat, cfg LogsConfig) ResourceLogsUnmarshaler { + includeCategories := make(map[string]bool, len(cfg.IncludeCategories)) + for _, icat := range cfg.IncludeCategories { + includeCategories[icat] = true + } + excludeCategories := make(map[string]bool, len(cfg.ExcludeCategories)) + for _, ecat := range cfg.ExcludeCategories { + excludeCategories[ecat] = true + } + + return ResourceLogsUnmarshaler{ + buildInfo: buildInfo, + logger: logger, + batchFormat: format, + timeFormat: cfg.TimeFormats, + includeCategories: includeCategories, + excludeCategories: excludeCategories, + } +} diff --git a/extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler_test.go b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler_test.go new file mode 100644 index 0000000000000..3e9eff6a32fdd --- /dev/null +++ b/extension/encoding/azureencodingextension/internal/unmarshaler/logs/unmarshaler_test.go @@ -0,0 +1,287 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/azureencodingextension/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" +) + +const testFilesDirectory = "testdata" + +var testBuildInfo = component.BuildInfo{ + Version: "test-version", +} + +func TestResourceLogsUnmarshaler_UnmarshalLogs_Golden(t *testing.T) { + t.Parallel() + + testCategoriesDirs, err := os.ReadDir(testFilesDirectory) + require.NoError(t, err) + + for _, dir := range testCategoriesDirs { + if !dir.IsDir() { + continue + } + + category := dir.Name() + t.Run(category, func(t *testing.T) { + t.Parallel() + + testFiles, err := filepath.Glob(filepath.Join(testFilesDirectory, category, "*.json")) + require.NoError(t, err) + + for _, testFile := range testFiles { + testName := strings.TrimSuffix(filepath.Base(testFile), ".json") + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + unmarshaler := NewAzureResourceLogsUnmarshaler( + testBuildInfo, + zap.New(observedZapCore), + unmarshaler.FormatEventHub, + LogsConfig{ + TimeFormats: []string{ + "01/02/2006 15:04:05", + "2006-01-02T15:04:05Z", + "1/2/2006 3:04:05.000 PM -07:00", + "1/2/2006 3:04:05 PM -07:00", + }, + }, + ) + + data, err := os.ReadFile(testFile) + require.NoError(t, err) + + logs, err := unmarshaler.UnmarshalLogs(data) + require.NoError(t, err) + + require.Equal(t, 0, observedLogs.Len(), "Unexpected error/warn logs: %+v", observedLogs.All()) + + expectedLogs, err := golden.ReadLogs(filepath.Join(testFilesDirectory, category, fmt.Sprintf("%s_expected.yaml", testName))) + require.NoError(t, err) + + compareOptions := []plogtest.CompareLogsOption{ + plogtest.IgnoreResourceLogsOrder(), + plogtest.IgnoreLogRecordsOrder(), + } + require.NoError(t, plogtest.CompareLogs(expectedLogs, logs, compareOptions...)) + }) + } + }) + } +} + +func TestResourceLogsUnmarshaler_UnmarshalLogs(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + cfg LogsConfig + data []byte + expectedError string + expectedLogLine string + expectedLogRecords int + }{ + { + name: "Empty Records", + cfg: LogsConfig{}, + data: []byte(`{"records": []}`), + expectedLogRecords: 0, + }, + { + name: "Incorrect Root Element", + cfg: LogsConfig{}, + data: []byte(`{"other-root": []}`), + expectedLogRecords: 0, + }, + { + name: "Invalid JSON", + cfg: LogsConfig{}, + data: []byte(`{invalid-json}`), + expectedLogRecords: 0, + expectedError: "failed to extract Azure Log Records", + }, + { + name: "Empty Category", + cfg: LogsConfig{}, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "", + "type": "" + }] + }`), + expectedLogRecords: 1, + }, + { + name: "Category from Type field", + cfg: LogsConfig{}, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "type": "AuditEvent" + }] + }`), + expectedLogRecords: 1, + }, + { + name: "Empty ResourceID", + cfg: LogsConfig{}, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "", + "operationName": "SecretGet", + "category": "AuditEvent" + }] + }`), + expectedLogRecords: 1, + expectedLogLine: "No ResourceID set on Log record", + }, + { + name: "No Timestamp", + cfg: LogsConfig{}, + data: []byte(`{ + "records": [{ + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + }] + }`), + expectedLogRecords: 1, + expectedLogLine: "Unable to convert timestamp from log", + }, + { + name: "Invalid Timestamp", + cfg: LogsConfig{}, + data: []byte(`{ + "records": [{ + "time": "invalid-timestamp", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + }] + }`), + expectedLogRecords: 1, + expectedLogLine: "Unable to convert timestamp from log", + }, + { + name: "Exclude Category Filter", + cfg: LogsConfig{ + ExcludeCategories: []string{"AuditEvent"}, + }, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + }] + }`), + expectedLogRecords: 0, + }, + { + name: "Exclude Category Filter No Match", + cfg: LogsConfig{ + ExcludeCategories: []string{"ApplicationGatewayAccess"}, + }, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + }] + }`), + expectedLogRecords: 1, + }, + { + name: "Include Category Filter", + cfg: LogsConfig{ + IncludeCategories: []string{"ApplicationGatewayAccess"}, + }, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "ApplicationGatewayAccess" + }] + }`), + expectedLogRecords: 1, + }, + { + name: "Include Category Filter No Match", + cfg: LogsConfig{ + IncludeCategories: []string{"ApplicationGatewayAccess"}, + }, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "AuditEvent" + }] + }`), + expectedLogRecords: 0, + }, + { + name: "Exclude Category Filter Priority Over Include", + cfg: LogsConfig{ + ExcludeCategories: []string{"ApplicationGatewayAccess"}, + IncludeCategories: []string{"ApplicationGatewayAccess"}, + }, + data: []byte(`{ + "records": [{ + "time": "2022-11-11T04:48:27.6767145Z", + "resourceId": "/RESOURCE_ID", + "operationName": "SecretGet", + "category": "ApplicationGatewayAccess" + }] + }`), + expectedLogRecords: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + unmarshaler := NewAzureResourceLogsUnmarshaler( + testBuildInfo, + zap.New(observedZapCore), + unmarshaler.FormatEventHub, + tt.cfg, + ) + logs, err := unmarshaler.UnmarshalLogs(tt.data) + if tt.expectedError != "" { + require.ErrorContains(t, err, tt.expectedError) + } else { + require.NoError(t, err) + } + if tt.expectedLogLine != "" { + require.Equal(t, 1, observedLogs.FilterMessage(tt.expectedLogLine).Len()) + } + require.Equal(t, tt.expectedLogRecords, logs.LogRecordCount()) + }) + } +}