Skip to content

Commit b530efd

Browse files
author
User
committed
Create test_share.py
1 parent c24d241 commit b530efd

File tree

1 file changed

+136
-0
lines changed

1 file changed

+136
-0
lines changed

tests/test_share.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
"""
2+
Tests for the share command in MCPM
3+
"""
4+
5+
import sys
6+
from unittest.mock import Mock, patch
7+
8+
from click.testing import CliRunner
9+
10+
from src.mcpm.commands.share import (
11+
find_mcp_proxy,
12+
make_non_blocking,
13+
monitor_for_errors,
14+
share,
15+
terminate_process,
16+
)
17+
18+
19+
class TestShareCommand:
20+
"""Tests for the share command"""
21+
22+
def test_find_mcp_proxy_found(self, monkeypatch):
23+
"""Test finding mcp-proxy when it exists in PATH"""
24+
# Mock shutil.which to return a path
25+
monkeypatch.setattr("shutil.which", lambda _: "/usr/bin/mcp-proxy")
26+
27+
assert find_mcp_proxy() == "/usr/bin/mcp-proxy"
28+
29+
def test_find_mcp_proxy_not_found(self, monkeypatch):
30+
"""Test finding mcp-proxy when it does not exist in PATH"""
31+
# Mock shutil.which to return None
32+
monkeypatch.setattr("shutil.which", lambda _: None)
33+
34+
assert find_mcp_proxy() is None
35+
36+
@patch("fcntl.fcntl")
37+
def test_make_non_blocking(self, mock_fcntl):
38+
"""Test making a file object non-blocking"""
39+
# Create a mock file object
40+
mock_file = Mock()
41+
mock_file.fileno.return_value = 42
42+
43+
# Set up mock fcntl return values
44+
mock_fcntl.return_value = 0
45+
46+
# Call the function
47+
make_non_blocking(mock_file)
48+
49+
# Verify that functions were called correctly
50+
mock_file.fileno.assert_called_once()
51+
assert mock_fcntl.call_count == 2
52+
53+
def test_monitor_for_errors_with_known_error(self):
54+
"""Test error detection with a known error pattern"""
55+
error_line = "Error: RuntimeError: Received request before initialization was complete"
56+
57+
result = monitor_for_errors(error_line)
58+
59+
assert result is not None
60+
assert "Protocol initialization error" in result
61+
62+
def test_monitor_for_errors_connection_error(self):
63+
"""Test error detection with connection broken error"""
64+
error_line = "Exception: anyio.BrokenResourceError occurred during processing"
65+
66+
result = monitor_for_errors(error_line)
67+
68+
assert result is not None
69+
assert "Connection broken unexpectedly" in result
70+
71+
def test_monitor_for_errors_taskgroup_error(self):
72+
"""Test error detection with task group error"""
73+
error_line = "Error: ExceptionGroup: unhandled errors in a TaskGroup"
74+
75+
result = monitor_for_errors(error_line)
76+
77+
assert result is not None
78+
assert "Server task error detected" in result
79+
80+
def test_monitor_for_errors_no_error(self):
81+
"""Test error detection with no error patterns"""
82+
normal_line = "Server started successfully on port 8000"
83+
84+
result = monitor_for_errors(normal_line)
85+
86+
assert result is None
87+
88+
def test_terminate_process_already_terminated(self):
89+
"""Test terminating a process that's already terminated"""
90+
mock_process = Mock()
91+
mock_process.poll.return_value = 0 # Process already exited
92+
93+
result = terminate_process(mock_process)
94+
95+
assert result is True
96+
mock_process.terminate.assert_not_called()
97+
98+
def test_terminate_process_successful_termination(self):
99+
"""Test successful termination of a process"""
100+
mock_process = Mock()
101+
# Process is running, then terminates after SIGTERM
102+
mock_process.poll.side_effect = [None, 0]
103+
104+
result = terminate_process(mock_process, timeout=1)
105+
106+
assert result is True
107+
mock_process.terminate.assert_called_once()
108+
mock_process.kill.assert_not_called()
109+
110+
@patch("time.sleep") # Add sleep patch to avoid actual sleep
111+
def test_terminate_process_needs_sigkill(self, mock_sleep):
112+
"""Test termination of a process that needs SIGKILL"""
113+
mock_process = Mock()
114+
# First 20 poll calls return None (not terminated)
115+
# Then the 21st call returns 0 (terminated after SIGKILL)
116+
mock_process.poll.side_effect = [None] * 20 + [0]
117+
118+
result = terminate_process(mock_process, timeout=1)
119+
120+
assert result is True
121+
mock_process.terminate.assert_called_once()
122+
mock_process.kill.assert_called_once()
123+
124+
def test_share_command_no_mcp_proxy(self, monkeypatch):
125+
"""Test share command when mcp-proxy is not installed"""
126+
# Mock find_mcp_proxy to return None
127+
monkeypatch.setattr("src.mcpm.commands.share.find_mcp_proxy", lambda: None)
128+
129+
# Run the command
130+
runner = CliRunner()
131+
with patch.object(sys, "exit") as mock_exit:
132+
result = runner.invoke(share, ["test command"])
133+
134+
# Verify the command failed with the right error
135+
assert mock_exit.called
136+
assert "mcp-proxy not found" in result.output

0 commit comments

Comments
 (0)