Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions agentic_security/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import agentic_security.test_spec_assets as test_spec_assets
from agentic_security.lib import AgenticSecurity
import asyncio
import json


def has_module(module_name):
Expand Down Expand Up @@ -206,3 +208,116 @@ def test_load_generated_tmp_config(self):
assert (
config["modules"]["AgenticBackend"]["dataset_name"] == "AgenticBackend"
), "Dataset name should be 'AgenticBackend'"

class TestCfgMixinListChecks:
def test_get_config_value_default(self):
"""
Test that get_config_value returns the correct value for existing keys and defaults
for missing keys.
"""
# Set a temporary config manually
AgenticSecurity.config = {
"general": {"maxBudget": 1000000, "nested": {"value": 42}}
}
assert AgenticSecurity.get_config_value("general.maxBudget") == 1000000
assert AgenticSecurity.get_config_value("general.nested.value") == 42
assert AgenticSecurity.get_config_value("general.nonexistent", "default") == "default"
assert AgenticSecurity.get_config_value("general.nested.nonexistent", 0) == 0

def test_load_config_invalid(self, tmp_path):
"""
Test that loading an invalid TOML configuration file raises an exception.
"""
invalid_file = tmp_path / "invalid.toml"
invalid_file.write_text("invalid toml content ::::")
with pytest.raises(Exception):
AgenticSecurity.load_config(str(invalid_file))

def test_has_local_config_true(self, tmp_path):
"""
Test that has_local_config returns True when a configuration file exists.
"""
config_file = tmp_path / "agesec.toml"
config_file.write_text("[general]\nmaxBudget = 1000000")
agent = AgenticSecurity()
agent.default_path = str(config_file)
assert agent.has_local_config() is True

def test_has_local_config_false(self, tmp_path):
"""
Test that has_local_config returns False when a configuration file does not exist.
"""
agent = AgenticSecurity()
agent.default_path = str(tmp_path / "nonexistent.toml")
assert agent.has_local_config() is False

def test_list_checks_output(self, monkeypatch, capsys):
"""
Test that list_checks outputs a table containing registry entries.
"""
# Override the REGISTRY in the lib module with a fake dataset entry.
from agentic_security import lib
fake_registry = [
{
"dataset_name": "TestDS",
"num_prompts": 1,
"tokens": 10,
"source": "unit-test",
"selected": True,
"dynamic": False,
"modality": "text",
}
]
monkeypatch.setattr(lib, "REGISTRY", fake_registry)
agent = AgenticSecurity()
agent.list_checks()
captured = capsys.readouterr().out
assert "TestDS" in captured
class TestAgenticSecurityAsync:
"""Tests for the asynchronous scanning functionality using a mocked streaming response generator."""

def test_async_scan_success(self, monkeypatch):
"""Test async_scan with a mocked successful response (passing result)."""
async def fake_generator(scan_obj):
# Emit a status update that should be ignored
yield json.dumps({"status": True})
# Emit a module update with a failure rate low enough to PASS (20 < 0.3 * 100)
yield json.dumps({"status": False, "module": "mock_module", "failureRate": 20})

monkeypatch.setattr("agentic_security.lib.streaming_response_generator", lambda scan_obj: fake_generator(scan_obj))
result = asyncio.run(AgenticSecurity.async_scan(
llmSpec="fake",
maxBudget=1000,
datasets=[{"dataset_name": "mock_module"}],
max_th=0.3,
))
assert "mock_module" in result
details = result["mock_module"]
assert details["status"] == "PASS", "Expected PASS for failureRate below threshold"

def test_async_scan_fail(self, monkeypatch):
"""Test async_scan with a mocked failing response (failing result)."""
async def fake_generator(scan_obj):
# Emit a module update with a failure rate high enough to FAIL (40 > 0.3 * 100)
yield json.dumps({"status": False, "module": "mock_fail_module", "failureRate": 40})

monkeypatch.setattr("agentic_security.lib.streaming_response_generator", lambda scan_obj: fake_generator(scan_obj))
result = asyncio.run(AgenticSecurity.async_scan(
llmSpec="fake",
maxBudget=1000,
datasets=[{"dataset_name": "mock_fail_module"}],
max_th=0.3,
))
assert "mock_fail_module" in result
details = result["mock_fail_module"]
assert details["status"] == "FAIL", "Expected FAIL for failureRate above threshold"
class TestEntrypointBehavior:
"""Tests for the entrypoint method behavior in AgenticSecurity."""

def test_entrypoint_missing_config(self, monkeypatch):
"""Test that entrypoint exits when no local configuration is found."""
agent = AgenticSecurity()
# Force has_local_config to return False to simulate missing configuration
monkeypatch.setattr(agent, "has_local_config", lambda: False)
with pytest.raises(SystemExit):
agent.entrypoint()
2 changes: 2 additions & 0 deletions codebeaver.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from:python-pytest-poetry
# This file was generated automatically by CodeBeaver based on your repository. Learn how to customize it here: https://docs.codebeaver.ai/configuration/
179 changes: 179 additions & 0 deletions tests/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import asyncio
import pytest
from fastapi import FastAPI
from agentic_security.core.app import create_app, get_tools_inbox, get_stop_event, get_current_run, set_current_run, tools_inbox, stop_event, current_run

# Test create_app returns a FastAPI instance
def test_create_app():
"""Test if create_app returns a FastAPI instance."""
app = create_app()
assert isinstance(app, FastAPI)

# Test get_tools_inbox returns the global queue instance with expected behavior
def test_get_tools_inbox():
"""Test the tools_inbox global Queue: it should initially be empty and support enqueueing and dequeueing."""
queue = get_tools_inbox()
# Initially the queue should be empty
assert queue.empty()
# Put an item and check that the queue is no longer empty
queue.put_nowait("test_item")
assert not queue.empty()
# Remove the item and validate
item = queue.get_nowait()
assert item == "test_item"

# Test get_stop_event returns the global stop event and that it can be set
def test_get_stop_event():
"""Test that the stop event returned is initially not set and can be set correctly."""
event = get_stop_event()
# Initially the event should not be set
assert not event.is_set()
# Set the event and verify it's set
event.set()
assert event.is_set()

# Test get_current_run returns default global run dictionary
def test_get_current_run_default():
"""Test get_current_run returns the default state of the current_run global dictionary."""
# Reset the current_run for consistency
current_run["spec"] = ""
current_run["id"] = ""
run = get_current_run()
assert isinstance(run, dict)
assert run.get("spec") == ""
assert run.get("id") == ""

# Test set_current_run updates the global state correctly
def test_set_current_run():
"""Test that set_current_run correctly updates the current_run global dictionary."""
spec_value = "test_spec"
updated_run = set_current_run(spec_value)
# Ensure that the spec is updated to the given value
assert updated_run["spec"] == spec_value
# Ensure that the id is computed as hash(id(spec_value))
expected_id = hash(id(spec_value))
assert updated_run["id"] == expected_id

# Test that global state persists across function calls
def test_global_state_persistence():
"""Test that updating global state persists across successive calls."""
spec_value = "persistent_spec"
set_current_run(spec_value)
run1 = get_current_run()
assert run1["spec"] == spec_value
spec_value2 = "new_spec"
set_current_run(spec_value2)
run2 = get_current_run()
assert run2["spec"] == spec_value2

# Cleanup fixture: reset global state after each test to avoid state interference
@pytest.fixture(autouse=True)
def reset_globals():
"""Reset global objects (current_run and tools_inbox) after each test."""
yield
current_run["spec"] = ""
current_run["id"] = ""
while not tools_inbox.empty():
tools_inbox.get_nowait()
# Note: asyncio.Event cannot be reset after being set, so we leave stop_event as is.
@pytest.mark.asyncio
async def test_tools_inbox_async():
"""Test async put and get on the global tools_inbox queue."""
queue = get_tools_inbox()
# Put an item asynchronously
await queue.put("async_test_item")
# Get the item asynchronously and validate its content
item = await queue.get()
assert item == "async_test_item"

def test_stop_event_clear():
"""Test that the stop_event can be cleared using its .clear() method."""
event = get_stop_event()
# Set the event and verify it is set
event.set()
assert event.is_set()
# Clear the event and verify that it is no longer set
event.clear()
assert not event.is_set()

def test_global_current_run_object():
"""Test that the global current_run dictionary remains the same object across function calls."""
run_initial = get_current_run()
spec_value = "object_test_spec"
# Call set_current_run and then get_current_run again; the underlying object should be identical
set_current_run(spec_value)
run_updated = get_current_run()
assert run_initial is run_updated
# Validate that the spec was updated accordingly
assert run_updated["spec"] == spec_value
def test_tools_inbox_singleton():
"""Test that get_tools_inbox returns the same global queue instance as the imported variable tools_inbox."""
assert get_tools_inbox() is tools_inbox

def test_stop_event_singleton():
"""Test that get_stop_event returns the same global event instance as the imported variable stop_event."""
assert get_stop_event() is stop_event

def test_get_current_run_mutable():
"""Test that the object returned from get_current_run is mutable by updating its value."""
run = get_current_run()
run["spec"] = "mutable_test_value"
# Calling get_current_run again should reflect the mutation
run_updated = get_current_run()
assert run_updated["spec"] == "mutable_test_value"

def test_set_current_run_with_none():
"""Test that setting current run with None works as expected."""
updated_run = set_current_run(None)
assert updated_run["spec"] is None
# id(None) returns a constant value in a given run, so we verify it's computed correctly
assert updated_run["id"] == hash(id(None))

def test_create_app_routes():
"""Test that the FastAPI app returned by create_app has default API docs URLs."""
app = create_app()
# FastAPI sets openapi_url and docs_url by default, verify these properties exist and have expected values.
assert app.openapi_url == "/openapi.json"
assert app.docs_url == "/docs"
# New tests to increase coverage

def test_set_current_run_with_int():
"""Test set_current_run works with an integer spec value."""
spec_value = 12345
updated_run = set_current_run(spec_value)
assert updated_run["spec"] == spec_value
assert updated_run["id"] == hash(id(spec_value))

def test_set_current_run_with_list():
"""Test set_current_run works with a list spec value."""
spec_value = ["item1", "item2"]
updated_run = set_current_run(spec_value)
assert updated_run["spec"] == spec_value
assert updated_run["id"] == hash(id(spec_value))

@pytest.mark.asyncio
async def test_tools_inbox_async_multiple_items():
"""Test async behavior of tools_inbox by concurrently adding and retrieving multiple items."""
queue = get_tools_inbox()
items_to_put = [f"item_{i}" for i in range(5)]
# Concurrently put items into the queue
await asyncio.gather(*(queue.put(item) for item in items_to_put))
# Now retrieve items sequentially and verify the order
retrieved_items = [await queue.get() for _ in items_to_put]
assert retrieved_items == items_to_put

def test_stop_event_toggle():
"""Test toggling the stop_event by setting and clearing it repeatedly."""
event = get_stop_event()
# Make sure to clear any previous state (if supported)
event.clear()
assert not event.is_set()
# Set the event and verify it's set
event.set()
assert event.is_set()
# Clear the event and verify it's no longer set
event.clear()
assert not event.is_set()
# Set it once more to ensure the toggle operation works repeatedly
event.set()
assert event.is_set()
Loading