|
1 | 1 | import itertools |
| 2 | +import json |
2 | 3 | import subprocess |
3 | 4 | from pathlib import Path |
4 | 5 | from tempfile import NamedTemporaryFile |
5 | 6 | from typing import Iterable, Optional |
6 | 7 |
|
| 8 | +from typing_extensions import Self |
| 9 | + |
7 | 10 | from codemodder.context import CodemodExecutionContext |
8 | 11 | from codemodder.logging import logger |
9 | | -from codemodder.sarifs import SarifResultSet |
| 12 | +from codemodder.result import LineInfo, Location, Result, ResultSet |
| 13 | +from codemodder.sarifs import AbstractSarifToolDetector |
| 14 | + |
| 15 | + |
| 16 | +class SemgrepSarifToolDetector(AbstractSarifToolDetector): |
| 17 | + @classmethod |
| 18 | + def detect(cls, run_data: dict) -> bool: |
| 19 | + return ( |
| 20 | + "tool" in run_data |
| 21 | + and "semgrep" in run_data["tool"]["driver"]["name"].lower() |
| 22 | + ) |
| 23 | + |
| 24 | + |
| 25 | +def extract_rule_id(result, sarif_run) -> Optional[str]: |
| 26 | + if "ruleId" in result: |
| 27 | + # semgrep preprends the folders into the rule-id, we want the base name only |
| 28 | + return result["ruleId"].rsplit(".")[-1] |
| 29 | + |
| 30 | + # it may be contained in the 'rule' field through the tool component in the sarif file |
| 31 | + if "rule" in result: |
| 32 | + tool_index = result["rule"]["toolComponent"]["index"] |
| 33 | + rule_index = result["rule"]["index"] |
| 34 | + return sarif_run["tool"]["extensions"][tool_index]["rules"][rule_index]["id"] |
| 35 | + |
| 36 | + return None |
| 37 | + |
| 38 | + |
| 39 | +class SemgrepLocation(Location): |
| 40 | + @classmethod |
| 41 | + def from_sarif(cls, sarif_location) -> Self: |
| 42 | + artifact_location = sarif_location["physicalLocation"]["artifactLocation"] |
| 43 | + file = Path(artifact_location["uri"]) |
| 44 | + start = LineInfo( |
| 45 | + line=sarif_location["physicalLocation"]["region"]["startLine"], |
| 46 | + column=sarif_location["physicalLocation"]["region"]["startColumn"], |
| 47 | + snippet=sarif_location["physicalLocation"]["region"]["snippet"]["text"], |
| 48 | + ) |
| 49 | + end = LineInfo( |
| 50 | + line=sarif_location["physicalLocation"]["region"]["endLine"], |
| 51 | + column=sarif_location["physicalLocation"]["region"]["endColumn"], |
| 52 | + snippet=sarif_location["physicalLocation"]["region"]["snippet"]["text"], |
| 53 | + ) |
| 54 | + return cls(file=file, start=start, end=end) |
| 55 | + |
| 56 | + |
| 57 | +class SemgrepResult(Result): |
| 58 | + @classmethod |
| 59 | + def from_sarif(cls, sarif_result, sarif_run) -> Self: |
| 60 | + rule_id = extract_rule_id(sarif_result, sarif_run) |
| 61 | + if not rule_id: |
| 62 | + raise ValueError("Could not extract rule id from sarif result.") |
| 63 | + |
| 64 | + locations: list[Location] = [] |
| 65 | + for location in sarif_result["locations"]: |
| 66 | + artifact_location = SemgrepLocation.from_sarif(location) |
| 67 | + locations.append(artifact_location) |
| 68 | + return cls(rule_id=rule_id, locations=locations) |
| 69 | + |
| 70 | + |
| 71 | +class SemgrepResultSet(ResultSet): |
| 72 | + @classmethod |
| 73 | + def from_sarif(cls, sarif_file: str | Path) -> Self: |
| 74 | + with open(sarif_file, "r", encoding="utf-8") as f: |
| 75 | + data = json.load(f) |
| 76 | + |
| 77 | + result_set = cls() |
| 78 | + for sarif_run in data["runs"]: |
| 79 | + for result in sarif_run["results"]: |
| 80 | + sarif_result = SemgrepResult.from_sarif(result, sarif_run) |
| 81 | + result_set.add_result(sarif_result) |
| 82 | + |
| 83 | + return result_set |
10 | 84 |
|
11 | 85 |
|
12 | 86 | def run( |
13 | 87 | execution_context: CodemodExecutionContext, |
14 | 88 | yaml_files: Iterable[Path], |
15 | 89 | files_to_analyze: Optional[Iterable[Path]] = None, |
16 | | -) -> SarifResultSet: |
| 90 | +) -> SemgrepResultSet: |
17 | 91 | """ |
18 | 92 | Runs Semgrep and outputs a dict with the results organized by rule_id. |
19 | 93 | """ |
@@ -49,5 +123,5 @@ def run( |
49 | 123 | if not execution_context.verbose: |
50 | 124 | logger.error("captured semgrep stderr: %s", call.stderr) |
51 | 125 | raise subprocess.CalledProcessError(call.returncode, command) |
52 | | - results = SarifResultSet.from_sarif(temp_sarif_file.name) |
| 126 | + results = SemgrepResultSet.from_sarif(temp_sarif_file.name) |
53 | 127 | return results |
0 commit comments