@@ -50,6 +50,7 @@ const (
5050 LB_NLB_TYPE string = "net"
5151 LB_ALB_TYPE string = "app"
5252 WAF_LOG_TYPE string = "WAFLogs"
53+ S3_ACCESS_LOG_TYPE string = "s3accesslogs"
5354)
5455
5556var (
8283 cloudfrontTimestampRegex = regexp .MustCompile (`(?P<timestamp>\d+-\d+-\d+\s\d+:\d+:\d+)` )
8384 wafFilenameRegex = regexp .MustCompile (`AWSLogs\/(?P<account_id>\d+)\/(?P<type>WAFLogs)\/(?P<region>[\w-]+)\/(?P<src>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/(?P<hour>\d+)\/(?P<minute>\d+)\/\d+\_waflogs\_[\w-]+_[\w-]+_\d+T\d+Z_\w+` )
8485 wafTimestampRegex = regexp .MustCompile (`"timestamp":\s*(?P<timestamp>\d+),` )
86+ s3AccessLogFilenameRegex = regexp .MustCompile (`(?:.*\/)?(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})-(?P<hour>\d{2})-(?P<minute>\d{2})-(?P<second>\d{2})-(?P<file_id>[A-F0-9]{16})$` )
87+ s3AccessLogTimestampRegex = regexp .MustCompile (`\[(?P<timestamp>\d{2}\/[A-Za-z]{3}\/\d{4}:\d{2}:\d{2}:\d{2}\s[+\-]\d{4})\]` )
8588 parsers = map [string ]parserConfig {
8689 FLOW_LOG_TYPE : {
8790 logTypeLabel : "s3_vpc_flow" ,
@@ -122,6 +125,15 @@ var (
122125 timestampRegex : wafTimestampRegex ,
123126 timestampType : "unix" ,
124127 },
128+ S3_ACCESS_LOG_TYPE : {
129+ logTypeLabel : "s3_access" ,
130+ filenameRegex : s3AccessLogFilenameRegex ,
131+ ownerLabelKey : "bucket" ,
132+ timestampRegex : s3AccessLogTimestampRegex ,
133+ timestampFormat : "02/Jan/2006:15:04:05 -0700" , // S3 access log format: [02/Jan/2006:15:04:05 -0700]
134+ timestampType : "string" ,
135+ skipHeaderCount : 0 , // S3 access logs typically don't have headers
136+ },
125137 }
126138)
127139
@@ -149,12 +161,29 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
149161 }
150162 return fmt .Errorf ("could not find parser for type %s" , labels ["type" ])
151163 }
152- gzreader , err := gzip .NewReader (obj )
153- if err != nil {
164+
165+ defer obj .Close ()
166+ bReader := bufio .NewReader (obj )
167+ peekBytes , err := bReader .Peek (2 )
168+ if err != nil && err != io .EOF {
154169 return err
155170 }
171+
172+ var reader io.Reader
173+ // Check if gzipped (magic bytes: 0x1F 0x8B)
174+ if len (peekBytes ) >= 2 && peekBytes [0 ] == 0x1F && peekBytes [1 ] == 0x8B {
175+ gzreader , err := gzip .NewReader (bReader )
176+ if err != nil {
177+ return err
178+ }
179+ defer gzreader .Close ()
180+ reader = gzreader
181+ } else {
182+ // Plain text - use the buffered reader as-is
183+ reader = bReader
184+ }
156185
157- scanner := bufio .NewScanner (gzreader )
186+ scanner := bufio .NewScanner (reader )
158187
159188 ls := model.LabelSet {
160189 model .LabelName ("__aws_log_type" ): model .LabelValue (parser .logTypeLabel ),
@@ -168,7 +197,9 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
168197 if labels ["type" ] == CLOUDTRAIL_LOG_TYPE {
169198 records := make (chan Record )
170199 jsonStream := NewJSONStream (records )
171- go jsonStream .Start (gzreader , parser .skipHeaderCount )
200+ // CloudTrail logs are always gzipped, so reader is always a *gzip.Reader (io.ReadCloser)
201+ readCloser := reader .(io.ReadCloser )
202+ go jsonStream .Start (readCloser , parser .skipHeaderCount )
172203 // Stream json file
173204 for record := range jsonStream .records {
174205 if record .Error != nil {
0 commit comments