Skip to content

Commit c0dad9b

Browse files
authored
ci: add e2e tests (#27)
1 parent 8be7a8e commit c0dad9b

File tree

10 files changed

+365
-11
lines changed

10 files changed

+365
-11
lines changed

.github/workflows/test.yaml

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,17 @@ jobs:
2121
test:
2222
name: Test
2323
runs-on: ubuntu-latest
24+
defaults:
25+
run:
26+
shell: nix develop --command bash {0}
2427
permissions:
2528
contents: read # required for actions/checkout
2629
steps:
2730
- name: Check out the repo
2831
uses: actions/checkout@v4
2932

30-
- name: Setup python
31-
uses: actions/setup-python@v5
32-
with:
33-
python-version: "3.10"
34-
35-
- name: Install uv
36-
uses: astral-sh/setup-uv@v5
37-
with:
38-
version: "0.7.17"
33+
- name: Install nix
34+
uses: DeterminateSystems/nix-installer-action@main
3935

4036
- name: Download dependencies
4137
run: make init
@@ -45,3 +41,6 @@ jobs:
4541

4642
- name: Run Unit Tests
4743
run: make test
44+
env:
45+
SYSDIG_MCP_API_HOST: ${{ vars.SYSDIG_MCP_API_HOST }}
46+
SYSDIG_MCP_API_SECURE_TOKEN: ${{ secrets.SYSDIG_MCP_API_SECURE_TOKEN }}

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ fmt:
1818
uvx ruff format --config ruff.toml
1919

2020
test:
21-
uv run pytest --capture=tee-sys --junitxml=pytest.xml
21+
uv run pytest --junitxml=pytest.xml
2222

2323
test-coverage:
2424
uv run pytest --cov=. --cov-report=xml

flake.nix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
uv
2727
ruff
2828
basedpyright
29+
sysdig-cli-scanner
2930
];
3031
};
3132

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ sysdig-mcp-server = "main:main"
2323

2424
[tool.uv]
2525
dev-dependencies = [
26+
"pytest-asyncio>=1.2.0",
2627
"pytest-cov~=6.2",
2728
"pytest~=8.4",
2829
"ruff~=0.12.1",
@@ -43,3 +44,7 @@ testpaths = [
4344
"tests",
4445
"integration",
4546
]
47+
asyncio_mode = "auto"
48+
markers = [
49+
"e2e: marks tests as end-to-end tests",
50+
]

tests/e2e/data/test.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
apiVersion: v1
2+
kind: Pod
3+
metadata:
4+
name: my-pod
5+
spec:
6+
containers:
7+
- name: my-container
8+
image: nginx
9+
securityContext:
10+
privileged: true
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: nginx-deployment
5+
spec:
6+
replicas: 1
7+
selector:
8+
matchLabels:
9+
app: nginx
10+
template:
11+
metadata:
12+
labels:
13+
app: nginx
14+
spec:
15+
containers:
16+
- name: nginx
17+
image: nginx:1.14.2
18+
ports:
19+
- containerPort: 80
20+
securityContext:
21+
allowPrivilegeEscalation: true

tests/e2e/test_tools.py

Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
from __future__ import annotations
2+
import os
3+
import json
4+
import pytest
5+
from typing import Callable, cast
6+
from fastmcp.client import Client
7+
from fastmcp.client.transports import StdioTransport
8+
9+
# Define a type for JSON-like objects to avoid using Any
10+
JsonValue = str | int | float | bool | None | dict[str, "JsonValue"] | list["JsonValue"]
11+
JsonObject = dict[str, JsonValue]
12+
13+
14+
# E2E tests for the Sysdig MCP Server tools.
15+
#
16+
# This script is designed to run in a CI/CD environment and requires the following prerequisites:
17+
# - Docker installed and running.
18+
# - The `sysdig-cli-scanner` binary installed and available in the system's PATH.
19+
# - The following environment variables set with valid Sysdig credentials:
20+
# - SYSDIG_MCP_API_SECURE_TOKEN
21+
# - SYSDIG_MCP_API_HOST
22+
#
23+
# The script will start the MCP server in a separate process, run a series of tests against it,
24+
# and then shut it down. If any of the tests fail, the script will exit with a non-zero status code.
25+
26+
27+
async def run_test(tool_name: str, tool_args: JsonObject, check: str | Callable[[JsonObject], None]):
28+
"""
29+
Runs a test by starting the MCP server, sending a request to it, and checking its stdout.
30+
"""
31+
transport = StdioTransport(
32+
"uv",
33+
["run", "sysdig-mcp-server"],
34+
env=dict(os.environ, **{"SYSDIG_MCP_LOGLEVEL": "DEBUG"}),
35+
)
36+
client = Client(transport)
37+
38+
async with client:
39+
result = await client.call_tool(tool_name, tool_args)
40+
41+
# Extract text content from the result
42+
output = ""
43+
if result.content:
44+
for content_block in result.content:
45+
output += getattr(content_block, "text", "")
46+
47+
print(f"--- STDOUT ---\n{output}")
48+
49+
if isinstance(check, str):
50+
assert check in output
51+
elif callable(check):
52+
try:
53+
json_output = cast(JsonObject, json.loads(output))
54+
check(json_output)
55+
except json.JSONDecodeError:
56+
pytest.fail(f"Output is not a valid JSON: {output}")
57+
58+
59+
@pytest.mark.e2e
60+
async def test_cli_scanner_tool_vulnerability_scan():
61+
"""
62+
Tests the CliScannerTool's vulnerability scan.
63+
"""
64+
def assert_vulns(output: JsonObject):
65+
assert "exit_code" in output
66+
output_str = output.get("output", "")
67+
assert isinstance(output_str, str)
68+
assert "vulnerabilities found" in output_str
69+
70+
await run_test(
71+
"run_sysdig_cli_scanner",
72+
{"image": "ubuntu:18.04"},
73+
assert_vulns,
74+
)
75+
76+
@pytest.mark.e2e
77+
async def test_cli_scanner_tool_vulnerability_scan_full_table():
78+
"""
79+
Tests the CliScannerTool's vulnerability scan with the full_vulnerability_table parameter.
80+
"""
81+
def assert_full_table(output: JsonObject):
82+
assert "exit_code" in output
83+
output_str = output.get("output", "")
84+
assert isinstance(output_str, str)
85+
# Check for a generic success message instead of the full table header
86+
assert "Execution logs written to" in output_str
87+
88+
await run_test(
89+
"run_sysdig_cli_scanner",
90+
{
91+
"image": "ubuntu:18.04",
92+
"mode": "vulnerability",
93+
"standalone": True,
94+
"offline_analyser": True,
95+
"full_vulnerability_table": True,
96+
},
97+
assert_full_table,
98+
)
99+
100+
101+
@pytest.mark.e2e
102+
async def test_cli_scanner_tool_iac_scan():
103+
"""
104+
Tests the CliScannerTool's IaC scan.
105+
"""
106+
def assert_iac(output: JsonObject):
107+
assert "exit_code" in output
108+
output_str = output.get("output", "")
109+
assert isinstance(output_str, str)
110+
assert "OK: no resources found" in output_str
111+
112+
await run_test(
113+
"run_sysdig_cli_scanner",
114+
{"path_to_scan": "tests/e2e/data/", "mode": "iac"},
115+
assert_iac,
116+
)
117+
118+
119+
@pytest.mark.e2e
120+
async def test_cli_scanner_tool_iac_scan_with_violations():
121+
"""
122+
Tests the CliScannerTool's IaC scan with a file containing violations.
123+
"""
124+
def assert_iac_violations(output: JsonObject):
125+
# The exit code might be 1 (fail) or 0 if only low/medium severity issues are found.
126+
# The important part is that the violation text is present.
127+
output_str = output.get("output", "")
128+
assert isinstance(output_str, str)
129+
assert "Container allowing privileged sub processes" in output_str
130+
131+
await run_test(
132+
"run_sysdig_cli_scanner",
133+
{"path_to_scan": "tests/e2e/iac_violations/", "mode": "iac"},
134+
assert_iac_violations,
135+
)
136+
137+
138+
@pytest.mark.e2e
139+
async def test_cli_scanner_tool_iac_scan_group_by_resource():
140+
"""
141+
Tests the CliScannerTool's IaC scan with grouping by resource.
142+
"""
143+
def assert_iac_violations(output: JsonObject):
144+
# The exit code might be 1 (fail) or 0.
145+
# The important part is that the resource name is present in the output.
146+
output_str = output.get("output", "")
147+
assert isinstance(output_str, str)
148+
assert "RESOURCE" in output_str # Check for the table header
149+
150+
await run_test(
151+
"run_sysdig_cli_scanner",
152+
{
153+
"path_to_scan": "tests/e2e/iac_violations/",
154+
"mode": "iac",
155+
"iac_group_by": "resource",
156+
},
157+
assert_iac_violations,
158+
)
159+
160+
161+
@pytest.mark.e2e
162+
async def test_events_feed_tools_list_runtime_events():
163+
"""
164+
Tests the EventsFeedTools' list_runtime_events.
165+
"""
166+
def assert_events(output: JsonObject):
167+
assert output["status_code"] == 200
168+
results = output.get("results")
169+
assert isinstance(results, dict)
170+
assert isinstance(results.get("data"), list)
171+
assert isinstance(results.get("page"), dict)
172+
173+
await run_test("list_runtime_events", {"scope_hours": 1}, assert_events)
174+
175+
176+
@pytest.mark.e2e
177+
async def test_events_feed_tools_list_runtime_events_with_filter():
178+
"""
179+
Tests the EventsFeedTools' list_runtime_events with a severity filter.
180+
"""
181+
def assert_events(output: JsonObject):
182+
assert output["status_code"] == 200
183+
results = output.get("results")
184+
assert isinstance(results, dict)
185+
data = results.get("data")
186+
assert isinstance(data, list)
187+
# Check that all returned events have the correct severity
188+
for event in data:
189+
assert isinstance(event, dict)
190+
severity = event.get("severity")
191+
assert severity in [4, 5]
192+
193+
await run_test(
194+
"list_runtime_events",
195+
{"scope_hours": 24, "filter_expr": 'severity in ("4", "5")'},
196+
assert_events,
197+
)
198+
199+
200+
@pytest.mark.e2e
201+
async def test_events_feed_tools_get_event_info():
202+
"""
203+
Tests the EventsFeedTools' get_event_info by first getting a valid event ID.
204+
"""
205+
event_id = None
206+
207+
def get_event_id(output: JsonObject):
208+
nonlocal event_id
209+
if output.get("results", {}).get("data"):
210+
event_id = output["results"]["data"][0].get("id")
211+
212+
await run_test("list_runtime_events", {"scope_hours": 24, "limit": 1}, get_event_id)
213+
214+
if not event_id:
215+
pytest.skip("No runtime events in the last 24 hours to test get_event_info.")
216+
217+
def assert_event_info(output: JsonObject):
218+
assert output["status_code"] == 200
219+
assert isinstance(output.get("results"), dict)
220+
assert output["results"].get("id") == event_id
221+
222+
await run_test("get_event_info", {"event_id": event_id}, assert_event_info)
223+
224+
225+
@pytest.mark.e2e
226+
async def test_events_feed_tools_get_event_process_tree():
227+
"""
228+
Tests the EventsFeedTools' get_event_process_tree by first getting a valid event ID.
229+
"""
230+
event_id = None
231+
232+
def get_event_id(output: JsonObject):
233+
nonlocal event_id
234+
if output.get("results", {}).get("data"):
235+
event_id = output["results"]["data"][0].get("id")
236+
237+
await run_test("list_runtime_events", {"scope_hours": 24, "limit": 1}, get_event_id)
238+
239+
if not event_id:
240+
pytest.skip("No runtime events in the last 24 hours to test get_event_process_tree.")
241+
242+
def assert_process_tree(output: JsonObject):
243+
assert isinstance(output.get("branches"), dict)
244+
assert isinstance(output.get("tree"), dict)
245+
assert isinstance(output.get("metadata"), dict)
246+
247+
await run_test("get_event_process_tree", {"event_id": event_id}, assert_process_tree)
248+
249+
250+
@pytest.mark.skip(reason="Sysdig Sage API endpoint is currently returning a 500 error")
251+
@pytest.mark.e2e
252+
async def test_sysql_tools_generate_and_run_sysql_query():
253+
"""
254+
Tests the SysQLTools' generate_and_run_sysql.
255+
"""
256+
def assert_sysql(output: JsonObject):
257+
assert output["status_code"] == 200
258+
results = output.get("results")
259+
assert isinstance(results, dict)
260+
assert isinstance(results.get("entities"), dict)
261+
assert isinstance(results.get("items"), list)
262+
263+
metadata = output.get("metadata")
264+
assert isinstance(metadata, dict)
265+
266+
metadata_kwargs = metadata.get("metadata_kwargs")
267+
assert isinstance(metadata_kwargs, dict)
268+
269+
sysql = metadata_kwargs.get("sysql")
270+
assert isinstance(sysql, str)
271+
assert "MATCH CloudResource AFFECTED_BY Vulnerability" in sysql
272+
273+
await run_test(
274+
"generate_and_run_sysql",
275+
{"question": "Match Cloud Resource affected by Critical Vulnerability"},
276+
assert_sysql,
277+
)
278+
279+
280+
@pytest.mark.e2e
281+
async def test_sysql_tools_run_sysql_query():
282+
"""
283+
Tests the SysQLTools' run_sysql.
284+
"""
285+
def assert_sysql(output: JsonObject):
286+
assert output["status_code"] == 200
287+
results = output.get("results")
288+
assert isinstance(results, dict)
289+
assert isinstance(results.get("entities"), dict)
290+
assert isinstance(results.get("items"), list)
291+
292+
metadata = output.get("metadata")
293+
assert isinstance(metadata, dict)
294+
295+
await run_test(
296+
"run_sysql",
297+
{"sysql_query": "MATCH CloudResource AFFECTED_BY Vulnerability"},
298+
assert_sysql,
299+
)

0 commit comments

Comments
 (0)