Skip to content

Commit 268f9ec

Browse files
authored
Merge pull request #275 from msoedov/poc-concurrency-reporting-unified
Poc concurrency reporting, general improvements
2 parents 9a4fb05 + 5238d67 commit 268f9ec

32 files changed

+1968
-73
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ agentic_security.toml
2020
/venv
2121
*.csv
2222
agentic_security/agents/operator_agno.py
23+
.claude/
24+
plan.md
25+
auto_loop.sh

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ repos:
2020
- id: flake8
2121
language_version: python3.11
2222
additional_dependencies: [flake8-docstrings]
23+
exclude: '^(tests)/'
2324

2425
# - repo: https://github.com/PyCQA/isort
2526
# rev: 7.0.0

agentic_security/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1-
from .lib import SecurityScanner
1+
from agentic_security.cache_config import ensure_cache_dir
22

3-
__all__ = ["SecurityScanner"]
3+
ensure_cache_dir()
4+
5+
from .lib import SecurityScanner # noqa: E402
6+
7+
__all__ = ["SecurityScanner", "ensure_cache_dir"]

agentic_security/cache_config.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Utilities to keep cache-to-disk storage in a writable, predictable location."""
2+
3+
from __future__ import annotations
4+
5+
import os
6+
from pathlib import Path
7+
8+
9+
def ensure_cache_dir(base_dir: Path | None = None) -> Path:
10+
"""Ensure ``DISK_CACHE_DIR`` points to a writable directory and create it if needed."""
11+
env_var = "DISK_CACHE_DIR"
12+
configured_path = os.environ.get(env_var) or os.environ.get(
13+
"AGENTIC_SECURITY_CACHE_DIR"
14+
)
15+
cache_dir = Path(
16+
configured_path or base_dir or Path.cwd() / ".cache" / "agentic_security"
17+
).expanduser()
18+
cache_dir.mkdir(parents=True, exist_ok=True)
19+
os.environ[env_var] = str(cache_dir)
20+
return cache_dir
21+
22+
23+
__all__ = ["ensure_cache_dir"]

agentic_security/core/app.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
import os
22
from asyncio import Event, Queue
3+
from typing import TypedDict
34

45
from fastapi import FastAPI
56
from fastapi.responses import ORJSONResponse
67

78
from agentic_security.http_spec import LLMSpec
89

10+
11+
class CurrentRun(TypedDict):
12+
id: int | None
13+
spec: LLMSpec | None
14+
15+
916
tools_inbox: Queue = Queue()
1017
stop_event: Event = Event()
11-
current_run: str = {"spec": "", "id": ""}
18+
current_run: CurrentRun = {"spec": None, "id": None}
1219
_secrets: dict[str, str] = {}
1320

14-
current_run: dict[str, int | LLMSpec] = {"spec": "", "id": ""}
15-
1621

1722
def create_app() -> FastAPI:
1823
"""Create and configure the FastAPI application."""
@@ -30,13 +35,13 @@ def get_stop_event() -> Event:
3035
return stop_event
3136

3237

33-
def get_current_run() -> dict[str, int | LLMSpec]:
38+
def get_current_run() -> CurrentRun:
3439
"""Get the current run id."""
3540
return current_run
3641

3742

38-
def set_current_run(spec: LLMSpec) -> dict[str, int | LLMSpec]:
39-
"""Set the current run id."""
43+
def set_current_run(spec: LLMSpec) -> CurrentRun:
44+
"""Set the current run metadata based on a spec instance."""
4045
current_run["id"] = hash(id(spec))
4146
current_run["spec"] = spec
4247
return current_run
@@ -56,4 +61,8 @@ def expand_secrets(secrets: dict[str, str]) -> None:
5661
for key in secrets:
5762
val = secrets[key]
5863
if val.startswith("$"):
59-
secrets[key] = os.getenv(val.strip("$"))
64+
env_value = os.getenv(val.strip("$"))
65+
if env_value is not None:
66+
secrets[key] = env_value
67+
else:
68+
secrets[key] = None
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
"""Advanced concurrent execution package for security scanning."""
2+
3+
from agentic_security.executor.rate_limiter import TokenBucketRateLimiter
4+
from agentic_security.executor.circuit_breaker import CircuitBreaker
5+
from agentic_security.executor.concurrent import ConcurrentExecutor, ExecutorMetrics
6+
7+
__all__ = [
8+
"TokenBucketRateLimiter",
9+
"CircuitBreaker",
10+
"ConcurrentExecutor",
11+
"ExecutorMetrics",
12+
]
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
"""Circuit breaker pattern for fault tolerance."""
2+
3+
import time
4+
from typing import Literal
5+
6+
7+
CircuitState = Literal["closed", "open", "half_open"]
8+
9+
10+
class CircuitBreaker:
11+
"""Circuit breaker to prevent cascading failures.
12+
13+
Implements the circuit breaker pattern with three states:
14+
- closed: Normal operation, requests pass through
15+
- open: Failure threshold exceeded, requests fail fast
16+
- half_open: Recovery attempt, limited requests allowed
17+
18+
Example:
19+
>>> breaker = CircuitBreaker(failure_threshold=0.5, recovery_timeout=30)
20+
>>> if breaker.is_open():
21+
... raise Exception("Circuit breaker is open")
22+
>>> try:
23+
... result = make_request()
24+
... breaker.record_success()
25+
>>> except Exception:
26+
... breaker.record_failure()
27+
"""
28+
29+
def __init__(self, failure_threshold: float = 0.5, recovery_timeout: int = 30):
30+
"""Initialize circuit breaker.
31+
32+
Args:
33+
failure_threshold: Failure rate (0.0-1.0) that triggers open state
34+
recovery_timeout: Seconds to wait before attempting recovery
35+
"""
36+
self.failure_threshold = failure_threshold
37+
self.recovery_timeout = recovery_timeout
38+
self.failures = 0
39+
self.successes = 0
40+
self.state: CircuitState = "closed"
41+
self.last_failure_time: float | None = None
42+
43+
def record_success(self):
44+
"""Record a successful request."""
45+
self.successes += 1
46+
47+
# If in half_open state and we have enough successes, close the circuit
48+
if self.state == "half_open" and self.successes >= 3:
49+
self.state = "closed"
50+
self.failures = 0
51+
self.successes = 0
52+
53+
def record_failure(self):
54+
"""Record a failed request."""
55+
self.failures += 1
56+
self.last_failure_time = time.monotonic()
57+
58+
total = self.failures + self.successes
59+
60+
# Need minimum sample size before opening circuit
61+
if total >= 10:
62+
failure_rate = self.failures / total
63+
if failure_rate >= self.failure_threshold:
64+
self.state = "open"
65+
66+
def is_open(self) -> bool:
67+
"""Check if circuit breaker is open.
68+
69+
Returns:
70+
bool: True if circuit is open and requests should be blocked
71+
"""
72+
if self.state == "open":
73+
# Check if we should attempt recovery
74+
if self.last_failure_time is not None:
75+
if time.monotonic() - self.last_failure_time > self.recovery_timeout:
76+
self.state = "half_open"
77+
# Reset counters for half-open state
78+
self.failures = 0
79+
self.successes = 0
80+
return False
81+
return True
82+
83+
return False
84+
85+
def get_state(self) -> CircuitState:
86+
"""Get current circuit breaker state.
87+
88+
Returns:
89+
CircuitState: Current state (closed, open, or half_open)
90+
"""
91+
return self.state
92+
93+
def get_failure_rate(self) -> float:
94+
"""Get current failure rate.
95+
96+
Returns:
97+
float: Failure rate (0.0-1.0), or 0.0 if no requests recorded
98+
"""
99+
total = self.failures + self.successes
100+
if total == 0:
101+
return 0.0
102+
return self.failures / total
103+
104+
def reset(self):
105+
"""Reset circuit breaker to initial state."""
106+
self.failures = 0
107+
self.successes = 0
108+
self.state = "closed"
109+
self.last_failure_time = None

0 commit comments

Comments
 (0)