From 015030887573c5f26a755ec4cbd7785e4b830c9a Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 28 Jul 2025 21:36:12 -0700 Subject: [PATCH 1/5] Use normalize_filepath across all tools that work with filepaths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change ensures consistent filepath handling across all toolkit functions by using the normalize_filepath utility function. This handles URL-encoded paths, relative paths, and special characters consistently. - Updated file_system toolkit: read, write, edit, search_and_replace, glob, grep, ls - Updated notebook toolkit: all functions that take file_path parameter - Updated git toolkit: all functions that take path parameter - Added comprehensive tests for normalize_filepath functionality All tools now properly decode URL-encoded paths (e.g., my%20notebook.ipynb → my notebook.ipynb) and resolve relative paths against the Jupyter server's root directory. --- jupyter_ai_tools/toolkits/file_system.py | 11 +- jupyter_ai_tools/toolkits/git.py | 10 ++ jupyter_ai_tools/toolkits/notebook.py | 17 +-- jupyter_ai_tools/utils.py | 67 ++++++++++-- tests/test_normalize_filepath.py | 130 +++++++++++++++++++++++ 5 files changed, 220 insertions(+), 15 deletions(-) create mode 100644 tests/test_normalize_filepath.py diff --git a/jupyter_ai_tools/toolkits/file_system.py b/jupyter_ai_tools/toolkits/file_system.py index 1d0a431..a296d18 100644 --- a/jupyter_ai_tools/toolkits/file_system.py +++ b/jupyter_ai_tools/toolkits/file_system.py @@ -8,6 +8,8 @@ from jupyter_ai.tools.models import Tool, Toolkit +from ..utils import normalize_filepath + def read(file_path: str, offset: Optional[int] = None, limit: Optional[int] = None) -> str: """Reads a file from the local filesystem @@ -21,6 +23,7 @@ def read(file_path: str, offset: Optional[int] = None, limit: Optional[int] = No The contents of the file, potentially with line numbers """ try: + file_path = normalize_filepath(file_path) if not os.path.exists(file_path): return f"Error: File not found: {file_path}" @@ -73,6 +76,7 @@ def write(file_path: str, content: str) -> str: A success message or error message """ try: + file_path = normalize_filepath(file_path) # Ensure the directory exists directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): @@ -107,6 +111,7 @@ def edit(file_path: str, old_string: str, new_string: str, replace_all: bool = F A success message or error message """ try: + file_path = normalize_filepath(file_path) if not os.path.exists(file_path): return f"Error: File not found: {file_path}" @@ -159,6 +164,7 @@ async def search_and_replace( A success message or error message """ try: + file_path = normalize_filepath(file_path) if not os.path.exists(file_path): return f"Error: File not found: {file_path}" @@ -212,7 +218,7 @@ async def glob(pattern: str, path: Optional[str] = None) -> str: A list of matching file paths sorted by modification time """ try: - search_path = path or os.getcwd() + search_path = normalize_filepath(path) if path else os.getcwd() if not os.path.exists(search_path): return f"Error: Path not found: {search_path}" @@ -260,7 +266,7 @@ async def grep( A list of file paths with at least one match """ try: - search_path = path or os.getcwd() + search_path = normalize_filepath(path) if path else os.getcwd() if not os.path.exists(search_path): return [f"Error: Path not found: {search_path}"] @@ -312,6 +318,7 @@ async def ls(path: str, ignore: Optional[List[str]] = None) -> str: A list of files and directories in the given path """ try: + path = normalize_filepath(path) if not os.path.exists(path): return f"Error: Path not found: {path}" diff --git a/jupyter_ai_tools/toolkits/git.py b/jupyter_ai_tools/toolkits/git.py index 6698203..9f93384 100644 --- a/jupyter_ai_tools/toolkits/git.py +++ b/jupyter_ai_tools/toolkits/git.py @@ -4,6 +4,8 @@ from jupyter_ai.tools.models import Tool, Toolkit from jupyterlab_git.git import Git +from ..utils import normalize_filepath + git = Git() @@ -19,6 +21,7 @@ async def git_clone(path: str, url: str) -> str: Returns: str: Success or error message. """ + path = normalize_filepath(path) res = await git.clone(path, repo_url=url) if res["code"] == 0: return f"✅ Cloned repo into {res['path']}" @@ -36,6 +39,7 @@ async def git_status(path: str) -> str: Returns: str: A JSON-formatted string of status or an error message. """ + path = normalize_filepath(path) res = await git.status(path) if res["code"] == 0: return f"📋 Status:\n{json.dumps(res, indent=2)}" @@ -54,6 +58,7 @@ async def git_log(path: str, history_count: int = 10) -> str: Returns: str: A JSON-formatted commit log or error message. """ + path = normalize_filepath(path) res = await git.log(path, history_count=history_count) if res["code"] == 0: return f"🕓 Recent commits:\n{json.dumps(res, indent=2)}" @@ -71,6 +76,7 @@ async def git_pull(path: str) -> str: Returns: str: Success or error message. """ + path = normalize_filepath(path) res = await git.pull(path) return ( "✅ Pulled latest changes." @@ -91,6 +97,7 @@ async def git_push(path: str, branch: str) -> str: Returns: str: Success or error message. """ + path = normalize_filepath(path) res = await git.push(remote="origin", branch=branch, path=path) return ( "✅ Pushed changes." @@ -111,6 +118,7 @@ async def git_commit(path: str, message: str) -> str: Returns: str: Success or error message. """ + path = normalize_filepath(path) res = await git.commit(commit_msg=message, amend=False, path=path) return ( "✅ Commit successful." @@ -132,6 +140,7 @@ async def git_add(path: str, add_all: bool = True, filename: str = "") -> str: Returns: str: Success or error message. """ + path = normalize_filepath(path) if add_all: res = await git.add_all(path) elif filename: @@ -158,6 +167,7 @@ async def git_get_repo_root(path: str) -> str: Returns: str: The path to the Git repository root or an error message. """ + path = normalize_filepath(path) dir_path = os.path.dirname(path) res = await git.show_top_level(dir_path) if res["code"] == 0 and res.get("path"): diff --git a/jupyter_ai_tools/toolkits/notebook.py b/jupyter_ai_tools/toolkits/notebook.py index b2e815f..f60282d 100644 --- a/jupyter_ai_tools/toolkits/notebook.py +++ b/jupyter_ai_tools/toolkits/notebook.py @@ -13,6 +13,7 @@ cell_to_md, get_file_id, get_jupyter_ydoc, + normalize_filepath, notebook_json_to_md, ) @@ -70,6 +71,7 @@ async def read_notebook(file_path: str, include_outputs=False) -> str: The notebook content as a markdown string. """ try: + file_path = normalize_filepath(file_path) notebook_dict = await read_notebook_json(file_path) notebook_md = notebook_json_to_md(notebook_dict, include_outputs=include_outputs) return notebook_md @@ -91,6 +93,7 @@ async def read_notebook_json(file_path: str) -> Dict[str, Any]: A dictionary containing the complete notebook structure. """ try: + file_path = normalize_filepath(file_path) with open(file_path, "r", encoding="utf-8") as f: notebook_dict = json.load(f) return notebook_dict @@ -120,6 +123,7 @@ async def read_cell(file_path: str, cell_id: str, include_outputs: bool = True) LookupError: If no cell with the given ID is found. """ try: + file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) cell, cell_index = await read_cell_json(file_path, resolved_cell_id) @@ -150,6 +154,7 @@ async def read_cell_json(file_path: str, cell_id: str) -> Tuple[Dict[str, Any], LookupError: If no cell with the given ID is found. """ try: + file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) notebook_json = await read_notebook_json(file_path) @@ -182,7 +187,7 @@ async def get_cell_id_from_index(file_path: str, cell_index: int) -> str: or if the cell does not have an ID. """ try: - + file_path = normalize_filepath(file_path) cell_id = None notebook_json = await read_notebook_json(file_path) cells = notebook_json["cells"] @@ -233,7 +238,7 @@ async def add_cell( None """ try: - + file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) if cell_id else None @@ -304,7 +309,7 @@ async def insert_cell( None """ try: - + file_path = normalize_filepath(file_path) file_id = await get_file_id(file_path) ydoc = await get_jupyter_ydoc(file_id) @@ -357,7 +362,7 @@ async def delete_cell(file_path: str, cell_id: str): None """ try: - + file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) @@ -762,7 +767,7 @@ async def edit_cell(file_path: str, cell_id: str, content: str) -> None: ValueError: If the cell_id is not found in the notebook. """ try: - + file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) @@ -814,7 +819,7 @@ def read_cell_nbformat(file_path: str, cell_id: str) -> Dict[str, Any]: Raises: ValueError: If no cell with the given ID is found. """ - + file_path = normalize_filepath(file_path) with open(file_path, "r", encoding="utf-8") as f: notebook = nbformat.read(f, as_version=nbformat.NO_CONVERT) diff --git a/jupyter_ai_tools/utils.py b/jupyter_ai_tools/utils.py index d19e7a2..d54799c 100644 --- a/jupyter_ai_tools/utils.py +++ b/jupyter_ai_tools/utils.py @@ -1,20 +1,72 @@ import functools import inspect +import os import typing +from pathlib import Path from typing import Optional +from urllib.parse import unquote from jupyter_server.serverapp import ServerApp from jupyter_server.auth.identity import User from pycrdt import Awareness -async def get_serverapp(): +def get_serverapp(): """Returns the server app from the request context""" server = ServerApp.instance() return server +def normalize_filepath(file_path: str) -> str: + """ + Normalizes a file path for Jupyter applications to return an absolute path. + + Handles various input formats: + - Relative paths from current working directory + - URL-encoded relative paths (common in Jupyter contexts) + - Absolute paths (returned as-is after normalization) + + Args: + file_path: Path in any of the supported formats + + Returns: + Absolute path to the file + + Example: + >>> normalize_filepath("notebooks/my%20notebook.ipynb") + "/current/working/dir/notebooks/my notebook.ipynb" + >>> normalize_filepath("/absolute/path/file.ipynb") + "/absolute/path/file.ipynb" + >>> normalize_filepath("relative/file.ipynb") + "/current/working/dir/relative/file.ipynb" + """ + if not file_path or not file_path.strip(): + raise ValueError("file_path cannot be empty") + + # URL decode the path in case it contains encoded characters + decoded_path = unquote(file_path) + + # Convert to Path object for easier manipulation + path = Path(decoded_path) + + # If already absolute, just normalize and return + if path.is_absolute(): + return str(path.resolve()) + + # For relative paths, get the Jupyter server's root directory + try: + serverapp = get_serverapp() + root_dir = serverapp.root_dir + except Exception: + # Fallback to current working directory if server app is not available + root_dir = os.getcwd() + + # Resolve relative path against the root directory + resolved_path = Path(root_dir) / path + return str(resolved_path.resolve()) + + async def get_jupyter_ydoc(file_id: str): """Returns the notebook ydoc @@ -24,7 +76,7 @@ async def get_jupyter_ydoc(file_id: str): Returns: `YNotebook` ydoc for the notebook """ - serverapp = await get_serverapp() + serverapp = get_serverapp() yroom_manager = serverapp.web_app.settings["yroom_manager"] room_id = f"json:notebook:{file_id}" @@ -35,7 +87,7 @@ async def get_jupyter_ydoc(file_id: str): async def get_global_awareness() -> Optional[Awareness]: - serverapp = await get_serverapp() + serverapp = get_serverapp() yroom_manager = serverapp.web_app.settings["yroom_manager"] room_id = "JupyterLab:globalAwareness" @@ -57,10 +109,11 @@ async def get_file_id(file_path: str) -> str: Returns: The file ID of the document """ - - serverapp = await get_serverapp() + normalized_file_path = normalize_filepath(file_path) + + serverapp = get_serverapp() file_id_manager = serverapp.web_app.settings["file_id_manager"] - file_id = file_id_manager.get_id(file_path) + file_id = file_id_manager.get_id(normalized_file_path) return file_id @@ -100,7 +153,7 @@ async def wrapper(*args, **kwargs): # Get serverapp for logging try: - serverapp = await get_serverapp() + serverapp = get_serverapp() logger = serverapp.log except Exception: logger = None diff --git a/tests/test_normalize_filepath.py b/tests/test_normalize_filepath.py new file mode 100644 index 0000000..429dbfd --- /dev/null +++ b/tests/test_normalize_filepath.py @@ -0,0 +1,130 @@ +import os +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from jupyter_ai_tools.utils import normalize_filepath + + +class TestNormalizeFilepath: + """Test suite for the normalize_filepath function.""" + + @pytest.fixture + def mock_serverapp(self): + """Fixture that provides a mocked serverapp with test root directory.""" + with patch('jupyter_ai_tools.utils.get_serverapp') as mock_serverapp: + mock_app = MagicMock() + mock_app.root_dir = "/test/root" + mock_serverapp.return_value = mock_app + yield mock_serverapp + + @pytest.mark.parametrize("test_path,expected_decoded", [ + ("notebooks/my%20notebook.ipynb", "notebooks/my notebook.ipynb"), + ("relative/file.ipynb", "relative/file.ipynb"), + ("folder%20with%20spaces/file%20name.ipynb", "folder with spaces/file name.ipynb"), + ("./current/file.ipynb", "current/file.ipynb"), + ("../parent/file.ipynb", "../parent/file.ipynb"), + ("path%2Fwith%2Fslashes/file%2Bwith%2Bplus.ipynb", "path/with/slashes/file+with+plus.ipynb"), + ("path/with/special%21chars%40symbols.ipynb", "path/with/special!chars@symbols.ipynb"), + ("path//with//double//slashes.ipynb", "path/with/double/slashes.ipynb"), + ("path/with/unicode%C3%A9chars.ipynb", "path/with/unicodeéchars.ipynb"), + ("very/deeply/nested/path/structure/with/many/levels/file.ipynb", "very/deeply/nested/path/structure/with/many/levels/file.ipynb"), + ]) + def test_relative_path_resolution(self, mock_serverapp, test_path, expected_decoded): + """Test that relative paths are properly decoded and resolved against server root.""" + result = normalize_filepath(test_path) + expected = str(Path(f"/test/root/{expected_decoded}").resolve()) + assert result == expected + + @pytest.mark.parametrize("test_path,expected_decoded", [ + ("/absolute/path/file.ipynb", "/absolute/path/file.ipynb"), + ("/absolute/path%20with%20spaces/file.ipynb", "/absolute/path with spaces/file.ipynb"), + ]) + def test_absolute_path_resolution(self, test_path, expected_decoded): + """Test that absolute paths are normalized but not changed.""" + result = normalize_filepath(test_path) + expected = str(Path(expected_decoded).resolve()) + assert result == expected + + def test_fallback_to_cwd_when_serverapp_fails(self): + """Test that function falls back to current working directory when serverapp fails.""" + test_path = "relative/file.ipynb" + + with patch('jupyter_ai_tools.utils.get_serverapp', side_effect=Exception("ServerApp error")): + result = normalize_filepath(test_path) + expected = str(Path(os.getcwd(), "relative/file.ipynb").resolve()) + assert result == expected + + @pytest.mark.parametrize("invalid_path", [ + "", + None, + " ", + "\t\n", + ]) + def test_invalid_path_raises_error(self, invalid_path): + """Test that invalid paths raise ValueError.""" + with pytest.raises(ValueError, match="file_path cannot be empty"): + normalize_filepath(invalid_path) + + def test_path_resolution_with_real_filesystem(self): + """Test path resolution with real filesystem using temporary directory.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test file structure + test_subdir = Path(temp_dir) / "test_subdir" + test_subdir.mkdir() + test_file = test_subdir / "test_file.ipynb" + test_file.write_text('{"cells": []}') + + # Test relative path resolution + relative_path = "test_subdir/test_file.ipynb" + + with patch('jupyter_ai_tools.utils.get_serverapp') as mock_serverapp: + mock_app = MagicMock() + mock_app.root_dir = temp_dir + mock_serverapp.return_value = mock_app + + result = normalize_filepath(relative_path) + + # Should resolve to the actual file path + expected = str(test_file.resolve()) + assert result == expected + + # Verify the resolved path actually exists + assert Path(result).exists() + + @pytest.mark.parametrize("test_path", [ + "notebook.ipynb", + "script.py", + "data.csv", + "image.png", + "document.txt", + "config.json", + "style.css", + "page.html" + ]) + def test_various_file_extensions(self, mock_serverapp, test_path): + """Test that function works with various file extensions.""" + result = normalize_filepath(test_path) + expected = str(Path(f"/test/root/{test_path}").resolve()) + assert result == expected + + @pytest.mark.parametrize("root_dir", [ + "/home/user/notebooks", + "/var/jupyter/work", + "/tmp/jupyter_root", + "/Users/username/Documents" + ]) + def test_serverapp_with_different_root_dirs(self, root_dir): + """Test that different server root directories are handled correctly.""" + test_path = "file.ipynb" + + with patch('jupyter_ai_tools.utils.get_serverapp') as mock_serverapp: + mock_app = MagicMock() + mock_app.root_dir = root_dir + mock_serverapp.return_value = mock_app + + result = normalize_filepath(test_path) + expected = str(Path(root_dir, test_path).resolve()) + assert result == expected \ No newline at end of file From 2e79291c975d2d3005331659bc62c6696ef11eb0 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Tue, 29 Jul 2025 08:07:24 -0700 Subject: [PATCH 2/5] Add create_notebook function to notebook toolkit This function creates a new empty Jupyter notebook at the specified file path with proper nbformat structure, including directory creation if needed. --- jupyter_ai_tools/toolkits/notebook.py | 40 +++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/jupyter_ai_tools/toolkits/notebook.py b/jupyter_ai_tools/toolkits/notebook.py index f60282d..8fb620a 100644 --- a/jupyter_ai_tools/toolkits/notebook.py +++ b/jupyter_ai_tools/toolkits/notebook.py @@ -1,6 +1,7 @@ import asyncio import difflib import json +import os import re from typing import Any, Dict, Literal, Optional, Tuple @@ -911,6 +912,44 @@ def _determine_insert_index(cells_count: int, cell_index: Optional[int], add_abo return insert_index +async def create_notebook(file_path: str) -> str: + """Creates a new empty Jupyter notebook at the specified file path. + + This function creates a new empty notebook with proper nbformat structure. + If the file already exists, it will return an error message. + + Args: + file_path: + The path where the new notebook should be created. + + Returns: + A success message or error message. + """ + try: + file_path = normalize_filepath(file_path) + + # Check if file already exists + if os.path.exists(file_path): + return f"Error: File already exists at {file_path}" + + # Ensure the directory exists + directory = os.path.dirname(file_path) + if directory and not os.path.exists(directory): + os.makedirs(directory, exist_ok=True) + + # Create a new empty notebook + notebook = nbformat.v4.new_notebook() + + # Write the notebook to the file + with open(file_path, "w", encoding="utf-8") as f: + nbformat.write(notebook, f) + + return f"Successfully created new notebook at {file_path}" + + except Exception as e: + return f"Error: Failed to create notebook: {str(e)}" + + toolkit = Toolkit( name="notebook_toolkit", @@ -923,3 +962,4 @@ def _determine_insert_index(cells_count: int, cell_index: Optional[int], add_abo toolkit.add_tool(Tool(callable=delete_cell, delete=True)) toolkit.add_tool(Tool(callable=edit_cell, read=True, write=True)) toolkit.add_tool(Tool(callable=get_cell_id_from_index, read=True)) +toolkit.add_tool(Tool(callable=create_notebook, write=True)) From 0793ce2fe5bb82041b10d4d5faaf4320045a9784 Mon Sep 17 00:00:00 2001 From: Piyush Jain Date: Tue, 19 Aug 2025 13:47:30 -0700 Subject: [PATCH 3/5] Fix lint error --- tests/test_collaborative_tool.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/tests/test_collaborative_tool.py b/tests/test_collaborative_tool.py index a3e064d..e0754e7 100644 --- a/tests/test_collaborative_tool.py +++ b/tests/test_collaborative_tool.py @@ -34,30 +34,6 @@ def mock_ydoc(self): ydoc.awareness.set_local_state_field = MagicMock() return ydoc - @pytest.mark.asyncio - async def test_no_user_skips_awareness(self, mock_user_dict): - """Test that decorator skips awareness when user is None.""" - - # Create a test function - @collaborative_tool(user=None) - async def test_func(file_path: str, content: str): - return f"processed {file_path} with {content}" - - # Mock the awareness functions to ensure they're not called - with patch('jupyter_ai_tools.utils.get_global_awareness') as mock_global, \ - patch('jupyter_ai_tools.utils.get_file_id') as mock_file_id, \ - patch('jupyter_ai_tools.utils.get_jupyter_ydoc') as mock_ydoc: - - result = await test_func("test.ipynb", "test content") - - # Verify function executed normally - assert result == "processed test.ipynb with test content" - - # Verify awareness functions were never called - mock_global.assert_not_called() - mock_file_id.assert_not_called() - mock_ydoc.assert_not_called() - @pytest.mark.asyncio async def test_user_with_notebook_file_sets_awareness(self, mock_user_dict, mock_global_awareness, mock_ydoc): """Test that decorator sets both global and notebook awareness for .ipynb files.""" From 9a2e399c80df4b09b16500bbb6688ffb29340b16 Mon Sep 17 00:00:00 2001 From: Piyush Jain Date: Tue, 19 Aug 2025 14:38:46 -0700 Subject: [PATCH 4/5] lint --- jupyter_ai_tools/toolkits/notebook.py | 257 +++++++++++-------- jupyter_ai_tools/utils.py | 83 +++--- tests/test_collaborative_tool.py | 264 ++++++++++---------- tests/test_normalize_filepath.py | 127 ++++++---- tests/test_write_to_cell_collaboratively.py | 184 +++++++------- 5 files changed, 506 insertions(+), 409 deletions(-) diff --git a/jupyter_ai_tools/toolkits/notebook.py b/jupyter_ai_tools/toolkits/notebook.py index 8fb620a..fba849b 100644 --- a/jupyter_ai_tools/toolkits/notebook.py +++ b/jupyter_ai_tools/toolkits/notebook.py @@ -24,9 +24,10 @@ def _is_uuid_like(value: str) -> bool: if not isinstance(value, str): return False # UUID v4 pattern: 8-4-4-4-12 hexadecimal characters - uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' + uuid_pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" return bool(re.match(uuid_pattern, value, re.IGNORECASE)) + def _is_index_like(value: str) -> bool: """Check if a string looks like a numeric index""" if not isinstance(value, str): @@ -37,6 +38,7 @@ def _is_index_like(value: str) -> bool: except ValueError: return False + async def _resolve_cell_id(file_path: str, cell_id_or_index: str) -> str: """ Resolve a cell_id parameter that might be either a UUID or an index. @@ -55,6 +57,7 @@ async def _resolve_cell_id(file_path: str, cell_id_or_index: str) -> str: # Assume it's a cell_id and let the downstream function handle validation return cell_id_or_index + async def read_notebook(file_path: str, include_outputs=False) -> str: """Returns the complete notebook content as markdown string. @@ -160,13 +163,13 @@ async def read_cell_json(file_path: str, cell_id: str) -> Tuple[Dict[str, Any], resolved_cell_id = await _resolve_cell_id(file_path, cell_id) notebook_json = await read_notebook_json(file_path) cell_index = _get_cell_index_from_id_json(notebook_json, resolved_cell_id) - + if cell_index is not None and 0 <= cell_index < len(notebook_json["cells"]): cell = notebook_json["cells"][cell_index] return cell, cell_index - + raise LookupError(f"No cell found with {cell_id=}") - + except Exception: raise @@ -192,7 +195,7 @@ async def get_cell_id_from_index(file_path: str, cell_index: int) -> str: cell_id = None notebook_json = await read_notebook_json(file_path) cells = notebook_json["cells"] - + if 0 <= cell_index < len(cells): cell_id = cells[cell_index].get("id") else: @@ -202,7 +205,7 @@ async def get_cell_id_from_index(file_path: str, cell_index: int) -> str: raise ValueError("No cell_id found, use `insert_cell` based on cell index") return cell_id - + except Exception: raise @@ -242,15 +245,17 @@ async def add_cell( file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) if cell_id else None - + file_id = await get_file_id(file_path) ydoc: YNotebook = await get_jupyter_ydoc(file_id) if ydoc: cells_count = ydoc.cell_number - cell_index = _get_cell_index_from_id_ydoc(ydoc, resolved_cell_id) if resolved_cell_id else None + cell_index = ( + _get_cell_index_from_id_ydoc(ydoc, resolved_cell_id) if resolved_cell_id else None + ) insert_index = _determine_insert_index(cells_count, cell_index, add_above) - + cell = { "cell_type": cell_type, "source": "", @@ -266,19 +271,25 @@ async def add_cell( notebook = nbformat.read(f, as_version=nbformat.NO_CONVERT) cells_count = len(notebook.cells) - cell_index = _get_cell_index_from_id_nbformat(notebook, resolved_cell_id) if resolved_cell_id else None + cell_index = ( + _get_cell_index_from_id_nbformat(notebook, resolved_cell_id) + if resolved_cell_id + else None + ) insert_index = _determine_insert_index(cells_count, cell_index, add_above) if cell_type == "code": notebook.cells.insert(insert_index, nbformat.v4.new_code_cell(source=content or "")) elif cell_type == "markdown": - notebook.cells.insert(insert_index, nbformat.v4.new_markdown_cell(source=content or "")) + notebook.cells.insert( + insert_index, nbformat.v4.new_markdown_cell(source=content or "") + ) else: notebook.cells.insert(insert_index, nbformat.v4.new_raw_cell(source=content or "")) with open(file_path, "w", encoding="utf-8") as f: nbformat.write(notebook, f) - + except Exception: raise @@ -316,7 +327,7 @@ async def insert_cell( if ydoc: cells_count = ydoc.cell_number - + cell = { "cell_type": cell_type, "source": "", @@ -336,13 +347,15 @@ async def insert_cell( if cell_type == "code": notebook.cells.insert(insert_index, nbformat.v4.new_code_cell(source=content or "")) elif cell_type == "markdown": - notebook.cells.insert(insert_index, nbformat.v4.new_markdown_cell(source=content or "")) + notebook.cells.insert( + insert_index, nbformat.v4.new_markdown_cell(source=content or "") + ) else: notebook.cells.insert(insert_index, nbformat.v4.new_raw_cell(source=content or "")) with open(file_path, "w", encoding="utf-8") as f: nbformat.write(notebook, f) - + except Exception: raise @@ -366,10 +379,10 @@ async def delete_cell(file_path: str, cell_id: str): file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) - + file_id = await get_file_id(file_path) ydoc = await get_jupyter_ydoc(file_id) - + if ydoc: cell_index = _get_cell_index_from_id_ydoc(ydoc, resolved_cell_id) if cell_index is not None and 0 <= cell_index < len(ydoc.ycells): @@ -390,27 +403,29 @@ async def delete_cell(file_path: str, cell_id: str): if cell_index is None: raise ValueError(f"Could not find cell index for {cell_id=}") - + except Exception: raise -def get_cursor_details(cell_source: Text, start_index: int, stop_index: Optional[int] = None) -> Dict[str, Any]: +def get_cursor_details( + cell_source: Text, start_index: int, stop_index: Optional[int] = None +) -> Dict[str, Any]: """ Creates cursor details for collaborative notebook cursor positioning. - + This function constructs the cursor details object required by the YNotebook awareness system to show cursor positions in collaborative editing environments. It handles both single cursor positions and text selections. - + Args: cell_source: The YText source object representing the cell content start_index: The starting position of the cursor (0-based index) stop_index: The ending position for selections (optional) - + Returns: dict: Cursor details object with head, anchor, and selection state - + Example: >>> details = get_cursor_details(cell_source, 10) # Single cursor at position 10 >>> details = get_cursor_details(cell_source, 5, 15) # Selection from 5 to 15 @@ -418,57 +433,59 @@ def get_cursor_details(cell_source: Text, start_index: int, stop_index: Optional # Create sticky index for the head position (where cursor starts) head_sticky_index = cell_source.sticky_index(start_index, Assoc.BEFORE) head_sticky_index_data = head_sticky_index.to_json() - + # Initialize cursor details with default values cursor_details: Dict[str, Any] = {"primary": True, "empty": True} - + # Set the head position (where cursor starts) cursor_details["head"] = { "type": head_sticky_index_data["item"], - "tname": None, + "tname": None, "item": head_sticky_index_data["item"], - "assoc": 0 + "assoc": 0, } - + # By default, anchor is same as head (no selection) cursor_details["anchor"] = cursor_details["head"] - + # If stop_index is provided, create a selection if stop_index is not None: anchor_sticky_index = cell_source.sticky_index(stop_index, Assoc.BEFORE) anchor_sticky_index_data = anchor_sticky_index.to_json() cursor_details["anchor"] = { "type": anchor_sticky_index_data["item"], - "tname": None, + "tname": None, "item": anchor_sticky_index_data["item"], - "assoc": 0 + "assoc": 0, } cursor_details["empty"] = False # Not empty when there's a selection - + return cursor_details -def set_cursor_in_ynotebook(ynotebook: YNotebook, cell_source: Text, start_index: int, stop_index: Optional[int] = None) -> None: +def set_cursor_in_ynotebook( + ynotebook: YNotebook, cell_source: Text, start_index: int, stop_index: Optional[int] = None +) -> None: """ Sets the cursor position in a collaborative notebook environment. - + This function updates the cursor position in the YNotebook awareness system, which allows other collaborators to see where the cursor is positioned. It handles both single cursor positions and text selections. - + Args: ynotebook: The YNotebook instance representing the collaborative notebook cell_source: The YText source object representing the cell content start_index: The starting position of the cursor (0-based index) stop_index: The ending position for selections (optional) - + Returns: None: This function does not return a value - + Note: This function silently ignores any errors that occur during cursor setting to avoid breaking the main collaborative editing operations. - + Example: >>> set_cursor_in_ynotebook(ynotebook, cell_source, 10) # Set cursor at position 10 >>> set_cursor_in_ynotebook(ynotebook, cell_source, 5, 15) # Select text from 5 to 15 @@ -476,7 +493,7 @@ def set_cursor_in_ynotebook(ynotebook: YNotebook, cell_source: Text, start_index try: # Get cursor details for the specified position/selection details = get_cursor_details(cell_source, start_index, stop_index=stop_index) - + # Update the awareness system with the cursor position if ynotebook.awareness: ynotebook.awareness.set_local_state_field("cursors", [details]) @@ -486,38 +503,40 @@ def set_cursor_in_ynotebook(ynotebook: YNotebook, cell_source: Text, start_index pass -async def write_to_cell_collaboratively(ynotebook, ycell, content: str, typing_speed: float = 0.1) -> bool: +async def write_to_cell_collaboratively( + ynotebook, ycell, content: str, typing_speed: float = 0.1 +) -> bool: """ Writes content to a Jupyter notebook cell with collaborative typing simulation. - + This function provides a collaborative writing experience by applying text changes incrementally with visual feedback. It uses a diff-based approach to compute the minimal set of changes needed and applies them with cursor positioning and timing delays to simulate natural typing behavior. - + The function handles three types of operations: - Delete: Removes text with visual highlighting - Insert: Adds text word-by-word with typing delays - Replace: Combines delete and insert operations - + Args: ynotebook: The YNotebook instance representing the collaborative notebook ycell: The YCell instance representing the specific cell to modify content: The new content to write to the cell typing_speed: Delay in seconds between typing operations (default: 0.1) - + Returns: bool: True if the operation completed successfully - + Raises: ValueError: If ynotebook/ycell is None or typing_speed is negative TypeError: If content is not a string RuntimeError: If cell content extraction or writing fails - + Example: >>> # Write with default typing speed >>> success = await write_to_cell_collaboratively(ynotebook, ycell, "print('Hello')") - >>> + >>> >>> # Write with custom typing speed (faster) >>> success = await write_to_cell_collaboratively( ... ynotebook, ycell, "print('World')", typing_speed=0.05 @@ -532,26 +551,26 @@ async def write_to_cell_collaboratively(ynotebook, ycell, content: str, typing_s raise TypeError("content must be a string") if typing_speed < 0: raise ValueError("typing_speed must be non-negative") - + try: # Extract current cell content cell = ycell.to_py() old_content = cell.get("source", "") cell_source = ycell["source"] # YText object for collaborative editing new_content = content - + # Early return if content is unchanged if old_content == new_content: return True - + except Exception as e: raise RuntimeError(f"Failed to extract cell content: {e}") - + try: # Compute the minimal set of changes needed using difflib sequence_matcher = difflib.SequenceMatcher(None, old_content, new_content) cursor_position = 0 - + # Set initial cursor position _safe_set_cursor(ynotebook, cell_source, cursor_position) @@ -560,7 +579,7 @@ async def write_to_cell_collaboratively(ynotebook, ycell, content: str, typing_s if operation == "equal": # No changes needed for this segment, just advance cursor cursor_position += old_end - old_start - + elif operation == "delete": # Remove text with visual feedback delete_length = old_end - old_start @@ -568,65 +587,87 @@ async def write_to_cell_collaboratively(ynotebook, ycell, content: str, typing_s ynotebook, cell_source, cursor_position, delete_length, typing_speed ) # Cursor stays at same position after deletion - + elif operation == "insert": # Add text with typing simulation cursor_position = await _handle_insert_operation( - ynotebook, cell_source, cursor_position, new_content, new_start, new_end, typing_speed + ynotebook, + cell_source, + cursor_position, + new_content, + new_start, + new_end, + typing_speed, ) - + elif operation == "replace": # Combine delete and insert operations delete_length = old_end - old_start cursor_position = await _handle_replace_operation( - ynotebook, cell_source, cursor_position, new_content, - delete_length, new_start, new_end, typing_speed + ynotebook, + cell_source, + cursor_position, + new_content, + delete_length, + new_start, + new_end, + typing_speed, ) - + # Set final cursor position at the end of the content _safe_set_cursor(ynotebook, cell_source, cursor_position) - + return True - + except Exception as e: raise RuntimeError(f"Failed to write cell content collaboratively: {e}") -async def _handle_delete_operation(ynotebook, cell_source, cursor_position: int, delete_length: int, typing_speed: float) -> None: +async def _handle_delete_operation( + ynotebook, cell_source, cursor_position: int, delete_length: int, typing_speed: float +) -> None: """ Handle deletion of text chunks with visual feedback. - + This function provides visual feedback during deletion by first highlighting the text to be deleted, then removing it after a delay to simulate natural deletion behavior in collaborative environments. - + Args: ynotebook: The YNotebook instance for cursor positioning cell_source: The YText source object representing the cell content cursor_position: Current cursor position in the text delete_length: Number of characters to delete from cursor position typing_speed: Base delay between operations in seconds - + Returns: None """ # Highlight the text chunk that will be deleted (visual feedback) _safe_set_cursor(ynotebook, cell_source, cursor_position, cursor_position + delete_length) await asyncio.sleep(min(0.3, typing_speed * 3)) # Cap highlight duration at 0.3s - + # Perform the actual deletion - del cell_source[cursor_position:cursor_position + delete_length] + del cell_source[cursor_position : cursor_position + delete_length] await asyncio.sleep(typing_speed) -async def _handle_insert_operation(ynotebook, cell_source, cursor_position: int, new_content: str, new_start: int, new_end: int, typing_speed: float) -> int: +async def _handle_insert_operation( + ynotebook, + cell_source, + cursor_position: int, + new_content: str, + new_start: int, + new_end: int, + typing_speed: float, +) -> int: """ Handle insertion of text with word-by-word typing simulation. - + This function simulates natural typing behavior by inserting text word-by-word with appropriate delays and cursor positioning. It handles both regular text and whitespace-only content appropriately. - + Args: ynotebook: The YNotebook instance for cursor positioning cell_source: The YText source object representing the cell content @@ -635,13 +676,13 @@ async def _handle_insert_operation(ynotebook, cell_source, cursor_position: int, new_start: Start index of text to insert in the new content new_end: End index of text to insert in the new content typing_speed: Base delay between typing operations in seconds - + Returns: int: The new cursor position after insertion """ text_to_insert = new_content[new_start:new_end] words = text_to_insert.split() - + # Handle whitespace-only or empty insertions if not words or text_to_insert.strip() == "": cell_source.insert(cursor_position, text_to_insert) @@ -649,47 +690,56 @@ async def _handle_insert_operation(ynotebook, cell_source, cursor_position: int, _safe_set_cursor(ynotebook, cell_source, cursor_position) await asyncio.sleep(typing_speed) return cursor_position - + # Insert text word-by-word with proper spacing and punctuation current_pos = 0 for word in words: # Find the position of this word in the text word_start = text_to_insert.find(word, current_pos) - + # Insert any whitespace or punctuation before the word if word_start > current_pos: prefix = text_to_insert[current_pos:word_start] cell_source.insert(cursor_position, prefix) cursor_position += len(prefix) - + # Insert the word itself cell_source.insert(cursor_position, word) cursor_position += len(word) current_pos = word_start + len(word) - + # Update cursor position and pause for typing effect _safe_set_cursor(ynotebook, cell_source, cursor_position) await asyncio.sleep(typing_speed) - + # Insert any remaining text after the last word (punctuation, etc.) if current_pos < len(text_to_insert): suffix = text_to_insert[current_pos:] cell_source.insert(cursor_position, suffix) cursor_position += len(suffix) _safe_set_cursor(ynotebook, cell_source, cursor_position) - + return cursor_position -async def _handle_replace_operation(ynotebook, cell_source, cursor_position: int, new_content: str, delete_length: int, new_start: int, new_end: int, typing_speed: float) -> int: +async def _handle_replace_operation( + ynotebook, + cell_source, + cursor_position: int, + new_content: str, + delete_length: int, + new_start: int, + new_end: int, + typing_speed: float, +) -> int: """ Handle replacement operations by deleting then inserting. - + This function simulates natural text replacement behavior by first deleting the old text (with visual feedback) and then inserting the new text with typing simulation. A pause is added between operations to make the replacement feel more natural. - + Args: ynotebook: The YNotebook instance for cursor positioning cell_source: The YText source object representing the cell content @@ -699,40 +749,46 @@ async def _handle_replace_operation(ynotebook, cell_source, cursor_position: int new_start: Start index of replacement text in the new content new_end: End index of replacement text in the new content typing_speed: Base delay between typing operations in seconds - + Returns: int: The new cursor position after replacement """ # First, delete the old text with visual feedback - await _handle_delete_operation(ynotebook, cell_source, cursor_position, delete_length, typing_speed) - + await _handle_delete_operation( + ynotebook, cell_source, cursor_position, delete_length, typing_speed + ) + # Brief pause between deletion and insertion for natural feel await asyncio.sleep(typing_speed * 2) - + # Then, insert the new text with typing simulation - cursor_position = await _handle_insert_operation(ynotebook, cell_source, cursor_position, new_content, new_start, new_end, typing_speed) - + cursor_position = await _handle_insert_operation( + ynotebook, cell_source, cursor_position, new_content, new_start, new_end, typing_speed + ) + return cursor_position -def _safe_set_cursor(ynotebook: YNotebook, cell_source: Text, cursor_position: int, stop_cursor: Optional[int] = None) -> None: +def _safe_set_cursor( + ynotebook: YNotebook, cell_source: Text, cursor_position: int, stop_cursor: Optional[int] = None +) -> None: """ Safely set cursor position with error handling. - + This function wraps the cursor positioning logic to prevent errors from breaking the main collaborative writing operations. Since cursor positioning is a visual enhancement rather than a core functionality, errors are silently ignored to maintain robustness. - + Args: ynotebook: The YNotebook instance for cursor positioning cell_source: The YText source object representing the cell content cursor_position: The cursor position to set stop_cursor: Optional end position for text selections - + Returns: None - + Note: This function silently ignores all exceptions to prevent cursor positioning errors from interfering with the main editing operations. @@ -771,7 +827,7 @@ async def edit_cell(file_path: str, cell_id: str, content: str) -> None: file_path = normalize_filepath(file_path) # Resolve cell_id in case it's an index resolved_cell_id = await _resolve_cell_id(file_path, cell_id) - + file_id = await get_file_id(file_path) ydoc = await get_jupyter_ydoc(file_id) @@ -793,7 +849,7 @@ async def edit_cell(file_path: str, cell_id: str, content: str) -> None: nbformat.write(notebook, f) else: raise ValueError(f"Cell with {cell_id=} not found in notebook at {file_path=}") - + except Exception: raise @@ -927,30 +983,29 @@ async def create_notebook(file_path: str) -> str: """ try: file_path = normalize_filepath(file_path) - + # Check if file already exists if os.path.exists(file_path): return f"Error: File already exists at {file_path}" - + # Ensure the directory exists directory = os.path.dirname(file_path) if directory and not os.path.exists(directory): os.makedirs(directory, exist_ok=True) - + # Create a new empty notebook notebook = nbformat.v4.new_notebook() - + # Write the notebook to the file with open(file_path, "w", encoding="utf-8") as f: nbformat.write(notebook, f) - + return f"Successfully created new notebook at {file_path}" - + except Exception as e: return f"Error: Failed to create notebook: {str(e)}" - toolkit = Toolkit( name="notebook_toolkit", description="Tools for reading and manipulating Jupyter notebooks.", diff --git a/jupyter_ai_tools/utils.py b/jupyter_ai_tools/utils.py index d54799c..33f1ce7 100644 --- a/jupyter_ai_tools/utils.py +++ b/jupyter_ai_tools/utils.py @@ -1,13 +1,12 @@ import functools import inspect import os -import typing from pathlib import Path from typing import Optional from urllib.parse import unquote -from jupyter_server.serverapp import ServerApp from jupyter_server.auth.identity import User +from jupyter_server.serverapp import ServerApp from pycrdt import Awareness @@ -21,18 +20,18 @@ def get_serverapp(): def normalize_filepath(file_path: str) -> str: """ Normalizes a file path for Jupyter applications to return an absolute path. - + Handles various input formats: - Relative paths from current working directory - URL-encoded relative paths (common in Jupyter contexts) - Absolute paths (returned as-is after normalization) - + Args: file_path: Path in any of the supported formats - + Returns: Absolute path to the file - + Example: >>> normalize_filepath("notebooks/my%20notebook.ipynb") "/current/working/dir/notebooks/my notebook.ipynb" @@ -43,17 +42,17 @@ def normalize_filepath(file_path: str) -> str: """ if not file_path or not file_path.strip(): raise ValueError("file_path cannot be empty") - + # URL decode the path in case it contains encoded characters decoded_path = unquote(file_path) - + # Convert to Path object for easier manipulation path = Path(decoded_path) - + # If already absolute, just normalize and return if path.is_absolute(): return str(path.resolve()) - + # For relative paths, get the Jupyter server's root directory try: serverapp = get_serverapp() @@ -61,7 +60,7 @@ def normalize_filepath(file_path: str) -> str: except Exception: # Fallback to current working directory if server app is not available root_dir = os.getcwd() - + # Resolve relative path against the root directory resolved_path = Path(root_dir) / path return str(resolved_path.resolve()) @@ -89,12 +88,12 @@ async def get_jupyter_ydoc(file_id: str): async def get_global_awareness() -> Optional[Awareness]: serverapp = get_serverapp() yroom_manager = serverapp.web_app.settings["yroom_manager"] - - room_id = "JupyterLab:globalAwareness" + + room_id = "JupyterLab:globalAwareness" if yroom_manager.has_room(room_id): yroom = yroom_manager.get_room(room_id) return yroom.get_awareness() - + # Return None if room doesn't exist return None @@ -110,7 +109,7 @@ async def get_file_id(file_path: str) -> str: The file ID of the document """ normalized_file_path = normalize_filepath(file_path) - + serverapp = get_serverapp() file_id_manager = serverapp.web_app.settings["file_id_manager"] file_id = file_id_manager.get_id(normalized_file_path) @@ -121,94 +120,100 @@ async def get_file_id(file_path: str) -> str: def collaborative_tool(user: User): """ Decorator factory to enable collaborative awareness for toolkit functions. - + This decorator automatically sets up user awareness in the global and notebook-specific awareness systems when functions are called. It enables real-time collaborative features by making the user's presence visible to other users in the same Jupyter environment. - + Args: user: Optional user dictionary with user information. If None, no awareness is set. Should contain keys like 'name', 'color', 'display_name', etc. - + Returns: Decorator function that wraps the target function with collaborative awareness. - + Example: >>> user_info = { ... "name": "Alice", ... "color": "var(--jp-collaborator-color1)", ... "display_name": "Alice Smith" ... } - >>> + >>> >>> @collaborative_tool(user=user_info) ... async def my_notebook_tool(file_path: str, content: str): ... # Your tool implementation here ... return f"Processed {file_path}" """ + def decorator(tool_func): @functools.wraps(tool_func) async def wrapper(*args, **kwargs): # Skip awareness if no user provided - + # Get serverapp for logging try: serverapp = get_serverapp() logger = serverapp.log except Exception: logger = None - + # Extract file_path from tool function arguments for notebook-specific awareness file_path = None try: # Try to find file_path in kwargs first - if 'file_path' in kwargs: - file_path = kwargs['file_path'] + if "file_path" in kwargs: + file_path = kwargs["file_path"] else: # Try to find file_path in positional args by inspecting the function signature sig = inspect.signature(tool_func) param_names = list(sig.parameters.keys()) - + # Look for file_path parameter - if 'file_path' in param_names and len(args) > param_names.index('file_path'): - file_path = args[param_names.index('file_path')] + if "file_path" in param_names and len(args) > param_names.index("file_path"): + file_path = args[param_names.index("file_path")] except Exception as e: # Log error in file_path detection if logger: logger.warning(f"Error detecting file_path in collaborative_tool: {e}") - + # Set notebook-specific collaborative awareness if we have a file_path - if file_path and file_path.endswith('.ipynb'): + if file_path and file_path.endswith(".ipynb"): try: file_id = await get_file_id(file_path) ydoc = await get_jupyter_ydoc(file_id) - + if ydoc: # Set the local user field in the notebook's awareness ydoc.awareness.set_local_state_field("user", user) except Exception as e: # Log error but don't block tool execution if logger: - logger.warning(f"Error setting notebook awareness in collaborative_tool: {e}") - + logger.warning( + f"Error setting notebook awareness in collaborative_tool: {e}" + ) + # Set global awareness try: g_awareness = await get_global_awareness() if g_awareness: - g_awareness.set_local_state({ - "user": user, - "current": file_path or "", - "documents": [file_path] if file_path else [] - }) + g_awareness.set_local_state( + { + "user": user, + "current": file_path or "", + "documents": [file_path] if file_path else [], + } + ) except Exception as e: # Log error but don't block tool execution if logger: logger.warning(f"Error setting global awareness in collaborative_tool: {e}") - + # Execute the original tool function return await tool_func(*args, **kwargs) - + return wrapper + return decorator diff --git a/tests/test_collaborative_tool.py b/tests/test_collaborative_tool.py index e0754e7..0fa9f56 100644 --- a/tests/test_collaborative_tool.py +++ b/tests/test_collaborative_tool.py @@ -16,7 +16,7 @@ def mock_user_dict(self): "name": "TestUser", "color": "var(--jp-collaborator-color1)", "display_name": "Test User", - "avatar": "/test/avatar.svg" + "avatar": "/test/avatar.svg", } @pytest.fixture @@ -35,56 +35,64 @@ def mock_ydoc(self): return ydoc @pytest.mark.asyncio - async def test_user_with_notebook_file_sets_awareness(self, mock_user_dict, mock_global_awareness, mock_ydoc): + async def test_user_with_notebook_file_sets_awareness( + self, mock_user_dict, mock_global_awareness, mock_ydoc + ): """Test that decorator sets both global and notebook awareness for .ipynb files.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str, content: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness), \ - patch('jupyter_ai_tools.utils.get_file_id', return_value="test-file-id"), \ - patch('jupyter_ai_tools.utils.get_jupyter_ydoc', return_value=mock_ydoc): - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ), patch("jupyter_ai_tools.utils.get_file_id", return_value="test-file-id"), patch( + "jupyter_ai_tools.utils.get_jupyter_ydoc", return_value=mock_ydoc + ): result = await test_func("test_notebook.ipynb", "test content") - + # Verify function executed assert result == "processed test_notebook.ipynb" - + # Verify global awareness was set - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "test_notebook.ipynb", - "documents": ["test_notebook.ipynb"] - }) - + mock_global_awareness.set_local_state.assert_called_once_with( + { + "user": mock_user_dict, + "current": "test_notebook.ipynb", + "documents": ["test_notebook.ipynb"], + } + ) + # Verify notebook awareness was set - mock_ydoc.awareness.set_local_state_field.assert_called_once_with("user", mock_user_dict) + mock_ydoc.awareness.set_local_state_field.assert_called_once_with( + "user", mock_user_dict + ) @pytest.mark.asyncio - async def test_user_with_non_notebook_file_only_sets_global_awareness(self, mock_user_dict, mock_global_awareness): + async def test_user_with_non_notebook_file_only_sets_global_awareness( + self, mock_user_dict, mock_global_awareness + ): """Test that decorator only sets global awareness for non-.ipynb files.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str, content: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness), \ - patch('jupyter_ai_tools.utils.get_file_id') as mock_file_id, \ - patch('jupyter_ai_tools.utils.get_jupyter_ydoc') as mock_ydoc: - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ), patch("jupyter_ai_tools.utils.get_file_id") as mock_file_id, patch( + "jupyter_ai_tools.utils.get_jupyter_ydoc" + ) as mock_ydoc: result = await test_func("test_file.py", "test content") - + # Verify function executed assert result == "processed test_file.py" - + # Verify global awareness was set - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "test_file.py", - "documents": ["test_file.py"] - }) - + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "test_file.py", "documents": ["test_file.py"]} + ) + # Verify notebook-specific functions were not called mock_file_id.assert_not_called() mock_ydoc.assert_not_called() @@ -92,180 +100,179 @@ async def test_func(file_path: str, content: str): @pytest.mark.asyncio async def test_file_path_detection_from_kwargs(self, mock_user_dict, mock_global_awareness): """Test that file_path is correctly detected from kwargs.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(content: str, file_path: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness): - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ): result = await test_func(content="test", file_path="test.py") - + # Verify function executed assert result == "processed test.py" - + # Verify global awareness was set with correct file_path - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "test.py", - "documents": ["test.py"] - }) + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "test.py", "documents": ["test.py"]} + ) @pytest.mark.asyncio - async def test_file_path_detection_from_positional_args(self, mock_user_dict, mock_global_awareness): + async def test_file_path_detection_from_positional_args( + self, mock_user_dict, mock_global_awareness + ): """Test that file_path is correctly detected from positional arguments.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str, content: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness): - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ): result = await test_func("test.py", "test content") - + # Verify function executed assert result == "processed test.py" - + # Verify global awareness was set with correct file_path - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "test.py", - "documents": ["test.py"] - }) + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "test.py", "documents": ["test.py"]} + ) @pytest.mark.asyncio - async def test_function_without_file_path_parameter(self, mock_user_dict, mock_global_awareness): + async def test_function_without_file_path_parameter( + self, mock_user_dict, mock_global_awareness + ): """Test that decorator works with functions that don't have file_path parameter.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(message: str): return f"processed {message}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness): - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ): result = await test_func("hello world") - + # Verify function executed assert result == "processed hello world" - + # Verify global awareness was set with empty file_path - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "", - "documents": [] - }) + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "", "documents": []} + ) @pytest.mark.asyncio async def test_notebook_awareness_error_handling(self, mock_user_dict, mock_global_awareness): """Test that notebook awareness errors don't break function execution.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness), \ - patch('jupyter_ai_tools.utils.get_file_id', side_effect=Exception("File ID error")), \ - patch('jupyter_ai_tools.utils.get_jupyter_ydoc') as mock_ydoc: - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ), patch( + "jupyter_ai_tools.utils.get_file_id", side_effect=Exception("File ID error") + ), patch("jupyter_ai_tools.utils.get_jupyter_ydoc") as mock_ydoc: # Function should still execute despite notebook awareness error result = await test_func("test.ipynb") - + # Verify function executed normally assert result == "processed test.ipynb" - + # Verify global awareness was still set - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "test.ipynb", - "documents": ["test.ipynb"] - }) - + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "test.ipynb", "documents": ["test.ipynb"]} + ) + # Verify ydoc was not called due to error mock_ydoc.assert_not_called() @pytest.mark.asyncio async def test_global_awareness_error_handling(self, mock_user_dict): """Test that global awareness errors don't break function execution.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', side_effect=Exception("Global awareness error")): - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", + side_effect=Exception("Global awareness error"), + ): # Function should still execute despite global awareness error result = await test_func("test.py") - + # Verify function executed normally assert result == "processed test.py" @pytest.mark.asyncio async def test_file_path_detection_error_handling(self, mock_user_dict, mock_global_awareness): """Test that file_path detection errors don't break function execution.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(weird_param: str): return f"processed {weird_param}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness): - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ): # Function should execute even with file_path detection issues result = await test_func("test data") - + # Verify function executed normally assert result == "processed test data" - + # Verify global awareness was set with empty file_path - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "", - "documents": [] - }) + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "", "documents": []} + ) @pytest.mark.asyncio async def test_ydoc_none_handling(self, mock_user_dict, mock_global_awareness): """Test that None ydoc is handled gracefully.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness), \ - patch('jupyter_ai_tools.utils.get_file_id', return_value="test-file-id"), \ - patch('jupyter_ai_tools.utils.get_jupyter_ydoc', return_value=None): - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ), patch("jupyter_ai_tools.utils.get_file_id", return_value="test-file-id"), patch( + "jupyter_ai_tools.utils.get_jupyter_ydoc", return_value=None + ): result = await test_func("test.ipynb") - + # Verify function executed normally assert result == "processed test.ipynb" - + # Verify global awareness was still set - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "test.ipynb", - "documents": ["test.ipynb"] - }) + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "test.ipynb", "documents": ["test.ipynb"]} + ) @pytest.mark.asyncio async def test_global_awareness_none_handling(self, mock_user_dict): """Test that None global awareness is handled gracefully.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str): return f"processed {file_path}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=None): - + with patch("jupyter_ai_tools.utils.get_global_awareness", return_value=None): # Function should execute even with None global awareness result = await test_func("test.py") - + # Verify function executed normally assert result == "processed test.py" @pytest.mark.asyncio async def test_function_signature_preservation(self, mock_user_dict): """Test that decorator preserves function signature and metadata.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str, content: str, optional_param: str = "default"): """Test function docstring.""" @@ -274,25 +281,24 @@ async def test_func(file_path: str, content: str, optional_param: str = "default # Check that function metadata is preserved assert test_func.__name__ == "test_func" assert test_func.__doc__ == "Test function docstring." - + # Check that function signature is preserved sig = inspect.signature(test_func) params = list(sig.parameters.keys()) assert params == ["file_path", "content", "optional_param"] - + # Check that default values are preserved assert sig.parameters["optional_param"].default == "default" @pytest.mark.asyncio async def test_function_exception_propagation(self, mock_user_dict): """Test that exceptions from the wrapped function are properly propagated.""" - + @collaborative_tool(user=mock_user_dict) async def test_func(file_path: str): raise ValueError("Test error") - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=MagicMock()): - + with patch("jupyter_ai_tools.utils.get_global_awareness", return_value=MagicMock()): # Verify that the original function's exception is propagated with pytest.raises(ValueError, match="Test error"): await test_func("test.py") @@ -300,36 +306,40 @@ async def test_func(file_path: str): @pytest.mark.asyncio async def test_complex_function_signature(self, mock_user_dict, mock_global_awareness): """Test decorator with complex function signatures.""" - + @collaborative_tool(user=mock_user_dict) async def complex_func(arg1: str, file_path: str, *args, **kwargs): return f"processed {arg1}, {file_path}, {args}, {kwargs}" - with patch('jupyter_ai_tools.utils.get_global_awareness', return_value=mock_global_awareness): - - result = await complex_func("first", "test.py", "extra1", "extra2", key1="value1", key2="value2") - + with patch( + "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness + ): + result = await complex_func( + "first", "test.py", "extra1", "extra2", key1="value1", key2="value2" + ) + # Verify function executed with all parameters - assert "processed first, test.py, ('extra1', 'extra2'), {'key1': 'value1', 'key2': 'value2'}" in result - + assert ( + "processed first, test.py, ('extra1', 'extra2'), {'key1': 'value1', 'key2': 'value2'}" # noqa: E501 + in result + ) + # Verify global awareness was set with correct file_path - mock_global_awareness.set_local_state.assert_called_once_with({ - "user": mock_user_dict, - "current": "test.py", - "documents": ["test.py"] - }) + mock_global_awareness.set_local_state.assert_called_once_with( + {"user": mock_user_dict, "current": "test.py", "documents": ["test.py"]} + ) def test_decorator_factory_pattern(self, mock_user_dict): """Test that collaborative_tool works as a decorator factory.""" - + # Test that it returns a decorator function decorator = collaborative_tool(user=mock_user_dict) assert callable(decorator) - + # Test that the decorator returns a wrapper function async def test_func(): return "test" - + wrapped_func = decorator(test_func) assert callable(wrapped_func) - assert wrapped_func.__name__ == "test_func" \ No newline at end of file + assert wrapped_func.__name__ == "test_func" diff --git a/tests/test_normalize_filepath.py b/tests/test_normalize_filepath.py index 429dbfd..048378c 100644 --- a/tests/test_normalize_filepath.py +++ b/tests/test_normalize_filepath.py @@ -10,38 +10,50 @@ class TestNormalizeFilepath: """Test suite for the normalize_filepath function.""" - + @pytest.fixture def mock_serverapp(self): """Fixture that provides a mocked serverapp with test root directory.""" - with patch('jupyter_ai_tools.utils.get_serverapp') as mock_serverapp: + with patch("jupyter_ai_tools.utils.get_serverapp") as mock_serverapp: mock_app = MagicMock() mock_app.root_dir = "/test/root" mock_serverapp.return_value = mock_app yield mock_serverapp - @pytest.mark.parametrize("test_path,expected_decoded", [ - ("notebooks/my%20notebook.ipynb", "notebooks/my notebook.ipynb"), - ("relative/file.ipynb", "relative/file.ipynb"), - ("folder%20with%20spaces/file%20name.ipynb", "folder with spaces/file name.ipynb"), - ("./current/file.ipynb", "current/file.ipynb"), - ("../parent/file.ipynb", "../parent/file.ipynb"), - ("path%2Fwith%2Fslashes/file%2Bwith%2Bplus.ipynb", "path/with/slashes/file+with+plus.ipynb"), - ("path/with/special%21chars%40symbols.ipynb", "path/with/special!chars@symbols.ipynb"), - ("path//with//double//slashes.ipynb", "path/with/double/slashes.ipynb"), - ("path/with/unicode%C3%A9chars.ipynb", "path/with/unicodeéchars.ipynb"), - ("very/deeply/nested/path/structure/with/many/levels/file.ipynb", "very/deeply/nested/path/structure/with/many/levels/file.ipynb"), - ]) + @pytest.mark.parametrize( + "test_path,expected_decoded", + [ + ("notebooks/my%20notebook.ipynb", "notebooks/my notebook.ipynb"), + ("relative/file.ipynb", "relative/file.ipynb"), + ("folder%20with%20spaces/file%20name.ipynb", "folder with spaces/file name.ipynb"), + ("./current/file.ipynb", "current/file.ipynb"), + ("../parent/file.ipynb", "../parent/file.ipynb"), + ( + "path%2Fwith%2Fslashes/file%2Bwith%2Bplus.ipynb", + "path/with/slashes/file+with+plus.ipynb", + ), + ("path/with/special%21chars%40symbols.ipynb", "path/with/special!chars@symbols.ipynb"), + ("path//with//double//slashes.ipynb", "path/with/double/slashes.ipynb"), + ("path/with/unicode%C3%A9chars.ipynb", "path/with/unicodeéchars.ipynb"), + ( + "very/deeply/nested/path/structure/with/many/levels/file.ipynb", + "very/deeply/nested/path/structure/with/many/levels/file.ipynb", + ), + ], + ) def test_relative_path_resolution(self, mock_serverapp, test_path, expected_decoded): """Test that relative paths are properly decoded and resolved against server root.""" result = normalize_filepath(test_path) expected = str(Path(f"/test/root/{expected_decoded}").resolve()) assert result == expected - @pytest.mark.parametrize("test_path,expected_decoded", [ - ("/absolute/path/file.ipynb", "/absolute/path/file.ipynb"), - ("/absolute/path%20with%20spaces/file.ipynb", "/absolute/path with spaces/file.ipynb"), - ]) + @pytest.mark.parametrize( + "test_path,expected_decoded", + [ + ("/absolute/path/file.ipynb", "/absolute/path/file.ipynb"), + ("/absolute/path%20with%20spaces/file.ipynb", "/absolute/path with spaces/file.ipynb"), + ], + ) def test_absolute_path_resolution(self, test_path, expected_decoded): """Test that absolute paths are normalized but not changed.""" result = normalize_filepath(test_path) @@ -51,18 +63,23 @@ def test_absolute_path_resolution(self, test_path, expected_decoded): def test_fallback_to_cwd_when_serverapp_fails(self): """Test that function falls back to current working directory when serverapp fails.""" test_path = "relative/file.ipynb" - - with patch('jupyter_ai_tools.utils.get_serverapp', side_effect=Exception("ServerApp error")): + + with patch( + "jupyter_ai_tools.utils.get_serverapp", side_effect=Exception("ServerApp error") + ): result = normalize_filepath(test_path) expected = str(Path(os.getcwd(), "relative/file.ipynb").resolve()) assert result == expected - @pytest.mark.parametrize("invalid_path", [ - "", - None, - " ", - "\t\n", - ]) + @pytest.mark.parametrize( + "invalid_path", + [ + "", + None, + " ", + "\t\n", + ], + ) def test_invalid_path_raises_error(self, invalid_path): """Test that invalid paths raise ValueError.""" with pytest.raises(ValueError, match="file_path cannot be empty"): @@ -76,55 +93,61 @@ def test_path_resolution_with_real_filesystem(self): test_subdir.mkdir() test_file = test_subdir / "test_file.ipynb" test_file.write_text('{"cells": []}') - + # Test relative path resolution relative_path = "test_subdir/test_file.ipynb" - - with patch('jupyter_ai_tools.utils.get_serverapp') as mock_serverapp: + + with patch("jupyter_ai_tools.utils.get_serverapp") as mock_serverapp: mock_app = MagicMock() mock_app.root_dir = temp_dir mock_serverapp.return_value = mock_app - + result = normalize_filepath(relative_path) - + # Should resolve to the actual file path expected = str(test_file.resolve()) assert result == expected - + # Verify the resolved path actually exists assert Path(result).exists() - @pytest.mark.parametrize("test_path", [ - "notebook.ipynb", - "script.py", - "data.csv", - "image.png", - "document.txt", - "config.json", - "style.css", - "page.html" - ]) + @pytest.mark.parametrize( + "test_path", + [ + "notebook.ipynb", + "script.py", + "data.csv", + "image.png", + "document.txt", + "config.json", + "style.css", + "page.html", + ], + ) def test_various_file_extensions(self, mock_serverapp, test_path): """Test that function works with various file extensions.""" result = normalize_filepath(test_path) expected = str(Path(f"/test/root/{test_path}").resolve()) assert result == expected - @pytest.mark.parametrize("root_dir", [ - "/home/user/notebooks", - "/var/jupyter/work", - "/tmp/jupyter_root", - "/Users/username/Documents" - ]) + @pytest.mark.parametrize( + "root_dir", + [ + "/home/user/notebooks", + "/var/jupyter/work", + "/tmp/jupyter_root", + "/Users/username/Documents", + ], + ) def test_serverapp_with_different_root_dirs(self, root_dir): """Test that different server root directories are handled correctly.""" test_path = "file.ipynb" - - with patch('jupyter_ai_tools.utils.get_serverapp') as mock_serverapp: + + with patch("jupyter_ai_tools.utils.get_serverapp") as mock_serverapp: mock_app = MagicMock() mock_app.root_dir = root_dir mock_serverapp.return_value = mock_app - + result = normalize_filepath(test_path) expected = str(Path(root_dir, test_path).resolve()) - assert result == expected \ No newline at end of file + assert result == expected diff --git a/tests/test_write_to_cell_collaboratively.py b/tests/test_write_to_cell_collaboratively.py index bab5dac..7ed97fc 100644 --- a/tests/test_write_to_cell_collaboratively.py +++ b/tests/test_write_to_cell_collaboratively.py @@ -23,11 +23,11 @@ def setup_method(self): self.mock_ynotebook = Mock() self.mock_ycell = MagicMock() self.mock_source = MagicMock() - + # Mock ycell behavior self.mock_ycell.to_py.return_value = {"source": "old content"} self.mock_ycell.__getitem__.return_value = self.mock_source - + # Mock source behavior self.mock_source.insert = Mock() self.mock_source.__delitem__ = Mock() @@ -63,103 +63,105 @@ async def test_input_validation_negative_typing_speed(self): async def test_same_content_returns_true(self): """Test that same content returns True immediately.""" self.mock_ycell.to_py.return_value = {"source": "same content"} - + result = await write_to_cell_collaboratively( self.mock_ynotebook, self.mock_ycell, "same content" ) - + assert result is True @pytest.mark.asyncio async def test_cell_extraction_error(self): """Test handling of cell content extraction errors.""" self.mock_ycell.to_py.side_effect = Exception("Cell extraction failed") - + with pytest.raises(RuntimeError, match="Failed to extract cell content"): - await write_to_cell_collaboratively( - self.mock_ynotebook, self.mock_ycell, "new content" - ) + await write_to_cell_collaboratively(self.mock_ynotebook, self.mock_ycell, "new content") @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher') + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher") async def test_successful_write_operation(self, mock_sequence_matcher, mock_safe_set_cursor): """Test successful write operation.""" # Mock SequenceMatcher to return simple equal operation mock_sm = Mock() - mock_sm.get_opcodes.return_value = [('equal', 0, 5, 0, 5)] + mock_sm.get_opcodes.return_value = [("equal", 0, 5, 0, 5)] mock_sequence_matcher.return_value = mock_sm - + result = await write_to_cell_collaboratively( self.mock_ynotebook, self.mock_ycell, "new content" ) - + assert result is True mock_safe_set_cursor.assert_called() @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher') + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher") async def test_difflib_error_handling(self, mock_sequence_matcher, mock_safe_set_cursor): """Test handling of difflib errors.""" mock_sequence_matcher.side_effect = Exception("Difflib error") - + with pytest.raises(RuntimeError, match="Failed to write cell content collaboratively"): - await write_to_cell_collaboratively( - self.mock_ynotebook, self.mock_ycell, "new content" - ) + await write_to_cell_collaboratively(self.mock_ynotebook, self.mock_ycell, "new content") @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook._handle_delete_operation') - @patch('jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher') - async def test_delete_operation_called(self, mock_sequence_matcher, mock_delete_op, mock_safe_set_cursor): + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook._handle_delete_operation") + @patch("jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher") + async def test_delete_operation_called( + self, mock_sequence_matcher, mock_delete_op, mock_safe_set_cursor + ): """Test that delete operation is called for delete opcodes.""" mock_sm = Mock() - mock_sm.get_opcodes.return_value = [('delete', 0, 5, 0, 0)] + mock_sm.get_opcodes.return_value = [("delete", 0, 5, 0, 0)] mock_sequence_matcher.return_value = mock_sm mock_delete_op.return_value = None - + result = await write_to_cell_collaboratively( self.mock_ynotebook, self.mock_ycell, "new content" ) - + assert result is True mock_delete_op.assert_called_once() @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook._handle_insert_operation') - @patch('jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher') - async def test_insert_operation_called(self, mock_sequence_matcher, mock_insert_op, mock_safe_set_cursor): + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook._handle_insert_operation") + @patch("jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher") + async def test_insert_operation_called( + self, mock_sequence_matcher, mock_insert_op, mock_safe_set_cursor + ): """Test that insert operation is called for insert opcodes.""" mock_sm = Mock() - mock_sm.get_opcodes.return_value = [('insert', 0, 0, 0, 5)] + mock_sm.get_opcodes.return_value = [("insert", 0, 0, 0, 5)] mock_sequence_matcher.return_value = mock_sm mock_insert_op.return_value = 5 - + result = await write_to_cell_collaboratively( self.mock_ynotebook, self.mock_ycell, "new content" ) - + assert result is True mock_insert_op.assert_called_once() @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook._handle_replace_operation') - @patch('jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher') - async def test_replace_operation_called(self, mock_sequence_matcher, mock_replace_op, mock_safe_set_cursor): + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook._handle_replace_operation") + @patch("jupyter_ai_tools.toolkits.notebook.difflib.SequenceMatcher") + async def test_replace_operation_called( + self, mock_sequence_matcher, mock_replace_op, mock_safe_set_cursor + ): """Test that replace operation is called for replace opcodes.""" mock_sm = Mock() - mock_sm.get_opcodes.return_value = [('replace', 0, 5, 0, 7)] + mock_sm.get_opcodes.return_value = [("replace", 0, 5, 0, 7)] mock_sequence_matcher.return_value = mock_sm mock_replace_op.return_value = 7 - + result = await write_to_cell_collaboratively( self.mock_ynotebook, self.mock_ycell, "new content" ) - + assert result is True mock_replace_op.assert_called_once() @@ -168,32 +170,32 @@ class TestHandleDeleteOperation: """Test cases for _handle_delete_operation function.""" @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_delete_operation(self, mock_sleep, mock_safe_set_cursor): """Test delete operation with proper timing.""" mock_ynotebook = Mock() mock_old_ = MagicMock() mock_old_.__delitem__ = Mock() - + await _handle_delete_operation(mock_ynotebook, mock_old_, 0, 5, 0.1) - + # Check that cursor was set and sleep was called mock_safe_set_cursor.assert_called_with(mock_ynotebook, mock_old_, 0, 5) assert mock_sleep.call_count == 2 # Two sleep calls mock_old_.__delitem__.assert_called_with(slice(0, 5)) @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_delete_operation_with_fast_typing(self, mock_sleep, mock_safe_set_cursor): """Test delete operation respects maximum sleep time.""" mock_ynotebook = Mock() mock_old_ = MagicMock() mock_old_.__delitem__ = Mock() - + await _handle_delete_operation(mock_ynotebook, mock_old_, 0, 5, 1.0) - + # Should use min(0.3, 1.0 * 3) = 0.3 for first sleep mock_sleep.assert_any_call(0.3) @@ -202,16 +204,16 @@ class TestHandleInsertOperation: """Test cases for _handle_insert_operation function.""" @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_insert_whitespace_only(self, mock_sleep, mock_safe_set_cursor): """Test insertion of whitespace-only content.""" mock_ynotebook = Mock() mock_old_ = Mock() mock_old_.insert = Mock() - + result = await _handle_insert_operation(mock_ynotebook, mock_old_, 0, " \n ", 0, 4, 0.1) - + assert result == 4 # Check that the entire whitespace string was inserted mock_old_.insert.assert_called_once() @@ -223,32 +225,34 @@ async def test_insert_whitespace_only(self, mock_sleep, mock_safe_set_cursor): mock_safe_set_cursor.assert_called_with(mock_ynotebook, mock_old_, 4) @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_insert_words(self, mock_sleep, mock_safe_set_cursor): """Test insertion of words with proper spacing.""" mock_ynotebook = Mock() mock_old_ = Mock() mock_old_.insert = Mock() - - result = await _handle_insert_operation(mock_ynotebook, mock_old_, 0, "hello world", 0, 11, 0.1) - + + result = await _handle_insert_operation( + mock_ynotebook, mock_old_, 0, "hello world", 0, 11, 0.1 + ) + assert result == 11 # Should insert "hello" and "world" separately assert mock_old_.insert.call_count >= 2 mock_safe_set_cursor.assert_called() @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._safe_set_cursor') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook._safe_set_cursor") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_insert_with_suffix(self, mock_sleep, mock_safe_set_cursor): """Test insertion handles remaining text after last word.""" mock_ynotebook = Mock() mock_old_ = Mock() mock_old_.insert = Mock() - + result = await _handle_insert_operation(mock_ynotebook, mock_old_, 0, "hello!", 0, 6, 0.1) - + assert result == 6 mock_old_.insert.assert_called() mock_safe_set_cursor.assert_called() @@ -258,59 +262,61 @@ class TestHandleReplaceOperation: """Test cases for _handle_replace_operation function.""" @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook._handle_delete_operation') - @patch('jupyter_ai_tools.toolkits.notebook._handle_insert_operation') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook._handle_delete_operation") + @patch("jupyter_ai_tools.toolkits.notebook._handle_insert_operation") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_replace_operation(self, mock_sleep, mock_insert_op, mock_delete_op): """Test replace operation calls delete then insert.""" mock_ynotebook = Mock() mock_old_ = Mock() mock_delete_op.return_value = None mock_insert_op.return_value = 10 - + result = await _handle_replace_operation( mock_ynotebook, mock_old_, 0, "new content", 5, 0, 10, 0.1 ) - + assert result == 10 mock_delete_op.assert_called_once_with(mock_ynotebook, mock_old_, 0, 5, 0.1) - mock_insert_op.assert_called_once_with(mock_ynotebook, mock_old_, 0, "new content", 0, 10, 0.1) + mock_insert_op.assert_called_once_with( + mock_ynotebook, mock_old_, 0, "new content", 0, 10, 0.1 + ) mock_sleep.assert_called_with(0.2) # typing_speed * 2 class TestSafeSetCursor: """Test cases for _safe_set_cursor function.""" - @patch('jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook') + @patch("jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook") def test_safe_set_cursor_success(self, mock_set_cursor): """Test successful cursor setting.""" mock_ynotebook = Mock() mock_old_ = Mock() - + _safe_set_cursor(mock_ynotebook, mock_old_, 5) - + mock_set_cursor.assert_called_once_with(mock_ynotebook, mock_old_, 5, None) - @patch('jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook') + @patch("jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook") def test_safe_set_cursor_with_stop(self, mock_set_cursor): """Test cursor setting with stop position.""" mock_ynotebook = Mock() mock_old_ = Mock() - + _safe_set_cursor(mock_ynotebook, mock_old_, 5, 10) - + mock_set_cursor.assert_called_once_with(mock_ynotebook, mock_old_, 5, 10) - @patch('jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook') + @patch("jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook") def test_safe_set_cursor_handles_exception(self, mock_set_cursor): """Test that cursor setting exceptions are handled gracefully.""" mock_set_cursor.side_effect = Exception("Cursor error") mock_ynotebook = Mock() mock_old_ = Mock() - + # Should not raise exception _safe_set_cursor(mock_ynotebook, mock_old_, 5) - + mock_set_cursor.assert_called_once() @@ -318,46 +324,44 @@ class TestIntegration: """Integration tests for the complete functionality.""" @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_full_workflow_simple_change(self, mock_sleep, mock_set_cursor): """Test a complete workflow with simple text change.""" mock_ynotebook = Mock() mock_ycell = MagicMock() mock_source = MagicMock() - + # Set up mocks mock_ycell.to_py.return_value = {"source": "hello"} mock_ycell.__getitem__.return_value = mock_source mock_source.insert = Mock() mock_source.__delitem__ = Mock() - + # This should result in a replace operation: "hello" -> "world" result = await write_to_cell_collaboratively( mock_ynotebook, mock_ycell, "world", typing_speed=0.0 ) - + assert result is True # Should have called insert operations for typing simulation mock_source.insert.assert_called() @pytest.mark.asyncio - @patch('jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook') - @patch('jupyter_ai_tools.toolkits.notebook.asyncio.sleep') + @patch("jupyter_ai_tools.toolkits.notebook.set_cursor_in_ynotebook") + @patch("jupyter_ai_tools.toolkits.notebook.asyncio.sleep") async def test_custom_typing_speed(self, mock_sleep, mock_set_cursor): """Test that custom typing speed is respected.""" mock_ynotebook = Mock() mock_ycell = MagicMock() mock_source = MagicMock() - + mock_ycell.to_py.return_value = {"source": ""} mock_ycell.__getitem__.return_value = mock_source mock_source.insert = Mock() - - await write_to_cell_collaboratively( - mock_ynotebook, mock_ycell, "test", typing_speed=0.5 - ) - + + await write_to_cell_collaboratively(mock_ynotebook, mock_ycell, "test", typing_speed=0.5) + # Should have called sleep with custom timing mock_sleep.assert_called() # At least one call should use our custom typing speed @@ -366,4 +370,4 @@ async def test_custom_typing_speed(self, mock_sleep, mock_set_cursor): if __name__ == "__main__": - pytest.main([__file__]) \ No newline at end of file + pytest.main([__file__]) From 07e9f548db58c0fe9ed78c88579ea61b6ce6ce40 Mon Sep 17 00:00:00 2001 From: Piyush Jain Date: Tue, 19 Aug 2025 14:47:31 -0700 Subject: [PATCH 5/5] lint --- tests/test_collaborative_tool.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_collaborative_tool.py b/tests/test_collaborative_tool.py index 0fa9f56..bde4ea9 100644 --- a/tests/test_collaborative_tool.py +++ b/tests/test_collaborative_tool.py @@ -176,7 +176,9 @@ async def test_func(file_path: str): "jupyter_ai_tools.utils.get_global_awareness", return_value=mock_global_awareness ), patch( "jupyter_ai_tools.utils.get_file_id", side_effect=Exception("File ID error") - ), patch("jupyter_ai_tools.utils.get_jupyter_ydoc") as mock_ydoc: + ), patch( + "jupyter_ai_tools.utils.get_jupyter_ydoc" + ) as mock_ydoc: # Function should still execute despite notebook awareness error result = await test_func("test.ipynb") @@ -320,7 +322,7 @@ async def complex_func(arg1: str, file_path: str, *args, **kwargs): # Verify function executed with all parameters assert ( - "processed first, test.py, ('extra1', 'extra2'), {'key1': 'value1', 'key2': 'value2'}" # noqa: E501 + "processed first, test.py, ('extra1', 'extra2'), {'key1': 'value1', 'key2': 'value2'}" # noqa: E501 in result )