diff --git a/README.md b/README.md
index 78572bf..8b19a3d 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,13 @@
# Jupyter AI Claude Code
-Jupyter AI integration with Claude Code.
+Jupyter AI integration with Claude Code persona for enhanced development capabilities.
+
+## Features
+
+- **Claude Code Integration**: Full Claude Code persona for Jupyter AI
+- **Development Tools**: Access to Claude Code's built-in development tools
+- **Seamless Integration**: Works with existing Jupyter AI workflow
+- **Template Management**: Interactive task progress tracking and updates
## Setup
@@ -39,7 +46,14 @@ This will:
pixi run start
```
-This will start JupyterLab with the Jupyter AI extension and this package available.
+This will start JupyterLab with the Jupyter AI extension and Claude Code persona available.
+
+### Using Claude Code Persona
+
+1. Open JupyterLab
+2. Open the Jupyter AI chat panel
+3. Select "Claude" persona
+4. Interact with Claude Code's development tools
### Build the Package
@@ -57,6 +71,7 @@ The package source code is located in `src/jupyter_ai_claude_code/`.
- **JupyterLab**: Latest stable version from conda-forge
- **Jupyter AI**: Version 3.0.0b5 from PyPI
+- **Claude Code SDK**: For Claude Code integration
- **Python**: >=3.8
## License
diff --git a/src/jupyter_ai_claude_code/persona.py b/src/jupyter_ai_claude_code/persona.py
index 7305a18..1e2cdcf 100644
--- a/src/jupyter_ai_claude_code/persona.py
+++ b/src/jupyter_ai_claude_code/persona.py
@@ -1,92 +1,20 @@
from typing import Dict, Any, List, Optional, AsyncIterator
+import datetime
from jupyter_ai.personas.base_persona import BasePersona, PersonaDefaults
-from jupyterlab_chat.models import Message
+from jupyterlab_chat.models import Message, NewMessage
from claude_code_sdk import (
- query, ClaudeCodeOptions,
- Message, SystemMessage, AssistantMessage, ResultMessage,
- TextBlock, ToolUseBlock
+ query, ClaudeCodeOptions, AssistantMessage, TextBlock, ToolUseBlock,
+ UserMessage, SystemMessage
)
+from .templates import (
+ ClaudeCodeTemplateManager
+)
-OMIT_INPUT_ARGS = ['content']
-
-TOOL_PARAM_MAPPING = {
- 'Task': 'description',
- 'Bash': 'command',
- 'Glob': 'pattern',
- 'Grep': 'pattern',
- 'LS': 'path',
- 'Read': 'file_path',
- 'Edit': 'file_path',
- 'MultiEdit': 'file_path',
- 'Write': 'file_path',
- 'NotebookRead': 'notebook_path',
- 'NotebookWrite': 'notebook_path',
- 'WebFetch': 'url',
- 'WebSearch': 'query',
-}
-
-PROMPT_TEMPLATE = """
-{{body}}
-
-The user has selected the following files as attachements:
-
-
-"""
-
-def input_dict_to_str(d: Dict[str, Any]) -> str:
- """Convert input dictionary to string representation, omitting specified args."""
- args = []
- for k, v in d.items():
- if k not in OMIT_INPUT_ARGS:
- args.append(f"{k}={v}")
- return ', '.join(args)
-
-
-def tool_to_str(block: ToolUseBlock, persona_instance=None) -> str:
- """Convert a ToolUseBlock to its string representation."""
- results = []
-
- if block.name == 'TodoWrite':
- block_id = block.id if hasattr(block, 'id') else str(hash(str(block.input)))
-
- if persona_instance and block_id in persona_instance._printed_todowrite_blocks:
- return ""
-
- if persona_instance:
- persona_instance._printed_todowrite_blocks.add(block_id)
-
- todos = block.input.get('todos', [])
- results.append('TodoWrite()')
- for todo in todos:
- content = todo.get('content')
- if content:
- results.append(f"* {content}")
- elif block.name in TOOL_PARAM_MAPPING:
- param_key = TOOL_PARAM_MAPPING[block.name]
- param_value = block.input.get(param_key, '')
- results.append(f"🛠️ {block.name}({param_value})")
- else:
- results.append(f"🛠️ {block.name}({input_dict_to_str(block.input)})")
-
- return '\n'.join(results)
-def claude_message_to_str(message, persona_instance=None) -> Optional[str]:
- """Convert a Claude Message to a string by extracting text content."""
- text_parts = []
- for block in message.content:
- if isinstance(block, TextBlock):
- text_parts.append(block.text)
- elif isinstance(block, ToolUseBlock):
- tool_str = tool_to_str(block, persona_instance)
- if tool_str:
- text_parts.append(tool_str)
- else:
- text_parts.append(str(block))
- return '\n'.join(text_parts) if text_parts else None
class ClaudeCodePersona(BasePersona):
@@ -94,7 +22,7 @@ class ClaudeCodePersona(BasePersona):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self._printed_todowrite_blocks = set()
+ self.template_mgr = ClaudeCodeTemplateManager(self)
@property
def defaults(self) -> PersonaDefaults:
@@ -102,18 +30,37 @@ def defaults(self) -> PersonaDefaults:
return PersonaDefaults(
name="Claude",
avatar_path="/files/.jupyter/claude.svg",
- description="Claude Code",
+ description="Claude Code persona",
system_prompt="...",
)
async def _process_response_message(self, message_iterator) -> AsyncIterator[str]:
- """Process response messages from Claude Code SDK."""
- async for response_message in message_iterator:
- self.log.info(str(response_message))
- if isinstance(response_message, AssistantMessage):
- msg_str = claude_message_to_str(response_message, self)
- if msg_str is not None:
- yield msg_str + '\n\n'
+ """Process response messages with template updates."""
+ has_content = False
+ template_was_used = False
+
+ async for message in message_iterator:
+ self.log.info(str(message))
+ if isinstance(message, AssistantMessage):
+ result = await self.template_mgr.claude_message_to_str(message)
+ # Template now handles everything - never stream individual components
+ if self.template_mgr.active:
+ template_was_used = True
+ elif result is not None:
+ # Only for messages without any tool usage (rare)
+ has_content = True
+ yield result + '\n\n'
+
+ # Complete template if active
+ if self.template_mgr.active:
+ await self.template_mgr.complete()
+ template_was_used = True
+
+ # Always yield something to complete the stream
+ if template_was_used:
+ yield "" # Empty yield to signal completion when template handled everything
+ elif not has_content:
+ yield "" # Ensure stream completes for empty responses
def _generate_prompt(self, message: Message) -> str:
attachment_ids = message.attachments
@@ -133,24 +80,57 @@ def _generate_prompt(self, message: Message) -> str:
self.log.info(prompt)
return prompt
+ def _get_system_prompt(self):
+ """Get the system prompt for Claude Code options."""
+ return ("I am Claude Code, an AI assistant with access to development tools. "
+ "When formatting responses, I use **bold text** for emphasis and section headers instead of markdown headings (# ## ###). "
+ "I keep formatting clean and readable without large headers. "
+ "For complex tasks requiring multiple steps (3+ actions), I proactively create a todo list using TodoWrite to track progress and keep the user informed of my plan.")
+
async def process_message(self, message: Message) -> None:
"""Process incoming message and stream Claude Code response."""
- self._printed_todowrite_blocks.clear()
- async_gen = None
- prompt = self._generate_prompt(message)
+ # Always set writing state at the start
+ self.awareness.set_local_state_field("isWriting", True)
+
+ self.template_mgr.reset()
+
try:
- async_gen = query(
- prompt=prompt,
- options=ClaudeCodeOptions(
- max_turns=20,
- cwd=self.get_workspace_dir(),
- permission_mode='bypassPermissions'
- )
- )
+ # Configure Claude Code - use workspace dir for better working directory detection
+ chat_dir = self.get_chat_dir()
+ workspace_dir = self.get_workspace_dir()
+
+ # Prefer workspace dir if available, fallback to chat dir
+ working_dir = chat_dir if chat_dir else workspace_dir
+
+ self.log.info(f"Chat directory: {chat_dir}")
+ self.log.info(f"Workspace directory: {workspace_dir}")
+ self.log.info(f"Using working directory: {working_dir}")
+
+ options = {
+ 'max_turns': 20,
+ 'cwd': working_dir,
+ 'permission_mode': 'bypassPermissions',
+ 'system_prompt': self._get_system_prompt()
+ }
+
+ # Generate prompt from current message
+ user_prompt = self._generate_prompt(message)
+
+ # Stream response directly with prompt
+ async_gen = query(prompt=user_prompt, options=ClaudeCodeOptions(**options))
+
+ # Use stream_message to handle the streaming
await self.stream_message(self._process_response_message(async_gen))
+
except Exception as e:
- self.log.error(f"Error in process_message: {e}")
- await self.send_message(f"Sorry, I have had an internal error while working on that: {e}")
+ self.log.error(f"Error: {e}")
+ if self.template_mgr.active:
+ await self.template_mgr.complete()
+
+ try:
+ await self.send_message(f"Sorry, error: {e}")
+ except TypeError:
+ self.send_message(f"Sorry, error: {e}")
finally:
- if async_gen is not None:
- await async_gen.aclose()
+ # Always clear writing state when done
+ self.awareness.set_local_state_field("isWriting", False)
diff --git a/src/jupyter_ai_claude_code/templates.py b/src/jupyter_ai_claude_code/templates.py
new file mode 100644
index 0000000..312eae6
--- /dev/null
+++ b/src/jupyter_ai_claude_code/templates.py
@@ -0,0 +1,424 @@
+"""Claude Code message template management for Jupyter AI persona."""
+
+from typing import Dict, Any, List, Optional
+import datetime
+import os
+import re
+from dataclasses import dataclass, field
+from jinja2 import Template
+from jupyterlab_chat.models import Message, NewMessage
+from claude_code_sdk import TextBlock, ToolUseBlock, ToolResultBlock, AssistantMessage
+
+
+@dataclass
+class MessageData:
+ """Dataclass containing all data needed for message template rendering."""
+ todos: List[Dict[str, Any]] = field(default_factory=list)
+ current_action: Dict[str, str] = None
+ completed_actions: List[str] = field(default_factory=list)
+ current_result: str = None
+ initial_text: str = None
+ final_text: str = None
+
+
+# Template for rendering consolidated actions and final response
+TODO_TEMPLATE = Template("""
+{%- if data.initial_text %}
+{{ data.initial_text }}
+
+{%- endif %}
+
+{% if data.todos %}
+**Task Progress:**
+
+{%- for todo in data.todos %}
+{%- if todo.status == 'completed' %}
+- [x] ~~{{ todo.content }}~~
+{%- elif todo.status == 'in_progress' %}
+- [ ] **{{ todo.content }}** *(in progress)*
+{%- else %}
+- [ ] {{ todo.content }}
+{%- endif %}
+{%- endfor %}
+
+{%- endif %}
+
+{%- if data.current_action %}
+
+**Current Tool:**
+{{ data.current_action.tool_call }}
+⎿ Executing...
+
+{%- endif %}
+
+{% if data.completed_actions and data.completed_actions|length > 0 %}
+
+**Tools Called:**
+
+See details ({{ data.completed_actions|length }})
+
+{%- for action in data.completed_actions %}
+
+{{ action }}
+⎿ Completed
+{%- endfor %}
+
+
+
+{% endif %}
+
+{% if data.current_result or data.final_text %}
+**Response:**
+{% if data.current_result %}
+{{ data.current_result }}
+{% endif %}
+{% if data.final_text %}
+{{ data.final_text }}
+{% endif %}
+{% endif %}
+""".strip())
+
+
+class ClaudeCodeTemplateManager:
+ """Manages in-place template message updates for Claude Code SDK messages."""
+
+ # Constants
+ MAX_TOOL_VALUE_LENGTH = 60
+ MAX_ARG_LENGTH = 30
+
+ # Tool parameter mapping for display formatting
+ TOOL_PARAM_MAPPING = {
+ 'Task': 'description', 'Bash': 'command', 'Glob': 'pattern', 'Grep': 'pattern',
+ 'LS': 'path', 'Read': 'file_path', 'Edit': 'file_path', 'MultiEdit': 'file_path',
+ 'Write': 'file_path', 'NotebookRead': 'notebook_path', 'NotebookWrite': 'notebook_path',
+ 'WebFetch': 'url', 'WebSearch': 'query'
+ }
+
+ # Tools that should have clickable file links
+ FILE_LINK_TOOLS = {'Read', 'Edit', 'MultiEdit', 'Write'}
+
+ def __init__(self, persona):
+ self.persona = persona
+ self.message_id = None
+ self.message_data = MessageData() # Single dataclass for all template data
+ self.active = False
+ self.turn_active = False # Track if we're in an active Claude turn
+ self.has_actions = False # Track if we've seen any tool calls
+ self.in_final_phase = False # Track if we're in final summary phase
+
+ def _same_todo_list(self, new_todos):
+ """Check if this is the same todo list (just status updates)."""
+ if not self.message_data.todos:
+ return False
+ old_ids = {t['id'] for t in self.message_data.todos}
+ new_ids = {t['id'] for t in new_todos}
+ return old_ids == new_ids
+
+ async def _ensure_message_exists(self):
+ """Ensure a message exists, creating one if needed."""
+ if not self.message_id:
+ await self._create_message()
+ else:
+ await self._update_message()
+
+ async def update_todos(self, todos):
+ """Update todo list, creating or updating message as needed."""
+ self.message_data.todos = todos
+
+ # Always ensure template is active when we have todos
+ if not self.active:
+ self.active = True
+
+ await self._ensure_message_exists()
+ return ""
+
+ async def update_action(self, action):
+ """Start a new action - show it as current action."""
+ if self.active:
+ # Complete previous action if it exists (just add tool call to completed)
+ if self.message_data.current_action:
+ self.message_data.completed_actions.append(self.message_data.current_action['tool_call'])
+
+ # Mark that we've seen actions
+ self.has_actions = True
+
+ # Always start template if not active yet
+ if not self.turn_active:
+ self.turn_active = True
+ if not self.message_id:
+ await self._create_message()
+
+ # Set new current action
+ self.message_data.current_action = {
+ 'tool_call': action,
+ 'result': 'Executing...'
+ }
+
+ # Reset final phase when we start a new action
+ self.in_final_phase = False
+
+ await self._ensure_message_exists()
+ return ""
+ return action
+
+ async def update_action_result(self, result):
+ """Update the result of the current action."""
+ if self.active and self.message_data.current_action:
+ # Update the current action's result (just escape markdown, no aggressive path linking)
+ self.message_data.current_action['result'] = self._escape_markdown(result)
+
+ # Set this as the current result (just the result text, no tool call)
+ self.message_data.current_result = self.message_data.current_action['result']
+
+ # After receiving a result, the action is complete
+ # Move just the tool call to completed actions
+ self.message_data.completed_actions.append(self.message_data.current_action['tool_call'])
+ self.message_data.current_action = None
+ self.in_final_phase = True # Now any subsequent text should go to final_text
+
+ await self._ensure_message_exists()
+ return ""
+ return result
+
+ async def update_text(self, text):
+ """Add text to template response."""
+ if self.active:
+ # If we haven't seen actions yet, add to initial text
+ if not self.has_actions:
+ if self.message_data.initial_text:
+ self.message_data.initial_text += '\n' + text
+ else:
+ self.message_data.initial_text = text
+ elif self.message_data.current_action and not self.in_final_phase:
+ # If we have a current action and not in final phase, treat this text as its result
+ # This handles cases where tool results come as text blocks
+ if self.message_data.current_action['result'] == 'Executing...':
+ self.message_data.current_action['result'] = text
+ # Update current result display (just the result text)
+ self.message_data.current_result = text
+ else:
+ # Append to existing result if already has content
+ self.message_data.current_action['result'] += '\n' + text
+ self.message_data.current_result = self.message_data.current_action['result']
+ else:
+ # No current action or we're in final phase - this is final summary text
+ # This should appear after the horizontal rule
+ if self.message_data.final_text:
+ self.message_data.final_text += '\n' + text
+ else:
+ self.message_data.final_text = text
+ self.in_final_phase = True # Mark that we're now in final phase
+
+ # Create or update message
+ await self._ensure_message_exists()
+ return ""
+ return text
+
+ async def _create_message(self):
+ """Create new template message."""
+ # Don't override main persona's writing state - it's already set
+ content = self._render_template()
+ new_msg = NewMessage(body=content, sender=self.persona.id)
+ self.message_id = self.persona.ychat.add_message(new_msg)
+
+ async def _update_message(self):
+ """Update existing template message."""
+ if not self.message_id:
+ return
+
+ # Update awareness to show writing to specific message
+ self.persona.awareness.set_local_state_field("isWriting", self.message_id)
+ content = self._render_template()
+
+ msg = Message(
+ id=self.message_id,
+ time=datetime.datetime.now().timestamp(),
+ body=content,
+ sender=self.persona.id
+ )
+ self.persona.ychat.update_message(msg, append=False)
+
+ def _render_template(self):
+ """Render current template state."""
+ return TODO_TEMPLATE.render(data=self.message_data)
+
+ async def complete(self):
+ """Complete template - move current action to completed actions."""
+ if self.active and self.message_id:
+ # Move current action to completed actions if it exists
+ if self.message_data.current_action:
+ self.message_data.completed_actions.append(self.message_data.current_action['tool_call'])
+ self.message_data.current_action = None
+
+ # Mark that we're now in final phase - any subsequent text should go to final_text
+ self.in_final_phase = True
+
+ # Do final template update to show completed state
+ await self._ensure_message_exists()
+ elif self.active:
+ # Still mark final phase even without message_id
+ self.in_final_phase = True
+
+ # Keep template active but mark turn as inactive
+ # This allows final text to still be processed
+ self.turn_active = False
+
+ def _escape_markdown(self, text):
+ """Escape markdown characters in text to prevent formatting issues."""
+ # Escape common markdown characters that could cause formatting problems
+ escapes = {
+ '*': '\\*',
+ '_': '\\_',
+ '`': '\\`',
+ '#': '\\#',
+ '[': '\\[',
+ ']': '\\]',
+ '(': '\\(',
+ ')': '\\)',
+ '{': '\\{',
+ '}': '\\}',
+ '\\': '\\\\'
+ }
+ result = str(text)
+ for char, escape in escapes.items():
+ result = result.replace(char, escape)
+ return result
+
+ def _make_jupyter_file_link(self, file_path, tool_name=None):
+ """Convert file path to clickable JupyterLab file link if path exists or will be created."""
+ server_root_reference = self._get_server_root_reference()
+ relative_path = self._resolve_relative_path(file_path, server_root_reference)
+
+ # Always create links for Write tools (file will be created)
+ # For other tools, only create link if file exists
+ should_create_link = (
+ tool_name == 'Write' or
+ self._path_exists_on_server(relative_path, server_root_reference)
+ )
+
+ if should_create_link:
+ return f"[{file_path}](/files/{relative_path})"
+ else:
+ # Return plain text if path doesn't exist and it's not a Write operation
+ return str(file_path)
+
+ def _get_server_root_reference(self):
+ """Get server root directory reference from persona."""
+ try:
+ workspace_dir = getattr(self.persona, 'get_workspace_dir', lambda: None)()
+ chat_dir = getattr(self.persona, 'get_chat_dir', lambda: None)()
+ return workspace_dir or chat_dir
+ except Exception:
+ return None
+
+ def _resolve_relative_path(self, file_path, server_root_reference):
+ """Resolve file path to be relative to server root."""
+ if not file_path.startswith('/') or not server_root_reference:
+ return file_path.lstrip('/')
+
+ try:
+ relative_path = os.path.relpath(file_path, start=server_root_reference)
+ # If path goes outside server root, use basename only
+ return os.path.basename(file_path) if relative_path.startswith('..') else relative_path
+ except (ValueError, OSError):
+ return os.path.basename(file_path)
+
+ def _path_exists_on_server(self, relative_path, server_root_reference):
+ """Check if the relative path exists on the server."""
+ if not server_root_reference or not relative_path:
+ return False
+
+ try:
+ # Construct full path from server root
+ full_path = os.path.join(server_root_reference, relative_path)
+ # Normalize path to handle any .. or . components
+ normalized_path = os.path.normpath(full_path)
+
+ # Security check: ensure normalized path is still within server root
+ if not normalized_path.startswith(os.path.normpath(server_root_reference)):
+ return False
+
+ # Check if file exists
+ return os.path.exists(normalized_path)
+ except Exception:
+ return False
+
+ def format_tool_input(self, tool_name, tool_input):
+ """Format tool input for Claude Code CLI style display."""
+ if tool_name in self.TOOL_PARAM_MAPPING:
+ key = self.TOOL_PARAM_MAPPING[tool_name]
+ value = tool_input.get(key, '')
+
+ # Make file paths clickable for file-related tools
+ if tool_name in self.FILE_LINK_TOOLS and value:
+ if len(str(value)) > self.MAX_TOOL_VALUE_LENGTH:
+ truncated = str(value)[:self.MAX_TOOL_VALUE_LENGTH] + '…'
+ return self._make_jupyter_file_link(truncated, tool_name)
+ return self._make_jupyter_file_link(str(value), tool_name)
+ else:
+ # For other tools, just escape markdown
+ if len(str(value)) > self.MAX_TOOL_VALUE_LENGTH:
+ return self._escape_markdown(str(value)[:self.MAX_TOOL_VALUE_LENGTH] + '…')
+ return self._escape_markdown(str(value))
+
+ # Format remaining args (excluding content)
+ args = []
+ for k, v in tool_input.items():
+ if k != 'content':
+ val_str = str(v)
+ if len(val_str) > self.MAX_ARG_LENGTH:
+ val_str = val_str[:self.MAX_ARG_LENGTH] + '…'
+ args.append(f"{k}={self._escape_markdown(val_str)}")
+ return ', '.join(args)
+
+ async def process_message_block(self, block):
+ """Process a single Claude SDK message block (text or tool)."""
+ if isinstance(block, TextBlock):
+ # Always capture text in template during active turn
+ if not self.active:
+ # Start template on first content
+ self.active = True
+ await self.update_text(block.text)
+ return None
+ else:
+ await self.update_text(block.text)
+ return None # Template handles all display
+
+ elif isinstance(block, ToolUseBlock):
+ if block.name == 'TodoWrite':
+ todos = block.input.get('todos', [])
+ await self.update_todos(todos)
+ return None # Template handles display, don't stream
+
+ # Regular tool display - always capture in template
+ tool_display = f"{block.name}({self.format_tool_input(block.name, block.input)})"
+ await self.update_action(tool_display)
+ return None # Template handles all display
+
+ elif isinstance(block, ToolResultBlock):
+ # Handle tool result - always capture in template
+ result_text = str(block.content) if hasattr(block, 'content') else str(block)
+ await self.update_action_result(result_text)
+ return None # Template handles all display
+
+ return None # Don't stream anything - template handles all
+
+ async def claude_message_to_str(self, message) -> Optional[str]:
+ """Convert Claude SDK Message to string, handling template updates."""
+ text_parts = []
+ for block in message.content:
+ result = await self.process_message_block(block)
+ if result is not None: # Only add non-None results
+ text_parts.append(result)
+ return '\n'.join(text_parts) if text_parts else None
+
+ def reset(self):
+ """Reset for new conversation."""
+ self.message_id = None
+ self.message_data = MessageData() # Reset to new dataclass instance
+ self.active = False
+ self.turn_active = False
+ self.has_actions = False
+ self.in_final_phase = False
+
+