Implement security mode aligned with the MCP Server Security Standard#157
Implement security mode aligned with the MCP Server Security Standard#157Agent-Hellboy wants to merge 2 commits intomainfrom
Conversation
WalkthroughAdds a Security Mode feature: policy-driven pre/post-call analysis, side-effect oracles (filesystem, network, process), security-focused mutators/sequences and payloads, session-replay and negative-auth probes, and reporting/output plumbing to emit an aggregated security_summary. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ToolClient
participant SecurityEngine as SecurityModeEngine
participant Oracles as SideEffectOracles
participant Transport as StreamHttpDriver
participant RPC as Tool RPC
participant Reporter
Client->>ToolClient: start fuzzing (security_policy)
ToolClient->>SecurityEngine: pre_call_expectations(tool,args)
SecurityEngine-->>ToolClient: expectations
ToolClient->>Oracles: pre_call() -> pre_snapshots
ToolClient->>Transport: send RPC (tool call)
Transport-->>ToolClient: response (may include session-id)
ToolClient->>RPC: receive result
ToolClient->>Oracles: post_call(pre_snapshots) -> oracle_findings, side_effects
ToolClient->>SecurityEngine: post_call_verdicts(success, exception, oracle_findings, expectations)
SecurityEngine-->>ToolClient: policy_violations, semantic_mismatch
ToolClient->>Reporter: emit result (oracle_findings, side_effects, policy_violations, semantic_mismatch)
Reporter->>Reporter: aggregate security_summary
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #157 +/- ##
==========================================
- Coverage 95.25% 94.01% -1.25%
==========================================
Files 164 173 +9
Lines 10189 11178 +989
==========================================
+ Hits 9706 10509 +803
- Misses 483 669 +186 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Implement security mode aligned with the MCP Server Security Standard (MSSS): https://github.com/mcp-security-standard/mcp-server-security-standard Phase 0 – Policy model - CLI: --security-mode (off|minimal|full), --fs-allow-root, --fs-deny-root, --repo-root, --workspace-root, --net-allow-host, --net-deny-by-default, --proc-allow, --proc-ignore - mcp_fuzzer/security_mode/policy.py: SecurityPolicy, path/host/command normalization, build_security_policy() - Result schema: oracle_findings, policy_violations, side_effects, semantic_mismatch (result_builder, types) Phase 1 – Side-effect oracles - ProcessSideEffectOracle: pre/post call snapshots (psutil), allowlist, shell detection (MCP-EXEC-01/02/03) - FilesystemSideEffectOracle: snapshot-diff on allow roots, symlink escape (MCP-FS-01/02) - NetworkSideEffectOracle: net_connections snapshot, allowlist (MCP-NET-01/02) - Wired in tool_client._call_tool_with_security() per tool call Phase 2 – Policy-violation checks - security_mode/engine.py: SecurityModeEngine, pre_call_expectations(), post_call_verdicts(), SecurityVerdict with policy_violations and semantic_mismatch (forbidden_side_effect / side_effect_on_error) - Control mapping: filesystem→MCP-FS-02, process→MCP-EXEC-01, network→MCP-NET-01, authz→MCP-AUTHZ-01 Phase 3 – Payload dictionaries - interesting_values: PATH_TRAVERSAL_UNC, arg_injection in get_payload_within_length, SECURITY_DICTIONARY path/command/arg_injection - aggressive/tool_strategy: _prefer_security_dictionary for command/path args - corpus: SECURITY_MODE_CORPUS, get_security_mode_corpus() Phase 4 – Stateful sequences - SequenceMutator, SequenceExecutor; FuzzerContext created_files, repo_paths, symlink_targets - Sequences: create_file→read_file, symlink escape, repo init→diff/log - Run when security engine enabled and tool capabilities detected Phase 5 – Auth and session - AuthManager.get_negative_auth_variants(): missing, invalid_token - tool_client: _run_negative_auth_probes, auth_required_but_missing as violation - stream_http_driver: _session_ids, get_stored_session_ids(), try_session_replay() Phase 6 – Response semantics - semantic_mismatch in engine (success + forbidden side effects; error + side effects) Phase 7 – Reporting - collect_security_summary() in reports/formatters/common.py - Security mode summary in console: oracle_findings_by_type, policy_violations_by_domain, policy_controls (MCP-*) Phase 8 – Tests - Unit tests: security_mode (policy, oracles, engine), auth negative variants, stream_http_driver session replay, collect_security_summary, interesting_values PATH_TRAVERSAL_UNC/arg_injection, sequence README: document security mode and link to MSSS.
cb8b4f8 to
c7d2129
Compare
Documentation PreviewYour documentation changes have been built successfully! Preview URL: https://Agent-Hellboy.github.io/mcp-server-fuzzer/ The documentation will be automatically deployed to the main site when this PR is merged. This comment was automatically generated by the documentation workflow. |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@README.md`:
- Line 42: Update the "Security Mode" README line to clarify that the MSSS
control IDs shown are illustrative: explicitly mark the control IDs (e.g.,
MCP-EXEC-01, MCP-FS-02, MCP-NET-01) as examples or add a parenthetical like
"(example control IDs)" so readers don’t assume a strict mapping; modify the
phrase around "Audit server security against the MCP Server Security Standard
(MSSS) ..." to include this example/illustrative qualifier.
- Around line 53-55: The README's "Security mode (MSSS auditing)" section
documents CLI flags that don't exist; update that section to reflect the actual
implemented flags in mcp_fuzzer/cli/parser.py (symbols: --enable-safety-system,
--no-safety, --safety-report, --no-network, --allow-host) or remove the
fictional flags (--security-mode, --fs-allow-root, --net-allow-host,
--proc-allow). Edit the text to describe how the real flags control MSSS-related
behavior (mapping of safety toggles, network/host allowances, and reporting) and
adjust examples and the "Configuration" anchor to reference the real flag names
so documentation matches parser.py.
| ### Security mode (MSSS auditing) | ||
|
|
||
| With `--security-mode minimal` or `--security-mode full`, the fuzzer audits server behavior against the [MCP Server Security Standard (MSSS)](https://github.com/mcp-security-standard/mcp-server-security-standard). It uses side-effect oracles (process, filesystem, network), policy-violation checks, and optional auth/session probes. Results include oracle findings and policy violations mapped to MSSS control IDs (e.g. MCP-EXEC-01, MCP-FS-02, MCP-NET-01). See [Configuration](#configuration) for `--security-mode`, `--fs-allow-root`, `--net-allow-host`, `--proc-allow`, and related options. |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for the mentioned CLI flags
rg -n --type=py --type=md -C3 '\bsecurity-mode\b|\bfs-allow-root\b|\bnet-allow-host\b|\bproc-allow\b'Repository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 817
🏁 Script executed:
#!/bin/bash
# Find the main entry point and CLI argument definitions
find . -name "*.py" -type f | head -20
echo "---"
# Search for argparse or click usage
rg -n "argparse|click|add_argument" --type=py | head -30Repository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 3332
🏁 Script executed:
#!/bin/bash
# Read the CLI parser to see what flags are actually implemented
wc -l ./mcp_fuzzer/cli/parser.py
echo "---"
cat -n ./mcp_fuzzer/cli/parser.py | head -100Repository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 3862
🏁 Script executed:
#!/bin/bash
# Read lines 100-250 of parser.py to find security-mode and related flags
sed -n '100,250p' ./mcp_fuzzer/cli/parser.pyRepository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 4440
🏁 Script executed:
#!/bin/bash
# Read remaining lines of parser.py
sed -n '250,461p' ./mcp_fuzzer/cli/parser.pyRepository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 5714
Remove or correct misleading CLI flag documentation—flags do not exist in implementation.
The README documents --security-mode minimal|full, --fs-allow-root, --net-allow-host, and --proc-allow, but these flags are not implemented in the CLI parser. The actual security-related flags are --enable-safety-system, --no-safety, --safety-report, --no-network, and --allow-host. Update the README to either remove the fictional flags or align it with the actual implemented options in mcp_fuzzer/cli/parser.py.
🤖 Prompt for AI Agents
In `@README.md` around lines 53 - 55, The README's "Security mode (MSSS auditing)"
section documents CLI flags that don't exist; update that section to reflect the
actual implemented flags in mcp_fuzzer/cli/parser.py (symbols:
--enable-safety-system, --no-safety, --safety-report, --no-network,
--allow-host) or remove the fictional flags (--security-mode, --fs-allow-root,
--net-allow-host, --proc-allow). Edit the text to describe how the real flags
control MSSS-related behavior (mapping of safety toggles, network/host
allowances, and reporting) and adjust examples and the "Configuration" anchor to
reference the real flag names so documentation matches parser.py.
- common.py: shorten docstring line (oracle type list) - test_auth.py: shorten docstring, split long assert across lines - test_common.py: split long dict literal for ruff E501 - test_stream_http_driver.py: shorten docstring - test_security_mode.py: add new process between pre_call and post_call so allowlist test sees new_process effect (fix failing test)
Documentation PreviewYour documentation changes have been built successfully! Preview URL: https://Agent-Hellboy.github.io/mcp-server-fuzzer/ The documentation will be automatically deployed to the main site when this PR is merged. This comment was automatically generated by the documentation workflow. |
There was a problem hiding this comment.
Actionable comments posted: 25
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
mcp_fuzzer/fuzz_engine/mutators/strategies/aggressive/tool_strategy.py (1)
22-60: 🧹 Nitpick | 🔵 TrivialArg‑injection hint matching is likely too literal for schema keys.
ARG_INJECTION_HINTSare CLI flags (e.g.,--output=), butkeyis a schema property name, so this branch may rarely trigger and arg‑injection payloads won’t be selected. Consider normalizing hints (strip dashes/=) or adding generic tokens (arg,option,flag) to improve hit rate.♻️ Possible normalization tweak
def _prefer_security_dictionary(key: str | None, max_size: int) -> str | None: if not key: return None lowered = key.lower() command_hints = ("cmd", "command", "exec", "shell", "run", "script") path_hints = ("path", "file", "dir", "folder", "repo", "root") if any(hint in lowered for hint in command_hints): return get_security_payload(max_size, "command") if any(hint in lowered for hint in path_hints): return get_security_payload(max_size, "path") - if any(hint in lowered for hint in ARG_INJECTION_HINTS): + normalized_hints = {hint.lstrip("-").split("=")[0] for hint in ARG_INJECTION_HINTS} + arg_tokens = ("arg", "args", "option", "options", "flag", "flags") + if any(hint in lowered for hint in normalized_hints) or any(t in lowered for t in arg_tokens): return get_security_payload(max_size, "arg_injection") return Nonemcp_fuzzer/fuzz_engine/mutators/strategies/interesting_values.py (1)
262-305: 🧹 Nitpick | 🔵 TrivialAvoid duplicating security payload mappings.
get_payload_within_lengthredefines path/arg‑injection payloads that already exist inSECURITY_DICTIONARY, which can drift. Consider reusing the dictionary to keep sources of truth aligned.♻️ Consolidate payload maps
def get_payload_within_length(max_length: int, category: str = "sql") -> str: """Get an attack payload that fits within the length constraint.""" payloads_map: dict[str, list[str]] = { "sql": SQL_INJECTION, "nosql": NOSQL_INJECTION, "xss": XSS_PAYLOADS, - "path": PATH_TRAVERSAL + PATH_TRAVERSAL_EXTENDED + PATH_TRAVERSAL_UNC, - "command": COMMAND_INJECTION, - "arg_injection": [ - f"{h}$(curl http://169.254.169.254)" - for h in ARG_INJECTION_HINTS - ], "ssrf": SSRF_PAYLOADS, + **SECURITY_DICTIONARY, }mcp_fuzzer/client/base.py (1)
29-56:⚠️ Potential issue | 🟡 MinorDocument the new
security_policyparameter.
The initializer docstring omits the new argument, which makes the public API harder to discover.📝 Suggested docstring update
Args: transport: Transport protocol for server communication auth_manager: Authentication manager for tool authentication tool_timeout: Default timeout for tool calls reporter: Reporter for fuzzing results safety_system: Safety system for filtering operations max_concurrency: Maximum number of concurrent operations tool_client: Optional pre-created ToolClient protocol_client: Optional pre-created ProtocolClient + security_policy: Optional policy enforced by the tool clientmcp_fuzzer/reports/output/protocol.py (1)
92-105: 🧹 Nitpick | 🔵 TrivialConsider renaming local variable
summaryto avoid ambiguity.The variable
summary(line 95) could be confused with other summary-related concepts in this file (e.g.,spec_summary,SummaryStats). A more explicit name likesec_summarywould improve clarity.♻️ Suggested rename for clarity
- security_summary: dict[str, Any] | None = None, + security_summary: dict[str, Any] | None = None, ) -> dict[str, Any]: """Create fuzzing results output.""" - summary = security_summary or {} + sec_summary = security_summary or {} data = { ... "spec_summary": spec_summary or {}, - "security_summary": summary, + "security_summary": sec_summary, }
🤖 Fix all issues with AI agents
In `@docs/development/standardized-output.md`:
- Around line 450-451: Update the description of fuzzing_results to clearly
state that the security_summary field is optional and may be omitted in the JSON
examples for brevity; specifically modify the line referencing `fuzzing_results`
to read something like "`security_summary` (optional): oracle findings/policy
violation counts aggregated per control domain; may be omitted in examples" and
ensure `error_report` remains described as "list of errors + warnings, execution
context" so readers understand the examples intentionally leave out
`security_summary`.
In `@mcp_fuzzer/client/tool_client.py`:
- Around line 364-373: The snapshots list currently uses a generic annotation
list[tuple[object, object | None]]; update it to a more descriptive typed
annotation by declaring a Snapshot type (e.g., SnapshotT = Any or a Union of the
concrete snapshot return types) or a Protocol/ABC for oracle snapshot results,
then annotate snapshots as list[tuple[FilesystemSideEffectOracle |
SecurityOracleType, Optional[SnapshotT]]] (referencing self._security_oracles,
FilesystemSideEffectOracle and the pre_call method) so the tuple element types
reflect the actual oracle and its optional snapshot return value; import typing
primitives (Optional, Union, Any, TypeVar or Protocol) as needed and apply the
new annotation to the snapshots variable.
- Around line 461-472: The inner async function _call_probe currently swallows
all exceptions, making auth failures indistinguishable from other errors; modify
_call_probe (and its use of self._temporary_transport_attr and
self._rpc.call_tool) to catch and handle auth-related exceptions separately
(e.g., AuthenticationError/AuthorizationError specific to your RPC layer) and
return False for those, while logging or re-raising unexpected exceptions: for
non-auth exceptions call logging.exception (or include the exception details)
before returning False or rethrowing as appropriate so network/other errors
aren't silently ignored.
In `@mcp_fuzzer/fuzz_engine/executor/sequence_executor.py`:
- Around line 37-62: In _update_context (in sequence_executor.py) the values
pulled from step.args for create_file/create_symlink/init_repo can be Path
objects but FuzzerContext stores tuple[str, ...]; convert those values to str
before appending to context tuples: when handling step.label == "create_file"
coerce path to str(path) before adding to context.created_files, for
"create_symlink" coerce both link and target to str(...) before adding to
context.symlink_targets, and for "init_repo" coerce repo_path to str(repo_path)
before adding to context.repo_paths; update only within the replace calls in
_update_context to ensure stored values are always strings.
- Around line 20-35: In execute_sequence, avoid unconditionally updating context
after calling _rpc.call_tool; wrap the call in a try/except and only call
_update_context when the tool invocation succeeded (i.e., the call_tool did not
raise and the returned response is not an error envelope—check for an "error"
key or equivalent success indicator on the response). On exception or an error
envelope, record the error in results (include label, tool, and the error) and
skip calling _update_context so non-existent files/repos are not added to
current_context. Ensure you reference execute_sequence, _rpc.call_tool, and
_update_context when making the change.
In `@mcp_fuzzer/fuzz_engine/mutators/sequence.py`:
- Around line 37-49: The build_sequences method constructs paths under seq_dir
but never ensures that directory exists, causing sequences (created_path,
symlink_path, repo_path) to fail; modify build_sequences to create seq_dir
before using it (e.g., call seq_dir.mkdir(parents=True, exist_ok=True)) or fall
back to base_dir if creation fails, so references to
seq_dir/created_sequence_file.txt, seq_dir/escape_link.txt and
seq_dir/security_repo are safe; update any logic that assumes the directory
exists (in build_sequences and any helpers that reference seq_dir) to handle
creation errors gracefully.
- Around line 131-137: The _match_tool implementation currently uses substring
matching which causes false positives (e.g., "CatalogTool" matching "log");
update _match_tool to tokenize tool names and keywords and perform token-equals
matching: in _match_tool, for each tool from self.tools get name via
tool.get("name",""), normalize to lowercase, split into tokens on
non-alphanumeric characters (or whitespace) to produce name_tokens, likewise
normalize and tokenize the input keywords, then check if any keyword token
exactly equals any name token (instead of using keyword in name); return
tool.get("name") when a token match is found and otherwise return None. Ensure
you only change the matching logic inside _match_tool and keep the returned
value the same.
In `@mcp_fuzzer/reports/core/collector.py`:
- Around line 176-195: In _update_security_summary, avoid using unhashable
control items as dict keys: when iterating controls (variable controls in
_update_security_summary), validate each control before incrementing
security_summary["policy_controls"] (control_counter). Implement a quick
hashability check (e.g., try: hash(control) except TypeError: continue) or
coerce safe types to a string/ID and skip nested dicts/lists so
control_counter[control] = control_counter.get(control, 0) + 1 only runs for
hashable/safe control values.
In `@mcp_fuzzer/reports/formatters/common.py`:
- Around line 135-175: The docstring in collect_security_summary has bullet
lines that exceed the 88-character lint limit; wrap those bullet lines so each
line is <=88 chars (for example split the long return description lines into
multiple shorter lines), preserving the same content and indentation and keeping
keys like "oracle_findings_by_type", "policy_violations_by_domain", and
"policy_controls" intact; update only the docstring text in the
collect_security_summary function to use shorter wrapped lines.
In `@mcp_fuzzer/security_mode/engine.py`:
- Around line 130-145: The loops that inspect command-like args use args.items()
which can raise AttributeError because args may be None; use the
already-initialized payload variable (payload = args or {}) instead of args in
both branches so the checks in the proc_allow branch (self.policy.proc_allow,
self.policy.normalize_command) and the else branch that sets cmd_violation and
appends to suspicious_commands operate on payload safely.
- Around line 175-207: The semantic_mismatch dict construction has inconsistent
indentation and misaligned nested elements (inside the block that checks
policy_violations and sets status); fix it by reformatting the entire
semantic_mismatch dictionary so all top-level keys ("status", "violations",
"expectations") are aligned consistently, and ensure the nested expectation
fields (path_violation_expected, network_violation_expected,
command_violation_expected, suspicious_paths, suspicious_hosts,
suspicious_commands) and their list() calls are indented uniformly and correctly
closed; update the block that references semantic_mismatch, expectations,
policy_violations, and success to use the corrected, consistently-indented dict
literal.
- Around line 83-90: The function _map_domain contains redundant checks for
"process" and "network" which are already handled by the initial membership test
against _CONTROL_MAPPING; remove those unreachable branches and simplify the
function to check identifier in _CONTROL_MAPPING (or handle None) and otherwise
return the default "filesystem". Update _map_domain to only reference
_CONTROL_MAPPING and the default return so the logic is concise and correct.
In `@mcp_fuzzer/security_mode/oracles/filesystem.py`:
- Around line 149-158: In the loop over "for path in common" comparing
pre_snapshot and post_snapshot (using before/after and
before.link_target/after.link_target) you only mark entries as "modified" but
don't validate symlink retargets; update the "modified" handling to detect
symlink status changes (e.g., before.link_target != after.link_target or a
change in symlink vs regular file) and then resolve the new target and run the
existing escape/policy check (the same check used elsewhere to detect path
escapes) on the resolved path so retargeted symlinks are validated against
policy and possible escapes are reported.
In `@mcp_fuzzer/security_mode/policy.py`:
- Around line 166-213: The net_deny_by_default value read from
config.get("net_deny_by_default") can be a string like "false" which
bool("false") treats as True; change the coercion so net_deny_by_default is
normalized explicitly: read config.get("net_deny_by_default") into
net_deny_by_default, if it is None fall back to SAFETY_NO_NETWORK_DEFAULT, if
it's a bool keep it, if it's a string casefold/strip and map common truthy
values ("true","1","yes","y","on") to True and common falsy values
("false","0","no","n","off") to False (optionally raise/log for unknown
strings), and for numeric types coerce non-zero to True; then pass the
normalized boolean into SecurityPolicy via
net_deny_by_default=net_deny_by_default. Ensure you update the references around
net_deny_by_default, _normalize_host, and SecurityPolicy instantiation
accordingly.
- Around line 117-124: The deny_roots() implementation currently returns only
self.fs_deny_roots when present, dropping _DEFAULT_DENY_ROOTS and creating a
security gap; change deny_roots() to merge normalized defaults with normalized
custom entries (use _normalize_path on each), de-duplicate (e.g., via set while
preserving order) and return a tuple of the combined normalized paths so default
critical directories from _DEFAULT_DENY_ROOTS are always included alongside any
user-specified fs_deny_roots; reference deny_roots, self.fs_deny_roots,
_DEFAULT_DENY_ROOTS, and _normalize_path when making the change.
In `@mcp_fuzzer/transport/drivers/stream_http_driver.py`:
- Around line 524-543: try_session_replay currently mutates driver state and can
trigger initialization via send_raw/_do_initialize, causing false positives;
modify try_session_replay (and send_raw as needed) to avoid auto-init: before
calling send_raw, capture and restore protocol_version and _initialized (and any
other init-related fields), and either bail out immediately if self._initialized
is False (return False) or call send_raw with a new flag like
skip_initialize=True that prevents _do_initialize from running; ensure
session_id is still restored in finally and update send_raw/_do_initialize to
honor skip_initialize to avoid changing protocol_version/_initialized during
replay.
In `@tests/unit/auth/test_auth.py`:
- Around line 295-314: The failing tests have E501 line-length violations in
test_auth_manager_get_negative_auth_variants_when_tool_has_auth; shorten the
long assertions by extracting expressions into short variables and using them in
assertions: for the final check, assign a small variable like possible_keys =
{"token","api_key","Authorization"} and assert that any(k in invalid_params for
k in possible_keys) (or assert
possible_keys.intersection(invalid_params.keys())), and for any other long
assertion split into a variable before asserting; update function
test_auth_manager_get_negative_auth_variants_when_tool_has_auth accordingly.
In `@tests/unit/client/test_tool_client_security.py`:
- Around line 49-70: The test relies on private internals (ToolClient._rpc and
ToolClient._run_negative_auth_probes); add a small testable injection point or
public test hook to avoid touching privates: modify ToolClient to accept an
optional rpc parameter (or provide a public setter like set_rpc) and expose a
public wrapper method (e.g., run_negative_auth_probes) that calls the internal
_run_negative_auth_probes, then update the test to use
client.run_negative_auth_probes(...) and inject the dummy RPC via the new
constructor parameter or setter; reference ToolClient,
_run_negative_auth_probes, and _rpc when making these changes.
- Around line 73-87: The test test_session_replay_detection_records_violation
contains a redundant explicit assignment to client.transport.session_id after
constructing _DummyTransport which already sets session_id to "session-new";
remove the extra assignment to avoid duplication (or alternatively add a short
comment explaining why the explicit override is kept) and keep references to
_DummyTransport.__init__ and client.transport.session_id so the change is
applied where the redundant line occurs.
In `@tests/unit/reports/test_common.py`:
- Line 136: The failing test line contains a long dict entry with
"oracle_findings": [{"oracle": "process", "type": "unexpected_process"}] that
exceeds the 88-char limit; edit the test to break this literal across multiple
lines (for example, place the list and/or inner dict on separate lines or format
the dict with each key/value on its own line) so the line length is under 88
characters while preserving the exact keys/values and test semantics in
test_common.py.
In `@tests/unit/reports/test_formatters.py`:
- Around line 171-189: Add docstrings to both test_print_security_summary_empty
and test_print_security_summary_with_counts to match style used by other tests
(e.g., test_print_tool_summary_empty_results), and add an additional unit test
that calls console_formatter.print_security_summary(None) to assert behavior
with None input (similar to the empty dict test). Locate the tests by the method
names test_print_security_summary_empty, test_print_security_summary_with_counts
and the target under test print_security_summary to update/extend them so they
include descriptive docstrings and the extra None-case assertion.
In `@tests/unit/reports/test_security_summary.py`:
- Around line 1-11: Add the pytest unit marker to this test module by importing
pytest and defining pytestmark = [pytest.mark.unit] near the top of the file
(e.g., after the existing imports); ensure the module-level variable pytestmark
is present so the test suite categorizes these tests consistently with other
files (look for the module-level pytestmark and the import of pytest to modify).
In `@tests/unit/security_mode/test_security_mode.py`:
- Around line 386-393: The failure is caused by adding allowed_child to the
process tree before taking the pre-call snapshot, so oracle.post_call won't see
it as new; move the tree.add(allowed_child) call to after pre_snapshot (i.e.,
call tree.add(allowed_child) after oracle.pre_call(1) and before
oracle.post_call(1)), keeping the monkeypatch of psutil.Process and references
to oracle.pre_call and oracle.post_call unchanged so the test detects the
new_process effect.
- Around line 260-262: The test's assertion is too weak; update the test that
monkeypatches policy_module.Path.resolve to raise via _raise_resolve so it
explicitly verifies the fallback path and that resolve was invoked: (1) assert
normalized equals policy_module.Path(str(path)).expanduser() to confirm the
function returned the expanded fallback path rather than any arbitrary
component, and (2) add a simple spy/flag around policy_module.Path.resolve (or
wrap _raise_resolve to flip a boolean) and assert that the resolve call occurred
so you prove the exception path was exercised in _normalize_path.
In `@tests/unit/transport/test_stream_http_driver.py`:
- Around line 896-903: The test docstring in
test_get_stored_session_ids_and_extract is too long and violates E501; edit the
docstring (the triple-quoted string immediately under the
test_get_stored_session_ids_and_extract function) to wrap to <=88 characters per
line (e.g., split into two short sentences like "Return IDs received via
_maybe_extract_session_headers." or similar), leaving the rest of the test code
(creation of StreamHttpDriver, FakeResponse usage, calls to
_maybe_extract_session_headers and get_stored_session_ids) unchanged.
| - `fuzzing_results`: mode, protocol, endpoint, tool/protocol summaries, spec summary, and `security_summary` (oracle findings/policy violation counts aggregated per control domain). | ||
| - `error_report`: list of errors + warnings, execution context. |
There was a problem hiding this comment.
Clarify that security_summary is optional in examples.
The JSON examples earlier don’t include this field, so this line should call out that it’s optional/omitted for brevity to avoid confusion.
✏️ Suggested clarification
-- `fuzzing_results`: mode, protocol, endpoint, tool/protocol summaries, spec summary, and `security_summary` (oracle findings/policy violation counts aggregated per control domain).
+- `fuzzing_results`: mode, protocol, endpoint, tool/protocol summaries, spec summary, and optional `security_summary` (oracle findings/policy violation counts aggregated per control domain; omitted from examples for brevity).📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - `fuzzing_results`: mode, protocol, endpoint, tool/protocol summaries, spec summary, and `security_summary` (oracle findings/policy violation counts aggregated per control domain). | |
| - `error_report`: list of errors + warnings, execution context. | |
| - `fuzzing_results`: mode, protocol, endpoint, tool/protocol summaries, spec summary, and optional `security_summary` (oracle findings/policy violation counts aggregated per control domain; omitted from examples for brevity). | |
| - `error_report`: list of errors + warnings, execution context. |
🤖 Prompt for AI Agents
In `@docs/development/standardized-output.md` around lines 450 - 451, Update the
description of fuzzing_results to clearly state that the security_summary field
is optional and may be omitted in the JSON examples for brevity; specifically
modify the line referencing `fuzzing_results` to read something like
"`security_summary` (optional): oracle findings/policy violation counts
aggregated per control domain; may be omitted in examples" and ensure
`error_report` remains described as "list of errors + warnings, execution
context" so readers understand the examples intentionally leave out
`security_summary`.
| snapshots: list[tuple[object, object | None]] = [] | ||
| for oracle in self._security_oracles: | ||
| try: | ||
| if isinstance(oracle, FilesystemSideEffectOracle): | ||
| snapshots.append((oracle, oracle.pre_call())) | ||
| else: | ||
| snapshots.append((oracle, oracle.pre_call(server_pid))) | ||
| except Exception as exc: | ||
| self._logger.debug("Security oracle pre_call failed: %s", exc) | ||
| snapshots.append((oracle, None)) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding type annotation for snapshot variable.
The snapshots list stores tuples of (oracle, snapshot) where snapshot types vary by oracle type. The current annotation list[tuple[object, object | None]] is correct but could be more descriptive.
🤖 Prompt for AI Agents
In `@mcp_fuzzer/client/tool_client.py` around lines 364 - 373, The snapshots list
currently uses a generic annotation list[tuple[object, object | None]]; update
it to a more descriptive typed annotation by declaring a Snapshot type (e.g.,
SnapshotT = Any or a Union of the concrete snapshot return types) or a
Protocol/ABC for oracle snapshot results, then annotate snapshots as
list[tuple[FilesystemSideEffectOracle | SecurityOracleType,
Optional[SnapshotT]]] (referencing self._security_oracles,
FilesystemSideEffectOracle and the pre_call method) so the tuple element types
reflect the actual oracle and its optional snapshot return value; import typing
primitives (Optional, Union, Any, TypeVar or Protocol) as needed and apply the
new annotation to the snapshots variable.
| async def _call_probe(kwargs: dict[str, Any], header_override=None): | ||
| try: | ||
| if header_override is not None: | ||
| with self._temporary_transport_attr( | ||
| "auth_headers", header_override | ||
| ): | ||
| await self._rpc.call_tool(tool_name, kwargs) | ||
| else: | ||
| await self._rpc.call_tool(tool_name, kwargs) | ||
| return True | ||
| except Exception: | ||
| return False |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Inner async function _call_probe swallows all exceptions.
While this is intentional for probing (to detect if the call succeeds without valid auth), catching all exceptions makes it impossible to distinguish between authentication failures and other errors like network issues.
♻️ Consider logging or distinguishing exception types
async def _call_probe(kwargs: dict[str, Any], header_override=None):
try:
if header_override is not None:
with self._temporary_transport_attr(
"auth_headers", header_override
):
await self._rpc.call_tool(tool_name, kwargs)
else:
await self._rpc.call_tool(tool_name, kwargs)
return True
- except Exception:
+ except Exception as exc:
+ self._logger.debug("Auth probe for %s failed: %s", tool_name, exc)
return False🤖 Prompt for AI Agents
In `@mcp_fuzzer/client/tool_client.py` around lines 461 - 472, The inner async
function _call_probe currently swallows all exceptions, making auth failures
indistinguishable from other errors; modify _call_probe (and its use of
self._temporary_transport_attr and self._rpc.call_tool) to catch and handle
auth-related exceptions separately (e.g., AuthenticationError/AuthorizationError
specific to your RPC layer) and return False for those, while logging or
re-raising unexpected exceptions: for non-auth exceptions call logging.exception
(or include the exception details) before returning False or rethrowing as
appropriate so network/other errors aren't silently ignored.
| async def execute_sequence( | ||
| self, sequence: SequenceDefinition, context: FuzzerContext | ||
| ) -> tuple[FuzzerContext, list[dict[str, Any]]]: | ||
| current_context = context | ||
| results: list[dict[str, Any]] = [] | ||
| for step in sequence.steps: | ||
| response = await self._rpc.call_tool(step.tool_name, step.args) | ||
| results.append( | ||
| { | ||
| "label": step.label, | ||
| "tool": step.tool_name, | ||
| "response": response, | ||
| } | ||
| ) | ||
| current_context = self._update_context(current_context, step) | ||
| return current_context, results |
There was a problem hiding this comment.
Guard context updates on successful tool execution.
call_tool can return error envelopes (or raise), but the context is updated unconditionally. This can record non-existent files/repos and mislead later steps.
🛠️ Suggested fix
for step in sequence.steps:
- response = await self._rpc.call_tool(step.tool_name, step.args)
+ try:
+ response = await self._rpc.call_tool(step.tool_name, step.args)
+ except Exception as exc:
+ results.append(
+ {"label": step.label, "tool": step.tool_name, "error": str(exc)}
+ )
+ continue
results.append(
{
"label": step.label,
"tool": step.tool_name,
"response": response,
}
)
- current_context = self._update_context(current_context, step)
+ if not (isinstance(response, dict) and "error" in response):
+ current_context = self._update_context(current_context, step)🤖 Prompt for AI Agents
In `@mcp_fuzzer/fuzz_engine/executor/sequence_executor.py` around lines 20 - 35,
In execute_sequence, avoid unconditionally updating context after calling
_rpc.call_tool; wrap the call in a try/except and only call _update_context when
the tool invocation succeeded (i.e., the call_tool did not raise and the
returned response is not an error envelope—check for an "error" key or
equivalent success indicator on the response). On exception or an error
envelope, record the error in results (include label, tool, and the error) and
skip calling _update_context so non-existent files/repos are not added to
current_context. Ensure you reference execute_sequence, _rpc.call_tool, and
_update_context when making the change.
| def _update_context( | ||
| self, context: FuzzerContext, step: SequenceStep | ||
| ) -> FuzzerContext: | ||
| if step.label == "create_file": | ||
| path = step.args.get("path") | ||
| if path: | ||
| return replace( | ||
| context, | ||
| created_files=context.created_files + (path,), | ||
| ) | ||
| if step.label == "create_symlink": | ||
| link = step.args.get("link") or step.args.get("path") | ||
| target = step.args.get("target") | ||
| if link and target: | ||
| return replace( | ||
| context, | ||
| symlink_targets=context.symlink_targets + ((link, target),), | ||
| ) | ||
| if step.label == "init_repo": | ||
| repo_path = step.args.get("path") | ||
| if repo_path: | ||
| return replace( | ||
| context, | ||
| repo_paths=context.repo_paths + (repo_path,), | ||
| ) | ||
| return context |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Normalize stored paths to strings.
FuzzerContext defines tuple[str, ...], but step.args may carry Path objects. Coercing to str avoids type drift.
🛠️ Suggested fix
if step.label == "create_file":
path = step.args.get("path")
if path:
return replace(
context,
- created_files=context.created_files + (path,),
+ created_files=context.created_files + (str(path),),
)
if step.label == "create_symlink":
link = step.args.get("link") or step.args.get("path")
target = step.args.get("target")
if link and target:
return replace(
context,
- symlink_targets=context.symlink_targets + ((link, target),),
+ symlink_targets=context.symlink_targets
+ + ((str(link), str(target)),),
)
if step.label == "init_repo":
repo_path = step.args.get("path")
if repo_path:
return replace(
context,
- repo_paths=context.repo_paths + (repo_path,),
+ repo_paths=context.repo_paths + (str(repo_path),),
)🤖 Prompt for AI Agents
In `@mcp_fuzzer/fuzz_engine/executor/sequence_executor.py` around lines 37 - 62,
In _update_context (in sequence_executor.py) the values pulled from step.args
for create_file/create_symlink/init_repo can be Path objects but FuzzerContext
stores tuple[str, ...]; convert those values to str before appending to context
tuples: when handling step.label == "create_file" coerce path to str(path)
before adding to context.created_files, for "create_symlink" coerce both link
and target to str(...) before adding to context.symlink_targets, and for
"init_repo" coerce repo_path to str(repo_path) before adding to
context.repo_paths; update only within the replace calls in _update_context to
ensure stored values are always strings.
| def test_print_security_summary_empty(self, console_formatter): | ||
| console_formatter.console.print.reset_mock() | ||
| console_formatter.print_security_summary({}) | ||
| console_formatter.console.print.assert_not_called() | ||
|
|
||
| def test_print_security_summary_with_counts(self, console_formatter): | ||
| console_formatter.console.print.reset_mock() | ||
| summary = { | ||
| "oracle_findings_by_type": {"filesystem": 2}, | ||
| "policy_violations_by_domain": {"filesystem": 1}, | ||
| "policy_controls": {"MCP-EXEC-01": 1}, | ||
| } | ||
| console_formatter.print_security_summary(summary) | ||
| calls = console_formatter.console.print.call_args_list | ||
| assert calls[0][0][0] == "\n[bold]Security Mode Summary:[/bold]" | ||
| table_titles = [getattr(call[0][0], "title", "") for call in calls[1:]] | ||
| assert "Oracle findings by type" in table_titles | ||
| assert "Policy violations by domain" in table_titles | ||
| assert "Policy controls" in table_titles |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add docstrings for consistency with other test methods.
Other test methods in this class include docstrings (e.g., test_print_tool_summary_empty_results). Consider adding them here for consistency.
Additionally, you might want to add a test case for None input to print_security_summary since the implementation handles falsy values.
📝 Suggested docstrings
def test_print_security_summary_empty(self, console_formatter):
+ """Test printing security summary with empty data produces no output."""
console_formatter.console.print.reset_mock()
console_formatter.print_security_summary({})
console_formatter.console.print.assert_not_called()
def test_print_security_summary_with_counts(self, console_formatter):
+ """Test printing security summary with findings displays header and tables."""
console_formatter.console.print.reset_mock()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def test_print_security_summary_empty(self, console_formatter): | |
| console_formatter.console.print.reset_mock() | |
| console_formatter.print_security_summary({}) | |
| console_formatter.console.print.assert_not_called() | |
| def test_print_security_summary_with_counts(self, console_formatter): | |
| console_formatter.console.print.reset_mock() | |
| summary = { | |
| "oracle_findings_by_type": {"filesystem": 2}, | |
| "policy_violations_by_domain": {"filesystem": 1}, | |
| "policy_controls": {"MCP-EXEC-01": 1}, | |
| } | |
| console_formatter.print_security_summary(summary) | |
| calls = console_formatter.console.print.call_args_list | |
| assert calls[0][0][0] == "\n[bold]Security Mode Summary:[/bold]" | |
| table_titles = [getattr(call[0][0], "title", "") for call in calls[1:]] | |
| assert "Oracle findings by type" in table_titles | |
| assert "Policy violations by domain" in table_titles | |
| assert "Policy controls" in table_titles | |
| def test_print_security_summary_empty(self, console_formatter): | |
| """Test printing security summary with empty data produces no output.""" | |
| console_formatter.console.print.reset_mock() | |
| console_formatter.print_security_summary({}) | |
| console_formatter.console.print.assert_not_called() | |
| def test_print_security_summary_with_counts(self, console_formatter): | |
| """Test printing security summary with findings displays header and tables.""" | |
| console_formatter.console.print.reset_mock() | |
| summary = { | |
| "oracle_findings_by_type": {"filesystem": 2}, | |
| "policy_violations_by_domain": {"filesystem": 1}, | |
| "policy_controls": {"MCP-EXEC-01": 1}, | |
| } | |
| console_formatter.print_security_summary(summary) | |
| calls = console_formatter.console.print.call_args_list | |
| assert calls[0][0][0] == "\n[bold]Security Mode Summary:[/bold]" | |
| table_titles = [getattr(call[0][0], "title", "") for call in calls[1:]] | |
| assert "Oracle findings by type" in table_titles | |
| assert "Policy violations by domain" in table_titles | |
| assert "Policy controls" in table_titles |
🤖 Prompt for AI Agents
In `@tests/unit/reports/test_formatters.py` around lines 171 - 189, Add docstrings
to both test_print_security_summary_empty and
test_print_security_summary_with_counts to match style used by other tests
(e.g., test_print_tool_summary_empty_results), and add an additional unit test
that calls console_formatter.print_security_summary(None) to assert behavior
with None input (similar to the empty dict test). Locate the tests by the method
names test_print_security_summary_empty, test_print_security_summary_with_counts
and the target under test print_security_summary to update/extend them so they
include descriptive docstrings and the extra None-case assertion.
| #!/usr/bin/env python3 | ||
| """ | ||
| Unit tests for security summary aggregation in reports. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime | ||
|
|
||
| from mcp_fuzzer.reports.core.collector import ReportCollector | ||
| from mcp_fuzzer.reports.core.models import FuzzingMetadata |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add pytest marker for consistency with other test files.
Other test files in the same directory (e.g., test_common.py) include pytestmark = [pytest.mark.unit] for test categorization. This file is missing the marker.
♻️ Proposed fix to add pytest marker
from mcp_fuzzer.reports.core.collector import ReportCollector
from mcp_fuzzer.reports.core.models import FuzzingMetadata
+
+import pytest
+
+pytestmark = [pytest.mark.unit]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #!/usr/bin/env python3 | |
| """ | |
| Unit tests for security summary aggregation in reports. | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime | |
| from mcp_fuzzer.reports.core.collector import ReportCollector | |
| from mcp_fuzzer.reports.core.models import FuzzingMetadata | |
| #!/usr/bin/env python3 | |
| """ | |
| Unit tests for security summary aggregation in reports. | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime | |
| from mcp_fuzzer.reports.core.collector import ReportCollector | |
| from mcp_fuzzer.reports.core.models import FuzzingMetadata | |
| import pytest | |
| pytestmark = [pytest.mark.unit] |
🤖 Prompt for AI Agents
In `@tests/unit/reports/test_security_summary.py` around lines 1 - 11, Add the
pytest unit marker to this test module by importing pytest and defining
pytestmark = [pytest.mark.unit] near the top of the file (e.g., after the
existing imports); ensure the module-level variable pytestmark is present so the
test suite categorizes these tests consistently with other files (look for the
module-level pytestmark and the import of pytest to modify).
| monkeypatch.setattr(policy_module.Path, "resolve", _raise_resolve) | ||
| normalized = policy_module._normalize_path(str(path)) | ||
| assert normalized.name == "data" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the _normalize_path function implementation
ast-grep --pattern $'def _normalize_path($_):
$$$
'Repository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 57
🏁 Script executed:
#!/bin/bash
# Search for _normalize_path function definition
rg "def _normalize_path" -A 20Repository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 1440
🏁 Script executed:
#!/bin/bash
# Read the test file around lines 260-262
sed -n '240,275p' tests/unit/security_mode/test_security_mode.pyRepository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 1392
Test assertion validates the fallback behavior but is somewhat weak.
The monkeypatch correctly triggers the exception handler in _normalize_path. When resolve() raises OSError, the function falls back to Path(path).expanduser(), which the assertion normalized.name == "data" does verify. However, this assertion is trivial—it will pass for any path component, not specifically validating the fallback behavior. Consider strengthening the test to explicitly verify the fallback behavior (e.g., by checking that resolve() was actually called and the exception was caught, or by validating properties that distinguish the fallback path from the strict resolution).
🤖 Prompt for AI Agents
In `@tests/unit/security_mode/test_security_mode.py` around lines 260 - 262, The
test's assertion is too weak; update the test that monkeypatches
policy_module.Path.resolve to raise via _raise_resolve so it explicitly verifies
the fallback path and that resolve was invoked: (1) assert normalized equals
policy_module.Path(str(path)).expanduser() to confirm the function returned the
expanded fallback path rather than any arbitrary component, and (2) add a simple
spy/flag around policy_module.Path.resolve (or wrap _raise_resolve to flip a
boolean) and assert that the resolve call occurred so you prove the exception
path was exercised in _normalize_path.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@mcp_fuzzer/reports/formatters/common.py`:
- Around line 140-143: Update the docstring that describes the returned dict to
use the actual oracle type names used in tests and runtime: replace "fs" with
"filesystem" in the oracle_findings_by_type list (so it reads something like
"process, filesystem, network, authz") while leaving the other return entries
(policy_violations_by_domain, policy_controls) unchanged; locate the docstring
that documents oracle_findings_by_type in this file (referencing the symbol
oracle_findings_by_type) and make the textual replacement for consistency.
In `@tests/unit/security_mode/test_security_mode.py`:
- Around line 132-147: The test test_filesystem_oracle_detects_symlink_escape
should guard os.symlink() against permission errors: replace the direct
os.symlink(outside, tmp_path / "escape_link") call with a try/except around
os.symlink that catches OSError and PermissionError, and call
pytest.skip("symlink not permitted") when symlink creation fails; ensure the
rest of the test (invoking FilesystemSideEffectOracle.pre_call()/post_call() and
assertions) only runs if the symlink was successfully created so the test is
skipped gracefully on platforms without symlink privileges.
In `@tests/unit/transport/test_stream_http_driver.py`:
- Around line 929-940: The test should also assert that driver.session_id is
restored after a failed replay: before calling try_session_replay in
test_try_session_replay_failure set a known original_session_id on the
StreamHttpDriver instance (driver.session_id = "orig"), trigger the failure via
monkeypatched send_raw (fake_send_raw_fail), await try_session_replay("bad-sid")
and assert it returns False, then assert driver.session_id == "orig" to verify
the finally block restored the session id; reference StreamHttpDriver,
try_session_replay, send_raw, and test_try_session_replay_failure when making
the change.
| Returns a dict with: | ||
| - oracle_findings_by_type: counts by oracle type (process, fs, network, authz) | ||
| - policy_violations_by_domain: counts by domain (FS, EXEC, NET, AUTHZ, INPUT) | ||
| - policy_controls: counts by control id (e.g. MCP-EXEC-01, MCP-FS-02) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Minor docstring inconsistency with test data.
The docstring lists oracle types as (process, fs, network, authz), but the corresponding test uses "filesystem" as an oracle type, not "fs". Since the function is generic and doesn't validate oracle values, consider updating the docstring to list the actual expected values (e.g., filesystem instead of fs) for consistency with real usage patterns.
📝 Suggested docstring update
- - oracle_findings_by_type: counts by oracle type (process, fs, network, authz)
+ - oracle_findings_by_type: counts by oracle type (process, filesystem,
+ network, authz)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Returns a dict with: | |
| - oracle_findings_by_type: counts by oracle type (process, fs, network, authz) | |
| - policy_violations_by_domain: counts by domain (FS, EXEC, NET, AUTHZ, INPUT) | |
| - policy_controls: counts by control id (e.g. MCP-EXEC-01, MCP-FS-02) | |
| Returns a dict with: | |
| - oracle_findings_by_type: counts by oracle type (process, filesystem, | |
| network, authz) | |
| - policy_violations_by_domain: counts by domain (FS, EXEC, NET, AUTHZ, INPUT) | |
| - policy_controls: counts by control id (e.g. MCP-EXEC-01, MCP-FS-02) |
🤖 Prompt for AI Agents
In `@mcp_fuzzer/reports/formatters/common.py` around lines 140 - 143, Update the
docstring that describes the returned dict to use the actual oracle type names
used in tests and runtime: replace "fs" with "filesystem" in the
oracle_findings_by_type list (so it reads something like "process, filesystem,
network, authz") while leaving the other return entries
(policy_violations_by_domain, policy_controls) unchanged; locate the docstring
that documents oracle_findings_by_type in this file (referencing the symbol
oracle_findings_by_type) and make the textual replacement for consistency.
| def test_filesystem_oracle_detects_symlink_escape(tmp_path): | ||
| policy = build_security_policy( | ||
| {"security_mode": "minimal", "fs_allow_roots": [str(tmp_path)]} | ||
| ) | ||
| assert policy | ||
| oracle = FilesystemSideEffectOracle(policy) | ||
| snapshot = oracle.pre_call() | ||
| (tmp_path / "allowed_file").write_text("ok") | ||
| outside = tmp_path.parent / "outside.txt" | ||
| outside.write_text("secret") | ||
| if not hasattr(os, "symlink"): | ||
| pytest.skip("symlink not available") | ||
| os.symlink(outside, tmp_path / "escape_link") | ||
| findings, effects = oracle.post_call(snapshot) | ||
| assert any(f["type"] == "symlink_escape" for f in findings) | ||
| assert any(e["type"] == "created" for e in effects) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find and read the test file
find . -path "*tests/unit/security_mode/test_security_mode.py" -type fRepository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 123
🏁 Script executed:
# Check if the file exists and read the specific section
head -150 tests/unit/security_mode/test_security_mode.py | tail -30Repository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 1090
🏁 Script executed:
# Get the exact lines mentioned in the review
sed -n '132,147p' tests/unit/security_mode/test_security_mode.pyRepository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 777
🏁 Script executed:
# Search for other symlink-related tests in the codebase
rg "symlink" tests/ --type py -B 2 -A 5Repository: Agent-Hellboy/mcp-server-fuzzer
Length of output: 4552
Guard os.symlink() against permission restrictions.
On Windows without elevated privileges, os.symlink() exists but raises OSError when called. The current hasattr() check only verifies existence, not usability. Wrap the call in try-except to skip the test gracefully when symlink creation fails due to permissions.
🛠️ Suggested fix
if not hasattr(os, "symlink"):
pytest.skip("symlink not available")
- os.symlink(outside, tmp_path / "escape_link")
+ try:
+ os.symlink(outside, tmp_path / "escape_link")
+ except (OSError, NotImplementedError) as exc:
+ pytest.skip(f"symlink not permitted: {exc}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def test_filesystem_oracle_detects_symlink_escape(tmp_path): | |
| policy = build_security_policy( | |
| {"security_mode": "minimal", "fs_allow_roots": [str(tmp_path)]} | |
| ) | |
| assert policy | |
| oracle = FilesystemSideEffectOracle(policy) | |
| snapshot = oracle.pre_call() | |
| (tmp_path / "allowed_file").write_text("ok") | |
| outside = tmp_path.parent / "outside.txt" | |
| outside.write_text("secret") | |
| if not hasattr(os, "symlink"): | |
| pytest.skip("symlink not available") | |
| os.symlink(outside, tmp_path / "escape_link") | |
| findings, effects = oracle.post_call(snapshot) | |
| assert any(f["type"] == "symlink_escape" for f in findings) | |
| assert any(e["type"] == "created" for e in effects) | |
| def test_filesystem_oracle_detects_symlink_escape(tmp_path): | |
| policy = build_security_policy( | |
| {"security_mode": "minimal", "fs_allow_roots": [str(tmp_path)]} | |
| ) | |
| assert policy | |
| oracle = FilesystemSideEffectOracle(policy) | |
| snapshot = oracle.pre_call() | |
| (tmp_path / "allowed_file").write_text("ok") | |
| outside = tmp_path.parent / "outside.txt" | |
| outside.write_text("secret") | |
| if not hasattr(os, "symlink"): | |
| pytest.skip("symlink not available") | |
| try: | |
| os.symlink(outside, tmp_path / "escape_link") | |
| except (OSError, NotImplementedError) as exc: | |
| pytest.skip(f"symlink not permitted: {exc}") | |
| findings, effects = oracle.post_call(snapshot) | |
| assert any(f["type"] == "symlink_escape" for f in findings) | |
| assert any(e["type"] == "created" for e in effects) |
🤖 Prompt for AI Agents
In `@tests/unit/security_mode/test_security_mode.py` around lines 132 - 147, The
test test_filesystem_oracle_detects_symlink_escape should guard os.symlink()
against permission errors: replace the direct os.symlink(outside, tmp_path /
"escape_link") call with a try/except around os.symlink that catches OSError and
PermissionError, and call pytest.skip("symlink not permitted") when symlink
creation fails; ensure the rest of the test (invoking
FilesystemSideEffectOracle.pre_call()/post_call() and assertions) only runs if
the symlink was successfully created so the test is skipped gracefully on
platforms without symlink privileges.
| @pytest.mark.asyncio | ||
| async def test_try_session_replay_failure(monkeypatch): | ||
| """try_session_replay returns False when request fails.""" | ||
| driver = StreamHttpDriver("http://localhost", safety_enabled=False) | ||
| driver._initialized = True | ||
|
|
||
| async def fake_send_raw_fail(_payload): | ||
| raise TransportError("session invalid") | ||
|
|
||
| monkeypatch.setattr(driver, "send_raw", fake_send_raw_fail) | ||
| result = await driver.try_session_replay("bad-sid") | ||
| assert result is False |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add assertion to verify session_id restoration after failure.
The test verifies False is returned on failure but doesn't confirm that session_id is properly restored via the finally block. The success test includes this check (line 926), and the failure path should verify the same behavior to ensure consistent driver state after errors.
🧪 Proposed fix to verify session restoration
`@pytest.mark.asyncio`
async def test_try_session_replay_failure(monkeypatch):
"""try_session_replay returns False when request fails."""
driver = StreamHttpDriver("http://localhost", safety_enabled=False)
driver._initialized = True
+ driver.session_id = "original-sid"
async def fake_send_raw_fail(_payload):
raise TransportError("session invalid")
monkeypatch.setattr(driver, "send_raw", fake_send_raw_fail)
result = await driver.try_session_replay("bad-sid")
assert result is False
+ assert driver.session_id == "original-sid"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @pytest.mark.asyncio | |
| async def test_try_session_replay_failure(monkeypatch): | |
| """try_session_replay returns False when request fails.""" | |
| driver = StreamHttpDriver("http://localhost", safety_enabled=False) | |
| driver._initialized = True | |
| async def fake_send_raw_fail(_payload): | |
| raise TransportError("session invalid") | |
| monkeypatch.setattr(driver, "send_raw", fake_send_raw_fail) | |
| result = await driver.try_session_replay("bad-sid") | |
| assert result is False | |
| `@pytest.mark.asyncio` | |
| async def test_try_session_replay_failure(monkeypatch): | |
| """try_session_replay returns False when request fails.""" | |
| driver = StreamHttpDriver("http://localhost", safety_enabled=False) | |
| driver._initialized = True | |
| driver.session_id = "original-sid" | |
| async def fake_send_raw_fail(_payload): | |
| raise TransportError("session invalid") | |
| monkeypatch.setattr(driver, "send_raw", fake_send_raw_fail) | |
| result = await driver.try_session_replay("bad-sid") | |
| assert result is False | |
| assert driver.session_id == "original-sid" |
🤖 Prompt for AI Agents
In `@tests/unit/transport/test_stream_http_driver.py` around lines 929 - 940, The
test should also assert that driver.session_id is restored after a failed
replay: before calling try_session_replay in test_try_session_replay_failure set
a known original_session_id on the StreamHttpDriver instance (driver.session_id
= "orig"), trigger the failure via monkeypatched send_raw (fake_send_raw_fail),
await try_session_replay("bad-sid") and assert it returns False, then assert
driver.session_id == "orig" to verify the finally block restored the session id;
reference StreamHttpDriver, try_session_replay, send_raw, and
test_try_session_replay_failure when making the change.
Document that --security-mode enables auditing against the MCP Server Security Standard (MSSS). Link to mcp-security-standard/mcp-server-security-standard and describe side-effect oracles, policy checks, and MSSS control mapping.
PR Summary: Implement Security Mode (MSSS) — README + full implementation
Overview
What changed (high-level)
Design changes & patterns
SOLID and maintainability assessment
Code/design smells, risks, and review focus
Documentation gaps
Backward-compatibility and migration
Recommended reviewer action items
Review effort estimate
Conclusion