Skip to content

Commit 79dc762

Browse files
committed
cache: add expiration support and stats/evict methods
ResultCache entries now expire after max_age_seconds (default 7 days). Expired entries are automatically pruned on lookup and can also be bulk-evicted with evict_expired(). New stats() method returns entry count, total bytes, and oldest timestamp for monitoring cache health. Tests cover time-based expiry, eviction, and statistics.
1 parent f77aad2 commit 79dc762

File tree

2 files changed

+124
-2
lines changed

2 files changed

+124
-2
lines changed

src/ai_sec_scan/cache.py

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
CACHE_VERSION = 1
1414
DEFAULT_CACHE_DIR = ".ai-sec-scan-cache"
15+
DEFAULT_MAX_AGE = 7 * 24 * 3600 # 7 days
1516

1617

1718
def file_hash(path: Path) -> str:
@@ -39,12 +40,18 @@ def _cache_key(file_path: str, provider: str, model: str) -> str:
3940
class ResultCache:
4041
"""Disk-backed cache that maps (file, provider, model) to findings.
4142
42-
Cache entries are invalidated when the file content hash changes.
43+
Cache entries are invalidated when the file content hash changes or
44+
when they exceed ``max_age_seconds`` (default 7 days).
4345
"""
4446

45-
def __init__(self, cache_dir: Path | None = None) -> None:
47+
def __init__(
48+
self,
49+
cache_dir: Path | None = None,
50+
max_age_seconds: int | None = None,
51+
) -> None:
4652
self._dir = cache_dir or Path(DEFAULT_CACHE_DIR)
4753
self._dir.mkdir(parents=True, exist_ok=True)
54+
self._max_age = max_age_seconds if max_age_seconds is not None else DEFAULT_MAX_AGE
4855

4956
@property
5057
def cache_dir(self) -> Path:
@@ -76,6 +83,13 @@ def get(
7683
if data.get("content_hash") != content_hash:
7784
return None
7885

86+
# Check expiration
87+
ts = data.get("timestamp")
88+
if self._max_age > 0 and isinstance(ts, (int, float)):
89+
if time.time() - ts > self._max_age:
90+
entry_path.unlink(missing_ok=True)
91+
return None
92+
7993
try:
8094
return [Finding.model_validate(f) for f in data.get("findings", [])]
8195
except Exception:
@@ -111,3 +125,60 @@ def clear(self) -> int:
111125
entry.unlink()
112126
count += 1
113127
return count
128+
129+
def evict_expired(self) -> int:
130+
"""Remove cache entries older than max_age_seconds.
131+
132+
Returns:
133+
Number of entries evicted.
134+
"""
135+
if self._max_age <= 0:
136+
return 0
137+
138+
now = time.time()
139+
evicted = 0
140+
for entry_path in self._dir.glob("*.json"):
141+
try:
142+
data = json.loads(entry_path.read_text(encoding="utf-8"))
143+
except (json.JSONDecodeError, OSError):
144+
entry_path.unlink(missing_ok=True)
145+
evicted += 1
146+
continue
147+
148+
ts = data.get("timestamp")
149+
if isinstance(ts, (int, float)) and now - ts > self._max_age:
150+
entry_path.unlink(missing_ok=True)
151+
evicted += 1
152+
153+
return evicted
154+
155+
def stats(self) -> dict[str, Any]:
156+
"""Return cache statistics.
157+
158+
Returns:
159+
Dict with ``total_entries``, ``total_bytes``, and ``oldest_timestamp``.
160+
"""
161+
total_entries = 0
162+
total_bytes = 0
163+
oldest: float | None = None
164+
165+
for entry_path in self._dir.glob("*.json"):
166+
total_entries += 1
167+
try:
168+
total_bytes += entry_path.stat().st_size
169+
except OSError:
170+
continue
171+
try:
172+
data = json.loads(entry_path.read_text(encoding="utf-8"))
173+
ts = data.get("timestamp")
174+
if isinstance(ts, (int, float)):
175+
if oldest is None or ts < oldest:
176+
oldest = ts
177+
except (json.JSONDecodeError, OSError):
178+
continue
179+
180+
return {
181+
"total_entries": total_entries,
182+
"total_bytes": total_bytes,
183+
"oldest_timestamp": oldest,
184+
}

tests/test_cache.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
from __future__ import annotations
44

5+
import json
6+
import time
57
from pathlib import Path
8+
from unittest.mock import patch
69

710
import pytest
811

@@ -104,3 +107,51 @@ def test_cache_dir_created(self, tmp_path: Path) -> None:
104107
cache_dir = tmp_path / "deep" / "nested" / "cache"
105108
cache = ResultCache(cache_dir=cache_dir)
106109
assert cache_dir.exists()
110+
111+
def test_expired_entry_returns_none(
112+
self, tmp_path: Path, sample_finding: Finding
113+
) -> None:
114+
cache = ResultCache(cache_dir=tmp_path / "cache", max_age_seconds=60)
115+
cache.put("app.py", "hash1", "anthropic", "claude-3", [sample_finding])
116+
117+
# Simulate passage of time by patching time.time
118+
with patch("ai_sec_scan.cache.time") as mock_time:
119+
mock_time.time.return_value = time.time() + 120
120+
result = cache.get("app.py", "hash1", "anthropic", "claude-3")
121+
assert result is None
122+
123+
def test_non_expired_entry_returned(
124+
self, tmp_path: Path, sample_finding: Finding
125+
) -> None:
126+
cache = ResultCache(cache_dir=tmp_path / "cache", max_age_seconds=300)
127+
cache.put("app.py", "hash1", "anthropic", "claude-3", [sample_finding])
128+
result = cache.get("app.py", "hash1", "anthropic", "claude-3")
129+
assert result is not None
130+
assert len(result) == 1
131+
132+
def test_evict_expired(self, tmp_path: Path, sample_finding: Finding) -> None:
133+
cache = ResultCache(cache_dir=tmp_path / "cache", max_age_seconds=60)
134+
cache.put("old.py", "h1", "anthropic", "claude-3", [sample_finding])
135+
136+
# Manually backdate the timestamp
137+
for entry in cache.cache_dir.glob("*.json"):
138+
data = json.loads(entry.read_text())
139+
data["timestamp"] = time.time() - 120
140+
entry.write_text(json.dumps(data))
141+
142+
cache.put("new.py", "h2", "anthropic", "claude-3", [])
143+
evicted = cache.evict_expired()
144+
assert evicted == 1
145+
# The fresh entry should still exist
146+
assert cache.get("new.py", "h2", "anthropic", "claude-3") is not None
147+
148+
def test_stats(self, tmp_path: Path, sample_finding: Finding) -> None:
149+
cache = ResultCache(cache_dir=tmp_path / "cache")
150+
assert cache.stats()["total_entries"] == 0
151+
152+
cache.put("a.py", "h1", "anthropic", "claude-3", [sample_finding])
153+
cache.put("b.py", "h2", "anthropic", "claude-3", [])
154+
stats = cache.stats()
155+
assert stats["total_entries"] == 2
156+
assert stats["total_bytes"] > 0
157+
assert stats["oldest_timestamp"] is not None

0 commit comments

Comments
 (0)