Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/json"
Expand Down Expand Up @@ -50,6 +51,7 @@ const (
LB_NLB_TYPE string = "net"
LB_ALB_TYPE string = "app"
WAF_LOG_TYPE string = "WAFLogs"
S3_ACCESS_LOG_TYPE string = "s3accesslogs"
)

var (
Expand Down Expand Up @@ -82,6 +84,8 @@ var (
cloudfrontTimestampRegex = regexp.MustCompile(`(?P<timestamp>\d+-\d+-\d+\s\d+:\d+:\d+)`)
wafFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>WAFLogs)\/(?P<region>[\w-]+)\/(?P<src>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/(?P<hour>\d+)\/(?P<minute>\d+)\/\d+\_waflogs\_[\w-]+_[\w-]+_\d+T\d+Z_\w+`)
wafTimestampRegex = regexp.MustCompile(`"timestamp":\s*(?P<timestamp>\d+),`)
s3AccessLogFilenameRegex = regexp.MustCompile(`(?:.*\/)?(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})-(?P<hour>\d{2})-(?P<minute>\d{2})-(?P<second>\d{2})-(?P<file_id>[A-F0-9]{16})$`)
s3AccessLogTimestampRegex = regexp.MustCompile(`\[(?P<timestamp>\d{2}\/[A-Za-z]{3}\/\d{4}:\d{2}:\d{2}:\d{2}\s[+\-]\d{4})\]`)
parsers = map[string]parserConfig{
FLOW_LOG_TYPE: {
logTypeLabel: "s3_vpc_flow",
Expand Down Expand Up @@ -122,6 +126,15 @@ var (
timestampRegex: wafTimestampRegex,
timestampType: "unix",
},
S3_ACCESS_LOG_TYPE: {
logTypeLabel: "s3_access",
filenameRegex: s3AccessLogFilenameRegex,
ownerLabelKey: "bucket",
timestampRegex: s3AccessLogTimestampRegex,
timestampFormat: "02/Jan/2006:15:04:05 -0700", // S3 access log format: [02/Jan/2006:15:04:05 -0700]
timestampType: "string",
skipHeaderCount: 0, // S3 access logs typically don't have headers
},
}
)

Expand Down Expand Up @@ -149,12 +162,29 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
}
return fmt.Errorf("could not find parser for type %s", labels["type"])
}
gzreader, err := gzip.NewReader(obj)
if err != nil {

defer obj.Close()
bReader := bufio.NewReader(obj)
peekBytes, err := bReader.Peek(2)
if err != nil && err != io.EOF {
return err
}

var reader io.Reader
// Check if gzipped (magic bytes: 0x1F 0x8B)
if len(peekBytes) >= 2 && peekBytes[0] == 0x1F && peekBytes[1] == 0x8B {
gzreader, err := gzip.NewReader(bReader)
if err != nil {
return err
}
defer gzreader.Close()
reader = gzreader
} else {
// Plain text - use the buffered reader as-is
reader = bReader
}

scanner := bufio.NewScanner(gzreader)
scanner := bufio.NewScanner(reader)

ls := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue(parser.logTypeLabel),
Expand All @@ -168,7 +198,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
if labels["type"] == CLOUDTRAIL_LOG_TYPE {
records := make(chan Record)
jsonStream := NewJSONStream(records)
go jsonStream.Start(gzreader, parser.skipHeaderCount)
go jsonStream.Start(reader, parser.skipHeaderCount)
// Stream json file
for record := range jsonStream.records {
if record.Error != nil {
Expand Down
55 changes: 55 additions & 0 deletions tools/lambda-promtail/lambda-promtail/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,40 @@ func Test_getLabels(t *testing.T) {
},
wantErr: false,
},
{
name: "s3_access_logs",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-west-2",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "mintel-core-frontend-app-shell-prod-replica-logs",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "replica-access-logs/2025-12-06-00-00-26-D9C9ED584E11F3F8",
},
},
},
},
want: map[string]string{
"bucket": "mintel-core-frontend-app-shell-prod-replica-logs",
"bucket_owner": "test",
"bucket_region": "us-west-2",
"day": "06",
"file_id": "D9C9ED584E11F3F8",
"hour": "00",
"key": "replica-access-logs/2025-12-06-00-00-26-D9C9ED584E11F3F8",
"minute": "00",
"month": "12",
"second": "26",
"type": S3_ACCESS_LOG_TYPE,
"year": "2025",
},
wantErr: false,
},
{
name: "missing_type",
args: args{
Expand Down Expand Up @@ -602,6 +636,27 @@ func Test_parseS3Log(t *testing.T) {
},
wantErr: false,
},
{
name: "s3_access_logs",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/s3accesslog.txt",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"type": S3_ACCESS_LOG_TYPE,
"bucket": "mintel-core-frontend-app-shell-prod-replica-logs",
"file_id": "D9C9ED584E11F3F8",
},
},
expectedLen: 1,
expectedStream: `{__aws_log_type="s3_access", __aws_s3_access="D9C9ED584E11F3F8", __aws_s3_access_owner="mintel-core-frontend-app-shell-prod-replica-logs"}`,
expectedTimestamps: []time.Time{
time.Date(2025, time.December, 5, 23, 49, 46, 0, time.FixedZone("UTC", 0)),
},
wantErr: false,
},
{
name: "missing_parser",
args: args{
Expand Down
1 change: 1 addition & 0 deletions tools/lambda-promtail/testdata/s3accesslog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8a2b8c083616eedb641d19d09a3df1c58140281be5de78bd378053da98598f92 mintel-core-frontend-app-shell-prod-replica [05/Dec/2025:23:49:46 +0000] 64.252.71.196 - RWXJ337PHNKAXKAJ WEBSITE.GET.OBJECT gnpd-data-entry/projects/gnpd/image-sets/a38f9766-2dee-4f2d-bc44-e0c5965ee79a "GET /gnpd-data-entry/projects/gnpd/image-sets/a38f9766-2dee-4f2d-bc44-e0c5965ee79a?productInfo=Batch+Entry+175699%3A5+%7C+Iwatsuka+Seika+The+Hitotsumami+Pizza+Margarita+Flavoured+Rice+Cracker HTTP/1.1" 404 NoSuchKey 625 - 48 - "-" "Amazon CloudFront" - 8htnihf1feY67Nx1Yr6bflyfkVxBFtZmdiJTvsOo3P/DTqfkA6jsaT5IYoKdyIGJeORE+LtJM74LOoNS0joL4c7qVyr+aEdM - - - mintel-core-frontend-app-shell-prod-replica.s3-website-us-west-2.amazonaws.com - - -