diff --git a/.gitignore b/.gitignore index a4ca88d..fd00b5c 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ venv/ # Coverage / test .pytest_cache/ +.cache/ .coverage htmlcov/ .hypothesis/ diff --git a/out/assertions.jsonl b/out/assertions.jsonl deleted file mode 100644 index 761fc41..0000000 --- a/out/assertions.jsonl +++ /dev/null @@ -1 +0,0 @@ -{"type": "validation_result", "timestamp": "2025-11-07T17:24:22.870943", "valid": true, "files_validated": 0, "errors": 0, "warnings": 0} diff --git a/out/assertions.xml b/out/assertions.xml deleted file mode 100644 index 838ab98..0000000 --- a/out/assertions.xml +++ /dev/null @@ -1,23 +0,0 @@ - - - -----BEGIN PUBLIC KEY----- -MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA77jI1n8ydDgUVXLVq5L2 -vcZnKSYwm4l461lwh9bB7Gc50whE7p6jqxN8FYbCWLRLnnQg09yZAerSPbYGsZSy -RdFy7HaGaKYEl1iFdINwHMZnfHxh+Iw5peObHe9eowtw1n8m/cLz9n5+1uxUq5Em -AJASalCyOwwWJXkb6wYQU781aoiw6/GR1Rxc2E6/4ZhCIE01wBcjPxl1Ac+ohKVY -8wEig6Qq7CWFN8X+Ju1G/7t31NYSXke9bbA9bBy5yOAxCVS/V9C4CxeJYkH3kUu8 -617wtoZB0HcGWPKX1ri1nG9Pq/xC3TRKzKjHlw47Jm3fO7ZeJtRst9k8CRrX9W95 -8wIDAQAB ------END PUBLIC KEY----- - - - - - 0 - 0 - 0 - - - - 7b8728707d8cc9f5a04ef5b1c99a9f6a1f603b86515329e50b103682ed5ee8379b2ded22481ea8f48da0eb6df28f08acc777ef4cffb3ee4790af3b0e583f5c8c174f1c0e3068cad502116ad2205831ace2e6eb5830d941746ed11ce812547df190ffcc2f653681ce642849ef2b521d9766f1b6f05fee4c403ed648e793a46604a795bdf9ec0db29ab984ef85113ae157c5cebd86ff8e05aef77d4f2651a69bcfee5914428915a4aa81731caf67266a5ee64c9592435d98a87051713eaf8b192bf6188a10a39a32ebcc073f7ab83efa28b35ab74039ba3599e2455ff53b4ed22bd3952d24d1aa2fe14a70b5ef6eae81c73bf2f3c1c46a6f2b5c46ad0227f5add0 - diff --git a/pyproject.toml b/pyproject.toml index 1c00d75..d7fd04d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ packages = [{include = "xml_lib"}] [tool.poetry.dependencies] python = "^3.11" -typer = {extras = ["all"], version = "^0.9.0"} +typer = {extras = ["all"], version = ">=0.9.0"} rich = "^13.7.0" lxml = "^5.0.0" xmlschema = "^2.5.0" diff --git a/tests/test_assertions.py b/tests/test_assertions.py new file mode 100644 index 0000000..aabee65 --- /dev/null +++ b/tests/test_assertions.py @@ -0,0 +1,555 @@ +"""Tests for assertion ledger with signing and JSON Lines output.""" + +import json +import tempfile +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from xml_lib.assertions import AssertionLedger, _get_crypto +from xml_lib.types import ValidationError, ValidationResult + + +class TestAssertionLedger: + """Tests for AssertionLedger class.""" + + def test_create_empty_ledger(self): + """Test creating an empty assertion ledger.""" + ledger = AssertionLedger() + assert ledger.assertions == [] + assert ledger.private_key is not None or ledger.private_key is None # depends on crypto + + def test_add_validation_result(self): + """Test adding validation results to ledger.""" + ledger = AssertionLedger() + + result = ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=["test.xml"], + checksums={"test.xml": "abc123"}, + timestamp=datetime.now(), + ) + + ledger.add_validation_result(result) + + assert len(ledger.assertions) == 1 + assert ledger.assertions[0].is_valid is True + + def test_add_multiple_results(self): + """Test adding multiple validation results.""" + ledger = AssertionLedger() + + for i in range(3): + result = ValidationResult( + is_valid=i % 2 == 0, + errors=[], + warnings=[], + validated_files=[f"file{i}.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + assert len(ledger.assertions) == 3 + + def test_write_xml_creates_file(self): + """Test that write_xml creates output file.""" + ledger = AssertionLedger() + + result = ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=["test.xml"], + checksums={"test.xml": "abc123def456"}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.xml" + ledger.write_xml(output_path) + + assert output_path.exists() + content = output_path.read_text() + assert "assertion-ledger" in content + assert "test.xml" in content + + def test_write_xml_with_errors(self): + """Test writing XML with validation errors.""" + ledger = AssertionLedger() + + error = ValidationError( + file="test.xml", + line=10, + column=5, + message="Invalid element", + type="error", + rule="GR1", + ) + + result = ValidationResult( + is_valid=False, + errors=[error], + warnings=[], + validated_files=["test.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + assert "Invalid element" in content + assert 'line="10"' in content + assert 'column="5"' in content + assert 'rule="GR1"' in content + + def test_write_xml_with_warnings(self): + """Test writing XML with validation warnings.""" + ledger = AssertionLedger() + + warning = ValidationError( + file="test.xml", + line=20, + column=None, + message="Deprecated element", + type="warning", + rule=None, + ) + + result = ValidationResult( + is_valid=True, + errors=[], + warnings=[warning], + validated_files=["test.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + assert "Deprecated element" in content + + def test_write_xml_creates_parent_directories(self): + """Test that write_xml creates parent directories.""" + ledger = AssertionLedger() + ledger.add_validation_result( + ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=[], + checksums={}, + timestamp=datetime.now(), + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "nested" / "dir" / "ledger.xml" + ledger.write_xml(output_path) + + assert output_path.exists() + + def test_write_jsonl_creates_file(self): + """Test that write_jsonl creates output file.""" + ledger = AssertionLedger() + + result = ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=["test.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.jsonl" + ledger.write_jsonl(output_path) + + assert output_path.exists() + + def test_write_jsonl_format(self): + """Test JSONL output format.""" + ledger = AssertionLedger() + + error = ValidationError( + file="error.xml", + line=5, + column=10, + message="Schema error", + type="error", + rule="SCHEMA1", + ) + + result = ValidationResult( + is_valid=False, + errors=[error], + warnings=[], + validated_files=["error.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.jsonl" + ledger.write_jsonl(output_path) + + lines = output_path.read_text().strip().split("\n") + assert len(lines) == 2 # summary + error + + # Check summary line + summary = json.loads(lines[0]) + assert summary["type"] == "validation_result" + assert summary["valid"] is False + assert summary["files_validated"] == 1 + assert summary["errors"] == 1 + assert summary["warnings"] == 0 + + # Check error line + error_line = json.loads(lines[1]) + assert error_line["type"] == "error" + assert error_line["file"] == "error.xml" + assert error_line["line"] == 5 + assert error_line["column"] == 10 + assert error_line["message"] == "Schema error" + assert error_line["rule"] == "SCHEMA1" + + def test_write_jsonl_with_warnings(self): + """Test JSONL output includes warnings.""" + ledger = AssertionLedger() + + warning = ValidationError( + file="warn.xml", + line=15, + column=None, + message="Style warning", + type="warning", + rule=None, + ) + + result = ValidationResult( + is_valid=True, + errors=[], + warnings=[warning], + validated_files=["warn.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.jsonl" + ledger.write_jsonl(output_path) + + lines = output_path.read_text().strip().split("\n") + assert len(lines) == 2 # summary + warning + + warning_line = json.loads(lines[1]) + assert warning_line["type"] == "warning" + assert warning_line["message"] == "Style warning" + + def test_write_jsonl_multiple_results(self): + """Test JSONL with multiple validation results.""" + ledger = AssertionLedger() + + for i in range(2): + result = ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=[f"file{i}.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.jsonl" + ledger.write_jsonl(output_path) + + lines = output_path.read_text().strip().split("\n") + assert len(lines) == 2 # 2 summary lines + + def test_write_jsonl_creates_parent_directories(self): + """Test that write_jsonl creates parent directories.""" + ledger = AssertionLedger() + ledger.add_validation_result( + ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=[], + checksums={}, + timestamp=datetime.now(), + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "nested" / "dir" / "ledger.jsonl" + ledger.write_jsonl(output_path) + + assert output_path.exists() + + +class TestCryptoOperations: + """Tests for cryptographic operations in assertions.""" + + def test_get_crypto_returns_modules_or_empty(self): + """Test that _get_crypto returns either crypto modules or empty dict.""" + try: + result = _get_crypto() + assert isinstance(result, dict) + except Exception: + # If crypto fails to load, it's handled gracefully + pass + + def test_ledger_key_generation(self): + """Test that ledger generates key pair.""" + ledger = AssertionLedger() + # Either crypto is available and we have keys, or it's not + if ledger.private_key is not None: + assert ledger.public_key is not None + else: + assert ledger.public_key is None + + def test_sign_data_with_crypto(self): + """Test signing data when crypto is available.""" + ledger = AssertionLedger() + + # Skip if no crypto + if ledger.private_key is None: + pytest.skip("Cryptography package not available") + + test_data = b"test data to sign" + signature = ledger._sign_data(test_data) + + assert signature is not None + assert isinstance(signature, bytes) + assert len(signature) > 0 + + def test_xml_includes_signature(self): + """Test that XML output includes signature when crypto is available.""" + ledger = AssertionLedger() + + # Skip if no crypto + if ledger.private_key is None: + pytest.skip("Cryptography package not available") + + ledger.add_validation_result( + ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=["test.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "signed.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + assert "signature" in content + assert "RSA-PSS-SHA256" in content + assert "public-key" in content + + def test_xml_includes_public_key(self): + """Test that XML includes public key for verification.""" + ledger = AssertionLedger() + + # Skip if no crypto + if ledger.private_key is None: + pytest.skip("Cryptography package not available") + + ledger.add_validation_result( + ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=[], + checksums={}, + timestamp=datetime.now(), + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "signed.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + assert "BEGIN PUBLIC KEY" in content + assert "END PUBLIC KEY" in content + + def test_xml_without_crypto_sets_signed_false(self): + """Test that XML indicates unsigned when crypto unavailable.""" + with patch("xml_lib.assertions._get_crypto", return_value={}): + ledger = AssertionLedger() + ledger.add_validation_result( + ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=[], + checksums={}, + timestamp=datetime.now(), + ) + ) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "unsigned.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + assert 'signed="false"' in content + + def test_sign_data_without_private_key_returns_none(self): + """Test that signing returns None when no private key.""" + ledger = AssertionLedger() + ledger.private_key = None # Simulate no private key + + result = ledger._sign_data(b"test") + assert result is None + + +class TestValidationErrorHandling: + """Tests for validation error edge cases.""" + + def test_error_without_line_number(self): + """Test error without line number.""" + ledger = AssertionLedger() + + error = ValidationError( + file="test.xml", + line=None, + column=None, + message="General error", + type="error", + rule=None, + ) + + result = ValidationResult( + is_valid=False, + errors=[error], + warnings=[], + validated_files=["test.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + assert "General error" in content + # Should not have line attribute when None + assert content.count('line="') == 0 + + def test_error_without_rule(self): + """Test error without rule attribute.""" + ledger = AssertionLedger() + + error = ValidationError( + file="test.xml", + line=10, + column=5, + message="No rule error", + type="error", + rule=None, + ) + + result = ValidationResult( + is_valid=False, + errors=[error], + warnings=[], + validated_files=["test.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + # Should not have rule attribute when None + assert "No rule error" in content + + def test_empty_validation_result(self): + """Test handling empty validation result.""" + ledger = AssertionLedger() + + result = ValidationResult( + is_valid=True, + errors=[], + warnings=[], + validated_files=[], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "ledger.xml" + ledger.write_xml(output_path) + + content = output_path.read_text() + assert "files-validated>0<" in content + + def test_multiple_errors_and_warnings(self): + """Test handling multiple errors and warnings.""" + ledger = AssertionLedger() + + errors = [ + ValidationError("file1.xml", 1, 1, "Error 1", "error", "R1"), + ValidationError("file2.xml", 2, 2, "Error 2", "error", "R2"), + ] + warnings = [ + ValidationError("file1.xml", 5, 5, "Warning 1", "warning", "R3"), + ] + + result = ValidationResult( + is_valid=False, + errors=errors, + warnings=warnings, + validated_files=["file1.xml", "file2.xml"], + checksums={}, + timestamp=datetime.now(), + ) + ledger.add_validation_result(result) + + with tempfile.TemporaryDirectory() as tmpdir: + # Test XML output + xml_path = Path(tmpdir) / "ledger.xml" + ledger.write_xml(xml_path) + + content = xml_path.read_text() + assert "Error 1" in content + assert "Error 2" in content + assert "Warning 1" in content + + # Test JSONL output + jsonl_path = Path(tmpdir) / "ledger.jsonl" + ledger.write_jsonl(jsonl_path) + + lines = jsonl_path.read_text().strip().split("\n") + assert len(lines) == 4 # 1 summary + 2 errors + 1 warning diff --git a/tests/test_cli_new.py b/tests/test_cli_new.py new file mode 100644 index 0000000..9e0776c --- /dev/null +++ b/tests/test_cli_new.py @@ -0,0 +1,518 @@ +"""Tests for the modern Typer-based CLI.""" + +import json +import tempfile +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from typer.testing import CliRunner + +from xml_lib.cli_new import ( + app, + print_command_result, +) +from xml_lib.types import CommandResult + + +runner = CliRunner() + + +class TestPrintCommandResult: + """Tests for print_command_result helper function.""" + + def test_print_success_result(self): + """Test printing successful command result.""" + result = CommandResult( + command="test command", + timestamp=datetime.now(), + duration_ms=123.45, + status="success", + summary={"files": 5, "processed": True}, + errors=[], + warnings=[], + ) + + # Should not raise + print_command_result(result) + + def test_print_result_with_errors(self): + """Test printing result with errors.""" + result = CommandResult( + command="test command", + timestamp=datetime.now(), + duration_ms=50.0, + status="failure", + summary={}, + errors=["Error 1", "Error 2"], + warnings=[], + ) + + print_command_result(result) + + def test_print_result_with_warnings(self): + """Test printing result with warnings.""" + result = CommandResult( + command="test command", + timestamp=datetime.now(), + duration_ms=100.0, + status="warning", + summary={}, + errors=[], + warnings=["Warning 1"], + ) + + print_command_result(result) + + def test_print_result_with_json_output(self): + """Test printing result with JSON output file.""" + with tempfile.TemporaryDirectory() as tmpdir: + json_path = Path(tmpdir) / "output.json" + + result = CommandResult( + command="test command", + timestamp=datetime.now(), + duration_ms=75.0, + status="success", + summary={"count": 10}, + errors=[], + warnings=["Minor issue"], + ) + + print_command_result(result, json_output=json_path) + + assert json_path.exists() + data = json.loads(json_path.read_text()) + assert data["command"] == "test command" + assert data["status"] == "success" + assert data["summary"]["count"] == 10 + assert "Minor issue" in data["warnings"] + + def test_print_result_creates_parent_directories(self): + """Test that JSON output creates parent directories.""" + with tempfile.TemporaryDirectory() as tmpdir: + json_path = Path(tmpdir) / "nested" / "dir" / "output.json" + + result = CommandResult( + command="test", + timestamp=datetime.now(), + duration_ms=10.0, + status="success", + summary={}, + ) + + print_command_result(result, json_output=json_path) + + assert json_path.exists() + + +class TestCLIBasicFunctionality: + """Basic CLI functionality tests.""" + + def test_app_exists(self): + """Test that app is defined.""" + assert app is not None + + def test_help_command(self): + """Test --help works.""" + result = runner.invoke(app, ["--help"]) + assert result.exit_code == 0 + assert "xml-lib" in result.stdout or "Usage" in result.stdout + + def test_lifecycle_subcommand_help(self): + """Test lifecycle subcommand help.""" + result = runner.invoke(app, ["lifecycle", "--help"]) + assert result.exit_code == 0 + + def test_guardrails_subcommand_help(self): + """Test guardrails subcommand help.""" + result = runner.invoke(app, ["guardrails", "--help"]) + assert result.exit_code == 0 + + def test_engine_subcommand_help(self): + """Test engine subcommand help.""" + result = runner.invoke(app, ["engine", "--help"]) + assert result.exit_code == 0 + + def test_schema_subcommand_help(self): + """Test schema subcommand help.""" + result = runner.invoke(app, ["schema", "--help"]) + assert result.exit_code == 0 + + def test_pptx_subcommand_help(self): + """Test pptx subcommand help.""" + result = runner.invoke(app, ["pptx", "--help"]) + assert result.exit_code == 0 + + +class TestGuardrailsCommands: + """Tests for guardrails CLI commands.""" + + def test_guardrails_simulate_default(self): + """Test guardrails simulate with default options.""" + result = runner.invoke(app, ["guardrails", "simulate"]) + # Should complete without error + assert result.exit_code == 0 + + def test_guardrails_simulate_custom_steps(self): + """Test guardrails simulate with custom steps.""" + result = runner.invoke(app, ["guardrails", "simulate", "--steps", "10"]) + assert result.exit_code == 0 + + def test_guardrails_simulate_with_output(self): + """Test guardrails simulate with JSON output.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "sim.json" + result = runner.invoke( + app, + ["guardrails", "simulate", "--output", str(output_path)], + ) + + assert result.exit_code == 0 + if output_path.exists(): + data = json.loads(output_path.read_text()) + assert "command" in data + assert data["command"] == "guardrails simulate" + + def test_guardrails_check_help(self): + """Test guardrails check help.""" + result = runner.invoke(app, ["guardrails", "check", "--help"]) + assert result.exit_code == 0 + assert "checksum" in result.stdout.lower() or "file" in result.stdout.lower() + + +class TestEngineCommands: + """Tests for engine CLI commands.""" + + def test_engine_verify_contraction(self): + """Test engine verify with contraction operator.""" + result = runner.invoke(app, ["engine", "verify", "--type", "contraction"]) + assert result.exit_code == 0 + + def test_engine_verify_projection(self): + """Test engine verify with projection operator.""" + result = runner.invoke(app, ["engine", "verify", "--type", "projection"]) + assert result.exit_code == 0 + + def test_engine_verify_unknown_type(self): + """Test engine verify with unknown operator type.""" + result = runner.invoke(app, ["engine", "verify", "--type", "unknown"]) + assert result.exit_code != 0 + + def test_engine_verify_with_output(self): + """Test engine verify with JSON output.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "verify.json" + result = runner.invoke( + app, + ["engine", "verify", "--output", str(output_path)], + ) + + assert result.exit_code == 0 + + def test_engine_prove_help(self): + """Test engine prove help.""" + result = runner.invoke(app, ["engine", "prove", "--help"]) + assert result.exit_code == 0 + + +class TestSchemaCommands: + """Tests for schema CLI commands.""" + + def test_schema_derive_xsd(self): + """Test schema derive with XSD type.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create example XML + example_path = tmpdir / "example.xml" + example_path.write_text("test") + + output_path = tmpdir / "derived.xsd" + + result = runner.invoke( + app, + [ + "schema", + "derive", + str(example_path), # comma-separated list + "--output", + str(output_path), + "--type", + "xsd", + ], + ) + + # May fail due to other issues, but should not fail on argument parsing + assert result.exit_code in [0, 1] + if result.exit_code == 0: + assert output_path.exists() + + def test_schema_derive_relaxng(self): + """Test schema derive with RELAX NG type.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + example_path = tmpdir / "example.xml" + example_path.write_text("test") + + output_path = tmpdir / "derived.rng" + + result = runner.invoke( + app, + [ + "schema", + "derive", + str(example_path), # comma-separated list + "--output", + str(output_path), + "--type", + "relaxng", + ], + ) + + # May fail due to other issues, but should not fail on argument parsing + assert result.exit_code in [0, 1] + + def test_schema_derive_unknown_type(self): + """Test schema derive with unknown type.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + example_path = tmpdir / "example.xml" + example_path.write_text("") + + output_path = tmpdir / "derived.txt" + + result = runner.invoke( + app, + [ + "schema", + "derive", + str(example_path), # comma-separated list + "--output", + str(output_path), + "--type", + "unknown", + ], + ) + + assert result.exit_code != 0 + + def test_schema_validate_valid_xml(self): + """Test schema validate with valid XML.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create XSD schema + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + # Create valid XML + xml_path = tmpdir / "valid.xml" + xml_path.write_text("test") + + result = runner.invoke( + app, + ["schema", "validate", str(xml_path), str(xsd_path)], + ) + + assert result.exit_code == 0 + + def test_schema_validate_invalid_xml(self): + """Test schema validate with invalid XML.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create XSD that requires specific element + xsd_content = ''' + + + + + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + # Create invalid XML (wrong root element) + xml_path = tmpdir / "invalid.xml" + xml_path.write_text("") + + result = runner.invoke( + app, + ["schema", "validate", str(xml_path), str(xsd_path)], + ) + + # May exit with 0 but show failure in output + assert result.exit_code == 0 or "failure" in result.stdout.lower() + + def test_schema_validate_with_output(self): + """Test schema validate with JSON output.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + output_path = tmpdir / "result.json" + + result = runner.invoke( + app, + [ + "schema", + "validate", + str(xml_path), + str(xsd_path), + "--output", + str(output_path), + ], + ) + + assert result.exit_code == 0 + if output_path.exists(): + data = json.loads(output_path.read_text()) + assert "command" in data + + +class TestDocsCommands: + """Tests for docs CLI commands.""" + + def test_docs_gen(self): + """Test docs gen command.""" + result = runner.invoke(app, ["docs", "gen"]) + assert result.exit_code == 0 + + +class TestExamplesCommands: + """Tests for examples CLI commands.""" + + def test_examples_run_nonexistent(self): + """Test running nonexistent example.""" + result = runner.invoke(app, ["examples", "run", "nonexistent"]) + # Should fail gracefully + assert result.exit_code != 0 or "not found" in result.stdout.lower() + + def test_examples_run_help(self): + """Test examples run help.""" + result = runner.invoke(app, ["examples", "run", "--help"]) + assert result.exit_code == 0 + + +class TestLifecycleCommands: + """Tests for lifecycle CLI commands.""" + + def test_lifecycle_validate_help(self): + """Test lifecycle validate help.""" + result = runner.invoke(app, ["lifecycle", "validate", "--help"]) + assert result.exit_code == 0 + + def test_lifecycle_visualize_help(self): + """Test lifecycle visualize help.""" + result = runner.invoke(app, ["lifecycle", "visualize", "--help"]) + assert result.exit_code == 0 + + def test_lifecycle_validate_nonexistent_path(self): + """Test lifecycle validate with nonexistent path.""" + result = runner.invoke(app, ["lifecycle", "validate", "/nonexistent/path"]) + # Should handle error gracefully + assert result.exit_code != 0 + + def test_lifecycle_visualize_nonexistent_path(self): + """Test lifecycle visualize with nonexistent path.""" + result = runner.invoke(app, ["lifecycle", "visualize", "/nonexistent/path"]) + # Should handle error gracefully + assert result.exit_code != 0 + + +class TestPPTXCommands: + """Tests for PPTX CLI commands.""" + + def test_pptx_build_help(self): + """Test pptx build help.""" + result = runner.invoke(app, ["pptx", "build", "--help"]) + assert result.exit_code == 0 + + def test_pptx_export_help(self): + """Test pptx export help.""" + result = runner.invoke(app, ["pptx", "export", "--help"]) + assert result.exit_code == 0 + + +class TestCommandResultJSONOutput: + """Tests for JSON output functionality.""" + + def test_json_output_includes_timestamp(self): + """Test that JSON output includes timestamp.""" + with tempfile.TemporaryDirectory() as tmpdir: + json_path = Path(tmpdir) / "output.json" + + result = CommandResult( + command="test", + timestamp=datetime.now(), + duration_ms=10.0, + status="success", + summary={}, + ) + + print_command_result(result, json_output=json_path) + + data = json.loads(json_path.read_text()) + assert "timestamp" in data + + def test_json_output_includes_duration(self): + """Test that JSON output includes duration.""" + with tempfile.TemporaryDirectory() as tmpdir: + json_path = Path(tmpdir) / "output.json" + + result = CommandResult( + command="test", + timestamp=datetime.now(), + duration_ms=123.456, + status="success", + summary={}, + ) + + print_command_result(result, json_output=json_path) + + data = json.loads(json_path.read_text()) + assert data["duration_ms"] == 123.456 + + def test_json_output_includes_all_fields(self): + """Test that JSON output includes all fields.""" + with tempfile.TemporaryDirectory() as tmpdir: + json_path = Path(tmpdir) / "output.json" + + result = CommandResult( + command="complete command", + timestamp=datetime.now(), + duration_ms=500.0, + status="warning", + summary={"processed": 100, "skipped": 5}, + errors=["Error A", "Error B"], + warnings=["Warning X"], + ) + + print_command_result(result, json_output=json_path) + + data = json.loads(json_path.read_text()) + assert data["command"] == "complete command" + assert data["status"] == "warning" + assert data["summary"]["processed"] == 100 + assert data["summary"]["skipped"] == 5 + assert "Error A" in data["errors"] + assert "Error B" in data["errors"] + assert "Warning X" in data["warnings"] diff --git a/tests/test_engine_wrapper.py b/tests/test_engine_wrapper.py new file mode 100644 index 0000000..f0c129d --- /dev/null +++ b/tests/test_engine_wrapper.py @@ -0,0 +1,556 @@ +"""Tests for engine wrapper integration.""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from xml_lib.engine_wrapper import EngineWrapper +from xml_lib.guardrails import GuardrailRule + + +class TestEngineWrapper: + """Tests for EngineWrapper class.""" + + @pytest.fixture + def temp_dirs(self): + """Create temporary directories for engine and output.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + engine_dir = tmpdir / "engine" + engine_dir.mkdir() + output_dir = tmpdir / "output" + output_dir.mkdir() + + # Create basic engine spec + engine_spec = ''' + + + + + + + +''' + (engine_dir / "spec.xml").write_text(engine_spec) + + yield engine_dir, output_dir + + def test_create_engine_wrapper(self, temp_dirs): + """Test creating engine wrapper.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + assert wrapper.engine_dir == engine_dir + assert wrapper.output_dir == output_dir + assert wrapper.parser is not None + assert wrapper.proof_engine is not None + assert wrapper.integration is not None + + def test_engine_wrapper_creates_output_dir(self): + """Test that wrapper creates output directory if needed.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + engine_dir = tmpdir / "engine" + engine_dir.mkdir() + output_dir = tmpdir / "output" / "nested" + + wrapper = EngineWrapper(engine_dir, output_dir) + + assert output_dir.exists() + + def test_run_engine_checks_empty_rules(self, temp_dirs): + """Test running engine checks with no rules.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + proofs, proof_result, metrics = wrapper.run_engine_checks([]) + + assert len(proofs) == 0 + assert metrics.guardrail_count == 0 + + def test_run_engine_checks_single_rule(self, temp_dirs): + """Test running engine checks with single rule.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="TEST1", + name="Test Rule", + description="Test rule for engine", + priority="high", + constraint_type="xpath", + constraint="//test", + message="Test message", + provenance={"author": "Test"}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + + assert len(proofs) == 1 + assert proofs[0].rule_id == "TEST1" + assert proofs[0].rule_name == "Test Rule" + assert metrics.guardrail_count == 1 + + def test_run_engine_checks_multiple_rules(self, temp_dirs): + """Test running engine checks with multiple rules.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rules = [ + GuardrailRule( + id=f"RULE{i}", + name=f"Rule {i}", + description=f"Test rule {i}", + priority="medium", + constraint_type="xpath", + constraint=f"//element{i}", + message=None, + provenance={}, + ) + for i in range(3) + ] + + proofs, proof_result, metrics = wrapper.run_engine_checks(rules) + + assert len(proofs) == 3 + assert metrics.guardrail_count == 3 + + def test_run_engine_checks_generates_proofs(self, temp_dirs): + """Test that engine checks generate proof obligations.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Guardrail 1", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + + assert proof_result is not None + assert isinstance(proof_result.summary, dict) + + def test_run_engine_checks_metrics_structure(self, temp_dirs): + """Test metrics structure from engine checks.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rules = [ + GuardrailRule( + id="GR1", + name="Test", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + ] + + proofs, proof_result, metrics = wrapper.run_engine_checks(rules) + + assert hasattr(metrics, "guardrail_count") + assert hasattr(metrics, "proof_count") + assert hasattr(metrics, "verified_count") + assert hasattr(metrics, "failed_count") + assert hasattr(metrics, "convergence_metrics") + + def test_write_outputs_creates_files(self, temp_dirs): + """Test that write_outputs creates all output files.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test Rule", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + output_files = wrapper.write_outputs(proofs, proof_result, metrics) + + assert "xml" in output_files + assert "jsonl" in output_files + assert "metrics" in output_files + assert "artifact" in output_files + + # Check files exist + assert output_files["xml"].exists() + assert output_files["jsonl"].exists() + assert output_files["metrics"].exists() + assert output_files["artifact"].exists() + + def test_write_outputs_xml_format(self, temp_dirs): + """Test XML output format.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test Rule", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + output_files = wrapper.write_outputs(proofs, proof_result, metrics) + + xml_content = output_files["xml"].read_text() + assert "engine-proof-ledger" in xml_content + assert "GR1" in xml_content + assert "Test Rule" in xml_content + + def test_write_outputs_jsonl_format(self, temp_dirs): + """Test JSONL output format.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test Rule", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + output_files = wrapper.write_outputs(proofs, proof_result, metrics) + + jsonl_content = output_files["jsonl"].read_text() + lines = jsonl_content.strip().split("\n") + assert len(lines) >= 1 + + record = json.loads(lines[0]) + assert "rule_id" in record + assert record["rule_id"] == "GR1" + + def test_write_outputs_metrics_json_format(self, temp_dirs): + """Test metrics JSON output format.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + output_files = wrapper.write_outputs(proofs, proof_result, metrics) + + metrics_content = output_files["metrics"].read_text() + metrics_data = json.loads(metrics_content) + + assert "guardrail_count" in metrics_data + assert metrics_data["guardrail_count"] == 1 + + def test_write_outputs_artifact_format(self, temp_dirs): + """Test artifact JSON output format.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + output_files = wrapper.write_outputs(proofs, proof_result, metrics) + + artifact_content = output_files["artifact"].read_text() + artifact_data = json.loads(artifact_content) + + assert "timestamp" in artifact_data or "proofs" in artifact_data + + def test_export_proofs_json(self, temp_dirs): + """Test export_proofs_json method.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test Rule", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + export_data = wrapper.export_proofs_json(proofs, proof_result, metrics) + + assert "proofs" in export_data + assert "proof_result" in export_data + assert "metrics" in export_data + assert "checksum" in export_data + + assert len(export_data["proofs"]) == 1 + assert export_data["proofs"][0]["rule_id"] == "GR1" + + def test_export_proofs_json_multiple_rules(self, temp_dirs): + """Test export with multiple rules.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rules = [ + GuardrailRule( + id=f"GR{i}", + name=f"Rule {i}", + description="Test", + priority="medium", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + for i in range(5) + ] + + proofs, proof_result, metrics = wrapper.run_engine_checks(rules) + export_data = wrapper.export_proofs_json(proofs, proof_result, metrics) + + assert len(export_data["proofs"]) == 5 + assert export_data["metrics"]["guardrail_count"] == 5 + + def test_checksum_determinism(self, temp_dirs): + """Test that checksum is deterministic for same proofs.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + # Note: This test checks determinism within same proof objects + # The actual iteration may vary due to randomness in initial state + proofs1, proof_result1, metrics1 = wrapper.run_engine_checks([rule]) + export1 = wrapper.export_proofs_json(proofs1, proof_result1, metrics1) + + # Generate checksum should be consistent for same proofs + checksum1 = export1["checksum"] + checksum2 = wrapper.integration.generate_checksum(proofs1) + + assert checksum1 == checksum2 + + +class TestEngineWrapperWithTelemetry: + """Tests for engine wrapper with telemetry integration.""" + + @pytest.fixture + def temp_dirs(self): + """Create temporary directories.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + engine_dir = tmpdir / "engine" + engine_dir.mkdir() + output_dir = tmpdir / "output" + output_dir.mkdir() + + # Create basic engine spec + engine_spec = ''' + + + + +''' + (engine_dir / "spec.xml").write_text(engine_spec) + + yield engine_dir, output_dir + + def test_engine_wrapper_without_telemetry(self, temp_dirs): + """Test engine wrapper works without telemetry.""" + engine_dir, output_dir = temp_dirs + wrapper = EngineWrapper(engine_dir, output_dir, telemetry=None) + + assert wrapper.telemetry is None + + rule = GuardrailRule( + id="GR1", + name="Test", + description="Test", + priority="high", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ) + + # Should work without telemetry + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + assert len(proofs) == 1 + + +class TestEngineWrapperEdgeCases: + """Tests for edge cases and error handling.""" + + def test_wrapper_with_nonexistent_engine_dir(self): + """Test wrapper with nonexistent engine directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + engine_dir = tmpdir / "nonexistent" + output_dir = tmpdir / "output" + + # Should not crash on creation + wrapper = EngineWrapper(engine_dir, output_dir) + assert wrapper is not None + + def test_run_checks_with_high_priority_rules(self): + """Test running checks with different priority levels.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + engine_dir = tmpdir / "engine" + engine_dir.mkdir() + output_dir = tmpdir / "output" + + # Create engine spec + engine_spec = ''' + + + + +''' + (engine_dir / "spec.xml").write_text(engine_spec) + + wrapper = EngineWrapper(engine_dir, output_dir) + + rules = [ + GuardrailRule( + id="CRITICAL1", + name="Critical Rule", + description="Critical", + priority="critical", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ), + GuardrailRule( + id="LOW1", + name="Low Rule", + description="Low", + priority="low", + constraint_type="xpath", + constraint="//test", + message=None, + provenance={}, + ), + ] + + proofs, proof_result, metrics = wrapper.run_engine_checks(rules) + + assert len(proofs) == 2 + assert any(p.rule_id == "CRITICAL1" for p in proofs) + assert any(p.rule_id == "LOW1" for p in proofs) + + def test_write_outputs_empty_proofs(self): + """Test writing outputs with no proofs.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + engine_dir = tmpdir / "engine" + engine_dir.mkdir() + output_dir = tmpdir / "output" + + # Create engine spec + engine_spec = ''' + + + + +''' + (engine_dir / "spec.xml").write_text(engine_spec) + + wrapper = EngineWrapper(engine_dir, output_dir) + + proofs, proof_result, metrics = wrapper.run_engine_checks([]) + output_files = wrapper.write_outputs(proofs, proof_result, metrics) + + # Should still create files + assert output_files["xml"].exists() + assert output_files["jsonl"].exists() + assert output_files["metrics"].exists() + + def test_export_proofs_json_structure(self): + """Test JSON export structure is complete.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + engine_dir = tmpdir / "engine" + engine_dir.mkdir() + output_dir = tmpdir / "output" + + engine_spec = ''' + + + + +''' + (engine_dir / "spec.xml").write_text(engine_spec) + + wrapper = EngineWrapper(engine_dir, output_dir) + + rule = GuardrailRule( + id="GR1", + name="Test Rule", + description="Test description", + priority="high", + constraint_type="xpath", + constraint="//element", + message="Test message", + provenance={"author": "Test Author"}, + ) + + proofs, proof_result, metrics = wrapper.run_engine_checks([rule]) + export = wrapper.export_proofs_json(proofs, proof_result, metrics) + + # Verify complete structure + assert isinstance(export["proofs"], list) + assert isinstance(export["proof_result"], dict) + assert isinstance(export["metrics"], dict) + assert isinstance(export["checksum"], str) + + # Check proof structure + if len(export["proofs"]) > 0: + proof_data = export["proofs"][0] + assert "rule_id" in proof_data + assert "rule_name" in proof_data + assert "operator_name" in proof_data diff --git a/tests/test_schema_comprehensive.py b/tests/test_schema_comprehensive.py new file mode 100644 index 0000000..a5b4519 --- /dev/null +++ b/tests/test_schema_comprehensive.py @@ -0,0 +1,723 @@ +"""Comprehensive tests for schema derivation and validation.""" + +import tempfile +from pathlib import Path + +import pytest + +from xml_lib.schema import ( + SchemaValidator, + derive_relaxng_from_examples, + derive_xsd_from_examples, + validate_with_schema, +) + + +class TestSchemaValidator: + """Tests for SchemaValidator class.""" + + def test_create_validator_without_cache(self): + """Test creating validator without cache directory.""" + validator = SchemaValidator() + assert validator is not None + assert validator.xsd_cache is not None + assert validator.rng_cache is not None + + def test_create_validator_with_cache(self): + """Test creating validator with cache directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache_dir = Path(tmpdir) + validator = SchemaValidator(cache_dir=cache_dir) + assert validator is not None + + def test_validate_with_xsd_valid_document(self): + """Test XSD validation with valid document.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create simple XSD schema + xsd_content = ''' + + + + + + + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + # Create valid XML + xml_content = ''' + + test +''' + xml_path = tmpdir / "valid.xml" + xml_path.write_text(xml_content) + + # Validate + validator = SchemaValidator() + result = validator.validate_with_xsd(xml_path, xsd_path) + + assert result.is_valid is True + assert len(result.errors) == 0 + assert result.metadata["schema_type"] == "xsd" + + def test_validate_with_xsd_invalid_document(self): + """Test XSD validation with invalid document.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create simple XSD schema that requires 'name' attribute + xsd_content = ''' + + + + + + + + + + + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + # Create invalid XML (missing required attribute) + xml_content = ''' + + +''' + xml_path = tmpdir / "invalid.xml" + xml_path.write_text(xml_content) + + # Validate + validator = SchemaValidator() + result = validator.validate_with_xsd(xml_path, xsd_path) + + assert result.is_valid is False + assert len(result.errors) > 0 + + def test_validate_with_xsd_malformed_schema(self): + """Test XSD validation with malformed schema.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create malformed XSD + xsd_path = tmpdir / "bad.xsd" + xsd_path.write_text("not valid xsd") + + xml_path = tmpdir / "test.xml" + xml_path.write_text("") + + validator = SchemaValidator() + result = validator.validate_with_xsd(xml_path, xsd_path) + + assert result.is_valid is False + assert len(result.errors) > 0 + + def test_validate_with_xsd_caching(self): + """Test that XSD schemas are cached.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create schema + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + # Create XML + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + validator = SchemaValidator() + + # First validation - schema not in cache + result1 = validator.validate_with_xsd(xml_path, xsd_path) + assert result1.is_valid + + # Second validation - schema should be cached + result2 = validator.validate_with_xsd(xml_path, xsd_path) + assert result2.is_valid + + def test_validate_with_relaxng_valid_document(self): + """Test RELAX NG validation with valid document.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create RELAX NG schema + rng_content = ''' + + + + + + + + + + +''' + rng_path = tmpdir / "schema.rng" + rng_path.write_text(rng_content) + + # Create valid XML + xml_content = ''' + + test +''' + xml_path = tmpdir / "valid.xml" + xml_path.write_text(xml_content) + + # Validate + validator = SchemaValidator() + result = validator.validate_with_relaxng(xml_path, rng_path) + + assert result.is_valid is True + assert len(result.errors) == 0 + assert result.metadata["schema_type"] == "relaxng" + + def test_validate_with_relaxng_invalid_document(self): + """Test RELAX NG validation with invalid document.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create RELAX NG schema that requires specific structure + rng_content = ''' + + + + + + + + +''' + rng_path = tmpdir / "schema.rng" + rng_path.write_text(rng_content) + + # Create invalid XML (missing required element) + xml_content = ''' + + test +''' + xml_path = tmpdir / "invalid.xml" + xml_path.write_text(xml_content) + + # Validate + validator = SchemaValidator() + result = validator.validate_with_relaxng(xml_path, rng_path) + + assert result.is_valid is False + assert len(result.errors) > 0 + + def test_validate_with_relaxng_malformed_schema(self): + """Test RELAX NG validation with malformed schema.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + rng_path = tmpdir / "bad.rng" + rng_path.write_text("not valid relaxng") + + xml_path = tmpdir / "test.xml" + xml_path.write_text("") + + validator = SchemaValidator() + result = validator.validate_with_relaxng(xml_path, rng_path) + + assert result.is_valid is False + assert len(result.errors) > 0 + + def test_validate_with_relaxng_caching(self): + """Test that RELAX NG schemas are cached.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create schema + rng_content = ''' + + + + + + +''' + rng_path = tmpdir / "schema.rng" + rng_path.write_text(rng_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + validator = SchemaValidator() + + # First validation + result1 = validator.validate_with_relaxng(xml_path, rng_path) + assert result1.is_valid + + # Second validation - should use cache + result2 = validator.validate_with_relaxng(xml_path, rng_path) + assert result2.is_valid + + def test_validate_with_schema_autodetect_xsd(self): + """Test auto-detection of XSD schema type.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create XSD schema + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + validator = SchemaValidator() + result = validator.validate_with_schema(xml_path, xsd_path) + + assert result.metadata["schema_type"] == "xsd" + + def test_validate_with_schema_autodetect_rng(self): + """Test auto-detection of RELAX NG schema type.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create RELAX NG schema + rng_content = ''' + + + + +''' + rng_path = tmpdir / "schema.rng" + rng_path.write_text(rng_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + validator = SchemaValidator() + result = validator.validate_with_schema(xml_path, rng_path) + + assert result.metadata["schema_type"] == "relaxng" + + def test_validate_with_schema_unknown_extension(self): + """Test handling of unknown schema extension.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + schema_path = tmpdir / "schema.unknown" + schema_path.write_text("something") + + xml_path = tmpdir / "test.xml" + xml_path.write_text("") + + validator = SchemaValidator() + result = validator.validate_with_schema(xml_path, schema_path) + + assert result.is_valid is False + assert "Unknown schema type" in str(result.errors) + + def test_validate_with_schema_explicit_type(self): + """Test explicit schema type specification.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create XSD schema with .xml extension + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xml" # Using .xml extension + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + validator = SchemaValidator() + result = validator.validate_with_schema(xml_path, xsd_path, schema_type="xsd") + + assert result.is_valid is True + + def test_validate_with_schema_unsupported_type(self): + """Test handling of unsupported schema type.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + schema_path = tmpdir / "schema.txt" + schema_path.write_text("something") + + xml_path = tmpdir / "test.xml" + xml_path.write_text("") + + validator = SchemaValidator() + result = validator.validate_with_schema(xml_path, schema_path, schema_type="dtd") + + assert result.is_valid is False + assert "Unsupported schema type" in str(result.errors) + + +class TestDeriveXSDFromExamples: + """Tests for XSD schema derivation.""" + + def test_derive_xsd_single_example(self): + """Test deriving XSD from single example.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create example XML + example_content = ''' + + Test + Body +''' + example_path = tmpdir / "example.xml" + example_path.write_text(example_content) + + output_path = tmpdir / "derived.xsd" + + derive_xsd_from_examples([example_path], output_path) + + assert output_path.exists() + content = output_path.read_text() + assert "xs:schema" in content + assert 'name="document"' in content + assert 'name="title"' in content + assert 'name="content"' in content + + def test_derive_xsd_multiple_examples(self): + """Test deriving XSD from multiple examples.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + # Create first example + ex1_content = ''' + + Test 1 +''' + ex1_path = tmpdir / "example1.xml" + ex1_path.write_text(ex1_content) + + # Create second example with additional element + ex2_content = ''' + + John +''' + ex2_path = tmpdir / "example2.xml" + ex2_path.write_text(ex2_content) + + output_path = tmpdir / "derived.xsd" + + derive_xsd_from_examples([ex1_path, ex2_path], output_path) + + content = output_path.read_text() + # Should include elements from both examples + assert 'name="title"' in content + assert 'name="author"' in content + + def test_derive_xsd_with_root_element_override(self): + """Test deriving XSD with custom root element name.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + example_content = ''' + + value +''' + example_path = tmpdir / "example.xml" + example_path.write_text(example_content) + + output_path = tmpdir / "derived.xsd" + + derive_xsd_from_examples([example_path], output_path, root_element="document") + + content = output_path.read_text() + assert 'name="document"' in content + + def test_derive_xsd_creates_parent_directories(self): + """Test that derive creates parent directories.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + example_path = tmpdir / "example.xml" + example_path.write_text("test") + + output_path = tmpdir / "nested" / "dir" / "schema.xsd" + + derive_xsd_from_examples([example_path], output_path) + + assert output_path.exists() + + def test_derive_xsd_no_examples_raises_error(self): + """Test that empty examples list raises error.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "schema.xsd" + + with pytest.raises(ValueError, match="No example files provided"): + derive_xsd_from_examples([], output_path) + + +class TestDeriveRelaxNGFromExamples: + """Tests for RELAX NG schema derivation.""" + + def test_derive_relaxng_single_example(self): + """Test deriving RELAX NG from single example.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + example_content = ''' + + Test + Body +''' + example_path = tmpdir / "example.xml" + example_path.write_text(example_content) + + output_path = tmpdir / "derived.rng" + + derive_relaxng_from_examples([example_path], output_path) + + assert output_path.exists() + content = output_path.read_text() + assert "relaxng.org/ns/structure" in content + assert 'name="document"' in content + assert 'name="title"' in content + assert 'name="content"' in content + + def test_derive_relaxng_multiple_examples(self): + """Test deriving RELAX NG from multiple examples.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + ex1_path = tmpdir / "ex1.xml" + ex1_path.write_text("val1") + + ex2_path = tmpdir / "ex2.xml" + ex2_path.write_text("val2") + + output_path = tmpdir / "derived.rng" + + derive_relaxng_from_examples([ex1_path, ex2_path], output_path) + + content = output_path.read_text() + assert 'name="field1"' in content + assert 'name="field2"' in content + + def test_derive_relaxng_with_root_element_override(self): + """Test deriving RELAX NG with custom root element.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + example_path = tmpdir / "example.xml" + example_path.write_text("test") + + output_path = tmpdir / "derived.rng" + + derive_relaxng_from_examples([example_path], output_path, root_element="document") + + content = output_path.read_text() + assert 'name="document"' in content + + def test_derive_relaxng_creates_parent_directories(self): + """Test that derive creates parent directories.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + example_path = tmpdir / "example.xml" + example_path.write_text("test") + + output_path = tmpdir / "nested" / "dir" / "schema.rng" + + derive_relaxng_from_examples([example_path], output_path) + + assert output_path.exists() + + def test_derive_relaxng_no_examples_raises_error(self): + """Test that empty examples list raises error.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "schema.rng" + + with pytest.raises(ValueError, match="No example files provided"): + derive_relaxng_from_examples([], output_path) + + +class TestValidateWithSchemaFunction: + """Tests for the convenience function validate_with_schema.""" + + def test_validate_with_schema_xsd(self): + """Test convenience function with XSD schema.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + result = validate_with_schema(xml_path, xsd_path) + + assert result.is_valid is True + + def test_validate_with_schema_relaxng(self): + """Test convenience function with RELAX NG schema.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + rng_content = ''' + + + + +''' + rng_path = tmpdir / "schema.rng" + rng_path.write_text(rng_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + result = validate_with_schema(xml_path, rng_path) + + assert result.is_valid is True + + def test_validate_with_schema_with_cache(self): + """Test convenience function with cache directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + cache_dir = tmpdir / "cache" + result = validate_with_schema(xml_path, xsd_path, cache_dir=cache_dir) + + assert result.is_valid is True + + def test_validate_with_schema_explicit_type(self): + """Test convenience function with explicit schema type.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xml" # Non-standard extension + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("test") + + result = validate_with_schema(xml_path, xsd_path, schema_type="xsd") + + assert result.is_valid is True + + +class TestSchemaErrorHandling: + """Tests for error handling in schema operations.""" + + def test_validate_nonexistent_xml_file(self): + """Test validation with nonexistent XML file.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "nonexistent.xml" + + validator = SchemaValidator() + result = validator.validate_with_xsd(xml_path, xsd_path) + + assert result.is_valid is False + assert len(result.errors) > 0 + + def test_validate_nonexistent_schema_file(self): + """Test validation with nonexistent schema file.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + xml_path = tmpdir / "test.xml" + xml_path.write_text("") + + xsd_path = tmpdir / "nonexistent.xsd" + + validator = SchemaValidator() + result = validator.validate_with_xsd(xml_path, xsd_path) + + assert result.is_valid is False + + def test_validate_malformed_xml_document(self): + """Test validation with malformed XML document.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + xsd_content = ''' + + +''' + xsd_path = tmpdir / "schema.xsd" + xsd_path.write_text(xsd_content) + + xml_path = tmpdir / "malformed.xml" + xml_path.write_text("not closed") + + validator = SchemaValidator() + result = validator.validate_with_xsd(xml_path, xsd_path) + + assert result.is_valid is False + assert len(result.errors) > 0 + + def test_relaxng_validation_error_includes_line_numbers(self): + """Test that RELAX NG errors include line numbers.""" + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + + rng_content = ''' + + + + + + +''' + rng_path = tmpdir / "schema.rng" + rng_path.write_text(rng_content) + + xml_content = ''' + + test +''' + xml_path = tmpdir / "invalid.xml" + xml_path.write_text(xml_content) + + validator = SchemaValidator() + result = validator.validate_with_relaxng(xml_path, rng_path) + + assert result.is_valid is False + # Check that errors mention line numbers + assert any("Line" in str(err) for err in result.errors) diff --git a/xml_lib/cli_new.py b/xml_lib/cli_new.py index c4f4264..b65244b 100644 --- a/xml_lib/cli_new.py +++ b/xml_lib/cli_new.py @@ -330,7 +330,9 @@ def engine_verify( raise typer.Exit(1) # Run fixed-point iteration - iterator = FixedPointIterator(op.apply, tolerance=1e-6, max_iterations=100) + iterator = FixedPointIterator( + operator=op, tolerance=1e-6, max_iterations=100, store_trajectory=True + ) x0 = np.array([1.0, 2.0]) with Progress( @@ -339,14 +341,14 @@ def engine_verify( console=console, ) as progress: progress.add_task("[cyan]Running fixed-point iteration...", total=None) - fp_result = iterator.iterate(x0, record_trace=True) + fp_result = iterator.iterate(x0) duration = (time.time() - start_time) * 1000 # Display results - console.print(f"[cyan]Converged:[/cyan] {fp_result.converged}") - console.print(f"[cyan]Iterations:[/cyan] {fp_result.iterations}") - console.print(f"[cyan]Final error:[/cyan] {fp_result.error:.2e}") + console.print(f"[cyan]Converged:[/cyan] {fp_result.is_converged()}") + console.print(f"[cyan]Iterations:[/cyan] {fp_result.metrics.iterations}") + console.print(f"[cyan]Final error:[/cyan] {fp_result.metrics.final_residual:.2e}") if fp_result.fixed_point is not None: console.print(f"[cyan]Fixed point:[/cyan] {fp_result.fixed_point}") @@ -355,12 +357,12 @@ def engine_verify( command="engine verify", timestamp=datetime.now(UTC), duration_ms=duration, - status="success" if fp_result.converged else "failure", + status="success" if fp_result.is_converged() else "failure", summary={ "operator_type": operator_type, - "converged": fp_result.converged, - "iterations": fp_result.iterations, - "error": fp_result.error, + "converged": fp_result.is_converged(), + "iterations": fp_result.metrics.iterations, + "error": fp_result.metrics.final_residual, }, ) @@ -429,13 +431,16 @@ def pptx_export( @schema_app.command("derive") def schema_derive( - examples: list[Path] = typer.Argument(..., help="Example XML files"), + example_files: str = typer.Argument(..., help="Example XML files (comma-separated)"), output: Path = typer.Option(..., "--output", "-o", help="Output schema file"), schema_type: str = typer.Option("relaxng", "--type", "-t", help="Schema type (xsd or relaxng)"), ) -> None: """Derive schema from example XML documents.""" start_time = time.time() + # Parse comma-separated file paths + examples = [Path(p.strip()) for p in example_files.split(",")] + with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), diff --git a/xml_lib/engine/operators.py b/xml_lib/engine/operators.py index 5455b76..d120006 100644 --- a/xml_lib/engine/operators.py +++ b/xml_lib/engine/operators.py @@ -243,3 +243,32 @@ class FunctionOperator(Operator): def apply(self, x: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """Apply function.""" return self.function(x) + + +# Helper factory functions for CLI +def contraction_operator(name: str, q: float = 0.9) -> ContractionOperator: + """Create a contraction operator with given contraction constant. + + Args: + name: Operator name + q: Contraction constant in [0, 1) + + Returns: + ContractionOperator instance + """ + space = HilbertSpace(dimension=2, name="DefaultSpace") + return ContractionOperator(space=space, name=name, contraction_q=q) + + +def projection_operator(name: str, dimension: int = 2) -> ProjectionOperator: + """Create a projection operator. + + Args: + name: Operator name + dimension: Dimension of the Hilbert space + + Returns: + ProjectionOperator instance + """ + space = HilbertSpace(dimension=dimension, name="DefaultSpace") + return ProjectionOperator(space=space, name=name) diff --git a/xml_lib/engine/proofs.py b/xml_lib/engine/proofs.py index 3f96660..b3814ae 100644 --- a/xml_lib/engine/proofs.py +++ b/xml_lib/engine/proofs.py @@ -362,3 +362,146 @@ def batch_verify(self, guardrail_proofs: list[GuardrailProof]) -> ProofResult: } return ProofResult(obligations=all_obligations, summary=summary) + + +@dataclass +class Proof: + """Mathematical proof structure.""" + + title: str + assumptions: list[str] = field(default_factory=list) + steps: list[ProofStep] = field(default_factory=list) + conclusion: str = "" + + def to_latex(self) -> str: + """Export proof to LaTeX format.""" + lines = [ + f"\\textbf{{{self.title}}}", + "", + "\\textbf{Assumptions:}", + "\\begin{itemize}", + ] + for assumption in self.assumptions: + lines.append(f" \\item {assumption}") + lines.extend([ + "\\end{itemize}", + "", + "\\textbf{Proof:}", + "\\begin{enumerate}", + ]) + for step in self.steps: + lines.append(f" \\item {step.description} \\\\") + lines.append(f" \\textit{{{step.reasoning}}}") + lines.extend([ + "\\end{enumerate}", + "", + f"\\textbf{{Conclusion:}} {self.conclusion}", + ]) + return "\n".join(lines) + + def to_html(self) -> str: + """Export proof to HTML format.""" + lines = [ + f"

{self.title}

", + "

Assumptions:

", + "
    ", + ] + for assumption in self.assumptions: + lines.append(f"
  • {assumption}
  • ") + lines.extend([ + "
", + "

Proof:

", + "
    ", + ]) + for step in self.steps: + lines.append(f"
  1. {step.description}
    ") + lines.append(f" {step.reasoning}
  2. ") + lines.extend([ + "
", + f"

Conclusion:

{self.conclusion}

", + ]) + return "\n".join(lines) + + +@dataclass +class ProofGenerator: + """Generator for mathematical proofs from XML specifications.""" + + def generate_from_xml(self, xml_path) -> Proof: + """Generate proof from XML specification. + + Args: + xml_path: Path to XML proof specification + + Returns: + Proof instance + """ + from lxml import etree + from pathlib import Path + + if not Path(xml_path).exists(): + raise FileNotFoundError(f"Proof specification not found: {xml_path}") + + doc = etree.parse(str(xml_path)) + root = doc.getroot() + + # Extract title + title_elem = root.find("title") + title = title_elem.text if title_elem is not None else "Unnamed Proof" + + # Extract assumptions + assumptions = [] + assumptions_elem = root.find("assumptions") + if assumptions_elem is not None: + for assumption in assumptions_elem.findall("assumption"): + if assumption.text: + assumptions.append(assumption.text) + + # Extract steps + steps = [] + steps_elem = root.find("steps") + if steps_elem is not None: + for i, step_elem in enumerate(steps_elem.findall("step")): + description = step_elem.text or "" + reasoning = step_elem.get("reasoning", "") + steps.append( + ProofStep( + step_id=f"step_{i+1}", + description=description, + reasoning=reasoning, + result=True, + ) + ) + + # Extract conclusion + conclusion_elem = root.find("conclusion") + conclusion = conclusion_elem.text if conclusion_elem is not None else "" + + return Proof( + title=title, + assumptions=assumptions, + steps=steps, + conclusion=conclusion, + ) + + def export_proof(self, proof: Proof, output_path, format: str = "latex") -> None: + """Export proof to file. + + Args: + proof: Proof instance + output_path: Path to output file + format: Output format ('latex' or 'html') + """ + from pathlib import Path + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + if format == "latex": + content = proof.to_latex() + elif format == "html": + content = proof.to_html() + else: + raise ValueError(f"Unsupported format: {format}") + + output_path.write_text(content) diff --git a/xml_lib/schema.py b/xml_lib/schema.py index 5e62d27..139128b 100644 --- a/xml_lib/schema.py +++ b/xml_lib/schema.py @@ -202,10 +202,11 @@ def derive_xsd_from_examples( root = parse_xml(examples[0]) # Build basic XSD structure + xs_ns = "http://www.w3.org/2001/XMLSchema" xsd_root = etree.Element( - "{http://www.w3.org/2001/XMLSchema}schema", + f"{{{xs_ns}}}schema", + nsmap={"xs": xs_ns}, attrib={ - "xmlns:xs": "http://www.w3.org/2001/XMLSchema", "elementFormDefault": "qualified", }, ) @@ -273,10 +274,11 @@ def derive_relaxng_from_examples( root = parse_xml(examples[0]) # Build RELAX NG structure + rng_ns = "http://relaxng.org/ns/structure/1.0" rng_root = etree.Element( - "{http://relaxng.org/ns/structure/1.0}grammar", + f"{{{rng_ns}}}grammar", + nsmap={None: rng_ns}, attrib={ - "xmlns": "http://relaxng.org/ns/structure/1.0", "datatypeLibrary": "http://www.w3.org/2001/XMLSchema-datatypes", }, ) diff --git a/xml_lib/types.py b/xml_lib/types.py index fd94509..d613b56 100644 --- a/xml_lib/types.py +++ b/xml_lib/types.py @@ -86,12 +86,13 @@ class ValidationResult: """ is_valid: bool - errors: list[ValidationError] = field(default_factory=list) - warnings: list[ValidationError] = field(default_factory=list) + errors: list[ValidationError] | list[str] = field(default_factory=list) + warnings: list[ValidationError] | list[str] = field(default_factory=list) validated_files: list[str] = field(default_factory=list) checksums: dict[str, str] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) used_streaming: bool = False + metadata: dict[str, Any] = field(default_factory=dict) @dataclass diff --git a/xml_lib/utils/cache.py b/xml_lib/utils/cache.py index e1cb7c3..5001854 100644 --- a/xml_lib/utils/cache.py +++ b/xml_lib/utils/cache.py @@ -91,8 +91,8 @@ def put(self, schema_path: Path, schema: T) -> None: try: with open(cache_file, "wb") as f: pickle.dump(schema, f) - except pickle.PickleError: - # If pickling fails, just keep in memory + except (pickle.PickleError, TypeError): + # If pickling fails (e.g., lxml objects can't be pickled), just keep in memory pass def clear(self) -> None: