22The findings module handles transforming a TVM vulnerability finding into an
33AWS Security Hub finding.
44"""
5+
56from typing import Dict
67from restfly .utils import dict_flatten , dict_clean , trunc
78import arrow
1213 'OPEN' : 'ACTIVE' ,
1314 'NEW' : 'ACTIVE' ,
1415 'REOPENED' : 'ACTIVE' ,
15- 'FIXED' : 'ARCHIVED'
16+ 'FIXED' : 'ARCHIVED' ,
1617}
1718
1819
1920class Finding :
2021 """
2122 Security Hub Finding transformer
2223 """
24+
2325 region : str
2426 account_id : str
2527 start_date : str
@@ -45,16 +47,19 @@ def check_required_params(self, vuln: Dict):
4547 'asset.aws_ec2_instance_id' ,
4648 'asset.aws_owner_id' ,
4749 'asset.aws_ec2_instance_type' ,
48- 'asset.aws_ec2_instance_ami_id'
50+ 'asset.aws_ec2_instance_ami_id' ,
4951 ]
5052 failed = []
5153 for attr in required_attributes :
5254 if vuln .get (attr ) is None :
5355 failed .append (attr )
5456 if failed :
55- raise KeyError ((f'The required asset attributes { "," .join (failed )} '
56- f' were not set on asset { vuln ["asset.uuid" ]} '
57- ))
57+ raise KeyError (
58+ (
59+ f'The required asset attributes { "," .join (failed )} '
60+ f' were not set on asset { vuln ["asset.uuid" ]} '
61+ )
62+ )
5863
5964 def generate (self , vuln : Dict ) -> Dict :
6065 """
@@ -77,25 +82,27 @@ def generate(self, vuln: Dict) -> Dict:
7782 # and lastly fall back to the severity_default_id.
7883 # FIXME: I don't really like how this nested fallback looks, and I feel
7984 # there has to be a cleaner way to implement.
80- base_score = vuln .get ('plugin.cvss3_base_score' ,
81- vuln .get ('plugin.cvss_base_score' ,
82- SEV_MAP [vuln .get ('severity_default_id' ,
83- 0
84- )]))
85+ base_score = vuln .get (
86+ 'plugin.cvss3_base_score' ,
87+ vuln .get (
88+ 'plugin.cvss_base_score' , SEV_MAP [vuln .get ('severity_default_id' , 0 )]
89+ ),
90+ )
8591
8692 finding = {
8793 'SchemaVersion' : '2018-10-08' ,
8894 'FirstObservedAt' : vuln ['first_found' ],
8995 'LastObservedAt' : vuln ['last_found' ],
90- 'ProductArn' : (f'arn:aws:securityhub:'
91- f' { self .region } ::product/tenable/tenable-io'
92- ),
96+ 'ProductArn' : (
97+ f'arn:aws:securityhub: { self .region } ::product/tenable/tenable-io'
98+ ),
9399 'AwsAccountId' : self .account_id ,
94100 'GeneratorId' : f'tenable-plugin-{ vuln ["plugin.id" ]} ' ,
95- 'Id' : (f'{ vuln ["asset.aws_region" ]} /'
96- f'{ vuln ["asset.aws_ec2_instance_id" ]} /'
97- f'{ vuln ["plugin.id" ]} '
98- ),
101+ 'Id' : (
102+ f'{ vuln ["asset.aws_region" ]} /'
103+ f'{ vuln ["asset.aws_ec2_instance_id" ]} /'
104+ f'{ vuln ["plugin.id" ]} '
105+ ),
99106 'CreatedAt' : self .start_date ,
100107 'UpdatedAt' : self .start_date ,
101108 'Types' : ['Software and Configuration Checks/Vulnerabilities/CVE' ],
@@ -109,27 +116,29 @@ def generate(self, vuln: Dict) -> Dict:
109116 # Some plugin names run quite long, we will need to truncate to
110117 # the max string size that AWS supports.
111118 'Title' : trunc (vuln ['plugin.name' ], 256 ),
112-
113119 # The description cannot exceed 1024 characters in size.
114120 'Description' : trunc (vuln ['plugin.description' ], 1024 ),
115- 'Resources' : [{
116- 'Type' : 'AwsEc2Instance' ,
117- 'Id' : ('arn:aws:ec2:'
118- f'{ vuln ["asset.aws_region" ]} :'
119- f'{ vuln ["asset.aws_owner_id" ]} :'
120- 'instance:'
121- f'{ vuln ["asset.aws_ec2_instance_id" ]} '
122- ),
123- 'Region' : vuln ['asset.aws_region' ],
124- 'Details' : {
125- 'AwsEc2Instance' : {
126- 'Type' : vuln ['asset.aws_ec2_instance_type' ],
127- 'ImageId' : vuln ['asset.aws_ec2_instance_ami_id' ],
128- 'IpV4Addresses' : vuln ['asset.ipv4s' ],
129- 'IpV6Addresses' : vuln ['asset.ipv6s' ],
121+ 'Resources' : [
122+ {
123+ 'Type' : 'AwsEc2Instance' ,
124+ 'Id' : (
125+ 'arn:aws:ec2:'
126+ f'{ vuln ["asset.aws_region" ]} :'
127+ f'{ vuln ["asset.aws_owner_id" ]} :'
128+ 'instance:'
129+ f'{ vuln ["asset.aws_ec2_instance_id" ]} '
130+ ),
131+ 'Region' : vuln ['asset.aws_region' ],
132+ 'Details' : {
133+ 'AwsEc2Instance' : {
134+ 'Type' : vuln ['asset.aws_ec2_instance_type' ],
135+ 'ImageId' : vuln ['asset.aws_ec2_instance_ami_id' ],
136+ 'IpV4Addresses' : vuln ['asset.ipv4s' ],
137+ 'IpV6Addresses' : vuln ['asset.ipv6s' ],
138+ },
130139 },
131- },
132- } ],
140+ }
141+ ],
133142 'ProductFields' : {
134143 'CVE' : ', ' .join (vuln .get ('plugin.cve' , [])),
135144 'Plugin Family' : vuln ['plugin.family' ],
@@ -142,12 +151,11 @@ def generate(self, vuln: Dict) -> Dict:
142151 # 'SourceUrl': 'XXXXXXX',
143152 'Remediation' : {
144153 'Recommendation' : {
145-
146154 # The solution cannot exceed 1024 characters in length.
147155 'Text' : trunc (vuln ['plugin.solution' ], 512 ),
148- 'Url' : vuln .get ('plugin.see_also' , [])[0 ]
156+ 'Url' : vuln .get ('plugin.see_also' , [])[0 ],
149157 }
150158 },
151- 'RecordState' : STATE_MAP [vuln ['state' ]]
159+ 'RecordState' : STATE_MAP [vuln ['state' ]],
152160 }
153161 return dict_clean (finding )
0 commit comments