|
6 | 6 | from typing import Any |
7 | 7 |
|
8 | 8 | import httpx |
9 | | -import pandas as pd |
10 | 9 | from skopt import Optimizer |
11 | 10 | from skopt.space import Real |
12 | 11 |
|
|
15 | 14 | from agentic_security.primitives import Scan, ScanResult |
16 | 15 | from agentic_security.probe_actor.cost_module import calculate_cost |
17 | 16 | from agentic_security.probe_actor.refusal import refusal_heuristic |
| 17 | +from agentic_security.probe_actor.state import FuzzerState |
18 | 18 | from agentic_security.probe_data import audio_generator, image_generator, msj_data |
19 | 19 | from agentic_security.probe_data.data import prepare_prompts |
20 | 20 |
|
|
26 | 26 | FAILURE_RATE_THRESHOLD = 0.5 |
27 | 27 |
|
28 | 28 |
|
29 | | -class FuzzerState: |
30 | | - """Container for tracking scan results""" |
31 | | - |
32 | | - def __init__(self): |
33 | | - self.errors = [] |
34 | | - self.refusals = [] |
35 | | - self.outputs = [] |
36 | | - |
37 | | - def add_error( |
38 | | - self, |
39 | | - module_name: str, |
40 | | - prompt: str, |
41 | | - status_code: int | str, |
42 | | - error_msg: str, |
43 | | - ): |
44 | | - """Add an error to the state""" |
45 | | - self.errors.append((module_name, prompt, status_code, error_msg)) |
46 | | - |
47 | | - def add_refusal( |
48 | | - self, module_name: str, prompt: str, status_code: int, response_text: str |
49 | | - ): |
50 | | - """Add a refusal to the state""" |
51 | | - self.refusals.append((module_name, prompt, status_code, response_text)) |
52 | | - |
53 | | - def add_output( |
54 | | - self, module_name: str, prompt: str, response_text: str, refused: bool |
55 | | - ): |
56 | | - """Add an output to the state""" |
57 | | - self.outputs.append((module_name, prompt, response_text, refused)) |
58 | | - |
59 | | - def get_last_output(self, prompt: str) -> str | None: |
60 | | - """Get the last output for a given prompt""" |
61 | | - for output in reversed(self.outputs): |
62 | | - if output[1] == prompt: |
63 | | - return output[2] |
64 | | - return None |
65 | | - |
66 | | - def export_failures(self, filename: str = "failures.csv"): |
67 | | - """Export failures to a CSV file""" |
68 | | - failure_data = self.errors + self.refusals |
69 | | - df = pd.DataFrame( |
70 | | - failure_data, columns=["module", "prompt", "status_code", "content"] |
71 | | - ) |
72 | | - df.to_csv(filename, index=False) |
73 | | - |
74 | | - |
75 | 29 | async def generate_prompts( |
76 | 30 | prompts: list[str] | AsyncGenerator, |
77 | 31 | ) -> AsyncGenerator[str, None]: |
|
0 commit comments