|
| 1 | +require 'json' |
| 2 | +require 'csv' |
| 3 | +require 'heimdall_tools/hdf' |
| 4 | + |
| 5 | +RESOURCE_DIR = Pathname.new(__FILE__).join('../../data') |
| 6 | + |
| 7 | +CWE_NIST_MAPPING_FILE = File.join(RESOURCE_DIR, 'cwe-nist-mapping.csv') |
| 8 | + |
| 9 | +IMPACT_MAPPING = { |
| 10 | + error: 0.7, |
| 11 | + warning: 0.5, |
| 12 | + note: 0.3, |
| 13 | + none: 0.0 |
| 14 | +}.freeze |
| 15 | + |
| 16 | +DEFAULT_NIST_TAG = %w{SA-11 RA-5}.freeze |
| 17 | + |
| 18 | +# Loading spinner sign |
| 19 | +$spinner = Enumerator.new do |e| |
| 20 | + loop do |
| 21 | + e.yield '|' |
| 22 | + e.yield '/' |
| 23 | + e.yield '-' |
| 24 | + e.yield '\\' |
| 25 | + end |
| 26 | +end |
| 27 | + |
| 28 | +module HeimdallTools |
| 29 | + class SarifMapper |
| 30 | + def initialize(sarif_json, _name = nil, verbose = false) |
| 31 | + @sarif_json = sarif_json |
| 32 | + @verbose = verbose |
| 33 | + begin |
| 34 | + @cwe_nist_mapping = parse_mapper |
| 35 | + @sarif_log = JSON.parse(@sarif_json) |
| 36 | + rescue StandardError => e |
| 37 | + raise "Invalid SARIF JSON file provided\n\nException: #{e}" |
| 38 | + end |
| 39 | + end |
| 40 | + |
| 41 | + def extract_scaninfo(sarif_log) |
| 42 | + info = {} |
| 43 | + begin |
| 44 | + info['policy'] = 'SARIF' |
| 45 | + info['version'] = sarif_log['version'] |
| 46 | + info['projectName'] = 'Static Analysis Results Interchange Format' |
| 47 | + info['summary'] = NA_STRING |
| 48 | + info |
| 49 | + rescue StandardError => e |
| 50 | + raise "Error extracting project info from SARIF JSON file provided Exception: #{e}" |
| 51 | + end |
| 52 | + end |
| 53 | + |
| 54 | + def finding(result) |
| 55 | + finding = {} |
| 56 | + finding['status'] = 'failed' |
| 57 | + finding['code_desc'] = '' |
| 58 | + if get_location(result)['uri'] |
| 59 | + finding['code_desc'] += " URL : #{get_location(result)['uri']}" |
| 60 | + end |
| 61 | + if get_location(result)['start_line'] |
| 62 | + finding['code_desc'] += " LINE : #{get_location(result)['start_line']}" |
| 63 | + end |
| 64 | + if get_location(result)['start_column'] |
| 65 | + finding['code_desc'] += " COLUMN : #{get_location(result)['start_column']}" |
| 66 | + end |
| 67 | + finding['code_desc'].strip! |
| 68 | + finding['run_time'] = NA_FLOAT |
| 69 | + finding['start_time'] = NA_STRING |
| 70 | + finding |
| 71 | + end |
| 72 | + |
| 73 | + def add_nist_tag_from_cwe(cweid, taxonomy_name, tags_node) |
| 74 | + entries = @cwe_nist_mapping.select { |x| cweid.include?(x[:cweid].to_s) && !x[:nistid].nil? } |
| 75 | + tags = entries.map { |x| x[:nistid] } |
| 76 | + result_tags = tags.empty? ? DEFAULT_NIST_TAG : tags.flatten.uniq |
| 77 | + if result_tags.count.positive? |
| 78 | + if !tags_node |
| 79 | + tags_node = {} |
| 80 | + end |
| 81 | + if !tags_node.key?(taxonomy_name) |
| 82 | + tags_node[taxonomy_name] = [] |
| 83 | + end |
| 84 | + result_tags.each do |t| |
| 85 | + tags_node[taxonomy_name] |= [t] |
| 86 | + end |
| 87 | + end |
| 88 | + tags_node |
| 89 | + end |
| 90 | + |
| 91 | + def get_location(result) |
| 92 | + location_info = {} |
| 93 | + location_info['uri'] = result.dig('locations', 0, 'physicalLocation', 'artifactLocation', 'uri') |
| 94 | + location_info['start_line'] = result.dig('locations', 0, 'physicalLocation', 'region', 'startLine') |
| 95 | + location_info['start_column'] = result.dig('locations', 0, 'physicalLocation', 'region', 'startColumn') |
| 96 | + location_info |
| 97 | + end |
| 98 | + |
| 99 | + def get_rule_info(run, result, rule_id) |
| 100 | + finding = {} |
| 101 | + driver = run.dig('tool', 'driver') |
| 102 | + finding['driver_name'] = driver['name'] |
| 103 | + finding['driver_version'] = driver['version'] |
| 104 | + rules = driver['rules'] |
| 105 | + if rules |
| 106 | + rule = rules.find { |x| x['id'].eql?(rule_id) } |
| 107 | + if rule |
| 108 | + finding['rule_name'] = rule&.[]('name') |
| 109 | + finding['rule_short_description'] = rule&.[]('shortDescription')&.[]('text') |
| 110 | + finding['rule_tags'] = get_tags(rule) |
| 111 | + finding['rule_name'] = rule&.[]('messageStrings')&.[]('default')&.[]('text') unless finding['rule_name'] |
| 112 | + end |
| 113 | + end |
| 114 | + finding['rule_name'] = result&.[]('message')&.[]('text') unless finding['rule_name'] |
| 115 | + finding |
| 116 | + end |
| 117 | + |
| 118 | + def get_tags(rule) |
| 119 | + result = {} |
| 120 | + Array(rule&.[]('relationships')).each do |relationship| |
| 121 | + taxonomy_name = relationship['target']['toolComponent']['name'].downcase |
| 122 | + taxonomy_id = relationship['target']['id'] |
| 123 | + if !result.key?(taxonomy_name) |
| 124 | + result[taxonomy_name] = [] |
| 125 | + end |
| 126 | + result[taxonomy_name] |= [taxonomy_id] |
| 127 | + end |
| 128 | + result |
| 129 | + end |
| 130 | + |
| 131 | + def parse_identifiers(rule_tags, ref) |
| 132 | + # Extracting id number from reference style CWE-297 |
| 133 | + rule_tags[ref.downcase].map { |e| e.downcase.split("#{ref.downcase}-")[1] } |
| 134 | + rescue StandardError |
| 135 | + [] |
| 136 | + end |
| 137 | + |
| 138 | + def impact(severity) |
| 139 | + severity_mapping = IMPACT_MAPPING[severity.to_sym] |
| 140 | + severity_mapping.nil? ? 0.1 : severity_mapping |
| 141 | + end |
| 142 | + |
| 143 | + def parse_mapper |
| 144 | + csv_data = CSV.read(CWE_NIST_MAPPING_FILE, **{ encoding: 'UTF-8', |
| 145 | + headers: true, |
| 146 | + header_converters: :symbol, |
| 147 | + converters: :all }) |
| 148 | + csv_data.map(&:to_hash) |
| 149 | + end |
| 150 | + |
| 151 | + def desc_tags(data, label) |
| 152 | + { data: data || NA_STRING, label: label || NA_STRING } |
| 153 | + end |
| 154 | + |
| 155 | + def process_item(run, result, controls) |
| 156 | + printf("\rProcessing: %s", $spinner.next) |
| 157 | + control = controls.find { |x| x['id'].eql?(result['ruleId']) } |
| 158 | + |
| 159 | + if control |
| 160 | + control['results'] << finding(result) |
| 161 | + else |
| 162 | + rule_info = get_rule_info(run, result, result['ruleId']) |
| 163 | + item = {} |
| 164 | + item['tags'] = rule_info['rule_tags'] |
| 165 | + item['descriptions'] = [] |
| 166 | + item['refs'] = NA_ARRAY |
| 167 | + item['source_location'] = { ref: get_location(result)['uri'], line: get_location(result)['start_line'] } |
| 168 | + item['descriptions'] = NA_ARRAY |
| 169 | + item['title'] = rule_info['rule_name'].to_s |
| 170 | + item['id'] = result['ruleId'].to_s |
| 171 | + item['desc'] = rule_info['rule_short_description'].to_s |
| 172 | + item['impact'] = impact(result['level'].to_s) |
| 173 | + item['code'] = NA_STRING |
| 174 | + item['results'] = [finding(result)] |
| 175 | + item['tags'] = add_nist_tag_from_cwe(parse_identifiers(rule_info['rule_tags'], 'CWE'), 'nist', item['tags']) |
| 176 | + controls << item |
| 177 | + end |
| 178 | + end |
| 179 | + |
| 180 | + def to_hdf |
| 181 | + controls = [] |
| 182 | + @sarif_log['runs'].each do |run| |
| 183 | + run['results'].each do |result| |
| 184 | + process_item(run, result, controls) |
| 185 | + end |
| 186 | + end |
| 187 | + |
| 188 | + scaninfo = extract_scaninfo(@sarif_log) |
| 189 | + results = HeimdallDataFormat.new(profile_name: scaninfo['policy'], |
| 190 | + version: scaninfo['version'], |
| 191 | + title: scaninfo['projectName'], |
| 192 | + summary: scaninfo['summary'], |
| 193 | + controls: controls, |
| 194 | + target_id: scaninfo['projectName']) |
| 195 | + results.to_hdf |
| 196 | + end |
| 197 | + end |
| 198 | +end |
0 commit comments