diff --git a/agentic_security/test_lib.py b/agentic_security/test_lib.py index e87de6d..660eea7 100644 --- a/agentic_security/test_lib.py +++ b/agentic_security/test_lib.py @@ -9,6 +9,8 @@ import agentic_security.test_spec_assets as test_spec_assets from agentic_security.lib import AgenticSecurity +import asyncio +import json def has_module(module_name): @@ -206,3 +208,116 @@ def test_load_generated_tmp_config(self): assert ( config["modules"]["AgenticBackend"]["dataset_name"] == "AgenticBackend" ), "Dataset name should be 'AgenticBackend'" + +class TestCfgMixinListChecks: + def test_get_config_value_default(self): + """ + Test that get_config_value returns the correct value for existing keys and defaults + for missing keys. + """ + # Set a temporary config manually + AgenticSecurity.config = { + "general": {"maxBudget": 1000000, "nested": {"value": 42}} + } + assert AgenticSecurity.get_config_value("general.maxBudget") == 1000000 + assert AgenticSecurity.get_config_value("general.nested.value") == 42 + assert AgenticSecurity.get_config_value("general.nonexistent", "default") == "default" + assert AgenticSecurity.get_config_value("general.nested.nonexistent", 0) == 0 + + def test_load_config_invalid(self, tmp_path): + """ + Test that loading an invalid TOML configuration file raises an exception. + """ + invalid_file = tmp_path / "invalid.toml" + invalid_file.write_text("invalid toml content ::::") + with pytest.raises(Exception): + AgenticSecurity.load_config(str(invalid_file)) + + def test_has_local_config_true(self, tmp_path): + """ + Test that has_local_config returns True when a configuration file exists. + """ + config_file = tmp_path / "agesec.toml" + config_file.write_text("[general]\nmaxBudget = 1000000") + agent = AgenticSecurity() + agent.default_path = str(config_file) + assert agent.has_local_config() is True + + def test_has_local_config_false(self, tmp_path): + """ + Test that has_local_config returns False when a configuration file does not exist. + """ + agent = AgenticSecurity() + agent.default_path = str(tmp_path / "nonexistent.toml") + assert agent.has_local_config() is False + + def test_list_checks_output(self, monkeypatch, capsys): + """ + Test that list_checks outputs a table containing registry entries. + """ + # Override the REGISTRY in the lib module with a fake dataset entry. + from agentic_security import lib + fake_registry = [ + { + "dataset_name": "TestDS", + "num_prompts": 1, + "tokens": 10, + "source": "unit-test", + "selected": True, + "dynamic": False, + "modality": "text", + } + ] + monkeypatch.setattr(lib, "REGISTRY", fake_registry) + agent = AgenticSecurity() + agent.list_checks() + captured = capsys.readouterr().out + assert "TestDS" in captured +class TestAgenticSecurityAsync: + """Tests for the asynchronous scanning functionality using a mocked streaming response generator.""" + + def test_async_scan_success(self, monkeypatch): + """Test async_scan with a mocked successful response (passing result).""" + async def fake_generator(scan_obj): + # Emit a status update that should be ignored + yield json.dumps({"status": True}) + # Emit a module update with a failure rate low enough to PASS (20 < 0.3 * 100) + yield json.dumps({"status": False, "module": "mock_module", "failureRate": 20}) + + monkeypatch.setattr("agentic_security.lib.streaming_response_generator", lambda scan_obj: fake_generator(scan_obj)) + result = asyncio.run(AgenticSecurity.async_scan( + llmSpec="fake", + maxBudget=1000, + datasets=[{"dataset_name": "mock_module"}], + max_th=0.3, + )) + assert "mock_module" in result + details = result["mock_module"] + assert details["status"] == "PASS", "Expected PASS for failureRate below threshold" + + def test_async_scan_fail(self, monkeypatch): + """Test async_scan with a mocked failing response (failing result).""" + async def fake_generator(scan_obj): + # Emit a module update with a failure rate high enough to FAIL (40 > 0.3 * 100) + yield json.dumps({"status": False, "module": "mock_fail_module", "failureRate": 40}) + + monkeypatch.setattr("agentic_security.lib.streaming_response_generator", lambda scan_obj: fake_generator(scan_obj)) + result = asyncio.run(AgenticSecurity.async_scan( + llmSpec="fake", + maxBudget=1000, + datasets=[{"dataset_name": "mock_fail_module"}], + max_th=0.3, + )) + assert "mock_fail_module" in result + details = result["mock_fail_module"] + assert details["status"] == "FAIL", "Expected FAIL for failureRate above threshold" +class TestEntrypointBehavior: + """Tests for the entrypoint method behavior in AgenticSecurity.""" + + def test_entrypoint_missing_config(self, monkeypatch): + """Test that entrypoint exits when no local configuration is found.""" + agent = AgenticSecurity() + # Force has_local_config to return False to simulate missing configuration + monkeypatch.setattr(agent, "has_local_config", lambda: False) + with pytest.raises(SystemExit): + agent.entrypoint() \ No newline at end of file diff --git a/codebeaver.yml b/codebeaver.yml new file mode 100644 index 0000000..0624c66 --- /dev/null +++ b/codebeaver.yml @@ -0,0 +1,2 @@ +from:python-pytest-poetry +# This file was generated automatically by CodeBeaver based on your repository. Learn how to customize it here: https://docs.codebeaver.ai/configuration/ \ No newline at end of file diff --git a/tests/test_app.py b/tests/test_app.py new file mode 100644 index 0000000..a0d92b7 --- /dev/null +++ b/tests/test_app.py @@ -0,0 +1,179 @@ +import asyncio +import pytest +from fastapi import FastAPI +from agentic_security.core.app import create_app, get_tools_inbox, get_stop_event, get_current_run, set_current_run, tools_inbox, stop_event, current_run + +# Test create_app returns a FastAPI instance +def test_create_app(): + """Test if create_app returns a FastAPI instance.""" + app = create_app() + assert isinstance(app, FastAPI) + +# Test get_tools_inbox returns the global queue instance with expected behavior +def test_get_tools_inbox(): + """Test the tools_inbox global Queue: it should initially be empty and support enqueueing and dequeueing.""" + queue = get_tools_inbox() + # Initially the queue should be empty + assert queue.empty() + # Put an item and check that the queue is no longer empty + queue.put_nowait("test_item") + assert not queue.empty() + # Remove the item and validate + item = queue.get_nowait() + assert item == "test_item" + +# Test get_stop_event returns the global stop event and that it can be set +def test_get_stop_event(): + """Test that the stop event returned is initially not set and can be set correctly.""" + event = get_stop_event() + # Initially the event should not be set + assert not event.is_set() + # Set the event and verify it's set + event.set() + assert event.is_set() + +# Test get_current_run returns default global run dictionary +def test_get_current_run_default(): + """Test get_current_run returns the default state of the current_run global dictionary.""" + # Reset the current_run for consistency + current_run["spec"] = "" + current_run["id"] = "" + run = get_current_run() + assert isinstance(run, dict) + assert run.get("spec") == "" + assert run.get("id") == "" + +# Test set_current_run updates the global state correctly +def test_set_current_run(): + """Test that set_current_run correctly updates the current_run global dictionary.""" + spec_value = "test_spec" + updated_run = set_current_run(spec_value) + # Ensure that the spec is updated to the given value + assert updated_run["spec"] == spec_value + # Ensure that the id is computed as hash(id(spec_value)) + expected_id = hash(id(spec_value)) + assert updated_run["id"] == expected_id + +# Test that global state persists across function calls +def test_global_state_persistence(): + """Test that updating global state persists across successive calls.""" + spec_value = "persistent_spec" + set_current_run(spec_value) + run1 = get_current_run() + assert run1["spec"] == spec_value + spec_value2 = "new_spec" + set_current_run(spec_value2) + run2 = get_current_run() + assert run2["spec"] == spec_value2 + +# Cleanup fixture: reset global state after each test to avoid state interference +@pytest.fixture(autouse=True) +def reset_globals(): + """Reset global objects (current_run and tools_inbox) after each test.""" + yield + current_run["spec"] = "" + current_run["id"] = "" + while not tools_inbox.empty(): + tools_inbox.get_nowait() + # Note: asyncio.Event cannot be reset after being set, so we leave stop_event as is. +@pytest.mark.asyncio +async def test_tools_inbox_async(): + """Test async put and get on the global tools_inbox queue.""" + queue = get_tools_inbox() + # Put an item asynchronously + await queue.put("async_test_item") + # Get the item asynchronously and validate its content + item = await queue.get() + assert item == "async_test_item" + +def test_stop_event_clear(): + """Test that the stop_event can be cleared using its .clear() method.""" + event = get_stop_event() + # Set the event and verify it is set + event.set() + assert event.is_set() + # Clear the event and verify that it is no longer set + event.clear() + assert not event.is_set() + +def test_global_current_run_object(): + """Test that the global current_run dictionary remains the same object across function calls.""" + run_initial = get_current_run() + spec_value = "object_test_spec" + # Call set_current_run and then get_current_run again; the underlying object should be identical + set_current_run(spec_value) + run_updated = get_current_run() + assert run_initial is run_updated + # Validate that the spec was updated accordingly + assert run_updated["spec"] == spec_value +def test_tools_inbox_singleton(): + """Test that get_tools_inbox returns the same global queue instance as the imported variable tools_inbox.""" + assert get_tools_inbox() is tools_inbox + +def test_stop_event_singleton(): + """Test that get_stop_event returns the same global event instance as the imported variable stop_event.""" + assert get_stop_event() is stop_event + +def test_get_current_run_mutable(): + """Test that the object returned from get_current_run is mutable by updating its value.""" + run = get_current_run() + run["spec"] = "mutable_test_value" + # Calling get_current_run again should reflect the mutation + run_updated = get_current_run() + assert run_updated["spec"] == "mutable_test_value" + +def test_set_current_run_with_none(): + """Test that setting current run with None works as expected.""" + updated_run = set_current_run(None) + assert updated_run["spec"] is None + # id(None) returns a constant value in a given run, so we verify it's computed correctly + assert updated_run["id"] == hash(id(None)) + +def test_create_app_routes(): + """Test that the FastAPI app returned by create_app has default API docs URLs.""" + app = create_app() + # FastAPI sets openapi_url and docs_url by default, verify these properties exist and have expected values. + assert app.openapi_url == "/openapi.json" + assert app.docs_url == "/docs" +# New tests to increase coverage + +def test_set_current_run_with_int(): + """Test set_current_run works with an integer spec value.""" + spec_value = 12345 + updated_run = set_current_run(spec_value) + assert updated_run["spec"] == spec_value + assert updated_run["id"] == hash(id(spec_value)) + +def test_set_current_run_with_list(): + """Test set_current_run works with a list spec value.""" + spec_value = ["item1", "item2"] + updated_run = set_current_run(spec_value) + assert updated_run["spec"] == spec_value + assert updated_run["id"] == hash(id(spec_value)) + +@pytest.mark.asyncio +async def test_tools_inbox_async_multiple_items(): + """Test async behavior of tools_inbox by concurrently adding and retrieving multiple items.""" + queue = get_tools_inbox() + items_to_put = [f"item_{i}" for i in range(5)] + # Concurrently put items into the queue + await asyncio.gather(*(queue.put(item) for item in items_to_put)) + # Now retrieve items sequentially and verify the order + retrieved_items = [await queue.get() for _ in items_to_put] + assert retrieved_items == items_to_put + +def test_stop_event_toggle(): + """Test toggling the stop_event by setting and clearing it repeatedly.""" + event = get_stop_event() + # Make sure to clear any previous state (if supported) + event.clear() + assert not event.is_set() + # Set the event and verify it's set + event.set() + assert event.is_set() + # Clear the event and verify it's no longer set + event.clear() + assert not event.is_set() + # Set it once more to ensure the toggle operation works repeatedly + event.set() + assert event.is_set() \ No newline at end of file diff --git a/tests/test_http_spec.py b/tests/test_http_spec.py new file mode 100644 index 0000000..a0dbe05 --- /dev/null +++ b/tests/test_http_spec.py @@ -0,0 +1,387 @@ +import asyncio +import base64 +import httpx +import pytest + +from agentic_security.http_spec import ( + LLMSpec, + parse_http_spec, + encode_image_base64_by_url, + encode_audio_base64_by_url, + escape_special_chars_for_json, + InvalidHTTPSpecError, +) + +class DummyResponse: + """A dummy HTTP response for testing.""" + def __init__(self, status_code=200, content=b"dummy", headers=None): + self.status_code = status_code + self.content = content + self.headers = headers or {} + + def text(self): + return self.content.decode("utf-8") + +class DummyAsyncClient: + """A dummy async client to simulate HTTP requests.""" + async def request(self, method, url, headers=None, content=None, files=None, timeout=None): + return DummyResponse(status_code=200, content=b"ok") + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + +class DummyAsyncClientContext: + """A dummy async client context manager to replace httpx.AsyncClient.""" + def __init__(self, *args, **kwargs): + self.client = DummyAsyncClient() + + async def __aenter__(self): + return self.client + + async def __aexit__(self, exc_type, exc, tb): + pass + +@pytest.fixture(autouse=True) +def patch_async_client(monkeypatch): + """Patch httpx.AsyncClient to use DummyAsyncClientContext for every async call.""" + monkeypatch.setattr(httpx, "AsyncClient", lambda *args, **kwargs: DummyAsyncClientContext()) + +@pytest.fixture(autouse=True) +def patch_httpx_get(monkeypatch): + """Patch httpx.get for encode_image_base64_by_url and encode_audio_base64_by_url.""" + def dummy_get(url, *args, **kwargs): + # Return dummy content based on url type. + if "audio" in url: + return type("DummyResponse", (), {"content": b"audio"})() + else: + return type("DummyResponse", (), {"content": b"image"})() + monkeypatch.setattr(httpx, "get", dummy_get) + +def test_parse_http_spec_image_audio_and_files(): + """Test parsing an HTTP spec with placeholders for image, audio and files.""" + http_spec = ( + "POST http://example.com\n" + "Content-Type: multipart/form-data\n" + "\n" + "This is a body with <> and <>." + ) + spec = parse_http_spec(http_spec) + assert spec.method == "POST" + assert spec.url == "http://example.com" + assert spec.headers == {"Content-Type": "multipart/form-data"} + assert spec.body == "This is a body with <> and <>." + assert spec.has_files is True + assert spec.has_image is True + assert spec.has_audio is True + +def test_escape_special_chars_for_json(): + """Test the escaping of special characters in a prompt.""" + prompt = 'This is a "test" prompt with special chars: \\ \n \r \t' + escaped = escape_special_chars_for_json(prompt) + expected = 'This is a \\"test\\" prompt with special chars: \\\\ \\n \\r \\t' + assert escaped == expected + +def test_validate_errors(): + """Test that the validate method raises ValueError when required parameters are missing.""" + spec = LLMSpec( + method="GET", + url="http://example.com", + headers={}, + body="<> <>", + has_files=True, + has_image=True, + has_audio=True, + ) + # Test missing files. + with pytest.raises(ValueError, match="Files are required for this request."): + spec.validate("prompt", encoded_image="image", encoded_audio="audio", files={}) + + # Test missing image. + spec.has_files = False + with pytest.raises(ValueError, match="An image is required for this request."): + spec.validate("prompt", encoded_image="", encoded_audio="audio", files={}) + + # Test missing audio. + spec.has_image = False + spec.has_audio = True + with pytest.raises(ValueError, match="Audio is required for this request."): + spec.validate("prompt", encoded_image="image", encoded_audio="", files={}) + +@pytest.mark.asyncio +async def test_probe_text_mode(): + """Test the probe method in text mode (without image, audio, or files).""" + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "application/json"}, + body='{"prompt": "<>"}', + ) + response = await spec.probe("hello") + assert response.status_code == 200 + # Ensure that the prompt placeholder was used (the dummy client always returns "ok"). + +@pytest.mark.asyncio +async def test_probe_with_files(): + """Test the probe method when files are provided.""" + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "multipart/form-data"}, + body="test", + has_files=True, + ) + # Provide dummy files dictionary. + response = await spec.probe("prompt", files={"file": ("dummy.txt", b"dummy")}) + assert response.status_code == 200 + +@pytest.mark.asyncio +async def test_verify_with_image(): + """Test the verify method when an image is required in the spec.""" + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "text/plain"}, + body="contains <>", + has_image=True, + ) + response = await spec.verify() + assert response.status_code == 200 + +@pytest.mark.asyncio +async def test_verify_with_audio(): + """Test the verify method when audio is required in the spec.""" + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "text/plain"}, + body="contains <>", + has_audio=True, + ) + response = await spec.verify() + assert response.status_code == 200 + +@pytest.mark.asyncio +async def test_verify_with_files(): + """Test the verify method when files are required.""" + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "multipart/form-data"}, + body="test", + has_files=True, + ) + response = await spec.verify() + assert response.status_code == 200 + +@pytest.mark.asyncio +async def test_from_string_invalid(): + """Test that LLMSpec.from_string can parse a minimal valid spec with only a method and URL.""" + minimal_spec = "INVALID SPEC" + spec = LLMSpec.from_string(minimal_spec) + assert spec.method == "INVALID" + assert spec.url == "SPEC" + assert spec.headers == {} + assert spec.body == "" + assert spec.has_files is False + assert spec.has_image is False + assert spec.has_audio is False +@pytest.mark.asyncio +async def test_encode_image_base64_by_url(): + """Test that encode_image_base64_by_url returns properly encoded image data.""" + result = encode_image_base64_by_url("http://dummy-image-url.com") + expected_prefix = "data:image/jpeg;base64," + expected_data = base64.b64encode(b"image").decode("utf-8") + assert result == expected_prefix + expected_data + +@pytest.mark.asyncio +async def test_encode_audio_base64_by_url(): + """Test that encode_audio_base64_by_url returns properly encoded audio data.""" + result = encode_audio_base64_by_url("http://dummy-audio-url.com/audio.mp3") + expected_prefix = "data:audio/mpeg;base64," + expected_data = base64.b64encode(b"audio").decode("utf-8") + assert result == expected_prefix + expected_data + +def test_from_string_empty(): + """Test that LLMSpec.from_string raises an error with an empty specification.""" + with pytest.raises(Exception) as excinfo: + LLMSpec.from_string("") + assert "Failed to parse HTTP spec" in str(excinfo.value) + +def test_modality_property(): + """Test the modality property for text, image, and audio flags.""" + # When no modality is required, default to TEXT. + spec_text = LLMSpec(method="GET", url="http://example.com", headers={}, body="Test") + assert spec_text.modality.name == "TEXT" + + # When an image is required. + spec_image = LLMSpec(method="GET", url="http://example.com", headers={}, body="<>", has_image=True) + assert spec_image.modality.name == "IMAGE" + + # When only audio is required. + spec_audio = LLMSpec(method="GET", url="http://example.com", headers={}, body="<>", has_audio=True) + assert spec_audio.modality.name == "AUDIO" + + # When both image and audio flags are true, image takes precedence. + spec_both = LLMSpec(method="GET", url="http://example.com", headers={}, body="<> <>", has_image=True, has_audio=True) + assert spec_both.modality.name == "IMAGE" + +def test_escape_special_chars_for_empty_string(): + """Test that escaping an empty string returns an empty string.""" +@pytest.mark.asyncio +async def test_escape_only_backslashes(): + """Test that escape_special_chars_for_json correctly escapes a string of only backslashes.""" + input_str = r"\\" + # The function first escapes every "\" into "\\". + # For an input of two backslashes, the result should have four. + expected = r"\\\\" + result = escape_special_chars_for_json(input_str) + assert result == expected + assert escape_special_chars_for_json("") == "" + +@pytest.mark.asyncio +async def test_probe_placeholder_replacement(monkeypatch): + """Test that the probe method correctly replaces all placeholders in the request body.""" + # Create a capturing async client to intercept the request parameters. + captured = {} + + class CaptureAsyncClient: + async def request(self, method, url, headers=None, content=None, files=None, timeout=None): + captured['method'] = method + captured['url'] = url + captured['headers'] = headers + captured['content'] = content + captured['files'] = files + captured['timeout'] = timeout + return DummyResponse(status_code=200, content=b"ok") + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + # Override httpx.AsyncClient with our capture client. + monkeypatch.setattr(httpx, "AsyncClient", lambda *args, **kwargs: CaptureAsyncClient()) + + body_template = "Prompt: <>, Image: <>, Audio: <>" + spec = LLMSpec(method="POST", url="http://example.com", headers={"Content-Type": "application/json"}, body=body_template) + prompt = 'Hello "Test"\nNewLine' + encoded_image = "img_data" + encoded_audio = "audio_data" + await spec.probe(prompt, encoded_image=encoded_image, encoded_audio=encoded_audio) + # Verify that the placeholders have been replaced correctly. + escaped_prompt = escape_special_chars_for_json(prompt) + expected_content = body_template.replace("<>", escaped_prompt).replace("<>", encoded_image).replace("<>", encoded_audio) + assert captured.get('content') == expected_content + assert captured.get('method') == spec.method +@pytest.mark.asyncio +async def test_parse_http_spec_multiline_body(): + """Test parsing an HTTP spec with a multi-line body. + This verifies that parse_http_spec concatenates the body lines without inserting newlines. + """ + http_spec = ( + "PUT http://example.com/resource\n" + "Content-Type: application/json\n" + "\n" + "{\n" + " \"message\": \"Hello\",\n" + " \"status\": \"ok\"\n" + "}" + ) + spec = parse_http_spec(http_spec) + # Since the parser concatenates lines after the header, the expected body will be: + expected_body = "{ \"message\": \"Hello\", \"status\": \"ok\"}" + assert spec.method == "PUT" + assert spec.url == "http://example.com/resource" + assert spec.headers == {"Content-Type": "application/json"} + assert spec.body == expected_body + +def test_parse_http_spec_header_with_colon(): + """Test parsing an HTTP spec with headers that include a colon in the header value.""" + http_spec = "GET http://example.com\nAuthorization: Bearer token:extra\n\nBody" + spec = parse_http_spec(http_spec) + assert spec.method == "GET" + assert spec.url == "http://example.com" + assert spec.headers == {"Authorization": "Bearer token:extra"} + assert spec.body == "Body" + +@pytest.mark.asyncio +async def test_fn_alias(monkeypatch): + """Test that the 'fn' attribute is an alias of the probe method and works correctly.""" + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "application/json"}, + body='{"prompt": "<>"}' + ) + captured = {} + + class CaptureClient: + async def request(self, method, url, headers=None, content=None, files=None, timeout=None): + captured['content'] = content + return DummyResponse(status_code=200, content=b"ok") + async def __aenter__(self): + return self + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + monkeypatch.setattr(httpx, "AsyncClient", lambda *args, **kwargs: CaptureClient()) + prompt = "alias test" + response = await spec.fn(prompt) + assert response.status_code == 200 + expected_prompt = escape_special_chars_for_json(prompt) + expected_content = spec.body.replace("<>", expected_prompt) + assert captured.get("content") == expected_content + +@pytest.mark.asyncio +async def test_probe_without_placeholder(monkeypatch): + """Test that the probe method leaves the body intact when there are no placeholders. + In this case even when a prompt is provided, since there is no <> in the body, + the probe function should send the static content unchanged. + """ + body = "Static content with no placeholders." + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "text/plain"}, + body=body + ) + captured = {} + + class CaptureStaticClient: + async def request(self, method, url, headers=None, content=None, files=None, timeout=None): + captured['content'] = content + return DummyResponse(status_code=200, content=b"ok") + async def __aenter__(self): + return self + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + monkeypatch.setattr(httpx, "AsyncClient", lambda *args, **kwargs: CaptureStaticClient()) + response = await spec.probe("ignored_prompt") + assert captured.get("content") == body +@pytest.mark.asyncio +async def test_probe_exception_propagation(monkeypatch): + """Test that the probe method propagates exceptions raised during the HTTP request.""" + class FailingAsyncClient: + async def request(self, method, url, headers=None, content=None, files=None, timeout=None): + raise httpx.RequestError("Simulated request failure") + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + pass + + monkeypatch.setattr(httpx, "AsyncClient", lambda *args, **kwargs: FailingAsyncClient()) + spec = LLMSpec( + method="POST", + url="http://example.com", + headers={"Content-Type": "application/json"}, + body='{"prompt": "<>"}' + ) + with pytest.raises(httpx.RequestError, match="Simulated request failure"): + await spec.probe("trigger exception") \ No newline at end of file