Skip to content

Commit 1cc6c48

Browse files
joecomputeaxw
andauthored
[encoding/googlecloudlogentry] Add encoding format to gcp encoding ext (open-telemetry#43759)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Add generic `encoding.format` to GCP encoding extension per the discussion at open-telemetry#43320 | **GCP Log Type** | **`cloud.resource_id`** | **new `encoding.format` value** | | ---|---|---| | Activity Logs | one of `cloudaudit.googleapis.com%2Factivity`, `cloudaudit.googleapis.com%2Fdata_access`, `cloudaudit.googleapis.com%2Fpolicy`, or `cloudaudit.googleapis.com%2Fsystem_event` | `gcp.auditlog` | | VPC Flow Logs | one of `networkmanagement.googleapis.com%2Fvpc_flows` or `compute.googleapis.com%2Fvpc_flows` | `gcp.vpcflow` | <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#43320 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit tests added and updated <!--Describe the documentation added.--> #### Documentation README updated --------- Co-authored-by: Andrew Wilkins <[email protected]>
1 parent 3a380ab commit 1cc6c48

17 files changed

+330
-24
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: extension/googlecloudlogentry_encoding
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add encoding.format attribute to GCP encoding extension to identify the source format.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43320]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

extension/encoding/googlecloudlogentryencodingextension/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,41 @@ The severity is mapped from [Google Cloud Log Severity](https://cloud.google.com
114114

115115
Currently, these are the log types that are specifically parsed into log record attributes.
116116

117+
## Log Format Identification
118+
119+
A subset of logs processed by this extension are automatically tagged with an `encoding.format` attribute at the scope level to identify the source format. This allows you to easily filter and route logs based on their Google Cloud service origin.
120+
121+
The pattern used is `gcp.<format_name>`.
122+
123+
Examples:
124+
- Audit Logs: `encoding.format: "gcp.auditlog"`
125+
- VPC Flow Logs: `encoding.format: "gcp.vpcflow"`
126+
127+
### How encoding.format is determined
128+
129+
The `encoding.format` attribute is automatically determined based on the log type extracted from the `logName` field. The extension uses the following logic:
130+
131+
1. **Parse the logName**: The extension extracts the log type from the `logName` field.
132+
133+
For example, `projects/my-project/logs/cloudaudit.googleapis.com%2Fsystem_event` is identified as a system event log via the log type suffix `cloudaudit.googleapis.com%2Fsystem_event`.
134+
135+
2. **Map log type to format**: The extension maps specific log types to their corresponding encoding formats (`encoding.format`):
136+
- Audit logs (activity, data access, system event, policy): `gcp.auditlog`
137+
- VPC flow logs (network management-sourced and compute-sourced VPC flow logs): `gcp.vpcflow`
138+
139+
3. **Set the attribute**: For recognized log types, the `encoding.format` attribute is set as an attribute of the `scope` field in the OTEL output log, allowing for flexible filtering and routing.
140+
141+
For unrecognized log types, no `encoding.format` attribute is set.
142+
143+
## Format Values
144+
145+
The following format values are supported in the `googlecloudlogentryencodingextension` to identify different Google Cloud log types:
146+
147+
| **GCP Log Type** | **Format Value** | **Description** |
148+
|------------------|------------------|-----------------|
149+
| Audit Logs | `auditlog` | Google Cloud audit logs (activity, data access, system event, policy) |
150+
| VPC Flow Logs | `vpcflow` | Virtual Private Cloud flow log records |
151+
117152
### Cloud Audit Logs
118153

119154
See the struct of the Cloud Audit Log payload in [AuditLog](https://cloud.google.com/logging/docs/reference/audit/auditlog/rest/Shared.Types/AuditLog). The fields are mapped this way in the extension:

extension/encoding/googlecloudlogentryencodingextension/extension.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ func (ex *ext) handleLogLine(logs plog.Logs, logLine []byte) error {
6161

6262
rl := logs.ResourceLogs().AppendEmpty()
6363
r := rl.Resource()
64-
logRecord := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
65-
if err := handleLogEntryFields(r.Attributes(), logRecord, log, ex.config); err != nil {
64+
scopeLogs := rl.ScopeLogs().AppendEmpty()
65+
66+
if err := handleLogEntryFields(r.Attributes(), scopeLogs, log, ex.config); err != nil {
6667
return fmt.Errorf("failed to handle log entry: %w", err)
6768
}
6869

extension/encoding/googlecloudlogentryencodingextension/extension_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package googlecloudlogentryencodingextension
55

66
import (
77
"bytes"
8+
"fmt"
89
"os"
910
"testing"
1011

@@ -13,6 +14,9 @@ import (
1314
"go.opentelemetry.io/collector/component/componenttest"
1415
"go.opentelemetry.io/collector/pdata/plog"
1516

17+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/auditlog"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/constants"
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/vpcflowlog"
1620
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
1721
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
1822
)
@@ -212,3 +216,101 @@ func TestPayloads(t *testing.T) {
212216
})
213217
}
214218
}
219+
220+
func TestEncodingFormatScopeAttributeUnknownLogs(t *testing.T) {
221+
// note that testing for known log types is already done in TestPayloads,
222+
// so we don't have tests here for known log types.
223+
tests := []struct {
224+
name string
225+
logName string
226+
expectedFormat string
227+
}{
228+
{
229+
name: "unknown log type",
230+
logName: "projects/test-project/logs/unknown-log-type",
231+
expectedFormat: "",
232+
},
233+
{
234+
name: "generic log type",
235+
logName: "projects/test-project/logs/generic-log",
236+
expectedFormat: "",
237+
},
238+
}
239+
240+
extension := newTestExtension(t, Config{})
241+
242+
for _, tt := range tests {
243+
t.Run(tt.name, func(t *testing.T) {
244+
t.Parallel()
245+
246+
// Create a minimal log entry that won't trigger payload processing errors
247+
logLine := fmt.Appendf(nil, `{"logName": "%s", "timestamp": "2024-05-05T10:31:19.45570687Z", "textPayload": "test message"}`, tt.logName)
248+
249+
logs, err := extension.UnmarshalLogs(logLine)
250+
require.NoError(t, err)
251+
252+
require.Equal(t, 1, logs.ResourceLogs().Len())
253+
resourceLogs := logs.ResourceLogs().At(0)
254+
require.Equal(t, 1, resourceLogs.ScopeLogs().Len())
255+
scopeLogs := resourceLogs.ScopeLogs().At(0)
256+
257+
scopeAttrs := scopeLogs.Scope().Attributes()
258+
259+
_, exists := scopeAttrs.Get(constants.FormatIdentificationTag)
260+
require.False(t, exists, "encoding.format attribute should not exist for unknown log types")
261+
})
262+
}
263+
}
264+
265+
func TestGetEncodingFormatFunction(t *testing.T) {
266+
tests := []struct {
267+
name string
268+
logType string
269+
expectedFormat string
270+
}{
271+
{
272+
name: "audit log activity",
273+
logType: auditlog.ActivityLogNameSuffix,
274+
expectedFormat: constants.GCPFormatAuditLog,
275+
},
276+
{
277+
name: "audit log data access",
278+
logType: auditlog.DataAccessLogNameSuffix,
279+
expectedFormat: constants.GCPFormatAuditLog,
280+
},
281+
{
282+
name: "audit log system event",
283+
logType: auditlog.SystemEventLogNameSuffix,
284+
expectedFormat: constants.GCPFormatAuditLog,
285+
},
286+
{
287+
name: "audit log policy",
288+
logType: auditlog.PolicyLogNameSuffix,
289+
expectedFormat: constants.GCPFormatAuditLog,
290+
},
291+
{
292+
name: "vpc flow log network management",
293+
logType: vpcflowlog.NetworkManagementNameSuffix,
294+
expectedFormat: constants.GCPFormatVPCFlowLog,
295+
},
296+
{
297+
name: "vpc flow log compute",
298+
logType: vpcflowlog.ComputeNameSuffix,
299+
expectedFormat: constants.GCPFormatVPCFlowLog,
300+
},
301+
{
302+
name: "unknown log type",
303+
logType: "unknown-log-type",
304+
expectedFormat: "",
305+
},
306+
}
307+
308+
for _, tt := range tests {
309+
t.Run(tt.name, func(t *testing.T) {
310+
t.Parallel()
311+
312+
result := getEncodingFormat(tt.logType)
313+
require.Equal(t, tt.expectedFormat, result)
314+
})
315+
}
316+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package constants // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/constants"
5+
6+
const (
7+
FormatAuditLog = "auditlog"
8+
FormatVPCFlowLog = "vpcflow"
9+
10+
FormatIdentificationTag = "encoding.format"
11+
12+
// GCP-specific format prefixes
13+
GCPFormatPrefix = "gcp."
14+
GCPFormatAuditLog = GCPFormatPrefix + FormatAuditLog
15+
GCPFormatVPCFlowLog = GCPFormatPrefix + FormatVPCFlowLog
16+
)

extension/encoding/googlecloudlogentryencodingextension/log_entry.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
ltype "google.golang.org/genproto/googleapis/logging/type"
2020

2121
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/auditlog"
22+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/constants"
2223
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/shared"
2324
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/vpcflowlog"
2425
)
@@ -62,6 +63,22 @@ const (
6263
gcpAppHubWorkloadCriticalityTypeField = "workload.criticality_type"
6364
)
6465

66+
// getEncodingFormat maps GCP log types to encoding format values
67+
func getEncodingFormat(logType string) string {
68+
switch logType {
69+
case auditlog.ActivityLogNameSuffix,
70+
auditlog.DataAccessLogNameSuffix,
71+
auditlog.SystemEventLogNameSuffix,
72+
auditlog.PolicyLogNameSuffix:
73+
return constants.GCPFormatAuditLog
74+
case vpcflowlog.NetworkManagementNameSuffix,
75+
vpcflowlog.ComputeNameSuffix:
76+
return constants.GCPFormatVPCFlowLog
77+
default:
78+
return ""
79+
}
80+
}
81+
6582
// See: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
6683
type logEntry struct {
6784
ProtoPayload gojson.RawMessage `json:"protoPayload"`
@@ -457,18 +474,18 @@ func handleLogNameField(logName string, resourceAttr pcommon.Map) (string, error
457474
}
458475
}
459476

460-
func handlePayload(logType string, log logEntry, logRecord plog.LogRecord, cfg Config) error {
461-
switch logType {
462-
case auditlog.ActivityLogNameSuffix,
463-
auditlog.DataAccessLogNameSuffix,
464-
auditlog.SystemEventLogNameSuffix,
465-
auditlog.PolicyLogNameSuffix:
477+
func handlePayload(encodingFormat string, log logEntry, logRecord plog.LogRecord, scope pcommon.InstrumentationScope, cfg Config) error {
478+
switch encodingFormat {
479+
case constants.GCPFormatAuditLog:
480+
// Add encoding.format to scope attributes for audit logs
481+
scope.Attributes().PutStr(constants.FormatIdentificationTag, encodingFormat)
466482
if err := auditlog.ParsePayloadIntoAttributes(log.ProtoPayload, logRecord.Attributes()); err != nil {
467483
return fmt.Errorf("failed to parse audit log proto payload: %w", err)
468484
}
469485
return nil
470-
case vpcflowlog.NetworkManagementNameSuffix,
471-
vpcflowlog.ComputeNameSuffix:
486+
case constants.GCPFormatVPCFlowLog:
487+
// Add encoding.format to scope attributes for VPC flow logs
488+
scope.Attributes().PutStr(constants.FormatIdentificationTag, encodingFormat)
472489
if err := vpcflowlog.ParsePayloadIntoAttributes(log.JSONPayload, logRecord.Attributes()); err != nil {
473490
return fmt.Errorf("failed to parse VPC flow log JSON payload: %w", err)
474491
}
@@ -496,7 +513,10 @@ func handlePayload(logType string, log logEntry, logRecord plog.LogRecord, cfg C
496513

497514
// handleLogEntryFields will place each entry of logEntry as either an attribute of the log,
498515
// or as part of the log body, in case of payload.
499-
func handleLogEntryFields(resourceAttributes pcommon.Map, logRecord plog.LogRecord, log logEntry, cfg Config) error {
516+
func handleLogEntryFields(resourceAttributes pcommon.Map, scopeLogs plog.ScopeLogs, log logEntry, cfg Config) error {
517+
logRecord := scopeLogs.LogRecords().AppendEmpty()
518+
scope := scopeLogs.Scope()
519+
500520
ts := log.Timestamp
501521
if ts == nil {
502522
return errors.New("missing timestamp")
@@ -509,11 +529,14 @@ func handleLogEntryFields(resourceAttributes pcommon.Map, logRecord plog.LogReco
509529

510530
shared.PutStr(string(semconv.LogRecordUIDKey), log.InsertID, logRecord.Attributes())
511531

532+
// Handle log name, get type and encoding format
512533
logType, errLogName := handleLogNameField(log.LogName, resourceAttributes)
513534
if errLogName != nil {
514535
return fmt.Errorf("failed to handle log name field: %w", errLogName)
515536
}
516-
if err := handlePayload(logType, log, logRecord, cfg); err != nil {
537+
encodingFormat := getEncodingFormat(logType)
538+
539+
if err := handlePayload(encodingFormat, log, logRecord, scope, cfg); err != nil {
517540
return fmt.Errorf("failed to handle payload field: %w", err)
518541
}
519542

extension/encoding/googlecloudlogentryencodingextension/log_entry_test.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
1515
ltype "google.golang.org/genproto/googleapis/logging/type"
1616

17+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/auditlog"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/constants"
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/googlecloudlogentryencodingextension/internal/vpcflowlog"
1720
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
1821
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
1922
)
@@ -382,17 +385,76 @@ func TestHandleLogEntryFields(t *testing.T) {
382385
logs := plog.NewLogs()
383386
resourceLogs := logs.ResourceLogs().AppendEmpty()
384387
resource := resourceLogs.Resource()
385-
logRecord := resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
388+
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
386389
cfg := *createDefaultConfig().(*Config)
387390

388-
err = handleLogEntryFields(resource.Attributes(), logRecord, l, cfg)
391+
// add the rest of the log entry fields
392+
err = handleLogEntryFields(resource.Attributes(), scopeLogs, l, cfg)
389393
require.NoError(t, err)
390394

391395
expected, err := golden.ReadLogs("testdata/log_entry_expected.yaml")
392396
require.NoError(t, err)
393397
require.NoError(t, plogtest.CompareLogs(expected, logs))
394398
}
395399

400+
func TestGetEncodingFormat(t *testing.T) {
401+
tests := []struct {
402+
name string
403+
logType string
404+
expectedFormat string
405+
}{
406+
{
407+
name: "audit log activity",
408+
logType: auditlog.ActivityLogNameSuffix,
409+
expectedFormat: constants.GCPFormatAuditLog,
410+
},
411+
{
412+
name: "audit log data access",
413+
logType: auditlog.DataAccessLogNameSuffix,
414+
expectedFormat: constants.GCPFormatAuditLog,
415+
},
416+
{
417+
name: "audit log system event",
418+
logType: auditlog.SystemEventLogNameSuffix,
419+
expectedFormat: constants.GCPFormatAuditLog,
420+
},
421+
{
422+
name: "audit log policy",
423+
logType: auditlog.PolicyLogNameSuffix,
424+
expectedFormat: constants.GCPFormatAuditLog,
425+
},
426+
{
427+
name: "vpc flow log network management",
428+
logType: vpcflowlog.NetworkManagementNameSuffix,
429+
expectedFormat: constants.GCPFormatVPCFlowLog,
430+
},
431+
{
432+
name: "vpc flow log compute",
433+
logType: vpcflowlog.ComputeNameSuffix,
434+
expectedFormat: constants.GCPFormatVPCFlowLog,
435+
},
436+
{
437+
name: "unknown log type",
438+
logType: "unknown-log-type",
439+
expectedFormat: "",
440+
},
441+
{
442+
name: "empty log type",
443+
logType: "",
444+
expectedFormat: "",
445+
},
446+
}
447+
448+
for _, tt := range tests {
449+
t.Run(tt.name, func(t *testing.T) {
450+
t.Parallel()
451+
452+
result := getEncodingFormat(tt.logType)
453+
require.Equal(t, tt.expectedFormat, result)
454+
})
455+
}
456+
}
457+
396458
func TestHandleJSONPayload(t *testing.T) {
397459
tests := []struct {
398460
name string

0 commit comments

Comments
 (0)