Skip to content

Commit 683f853

Browse files
committed
feat(scripts): Coverage script
1 parent baa44fa commit 683f853

File tree

1 file changed

+381
-0
lines changed

1 file changed

+381
-0
lines changed

scripts/coverage.py

Lines changed: 381 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Coverage script to load .coverage.json and integrate CodeQL query resolution data.
4+
5+
This script:
6+
1. Loads the existing .coverage.json file from the project root
7+
2. Runs 'codeql resolve queries --format=json ./ql/src' to get available queries
8+
3. Integrates the query data into the coverage file
9+
"""
10+
11+
import json
12+
import subprocess
13+
import sys
14+
import argparse
15+
from pathlib import Path
16+
from typing import Dict, List, Any
17+
18+
19+
def find_project_root() -> Path:
20+
"""Find the project root directory by looking for .coverage.json file."""
21+
current_dir = Path(__file__).parent
22+
23+
# Look for .coverage.json in parent directories
24+
while current_dir != current_dir.parent:
25+
coverage_file = current_dir / ".coverage.json"
26+
if coverage_file.exists():
27+
return current_dir
28+
current_dir = current_dir.parent
29+
30+
# If not found, assume project root is one level up from scripts directory
31+
return Path(__file__).parent.parent
32+
33+
34+
def load_coverage_file(project_root: Path) -> Dict[str, Any]:
35+
"""Load the existing .coverage.json file."""
36+
coverage_file = project_root / ".coverage.json"
37+
38+
if not coverage_file.exists():
39+
print(f"Error: .coverage.json not found at {coverage_file}")
40+
sys.exit(1)
41+
42+
try:
43+
with open(coverage_file, 'r', encoding='utf-8') as f:
44+
return json.load(f)
45+
except json.JSONDecodeError as e:
46+
print(f"Error: Invalid JSON in .coverage.json: {e}")
47+
sys.exit(1)
48+
49+
50+
def run_codeql_resolve_queries(project_root: Path) -> List[str]:
51+
"""Run codeql resolve queries command and return the list of query paths."""
52+
ql_src_path = project_root / "ql" / "src"
53+
54+
if not ql_src_path.exists():
55+
print(f"Error: ql/src directory not found at {ql_src_path}")
56+
sys.exit(1)
57+
58+
try:
59+
cmd = ["codeql", "resolve", "queries", "--format=json", str(ql_src_path)]
60+
result = subprocess.run(
61+
cmd,
62+
cwd=project_root,
63+
capture_output=True,
64+
text=True,
65+
check=True
66+
)
67+
68+
# Parse the JSON output
69+
queries = json.loads(result.stdout)
70+
return queries
71+
72+
except subprocess.CalledProcessError as e:
73+
print(f"Error running codeql command: {e}")
74+
print(f"stderr: {e.stderr}")
75+
sys.exit(1)
76+
except json.JSONDecodeError as e:
77+
print(f"Error parsing codeql output as JSON: {e}")
78+
sys.exit(1)
79+
80+
81+
def process_query_paths(queries: List[str], project_root: Path) -> List[Dict[str, Any]]:
82+
"""Process query paths to extract metadata and create coverage entries."""
83+
processed_queries = []
84+
85+
for query_path in queries:
86+
# Convert absolute path to relative path from project root
87+
try:
88+
relative_path = Path(query_path).relative_to(project_root)
89+
except ValueError:
90+
# If the path is not relative to project root, use the full path
91+
relative_path = Path(query_path)
92+
93+
# Extract query metadata
94+
query_info = {
95+
"path": str(relative_path),
96+
"absolute_path": query_path,
97+
"name": Path(query_path).stem,
98+
"category": extract_category_from_path(relative_path),
99+
"cwe": extract_cwe_from_path(relative_path),
100+
"covered": False, # Default to not covered
101+
"test_files": [] # Will be populated with test file paths if any
102+
}
103+
104+
processed_queries.append(query_info)
105+
106+
return processed_queries
107+
108+
109+
def extract_category_from_path(path: Path) -> str:
110+
"""Extract category from query path (e.g., 'security', 'diagnostics')."""
111+
parts = path.parts
112+
if len(parts) >= 3 and parts[0] == "ql" and parts[1] == "src":
113+
return parts[2]
114+
return "unknown"
115+
116+
117+
def extract_cwe_from_path(path: Path) -> str:
118+
"""Extract CWE number from query path if present."""
119+
parts = path.parts
120+
for part in parts:
121+
if part.startswith("CWE-"):
122+
return part
123+
return ""
124+
125+
126+
def update_coverage_file(coverage_data: Dict[str, Any], queries: List[Dict[str, Any]]) -> Dict[str, Any]:
127+
"""Update the coverage data with query information."""
128+
# Add queries to the coverage data
129+
coverage_data["queries"] = queries
130+
131+
# Update metadata
132+
coverage_data["metadata"] = {
133+
"total_queries": len(queries),
134+
"covered_queries": sum(1 for q in queries if q["covered"]),
135+
"categories": list(set(q["category"] for q in queries)),
136+
"cwes": list(set(q["cwe"] for q in queries if q["cwe"]))
137+
}
138+
139+
# Calculate coverage percentage
140+
total = coverage_data["metadata"]["total_queries"]
141+
covered = coverage_data["metadata"]["covered_queries"]
142+
coverage_data["metadata"]["coverage_percentage"] = (covered / total * 100) if total > 0 else 0
143+
144+
return coverage_data
145+
146+
147+
def save_coverage_file(coverage_data: Dict[str, Any], project_root: Path) -> None:
148+
"""Save the updated coverage data back to .coverage.json."""
149+
coverage_file = project_root / ".coverage.json"
150+
151+
try:
152+
with open(coverage_file, 'w', encoding='utf-8') as f:
153+
json.dump(coverage_data, f, indent=2, ensure_ascii=False)
154+
print(f"Successfully updated {coverage_file}")
155+
except Exception as e:
156+
print(f"Error saving coverage file: {e}")
157+
sys.exit(1)
158+
159+
160+
def generate_coverage_markdown(coverage_data: Dict[str, Any]) -> str:
161+
"""Generate markdown coverage report from coverage data."""
162+
metadata = coverage_data["metadata"]
163+
queries = coverage_data["queries"]
164+
165+
# Calculate coverage percentage
166+
coverage_pct = metadata["coverage_percentage"]
167+
168+
# Create coverage badge color based on percentage
169+
if coverage_pct >= 80:
170+
badge_color = "brightgreen"
171+
elif coverage_pct >= 60:
172+
badge_color = "yellow"
173+
elif coverage_pct >= 40:
174+
badge_color = "orange"
175+
else:
176+
badge_color = "red"
177+
178+
# Generate markdown content
179+
md_content = []
180+
181+
# Coverage badge
182+
md_content.append(f"![Coverage](https://img.shields.io/badge/Query_Coverage-{coverage_pct:.1f}%25-{badge_color})")
183+
md_content.append("")
184+
185+
# Summary statistics
186+
md_content.append("| Metric | Value |")
187+
md_content.append("|--------|-------|")
188+
md_content.append(f"| Total Queries | {metadata['total_queries']} |")
189+
md_content.append(f"| Covered Queries | {metadata['covered_queries']} |")
190+
md_content.append(f"| Coverage Percentage | {coverage_pct:.1f}% |")
191+
md_content.append(f"| Categories | {len(metadata['categories'])} |")
192+
md_content.append(f"| CWE Categories | {len(metadata['cwes'])} |")
193+
md_content.append("")
194+
195+
# Coverage by category
196+
if queries:
197+
category_stats = {}
198+
for query in queries:
199+
category = query["category"]
200+
if category not in category_stats:
201+
category_stats[category] = {"total": 0, "covered": 0}
202+
category_stats[category]["total"] += 1
203+
if query["covered"]:
204+
category_stats[category]["covered"] += 1
205+
206+
md_content.append("### Coverage by Category")
207+
md_content.append("")
208+
md_content.append("| Category | Covered | Total | Percentage |")
209+
md_content.append("|----------|---------|-------|------------|")
210+
211+
for category in sorted(category_stats.keys()):
212+
stats = category_stats[category]
213+
pct = (stats["covered"] / stats["total"] * 100) if stats["total"] > 0 else 0
214+
md_content.append(f"| {category.title()} | {stats['covered']} | {stats['total']} | {pct:.1f}% |")
215+
216+
md_content.append("")
217+
218+
# CWE coverage breakdown
219+
if metadata["cwes"]:
220+
cwe_stats = {}
221+
for query in queries:
222+
if query["cwe"]:
223+
cwe = query["cwe"]
224+
if cwe not in cwe_stats:
225+
cwe_stats[cwe] = {"total": 0, "covered": 0}
226+
cwe_stats[cwe]["total"] += 1
227+
if query["covered"]:
228+
cwe_stats[cwe]["covered"] += 1
229+
230+
if cwe_stats:
231+
md_content.append("### Coverage by CWE")
232+
md_content.append("")
233+
md_content.append("| CWE | Description | Covered | Total | Percentage |")
234+
md_content.append("|-----|-------------|---------|-------|------------|")
235+
236+
# CWE descriptions for common ones
237+
cwe_descriptions = {
238+
"CWE-200": "Information Exposure",
239+
"CWE-284": "Improper Access Control",
240+
"CWE-306": "Missing Authentication",
241+
"CWE-319": "Cleartext Transmission",
242+
"CWE-327": "Broken/Risky Crypto Algorithm",
243+
"CWE-352": "Cross-Site Request Forgery",
244+
"CWE-272": "Least Privilege Violation",
245+
"CWE-311": "Missing Encryption",
246+
"CWE-400": "Resource Exhaustion",
247+
"CWE-942": "Overly Permissive CORS",
248+
"CWE-693": "Protection Mechanism Failure",
249+
"CWE-295": "Improper Certificate Validation",
250+
"CWE-798": "Hard-coded Credentials",
251+
"CWE-404": "Improper Resource Shutdown"
252+
}
253+
254+
for cwe in sorted(cwe_stats.keys()):
255+
stats = cwe_stats[cwe]
256+
pct = (stats["covered"] / stats["total"] * 100) if stats["total"] > 0 else 0
257+
description = cwe_descriptions.get(cwe, "Security Vulnerability")
258+
md_content.append(f"| {cwe} | {description} | {stats['covered']} | {stats['total']} | {pct:.1f}% |")
259+
260+
md_content.append("")
261+
262+
# Last updated timestamp
263+
from datetime import datetime
264+
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
265+
md_content.append(f"*Last updated: {timestamp}*")
266+
267+
return "\n".join(md_content)
268+
269+
270+
def update_readme_coverage(project_root: Path, coverage_markdown: str) -> None:
271+
"""Update the README.md file with the coverage report."""
272+
readme_file = project_root / "README.md"
273+
274+
if not readme_file.exists():
275+
print(f"Warning: README.md not found at {readme_file}")
276+
return
277+
278+
try:
279+
with open(readme_file, 'r', encoding='utf-8') as f:
280+
content = f.read()
281+
282+
# Find the coverage report markers
283+
start_marker = "<!-- COVERAGE-REPORT -->"
284+
end_marker = "<!-- COVERAGE-REPORT:END -->"
285+
286+
start_idx = content.find(start_marker)
287+
end_idx = content.find(end_marker)
288+
289+
if start_idx == -1 or end_idx == -1:
290+
print(f"Warning: Coverage report markers not found in {readme_file}")
291+
print("Please add the following markers to your README.md where you want the coverage report:")
292+
print(f" {start_marker}")
293+
print(f" {end_marker}")
294+
return
295+
296+
# Replace the content between markers
297+
new_content = (
298+
content[:start_idx + len(start_marker)] +
299+
"\n\n" + coverage_markdown + "\n\n" +
300+
content[end_idx:]
301+
)
302+
303+
with open(readme_file, 'w', encoding='utf-8') as f:
304+
f.write(new_content)
305+
306+
print(f"Successfully updated coverage report in {readme_file}")
307+
308+
except Exception as e:
309+
print(f"Error updating README.md: {e}")
310+
311+
312+
def main():
313+
"""Main function to orchestrate the coverage update process."""
314+
parser = argparse.ArgumentParser(description="Generate CodeQL query coverage report")
315+
parser.add_argument(
316+
"--markdown-only",
317+
action="store_true",
318+
help="Generate only the markdown report and print to stdout (don't update files)"
319+
)
320+
parser.add_argument(
321+
"--no-readme-update",
322+
action="store_true",
323+
help="Don't update the README.md file with the coverage report"
324+
)
325+
326+
args = parser.parse_args()
327+
328+
if not args.markdown_only:
329+
print("Loading CodeQL query coverage data...")
330+
331+
# Find project root
332+
project_root = find_project_root()
333+
if not args.markdown_only:
334+
print(f"Project root: {project_root}")
335+
336+
# Load existing coverage file
337+
coverage_data = load_coverage_file(project_root)
338+
if not args.markdown_only:
339+
print("Loaded existing coverage data")
340+
341+
# Run codeql resolve queries
342+
if not args.markdown_only:
343+
print("Running codeql resolve queries...")
344+
query_paths = run_codeql_resolve_queries(project_root)
345+
if not args.markdown_only:
346+
print(f"Found {len(query_paths)} queries")
347+
348+
# Process query paths
349+
processed_queries = process_query_paths(query_paths, project_root)
350+
351+
# Update coverage data
352+
updated_coverage = update_coverage_file(coverage_data, processed_queries)
353+
354+
# Generate markdown coverage report
355+
coverage_markdown = generate_coverage_markdown(updated_coverage)
356+
357+
if args.markdown_only:
358+
# Just print the markdown and exit
359+
print(coverage_markdown)
360+
return
361+
362+
# Save updated coverage file
363+
save_coverage_file(updated_coverage, project_root)
364+
365+
# Update README if not disabled
366+
if not args.no_readme_update:
367+
print("Generating coverage report...")
368+
update_readme_coverage(project_root, coverage_markdown)
369+
370+
# Print summary
371+
metadata = updated_coverage["metadata"]
372+
print(f"\nCoverage Summary:")
373+
print(f" Total queries: {metadata['total_queries']}")
374+
print(f" Covered queries: {metadata['covered_queries']}")
375+
print(f" Coverage percentage: {metadata['coverage_percentage']:.1f}%")
376+
print(f" Categories: {', '.join(metadata['categories'])}")
377+
print(f" CWEs covered: {len(metadata['cwes'])}")
378+
379+
380+
if __name__ == "__main__":
381+
main()

0 commit comments

Comments
 (0)