Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions aws/logs_monitoring/steps/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def is_cloudtrail(key):
return bool(match)


def is_vpc_flowlog(key):
return "vpcflowlogs" in key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 thought: ‏ I'm not fan of this as it puts back some source identification in the forwarder and also some business logic.

While this check is the current method for S3-based VPC Flow Log, we also support CloudWatch logs for VPC Flow Logs (they seems to not face the issue). Is there we can implement this on backend side so that if we update the source identification we don't have two places to maintain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is doable from the backend, but requires a change in the logic as we won't know which line comes first on the intake side. For that we need to perform a keyword check maybe ? For the file key, it's already included in the log payload so we could use that instead of doing the check here.

The only downside is that we're going to still send the log and filter it out in the backend.

Regarding the source identification I agree that this brings back some business logic, but it doesn't add any extra fields to the log itself. We're still going to keep some business logic eventually (same goes for cloudtrail), but I think it should be fine if we don't use it amending payload to the log itself.

I'm fine with either choices by the way, this one seemed more logical as we trim sending the log at the source and not filter it out on the back-end side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You got me at the "we don't know the order on backend side" so Ok for this.



def find_cloudwatch_source(log_group):
for prefix in AwsCwEventSourcePrefix:
if log_group.startswith(str(prefix)):
Expand Down
21 changes: 18 additions & 3 deletions aws/logs_monitoring/steps/handlers/s3_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
DD_USE_VPC,
GOV_STRING,
)
from steps.common import add_service_tag, is_cloudtrail, merge_dicts, parse_event_source
from steps.common import (
add_service_tag,
is_cloudtrail,
is_vpc_flowlog,
merge_dicts,
parse_event_source,
)


class S3EventDataStore:
Expand Down Expand Up @@ -63,6 +69,7 @@ def handle(self, event):
add_service_tag(self.metadata)

self._extract_data()

yield from self._get_structured_lines_for_s3_handler()

def _extract_event(self, event):
Expand Down Expand Up @@ -178,6 +185,9 @@ def _extract_cloudtrail_logs(self):
self.logger.debug("Unable to parse cloudtrail log: %s" % e)

def _extract_other_logs(self):
# VPC flow logs have a header line that should be skipped
skip_first_line = is_vpc_flowlog(self.data_store.key)

# Check if using multiline log regex pattern
# and determine whether line or pattern separated logs
if self.multiline_regex_start_pattern and self.multiline_regex_pattern:
Expand All @@ -197,7 +207,9 @@ def _extract_other_logs(self):
)
self.data_store.data = self.data_store.data.splitlines()

for line in self.data_store.data:
for i, line in enumerate(self.data_store.data):
if skip_first_line and i == 0:
continue
yield self._format_event(line)

else:
Expand All @@ -206,7 +218,10 @@ def _extract_other_logs(self):
#
# https://docs.python.org/3/library/stdtypes.html#str.splitlines
# https://docs.python.org/3/library/stdtypes.html#bytes.splitlines
for line in self.data_store.data.splitlines():
for i, line in enumerate(self.data_store.data.splitlines()):
if skip_first_line and i == 0:
continue

line = line.decode("utf-8", errors="ignore").strip()
if len(line) == 0:
continue
Expand Down
25 changes: 25 additions & 0 deletions aws/logs_monitoring/tests/test_s3_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,31 @@ def test_set_source_cloudfront(self):
"s3",
)

def test_vpc_flowlog_skips_header_line(self):
"""Test that VPC flow logs skip the first header line"""
key = "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/01/123456789012_vpcflowlogs_us-east-1_fl-abc123.log.gz"
source = "vpc"
data = (
"version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status\n"
"2 123456789012 eni-abc123 10.0.0.1 10.0.0.2 443 49152 6 10 840 1620000000 1620000060 ACCEPT OK\n"
"2 123456789012 eni-abc123 10.0.0.2 10.0.0.1 49152 443 6 8 640 1620000000 1620000060 ACCEPT OK"
)
structured_lines = self.parse_lines(data, key, source)

self.assertEqual(len(structured_lines), 2)
self.assertIn("10.0.0.1", structured_lines[0]["message"])
self.assertNotIn("version account-id", structured_lines[0]["message"])

def test_non_vpc_flowlog_includes_first_line(self):
"""Test that non-VPC flow logs include all lines"""
key = "AWSLogs/123456789012/elasticloadbalancing/us-east-1/2024/01/01/log.gz"
source = "elb"
data = "first line of data\n" "second line of data\n" "third line of data"
structured_lines = self.parse_lines(data, key, source)

self.assertEqual(len(structured_lines), 3)
self.assertEqual(structured_lines[0]["message"], "first line of data")


if __name__ == "__main__":
unittest.main()
Loading