diff --git a/pyproject.toml b/pyproject.toml index 3a7586f..c258903 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "mcp>=1.1.2", "chardet>=5.2.0", ] -requires-python = ">=3.13" +requires-python = ">=3.12" readme = "README.md" license = { text = "MIT" } diff --git a/src/mcp_text_editor/__init__.py b/src/mcp_text_editor/__init__.py index de99306..7c7834e 100644 --- a/src/mcp_text_editor/__init__.py +++ b/src/mcp_text_editor/__init__.py @@ -3,10 +3,10 @@ import asyncio from .server import main -from .text_editor import TextEditor +from .text_editor import TextEditor as TextEditor # Re-export explicitly # Create a global text editor instance -_text_editor = TextEditor() +_text_editor = None def run() -> None: diff --git a/src/mcp_text_editor/server.py b/src/mcp_text_editor/server.py index ae49e25..086ea27 100644 --- a/src/mcp_text_editor/server.py +++ b/src/mcp_text_editor/server.py @@ -4,7 +4,7 @@ import logging import traceback from collections.abc import Sequence -from typing import Any, List +from typing import Any, List, Optional from mcp.server import Server from mcp.types import TextContent, Tool @@ -17,6 +17,7 @@ InsertTextFileContentsHandler, PatchTextFileContentsHandler, ) +from mcp_text_editor.text_editor import TextEditor from mcp_text_editor.version import __version__ # Configure logging @@ -25,26 +26,32 @@ app: Server = Server("mcp-text-editor") -# Initialize tool handlers -get_contents_handler = GetTextFileContentsHandler() -patch_file_handler = PatchTextFileContentsHandler() -create_file_handler = CreateTextFileHandler() -append_file_handler = AppendTextFileContentsHandler() -delete_contents_handler = DeleteTextFileContentsHandler() -insert_file_handler = InsertTextFileContentsHandler() +# Initialize tool handlers as global variables +get_contents_handler: Optional[GetTextFileContentsHandler] = None +patch_file_handler: Optional[PatchTextFileContentsHandler] = None +create_file_handler: Optional[CreateTextFileHandler] = None +append_file_handler: Optional[AppendTextFileContentsHandler] = None +delete_contents_handler: Optional[DeleteTextFileContentsHandler] = None +insert_file_handler: Optional[InsertTextFileContentsHandler] = None @app.list_tools() async def list_tools() -> List[Tool]: """List available tools.""" - return [ - get_contents_handler.get_tool_description(), - create_file_handler.get_tool_description(), - append_file_handler.get_tool_description(), - delete_contents_handler.get_tool_description(), - insert_file_handler.get_tool_description(), - patch_file_handler.get_tool_description(), - ] + tools = [] + if get_contents_handler: + tools.append(get_contents_handler.get_tool_description()) + if create_file_handler: + tools.append(create_file_handler.get_tool_description()) + if append_file_handler: + tools.append(append_file_handler.get_tool_description()) + if delete_contents_handler: + tools.append(delete_contents_handler.get_tool_description()) + if insert_file_handler: + tools.append(insert_file_handler.get_tool_description()) + if patch_file_handler: + tools.append(patch_file_handler.get_tool_description()) + return tools @app.call_tool() @@ -52,17 +59,17 @@ async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: """Handle tool calls.""" logger.info(f"Calling tool: {name}") try: - if name == get_contents_handler.name: + if get_contents_handler and name == get_contents_handler.name: return await get_contents_handler.run_tool(arguments) - elif name == create_file_handler.name: + elif create_file_handler and name == create_file_handler.name: return await create_file_handler.run_tool(arguments) - elif name == append_file_handler.name: + elif append_file_handler and name == append_file_handler.name: return await append_file_handler.run_tool(arguments) - elif name == delete_contents_handler.name: + elif delete_contents_handler and name == delete_contents_handler.name: return await delete_contents_handler.run_tool(arguments) - elif name == insert_file_handler.name: + elif insert_file_handler and name == insert_file_handler.name: return await insert_file_handler.run_tool(arguments) - elif name == patch_file_handler.name: + elif patch_file_handler and name == patch_file_handler.name: return await patch_file_handler.run_tool(arguments) else: raise ValueError(f"Unknown tool: {name}") @@ -76,7 +83,30 @@ async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: async def main() -> None: """Main entry point for the MCP text editor server.""" + import argparse + + parser = argparse.ArgumentParser(description="MCP Text Editor Server") + parser.add_argument( + "--validator", help="Validator command to run after file updates" + ) + args = parser.parse_args() + logger.info(f"Starting MCP text editor server v{__version__}") + + # Initialize the global text editor instance with validator command + text_editor_instance = TextEditor(validator_command=args.validator) + + # Initialize the global handlers with the text editor instance + global get_contents_handler, patch_file_handler, create_file_handler + global append_file_handler, delete_contents_handler, insert_file_handler + + get_contents_handler = GetTextFileContentsHandler(editor=text_editor_instance) + patch_file_handler = PatchTextFileContentsHandler(editor=text_editor_instance) + create_file_handler = CreateTextFileHandler(editor=text_editor_instance) + append_file_handler = AppendTextFileContentsHandler(editor=text_editor_instance) + delete_contents_handler = DeleteTextFileContentsHandler(editor=text_editor_instance) + insert_file_handler = InsertTextFileContentsHandler(editor=text_editor_instance) + try: from mcp.server.stdio import stdio_server diff --git a/src/mcp_text_editor/text_editor.py b/src/mcp_text_editor/text_editor.py index 5789e55..ac594a4 100644 --- a/src/mcp_text_editor/text_editor.py +++ b/src/mcp_text_editor/text_editor.py @@ -14,10 +14,11 @@ class TextEditor: """Handles text file operations with security checks and conflict detection.""" - def __init__(self): + def __init__(self, validator_command: Optional[str] = None): """Initialize TextEditor.""" self._validate_environment() self.service = TextEditorService() + self.validator_command = validator_command def create_error_response( self, @@ -467,12 +468,15 @@ async def edit_file_contents( # Calculate new hash new_hash = self.calculate_hash(final_content) + validator_result = self._run_validator(file_path) + return { "result": "ok", "file_hash": new_hash, "reason": None, "suggestion": suggestion_text, "hint": hint_text, + "validator_result": validator_result, } except FileNotFoundError: @@ -589,10 +593,13 @@ async def insert_text_file_contents( # Calculate new hash new_hash = self.calculate_hash(final_content) + validator_result = self._run_validator(file_path) + return { "result": "ok", "hash": new_hash, "reason": None, + "validator_result": validator_result, } except FileNotFoundError: @@ -740,11 +747,14 @@ async def delete_text_file_contents( # Calculate new hash new_hash = self.calculate_hash(final_content) + validator_result = self._run_validator(request.file_path) + return { request.file_path: { "result": "ok", "hash": new_hash, "reason": None, + "validator_result": validator_result, } } @@ -764,3 +774,40 @@ async def delete_text_file_contents( "hash": None, } } + + def _run_validator(self, file_path: str) -> Optional[Dict[str, Any]]: + """Run validator command on the updated file if configured. + + Args: + file_path (str): Path to the file to validate + + Returns: + Optional[Dict[str, Any]]: Validation result or None if validation was not run + """ + logger.debug( + f"Running validator with command: {self.validator_command} on file: {file_path}" + ) + if not self.validator_command: + logger.debug("No validator command configured, skipping validation") + return None + + try: + import subprocess + + logger.debug( + f"Executing subprocess.run with: {[self.validator_command, file_path]}" + ) + result = subprocess.run( + [self.validator_command, file_path], + capture_output=True, + text=True, + check=False, + ) + return { + "exit_code": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr, + } + except Exception as e: + logger.error(f"Error running validator command: {str(e)}") + return {"error": str(e)} diff --git a/tests/test_validator.py b/tests/test_validator.py new file mode 100644 index 0000000..bae63b8 --- /dev/null +++ b/tests/test_validator.py @@ -0,0 +1,51 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from mcp_text_editor.text_editor import TextEditor + + +@pytest.fixture +def temp_file(tmp_path): + """Create a temporary file for testing.""" + file_path = tmp_path / "test.txt" + with open(file_path, "w") as f: + f.write("Original content\n") + return str(file_path) + + +@pytest.mark.asyncio +async def test_validator_command_runs_after_edit(temp_file): + """Test that validator command runs after editing a file.""" + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = "Validation passed" + mock_result.stderr = "" + mock_run.return_value = mock_result + + editor = TextEditor(validator_command="test-validator") + + with open(temp_file, "r") as f: + content = f.read() + file_hash = editor.calculate_hash(content) + result = await editor.edit_file_contents( + file_path=temp_file, + expected_file_hash=file_hash, # Use actual file hash + patches=[ + { + "start": 1, + "end": 1, + "contents": "Updated content\n", + "range_hash": editor.calculate_hash("Original content\n"), + } + ], + ) + + mock_run.assert_called_once_with( + ["test-validator", temp_file], capture_output=True, text=True, check=False + ) + + assert "validator_result" in result + assert result["validator_result"]["exit_code"] == 0 + assert result["validator_result"]["stdout"] == "Validation passed"