diff --git a/.taskmaster/config.json b/.taskmaster/config.json new file mode 100644 index 0000000..173e20b --- /dev/null +++ b/.taskmaster/config.json @@ -0,0 +1,35 @@ +{ + "models": { + "main": { + "provider": "anthropic", + "modelId": "claude-3-7-sonnet-20250219", + "maxTokens": 64000, + "temperature": 0.2 + }, + "research": { + "provider": "perplexity", + "modelId": "sonar-pro", + "maxTokens": 8700, + "temperature": 0.1 + }, + "fallback": { + "provider": "anthropic", + "modelId": "claude-3-5-sonnet", + "maxTokens": 8192, + "temperature": 0.2 + } + }, + "global": { + "logLevel": "info", + "debug": false, + "defaultNumTasks": 10, + "defaultSubtasks": 5, + "defaultPriority": "medium", + "projectName": "Task Master", + "ollamaBaseURL": "http://localhost:11434/api", + "bedrockBaseURL": "https://bedrock.us-east-1.amazonaws.com", + "responseLanguage": "English", + "userId": "1234567890" + }, + "claudeCode": {} +} \ No newline at end of file diff --git a/.taskmaster/state.json b/.taskmaster/state.json new file mode 100644 index 0000000..83236fc --- /dev/null +++ b/.taskmaster/state.json @@ -0,0 +1,3 @@ +{ + "migrationNoticeShown": true +} \ No newline at end of file diff --git a/.taskmaster/tasks/tasks.json b/.taskmaster/tasks/tasks.json new file mode 100644 index 0000000..4ab32d0 --- /dev/null +++ b/.taskmaster/tasks/tasks.json @@ -0,0 +1,142 @@ +{ + "master": { + "tasks": [ + { + "id": 11, + "title": "Analyze Current Heartbeat Function Implementation", + "description": "Analyze the existing PRReviewPersona.heartbeat function to understand its current structure, message flow, and timing logic.", + "details": "1. Locate the PRReviewPersona class in the codebase\n2. Identify the heartbeat function implementation\n3. Document the current message sequence and timing\n4. Identify areas of code duplication and excessive nesting\n5. Create a flow diagram of the current implementation\n6. Document any edge cases or special handling in the current code\n7. Measure current performance characteristics as a baseline", + "testStrategy": "No direct testing needed for this task, but document findings for validation in subsequent tasks. Create a reference document with timing measurements that can be used to verify the refactored implementation maintains the same behavior.", + "priority": "high", + "dependencies": [], + "status": "pending", + "subtasks": [] + }, + { + "id": 12, + "title": "Design Message Configuration Data Structure", + "description": "Create a structured data format to hold message configurations including delays and text content.", + "details": "1. Define a tuple structure with (delay_in_seconds, message_text) format\n2. Create a list/tuple of these configurations representing the complete message sequence\n3. Extract all existing messages and their timing from the current implementation\n4. Organize messages in sequential order\n5. Consider using a TypedDict or NamedTuple for better type safety:\n```python\nfrom typing import List, Tuple, TypedDict\n\nclass MessageConfig(TypedDict):\n delay: float\n text: str\n\n# Or as a simple tuple list\nmessage_configs: List[Tuple[float, str]] = [\n (1.5, \"Starting PR review...\"),\n (2.0, \"Analyzing code changes...\"),\n # Additional messages\n]\n```", + "testStrategy": "Create unit tests to verify:\n1. All messages from original implementation are included\n2. Timing values match original implementation\n3. Data structure can be iterated over correctly", + "priority": "high", + "dependencies": [ + 11 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 13, + "title": "Implement Message Iterator Mechanism", + "description": "Create an iterator-based mechanism to process message configurations sequentially with appropriate timing.", + "details": "1. Implement an async generator function to yield messages with delays\n2. Handle the timing between messages using asyncio.sleep\n3. Ensure the iterator can be interrupted when needed\n\n```python\nasync def message_iterator(message_configs):\n \"\"\"Generate status messages with appropriate delays.\"\"\"\n for delay, message in message_configs:\n yield message\n await asyncio.sleep(delay)\n```\n\n4. Consider adding a parameter to control whether to include initial delay\n5. Add proper error handling for asyncio operations", + "testStrategy": "1. Unit test the iterator with mock timing to verify message sequence\n2. Test interruption scenarios\n3. Verify timing accuracy using asyncio test utilities\n4. Test with various message configurations", + "priority": "high", + "dependencies": [ + 12 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 14, + "title": "Implement Processing Status Check Mechanism", + "description": "Create a mechanism to check if PR review processing is complete and break the message iteration when necessary.", + "details": "1. Analyze how the current implementation detects completion\n2. Implement a clean way to check processing status\n3. Create a mechanism to signal the iterator to stop\n\n```python\nasync def heartbeat(self, processing_complete_check):\n \"\"\"Provide status updates during processing.\"\"\"\n for message in message_iterator(MESSAGE_CONFIGS):\n if await processing_complete_check():\n break\n await self.send_message(message)\n```\n\n4. Consider using asyncio.Event or similar mechanism for signaling\n5. Ensure proper cleanup of resources when breaking out of the loop", + "testStrategy": "1. Test with mock processing_complete_check functions that return at different times\n2. Verify the iterator stops when processing is complete\n3. Test edge cases (immediate completion, never completes)\n4. Verify no resource leaks when breaking early", + "priority": "high", + "dependencies": [ + 13 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 15, + "title": "Refactor Heartbeat Function Core Implementation", + "description": "Rewrite the PRReviewPersona.heartbeat function using the new data-driven approach with reduced nesting and complexity.", + "details": "1. Replace the current implementation with the new iterator-based approach\n2. Ensure maximum nesting level is 2 or less\n3. Implement proper error handling\n4. Maintain all existing functionality\n\n```python\nasync def heartbeat(self):\n \"\"\"Provide status updates during PR review processing.\"\"\"\n message_configs = [\n (1.5, \"Starting PR review...\"),\n (2.0, \"Analyzing code changes...\"),\n # Additional messages with their delays\n ]\n \n async for message in message_iterator(message_configs):\n if self.review_complete:\n break\n await self.send_message(message)\n```\n\n5. Ensure compatibility with the rest of the class\n6. Preserve any class-specific state management", + "testStrategy": "1. Compare output with original implementation\n2. Verify all messages appear with correct timing\n3. Test interruption when review completes\n4. Measure performance compared to baseline\n5. Verify integration with the rest of the PRReviewPersona class", + "priority": "high", + "dependencies": [ + 14 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 16, + "title": "Add Type Hints and Update Documentation", + "description": "Add comprehensive type hints and update function documentation for the refactored implementation.", + "details": "1. Add type hints to all new functions and data structures\n2. Update docstrings with detailed descriptions\n3. Include parameter and return type documentation\n4. Add examples in docstrings where appropriate\n\n```python\nfrom typing import List, Tuple, Callable, AsyncIterator, TypeVar, Any\n\nMessageConfig = Tuple[float, str]\n\nasync def message_iterator(configs: List[MessageConfig]) -> AsyncIterator[str]:\n \"\"\"Generate status messages with appropriate delays.\n \n Args:\n configs: List of (delay, message) tuples\n \n Yields:\n Status messages in sequence with appropriate delays\n \"\"\"\n # Implementation\n\nasync def heartbeat(self) -> None:\n \"\"\"Provide status updates during PR review processing.\n \n Sends a series of status messages with appropriate timing\n until the review process is complete.\n \"\"\"\n # Implementation\n```\n\n5. Update any related class documentation", + "testStrategy": "1. Run mypy or similar type checker to verify type correctness\n2. Verify documentation builds correctly\n3. Review docstrings for completeness and accuracy\n4. Ensure examples in documentation are correct", + "priority": "medium", + "dependencies": [ + 15 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 17, + "title": "Implement Unit Tests for Refactored Code", + "description": "Create comprehensive unit tests for the refactored heartbeat function and supporting components.", + "details": "1. Create test cases for the message configuration structure\n2. Implement tests for the message iterator\n3. Create tests for the processing status check mechanism\n4. Implement tests for the heartbeat function itself\n\n```python\nimport pytest\nimport asyncio\nfrom unittest.mock import MagicMock, patch\n\n@pytest.mark.asyncio\nasync def test_message_iterator():\n configs = [(0.1, \"msg1\"), (0.1, \"msg2\")]\n messages = []\n async for msg in message_iterator(configs):\n messages.append(msg)\n assert messages == [\"msg1\", \"msg2\"]\n\n@pytest.mark.asyncio\nasync def test_heartbeat_stops_when_complete():\n # Test implementation\n pass\n```\n\n5. Include tests for edge cases and error conditions\n6. Test timing accuracy (with appropriate test utilities)\n7. Aim for 100% code coverage", + "testStrategy": "1. Run tests with pytest or similar framework\n2. Measure code coverage and ensure it meets requirements\n3. Verify tests pass consistently\n4. Include both unit tests and integration tests", + "priority": "medium", + "dependencies": [ + 15 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 18, + "title": "Perform Code Quality Checks", + "description": "Run code quality tools to ensure the refactored code meets quality standards and has no regressions.", + "details": "1. Run pylint or similar linter on the refactored code\n2. Check for any new warnings or errors\n3. Verify code complexity metrics\n - Maximum nesting level should be 2 or less\n - No duplicate code\n4. Run type checking with mypy\n5. Verify PEP 8 compliance\n6. Check for any performance regressions\n\n```bash\n# Example commands to run\npylint jupyter_ai_personas/pr_review_persona.py\nmypy jupyter_ai_personas/pr_review_persona.py\nblack --check jupyter_ai_personas/pr_review_persona.py\n```\n\n7. Address any issues found during quality checks", + "testStrategy": "1. Compare quality metrics before and after refactoring\n2. Document any improvements in code quality\n3. Ensure no new warnings are introduced\n4. Verify all quality checks pass in CI environment", + "priority": "medium", + "dependencies": [ + 16, + 17 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 19, + "title": "Perform Integration Testing", + "description": "Test the refactored heartbeat function in the context of the full PRReviewPersona class and its interactions.", + "details": "1. Create integration tests that use the PRReviewPersona class with real or mock PR data\n2. Verify the heartbeat function works correctly in the context of a full PR review\n3. Test interactions with other components\n4. Verify message delivery in end-to-end scenarios\n\n```python\n@pytest.mark.asyncio\nasync def test_pr_review_with_heartbeat():\n # Setup mock PR data\n pr_data = {...}\n \n # Create persona instance\n persona = PRReviewPersona(...)\n \n # Start review process\n review_task = asyncio.create_task(persona.review_pr(pr_data))\n \n # Verify heartbeat messages are sent\n # Complete test implementation\n```\n\n5. Test with various PR scenarios (simple PR, complex PR, etc.)\n6. Verify behavior matches original implementation", + "testStrategy": "1. Run integration tests in a controlled environment\n2. Monitor message output and timing\n3. Verify correct interaction with other components\n4. Test with real-world PR examples if possible", + "priority": "medium", + "dependencies": [ + 15, + 17 + ], + "status": "pending", + "subtasks": [] + }, + { + "id": 20, + "title": "Documentation and Final Review", + "description": "Update project documentation and perform a final review of the refactored implementation.", + "details": "1. Update any project documentation referencing the heartbeat function\n2. Document the new approach in code comments\n3. Create a summary of changes for the pull request\n4. Prepare before/after code samples to demonstrate improvements\n5. Document any performance improvements or other benefits\n6. Create a changelog entry if appropriate\n7. Prepare for code review by addressing potential questions\n\n```markdown\n## Refactoring Summary\n\nThe `PRReviewPersona.heartbeat` function has been refactored to use a data-driven approach with the following improvements:\n\n- Reduced code nesting from X levels to 2 levels\n- Eliminated code duplication\n- Improved maintainability through clear separation of configuration and logic\n- Added comprehensive type hints and documentation\n- Maintained all existing functionality and timing\n\n### Before/After Comparison\n\n[Include code snippets showing the improvement]\n```", + "testStrategy": "1. Review documentation for accuracy and completeness\n2. Verify all changes are properly documented\n3. Have another team member review the documentation\n4. Ensure documentation builds correctly in the project's documentation system", + "priority": "low", + "dependencies": [ + 18, + 19 + ], + "status": "pending", + "subtasks": [] + } + ], + "metadata": { + "created": "2025-07-29T00:20:35.150Z", + "updated": "2025-07-29T00:31:28.500Z", + "description": "Tasks for master context" + } + } +} \ No newline at end of file diff --git a/jupyter-ai-personas b/jupyter-ai-personas deleted file mode 160000 index 4af5de3..0000000 --- a/jupyter-ai-personas +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 4af5de32685badcea70cb30f8abfde93bf2c2ed6 diff --git a/jupyter_ai_personas/knowledge_graph/bulk_analyzer.py b/jupyter_ai_personas/knowledge_graph/bulk_analyzer.py new file mode 100644 index 0000000..22324a7 --- /dev/null +++ b/jupyter_ai_personas/knowledge_graph/bulk_analyzer.py @@ -0,0 +1,205 @@ +import os +import tree_sitter_python as tspython +from tree_sitter import Language, Parser +from neo4j import GraphDatabase +import hashlib +import boto3 +import json + + +class BulkCodeAnalyzer: + def __init__(self, uri, auth, embd_name=None, embd_id=None): + self.driver = GraphDatabase.driver(uri, auth=auth) + self.PY_LANGUAGE = Language(tspython.language()) + self.parser = Parser(self.PY_LANGUAGE) + self.embd_name = embd_name # Bedrock + self.embd_id = embd_id # amazon.titan-embed-text-v1 + self.bedrock_client = boto3.client("bedrock-runtime") if embd_name else None + + def analyze_folder(self, folder_path, clear_existing=False): + """Analyze all supported files in a folder and add to knowledge graph""" + if clear_existing: + with self.driver.session() as session: + session.run("MATCH (n) DETACH DELETE n") + print("Cleared existing graph") + + # Supported file extensions + supported_extensions = {".py"} # for 1st phase just py + + all_files = [] + for root, dirs, files in os.walk(folder_path): + for file in files: + file_ext = os.path.splitext(file)[1] + if file_ext in supported_extensions: + all_files.append(os.path.join(root, file)) + + print(f"Found {len(all_files)} supported files") + + with self.driver.session() as session: + for file_path in all_files: + print(f"Analyzing: {file_path}") + try: + if file_path.endswith(".py"): + self._analyze_file(file_path, session) + else: + self._analyze_non_python_file(file_path, session) + except Exception as e: + print(f"Error analyzing {file_path}: {e}") + + def _analyze_file(self, file_path, session): + with open(file_path, "r", encoding="utf-8") as f: + code = f.read() + + tree = self.parser.parse(bytes(code, "utf8")) + self._extract_code_elements(tree.root_node, session, file_path) + + def _extract_code_elements(self, node, session, file_path, current_class=None): + if node.type == "class_definition": + class_name = node.child_by_field_name("name").text.decode("utf8") + class_code = node.text.decode("utf8", errors="ignore") + embedding = self._get_embedding(class_code) if self.bedrock_client else None + + session.run( + "MERGE (c:Class {name: $name}) SET c.file = $file, c.embedding = $embedding", + name=class_name, + file=file_path, + embedding=embedding, + ) + + superclasses = node.child_by_field_name("superclasses") + if superclasses: + for child in superclasses.children: + if child.type == "identifier": + parent = child.text.decode("utf8") + session.run( + "MERGE (parent:Class {name: $parent})", parent=parent + ) + session.run( + "MATCH (parent:Class {name: $parent}), (child:Class {name: $child}) " + "MERGE (child)-[:INHERITS_FROM]->(parent)", + parent=parent, + child=class_name, + ) + + for child in node.children: + self._extract_code_elements(child, session, file_path, class_name) + + elif node.type == "function_definition": + func_name = node.child_by_field_name("name").text.decode("utf8") + func_code = node.text.decode("utf8", errors="ignore") + + params_node = node.child_by_field_name("parameters") + params = [] + if params_node: + for child in params_node.children: + if child.type == "identifier": + params.append(child.text.decode("utf8")) + + code_hash = hashlib.md5(func_code.encode()).hexdigest() + + # Generate embedding for function code + embedding = self._get_embedding(func_code) if self.bedrock_client else None + + session.run( + "MERGE (f:Function {name: $name, file: $file}) " + "SET f.code = $code, f.code_hash = $hash, f.parameters = $params, f.line_start = $start, f.line_end = $end, f.embedding = $embedding", + name=func_name, + file=file_path, + code=func_code, + hash=code_hash, + params=params, + start=node.start_point[0], + end=node.end_point[0], + embedding=embedding, + ) + + if current_class: + session.run( + "MATCH (c:Class {name: $class_name}), (f:Function {name: $func_name, file: $file}) " + "MERGE (c)-[:CONTAINS]->(f)", + class_name=current_class, + func_name=func_name, + file=file_path, + ) + + # Extract function calls + self._extract_function_calls(node, session, func_name, file_path) + + else: + for child in node.children: + self._extract_code_elements(child, session, file_path, current_class) + + def _analyze_non_python_file(self, file_path, session): + """Analyze non-Python files (basic content indexing)""" + try: + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + # Create a File node for non-Python files + embedding = ( + self._get_embedding(content[:5000]) if self.bedrock_client else None + ) + + session.run( + "MERGE (f:File {path: $path}) SET f.content = $content, f.size = $size, f.type = $type, f.embedding = $embedding", + path=file_path, + content=content[:5000], + size=len(content), + type=os.path.splitext(file_path)[1], + embedding=embedding, + ) + + except Exception as e: + print(f"Error reading {file_path}: {e}") + # Create File node without content + session.run( + "MERGE (f:File {path: $path}) SET f.error = $error, f.type = $type", + path=file_path, + error=str(e), + type=os.path.splitext(file_path)[1], + ) + + def _extract_function_calls(self, func_node, session, caller_name, file_path): + """Extract function calls from a function body""" + + def find_calls(node): + calls = [] + if node.type == "call": + func_expr = node.child_by_field_name("function") + if func_expr and func_expr.type == "identifier": + called_func = func_expr.text.decode("utf8") + calls.append(called_func) + elif func_expr and func_expr.type == "attribute": + # Handle method calls like obj.method() + attr = func_expr.child_by_field_name("attribute") + if attr: + called_func = attr.text.decode("utf8") + calls.append(called_func) + + for child in node.children: + calls.extend(find_calls(child)) + return calls + + called_functions = find_calls(func_node) + + for called_func in called_functions: + # Create CALLS relationship + session.run( + "MATCH (caller:Function {name: $caller, file: $file}) " + "MERGE (called:Function {name: $called}) " + "MERGE (caller)-[:CALLS]->(called)", + caller=caller_name, + called=called_func, + file=file_path, + ) + + def _get_embedding(self, text): + """Generate embedding using AWS Bedrock Titan model""" + try: + response = self.bedrock_client.invoke_model( + modelId=self.embd_id, body=json.dumps({"inputText": text}) + ) + return json.loads(response["body"].read())["embedding"] + except Exception as e: + print(f"Error generating embedding: {e}") + return None diff --git a/jupyter_ai_personas/knowledge_graph/code_analysis_tool.py b/jupyter_ai_personas/knowledge_graph/code_analysis_tool.py new file mode 100644 index 0000000..11e93d4 --- /dev/null +++ b/jupyter_ai_personas/knowledge_graph/code_analysis_tool.py @@ -0,0 +1,123 @@ +from agno.tools import Toolkit +from .bulk_analyzer import BulkCodeAnalyzer +from neo4j import GraphDatabase +import ast +import os + + +class CodeAnalysisTool(Toolkit): + def __init__(self): + super().__init__(name="code_analysis") + # Use environment variables for Neo4j credentials with defaults + neo4j_uri = os.getenv("NEO4J_URI", "neo4j://localhost:7687") + neo4j_user = os.getenv("NEO4J_USER", "neo4j") + neo4j_password = os.getenv("NEO4J_PASSWORD") + + if not neo4j_password: + raise ValueError("NEO4J_PASSWORD environment variable must be set") + + self.driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password)) + self.register(self.get_class_info) + self.register(self.find_related_classes) + self.register(self.query_code) + self.register(self.get_function_code) + + def get_class_info(self, class_name: str) -> str: + """Get detailed information about a class from the knowledge graph""" + try: + with self.driver.session() as session: + # Get class info + class_result = session.run( + "MATCH (c:Class {name: $class_name}) RETURN c.file as file", + class_name=class_name, + ) + class_record = class_result.single() + if not class_record: + return f"Class {class_name} not found in knowledge graph" + + inherit_result = session.run( + "MATCH (c:Class {name: $class_name})-[:INHERITS_FROM]->(parent:Class) " + "RETURN parent.name as parent_name", + class_name=class_name, + ) + parents = [record["parent_name"] for record in inherit_result] + + method_result = session.run( + "MATCH (c:Class {name: $class_name})-[:CONTAINS]->(f:Function) " + "RETURN f.name as method_name, f.parameters as params", + class_name=class_name, + ) + methods = [ + (record["method_name"], record["params"]) + for record in method_result + ] + + info = f"Class {class_name}:\n" + info += f" File: {class_record['file']}\n" + if parents: + info += f" Inherits from: {', '.join(parents)}\n" + info += f" Methods:\n" + for method_name, params in methods: + param_str = ", ".join(params) if params else "" + info += f" {method_name}({param_str})\n" + + return info + except Exception as e: + return f"Error getting class info: {str(e)}" + + def get_function_code(self, function_name: str, class_name: str = None) -> str: + """Get the source code of a function from the knowledge graph""" + try: + with self.driver.session() as session: + # Query function with code directly + if class_name: + result = session.run( + "MATCH (c:Class {name: $class_name})-[:CONTAINS]->(f:Function {name: $function_name}) " + "RETURN f.code as code, f.file as file, f.line_start as line_start, f.line_end as line_end", + class_name=class_name, + function_name=function_name, + ) + else: + result = session.run( + "MATCH (f:Function {name: $function_name}) " + "RETURN f.code as code, f.file as file, f.line_start as line_start, f.line_end as line_end", + function_name=function_name, + ) + + record = result.single() + if not record: + return f"Function {function_name} not found" + + # If code is stored directly on the function node + if record["code"]: + return f"Function {function_name} code:\n{record['code']}" + + except Exception as e: + return f"Error getting function code: {str(e)}" + + def find_related_classes(self, class_name: str) -> str: + """Find all classes that inherit from the given class""" + try: + with self.driver.session() as session: + result = session.run( + "MATCH (related:Class)-[:INHERITS_FROM*]->(c:Class {name: $class_name}) " + "RETURN related.name as related_class", + class_name=class_name, + ) + related = [record["related_class"] for record in result] + if related: + return f"Classes that inherit from {class_name}: {', '.join(related)}" + else: + return f"No classes inherit from {class_name}" + except Exception as e: + return f"Error finding related classes: {str(e)}" + + def query_code(self, query: str) -> str: + """Execute custom Cypher queries on the code knowledge graph""" + try: + with self.driver.session() as session: + result = session.run(query) + records = [dict(record) for record in result] + return str(records) if records else "No results found" + except Exception as e: + return f"Query error: {str(e)}" \ No newline at end of file diff --git a/jupyter_ai_personas/knowledge_graph/schema_validator.py b/jupyter_ai_personas/knowledge_graph/schema_validator.py new file mode 100644 index 0000000..121800a --- /dev/null +++ b/jupyter_ai_personas/knowledge_graph/schema_validator.py @@ -0,0 +1,116 @@ +from neo4j import GraphDatabase + + +class SchemaValidator: + def __init__(self, uri, auth): + self.driver = GraphDatabase.driver(uri, auth=auth) + + def get_actual_schema(self): + """Get the actual schema from Neo4j database""" + with self.driver.session() as session: + # node labels and their properties + node_result = session.run(""" + CALL db.schema.nodeTypeProperties() + YIELD nodeType, propertyName, propertyTypes + RETURN nodeType, collect(propertyName) as properties + """) + + # relationship types + rel_result = session.run(""" + CALL db.schema.relTypeProperties() + YIELD relType + RETURN DISTINCT relType + """) + + # relationship patterns + pattern_result = session.run(""" + MATCH (a)-[r]->(b) + RETURN DISTINCT labels(a)[0] as from_label, type(r) as rel_type, labels(b)[0] as to_label + LIMIT 20 + """) + + nodes = {record["nodeType"]: record["properties"] for record in node_result} + relationships = [record["relType"] for record in rel_result] + patterns = [ + (record["from_label"], record["rel_type"], record["to_label"]) + for record in pattern_result + ] + + return { + "nodes": nodes, + "relationships": relationships, + "patterns": patterns, + } + + def generate_schema_info(self): + """Generate schema information string for agents""" + schema = self.get_actual_schema() + + info = "ACTUAL DATABASE SCHEMA:\n\n" + + # Node types and properties + info += "NODES:\n" + for node_type, properties in schema["nodes"].items(): + info += f"- {node_type}: properties {{{', '.join(properties)}}}\n" + + # Relationships + info += f"\nRELATIONSHIPS:\n" + for rel in schema["relationships"]: + info += f"- {rel}\n" + + # Relationship patterns + info += f"\nVALID PATTERNS:\n" + for from_label, rel_type, to_label in schema["patterns"]: + info += f"- ({from_label})-[:{rel_type}]->({to_label})\n" + + info += self._get_sample_files() + + # examples + info += f"\nEXAMPLE QUERIES:\n" + if schema["patterns"]: + pattern = schema["patterns"][0] + info += f"- MATCH ({pattern[0].lower()}:{pattern[0]})-[:{pattern[1]}]->({pattern[2].lower()}:{pattern[2]}) RETURN {pattern[0].lower()}.name\n" + + return info + + def _get_sample_files(self): + """Get sample files in the database""" + with self.driver.session() as session: + # Python files (Class/Function nodes with 'file' property) + py_result = session.run(""" + MATCH (n) + WHERE n.file IS NOT NULL + RETURN DISTINCT n.file as file, labels(n) as labels + LIMIT 5 + """) + + # Check for other files (File nodes with 'path' property) + file_result = session.run(""" + MATCH (f:File) + RETURN DISTINCT f.path as file, f.type as type + LIMIT 5 + """) + + info = "\nFILES IN DATABASE:\n" + + py_files = list(py_result) + other_files = list(file_result) + + if py_files: + info += "Python files (Class/Function nodes):\n" + for record in py_files: + info += f"- {record['file']} ({', '.join(record['labels'])})\n" + + if other_files: + info += "Other files (File nodes):\n" + for record in other_files: + info += f"- {record['file']} ({record['type']})\n" + + if not py_files and not other_files: + info += "- No files found in database\n" + + info += "\nQUERY PATTERNS:\n" + info += "- For Python: MATCH (n:Class) WHERE n.file CONTAINS 'filename' RETURN n.name\n" + info += "- For Other files: MATCH (f:File) WHERE f.path CONTAINS 'filename' RETURN f.path\n" + + return info diff --git a/jupyter_ai_personas/pr_creation_persona/README.md b/jupyter_ai_personas/pr_creation_persona/README.md new file mode 100644 index 0000000..6918c78 --- /dev/null +++ b/jupyter_ai_personas/pr_creation_persona/README.md @@ -0,0 +1,130 @@ +# PR Creation Persona + +A specialized AI assistant for analyzing GitHub issues and implementing code fixes with automated git operations. + +## Overview + +The PR Creation Persona coordinates a team of specialized agents to: +1. **Analyze Issues** - Parse requirements and understand scope +2. **Design Architecture** - Plan minimal solution architecture +3. **Implement Code** - Write focused code that addresses the issue +4. **Manage Git Operations** - Handle cloning, branching, committing, and pushing + +## Key Features + +### Clear Task Separation +- **Issue Analysis Agent**: Parses requirements and analyzes repository context +- **Architecture Designer**: Plans implementation strategy and file changes +- **Code Implementer**: Writes minimal, focused code following existing patterns +- **Git Manager**: Handles all git operations including branch creation and pushing + +### Repository Context Awareness +- Uses knowledge graph analysis to understand codebase structure +- Identifies existing patterns and conventions +- Analyzes dependencies and relationships +- Finds similar implementations for reference + +### Minimal Code Implementation +- Writes only the absolute minimum code needed +- Follows existing code patterns and style +- Focuses specifically on issue requirements +- Avoids verbose or unnecessary implementations + +### Complete Git Workflow +- Uses Agno's ShellTools for git operations +- Clones main branch automatically +- Creates descriptive feature branches +- Commits changes with clear messages +- Pushes to remote branch (does NOT create PR) + +## Usage + +Provide an issue description along with a GitHub repository URL: + +``` +Analyze this issue and implement a fix: + +Repository: https://github.com/user/repo +Issue: Add validation to user input in the login form to prevent SQL injection + +The login form currently accepts any input without validation... +``` + +### Using with Local Repository + +If you're already working on a repository locally, you can set the `LOCAL_REPO_PATH` environment variable to use your existing clone: + +```bash +# Set the environment variable to your local repository path +export LOCAL_REPO_PATH=/path/to/your/local/repo + +# Then use the persona as normal - it will use your local repository +# instead of cloning a new one +``` + +If the specified `LOCAL_REPO_PATH` doesn't exist but the environment variable is set, the persona will automatically clone the repository to that location when processing an issue. This allows you to specify where you want the repository to be cloned without having to create the directory structure yourself. + +## Workflow + +### Phase 1: Issue Analysis +- Extracts issue requirements and acceptance criteria +- Uses KG queries to understand repository structure +- Identifies affected components and files +- Assesses scope and complexity + +### Phase 2: Architecture Design +- Designs minimal solution architecture +- Plans file structure and organization +- Defines implementation strategy +- Creates detailed file-by-file changes plan + +### Phase 3: Code Implementation +- Uses Agno's ShellTools and FileTools +- Sets up repository and creates feature branch +- Implements code following the architecture plan +- Writes minimal, focused code addressing the issue +- Maintains consistency with existing patterns + +### Phase 4: Git Operations +- Uses standard git commands via ShellTools +- Commits changes with descriptive messages +- Pushes feature branch to remote repository +- Provides branch information for manual PR creation +- Does NOT automatically create pull requests + +## Requirements + +- GitHub personal access token in `GITHUB_ACCESS_TOKEN` environment variable +- Neo4j database running on `neo4j://127.0.0.1:7687` +- AWS credentials configured for Bedrock access + +## Environment Variables + +- `GITHUB_ACCESS_TOKEN`: Required. GitHub personal access token for repository access. +- `LOCAL_REPO_PATH`: Optional. Path to an existing local repository clone. When provided, the persona will use this local repository instead of cloning a new one, which is ideal for developers already working on the repository. + +## Output + +The persona provides: +- Detailed issue analysis and requirements +- Solution architecture and implementation plan +- Code implementation with explanations +- Git operations summary with branch information +- Instructions for manual PR creation + +## Integration + +The persona integrates with: +- **Knowledge Graph**: For codebase analysis and context +- **GitHub API**: For repository access and metadata +- **Agno ShellTools**: For git operations and command execution +- **Agno FileTools**: For code creation and modification + +## Best Practices + +- Always analyzes repository context before implementation +- Maintains clear separation between analysis and implementation +- Writes minimal, focused code addressing specific issues +- Follows existing code patterns and conventions +- Provides complete git workflow without creating PRs +- Ensures proper error handling and validation \ No newline at end of file diff --git a/jupyter_ai_personas/pr_creation_persona/__init__.py b/jupyter_ai_personas/pr_creation_persona/__init__.py new file mode 100644 index 0000000..7bea227 --- /dev/null +++ b/jupyter_ai_personas/pr_creation_persona/__init__.py @@ -0,0 +1,5 @@ +"""PR Creation Persona for analyzing issues and implementing fixes.""" + +from .persona import PRCreationPersona + +__all__ = ["PRCreationPersona"] \ No newline at end of file diff --git a/jupyter_ai_personas/pr_creation_persona/enhanced_file_tools.py b/jupyter_ai_personas/pr_creation_persona/enhanced_file_tools.py new file mode 100644 index 0000000..d57858b --- /dev/null +++ b/jupyter_ai_personas/pr_creation_persona/enhanced_file_tools.py @@ -0,0 +1,123 @@ +"""Enhanced file tools with structure awareness.""" + +import os +from agno.tools.file import FileTools as AgnoFileTools +from agno.agent import Agent + +class EnhancedFileTools(AgnoFileTools): + """Enhanced file tools with structure awareness.""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.repo_structure_tools = None + self.repo_path = None + + def set_repo_path(self, repo_path): + """Set the repository path for structure analysis.""" + self.repo_path = repo_path + + def set_structure_tools(self, structure_tools): + """Set the repository structure tools instance.""" + self.repo_structure_tools = structure_tools + + def write_file(self, agent: Agent, path: str, content: str) -> str: + """ + Write content to a file with structure validation. + + Args: + agent: The agent instance + path: Path to the file + content: Content to write + + Returns: + str: Result of the operation + """ + # Validate file path if repo_structure_tools and repo_path are available + if self.repo_structure_tools and self.repo_path: + try: + # Validate the path + validation_result = self.repo_structure_tools.validate_file_path(agent, path, self.repo_path) + + # Check if path is directly in root + if validation_result.startswith("WARNING:"): + # Extract suggested path from validation result + suggested_path = validation_result.split("Consider using: ")[1].strip() + print(f"WARNING: File would be created in repository root. Using suggested path: {suggested_path}") + path = suggested_path + except Exception as e: + print(f"Warning: Error validating file path: {str(e)}") + + # Ensure parent directories exist + try: + parent_dir = os.path.dirname(path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + print(f"Created parent directories for: {path}") + except Exception as e: + print(f"Warning: Error creating parent directories: {str(e)}") + + # Call the original write_file method + return super().write_file(agent, path, content) + + def append_file(self, agent: Agent, path: str, content: str) -> str: + """ + Append content to a file with structure validation. + + Args: + agent: The agent instance + path: Path to the file + content: Content to append + + Returns: + str: Result of the operation + """ + # Validate file path if repo_structure_tools and repo_path are available + if self.repo_structure_tools and self.repo_path: + try: + # Validate the path + validation_result = self.repo_structure_tools.validate_file_path(agent, path, self.repo_path) + + # Check if path is directly in root + if validation_result.startswith("WARNING:"): + # Extract suggested path from validation result + suggested_path = validation_result.split("Consider using: ")[1].strip() + print(f"WARNING: File would be created in repository root. Using suggested path: {suggested_path}") + path = suggested_path + except Exception as e: + print(f"Warning: Error validating file path: {str(e)}") + + # Ensure parent directories exist + try: + parent_dir = os.path.dirname(path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + print(f"Created parent directories for: {path}") + except Exception as e: + print(f"Warning: Error creating parent directories: {str(e)}") + + # Call the original append_file method + return super().append_file(agent, path, content) + + def suggest_file_path(self, agent: Agent, file_name: str, component_type: str) -> str: + """ + Suggest appropriate file path based on repository patterns. + + Args: + agent: The agent instance + file_name: Name of the file to create + component_type: Type of component (model, view, controller, etc.) + + Returns: + str: Suggested file path + """ + if self.repo_structure_tools and self.repo_path: + try: + return self.repo_structure_tools.suggest_file_path(agent, file_name, component_type, self.repo_path) + except Exception as e: + print(f"Warning: Error suggesting file path: {str(e)}") + + # Default path if suggestion fails + if self.base_path: + return os.path.join(self.base_path, file_name) + else: + return file_name \ No newline at end of file diff --git a/jupyter_ai_personas/pr_creation_persona/persona.py b/jupyter_ai_personas/pr_creation_persona/persona.py new file mode 100644 index 0000000..4131ec9 --- /dev/null +++ b/jupyter_ai_personas/pr_creation_persona/persona.py @@ -0,0 +1,1181 @@ +import os +import re +import tempfile +import subprocess +from jupyter_ai.personas.base_persona import BasePersona, PersonaDefaults +from jupyterlab_chat.models import Message +from jupyter_ai.history import YChatHistory +from agno.agent import Agent +from agno.models.aws import AwsBedrock +import boto3 +from agno.tools.github import GithubTools +from agno.tools.reasoning import ReasoningTools +from agno.tools.file import FileTools +from .enhanced_file_tools import EnhancedFileTools +from agno.tools.shell import ShellTools +from langchain_core.messages import HumanMessage +from agno.tools.python import PythonTools +from agno.team.team import Team +from .template import PRCreationPersonaVariables, PR_CREATION_PROMPT_TEMPLATE +from .repo_structure_tools import RepoStructureTools +import sys +sys.path.append('../knowledge_graph') +from jupyter_ai_personas.knowledge_graph.bulk_analyzer import BulkCodeAnalyzer +from jupyter_ai_personas.pr_review_persona.repo_analysis_tools import RepoAnalysisTools +from jupyter_ai_personas.task_master import TaskMasterClient, PRDAgent, Task +from jupyter_ai_personas.task_master.task_agent import TaskExecutionAgent + +session = boto3.Session() + +class PRCreationPersona(BasePersona): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.shared_analyzer = None + self.taskmaster = None + self.prd_agent = None + self.current_prd = None + self.current_tasks = [] + self.current_repo_url = None + self.current_issue_url = None + self.local_repo_path = os.getenv("LOCAL_REPO_PATH", None) + self.feature_branch = None + self.folder_map = {} # Maps component types to appropriate folders + + @property + def defaults(self): + return PersonaDefaults( + name="PRCreationPersona", + avatar_path="/api/ai/static/jupyternaut.svg", + description="A specialized assistant for analyzing issues and implementing code fixes with automated git operations.", + system_prompt="You are a PR creation assistant that analyzes issues, designs solutions, and implements fixes with proper git workflow.", + ) + + def analyze_repository_structure(self, repo_path): + """Analyze repository structure and create folder map for component placement.""" + if not repo_path or not os.path.exists(repo_path): + print(f"Repository path does not exist: {repo_path}") + return + + try: + # Create RepoStructureTools instance + structure_tools = RepoStructureTools() + + # Analyze folder structure + print("Analyzing repository folder structure...") + structure_analysis = structure_tools.analyze_folder_structure(None, repo_path) + print(structure_analysis) + + # Get component placement map + print("\nGenerating component placement map...") + placement_map = structure_tools.get_component_placement_map(None, repo_path) + print(placement_map) + + # Analyze project templates + print("\nAnalyzing project templates...") + template_analysis = structure_tools.analyze_project_templates(None, repo_path) + print(template_analysis) + + # Parse the placement map to update the folder_map attribute + for line in placement_map.split('\n'): + if ': ' in line and line.startswith('- '): + comp_type, folder = line.replace('- ', '').split(': ', 1) + self.folder_map[comp_type.strip()] = folder.strip() + + print(f"\nFolder map updated: {self.folder_map}") + + return { + "structure_analysis": structure_analysis, + "placement_map": placement_map, + "template_analysis": template_analysis, + "folder_map": self.folder_map + } + + except Exception as e: + print(f"Error analyzing repository structure: {str(e)}") + return None + + def initialize_team(self, system_prompt): + model_id = self.config_manager.lm_provider_params["model_id"] + github_token = os.getenv("GITHUB_ACCESS_TOKEN") + if not github_token: + raise ValueError("GITHUB_ACCESS_TOKEN environment variable is not set. Please set it with a plain GitHub personal access token.") + + # Issue Analysis Agent + issue_analyzer = Agent( + name="issue_analyzer", + role="Issue Analysis Specialist", + model=AwsBedrock(id=model_id, session=session), + markdown=True, + instructions=[ + "MANDATORY ISSUE ANALYSIS WORKFLOW - Follow these steps:", + + "STEP 1 - Parse Issue Requirements:", + " - Extract issue description and requirements", + " - Identify problem statement and expected behavior", + " - Determine scope and complexity", + " - List acceptance criteria", + + "STEP 2 - Repository Context Analysis:", + " - Use KG queries to understand codebase structure", + " - Identify relevant files and components", + " - Find similar patterns or existing implementations", + " - Analyze dependencies and relationships", + + "STEP 3 - Impact Assessment:", + " - Determine which files need modification", + " - Identify potential breaking changes", + " - Consider testing requirements", + " - Plan integration points", + + "OUTPUT: Structured analysis with clear requirements and affected components" + ], + tools=[RepoAnalysisTools(), ReasoningTools(add_instructions=True, think=True, analyze=True)] + ) + + # Architecture Designer Agent + architect = Agent( + name="architect", + role="Solution Architecture Designer", + model=AwsBedrock(id=model_id, session=session), + markdown=True, + instructions=[ + "MANDATORY ARCHITECTURE DESIGN WORKFLOW:", + + "STEP 1 - Solution Design:", + " - Design minimal solution architecture", + " - Plan file structure and organization", + " - Define interfaces and contracts", + " - Consider existing patterns and conventions", + + "STEP 2 - Repository Structure Analysis:", + " - Analyze existing folder structure patterns", + " - Identify where similar components are typically placed", + " - Create a folder map to guide file placement decisions", + " - Determine appropriate locations for new files", + + "STEP 3 - Implementation Strategy:", + " - Break down into implementable components", + " - Define clear separation of concerns", + " - Plan error handling and edge cases", + " - Consider performance implications", + " - SPECIFY EXACT FILE PATHS for all new/modified files", + + "STEP 4 - Integration Planning:", + " - Plan how new code integrates with existing", + " - Identify required imports and dependencies", + " - Consider backward compatibility", + " - Plan testing approach", + + "OUTPUT: Detailed implementation plan with file-by-file changes and EXACT file paths" + ], + tools=[ + RepoAnalysisTools(), + RepoStructureTools(), + ReasoningTools(add_instructions=True, think=True, analyze=True) + ] + ) + + # Code Implementation Agent + code_implementer = Agent( + name="code_implementer", + role="Code Implementation Specialist", + model=AwsBedrock(id=model_id, session=session), + markdown=True, + instructions=[ + "MANDATORY CODE IMPLEMENTATION WORKFLOW:", + + "STEP 1 - Repository Setup:", + f" - {'Use existing repository at ' + self.local_repo_path if self.local_repo_path else 'Clone repository using shell commands'}", + " - If using existing repo: fetch latest changes", + f" - Use the feature branch: {self.feature_branch if self.feature_branch else 'feature/issue-description'}", + " - Check if branch exists: git branch --list ", + " - If exists: git checkout ", + " - If not exists: git checkout -b from main/master", + " - Verify current codebase state", + + "STEP 2 - Repository Structure Analysis:", + " - Analyze existing folder structure patterns", + " - Identify where similar components are typically placed", + " - Create a folder map to guide file placement decisions", + " - Determine appropriate locations for new files", + + "STEP 3 - Code Implementation:", + " - Write MINIMAL code that addresses the issue", + " - Follow existing code patterns and style", + " - Implement proper error handling", + " - Focus ONLY on the specific issue requirements", + + "STEP 4 - File Operations:", + " - Create/modify files using FileTools", + " - NEVER create files in the repository root", + " - Always follow project structure conventions", + " - Ensure parent directories exist before file creation", + " - Validate file paths against project patterns", + + "CRITICAL REQUIREMENTS:", + "- Write ONLY the minimal code needed", + "- Follow existing patterns exactly", + "- Place files in appropriate directories", + "- NO verbose implementations", + "- Focus on the specific issue only" + ], + tools=[ + ShellTools(), + EnhancedFileTools(), + PythonTools(), + RepoAnalysisTools(), + RepoStructureTools(), + ReasoningTools(add_instructions=True, think=True, analyze=True) + ] + ) + + # Git Operations Agent + git_manager = Agent( + name="git_manager", + role="Git Operations Manager", + model=AwsBedrock(id=model_id, session=session), + markdown=True, + instructions=[ + "MANDATORY GIT WORKFLOW:", + + "STEP 1 - Repository Operations:", + " - Use shell commands for git operations", + f" - {'Use existing repository at ' + self.local_repo_path if self.local_repo_path else 'Clone main branch from repository'}", + " - If using existing repo: fetch latest changes and checkout main branch", + " - Verify repository state and structure", + + "STEP 2 - Branch Management:", + f" - Use the feature branch: {self.feature_branch if self.feature_branch else 'feature/issue-description'}", + " - Check if branch exists: git branch --list ", + " - If exists: git checkout ", + " - If not exists: git checkout -b ", + + "STEP 3 - Commit Operations:", + " - Stage files: git add .", + " - Create clear commit messages: git commit -m 'description'", + " - Follow conventional commit format if used in repo", + + "STEP 4 - Push Operations:", + " - Push feature branch: git push -u origin branch-name", + " - DO NOT create pull request (user will do manually)", + + "CRITICAL REQUIREMENTS:", + "- NEVER create pull requests automatically", + "- Always push to feature branch, never main", + "- Use clear, descriptive commit messages" + ], + tools=[ + ShellTools(), + GithubTools(get_pull_requests=True, get_pull_request_changes=True), + ReasoningTools(add_instructions=True, think=True, analyze=True) + ] + ) + + # Configure EnhancedFileTools with repository structure information + if self.local_repo_path and os.path.exists(self.local_repo_path): + # Get the EnhancedFileTools instance from code_implementer + for tool in code_implementer.tools: + if isinstance(tool, EnhancedFileTools): + # Set repository path + tool.set_repo_path(self.local_repo_path) + # Set structure tools + tool.set_structure_tools(RepoStructureTools()) + print(f"Configured EnhancedFileTools with repository path: {self.local_repo_path}") + break + + # Create the coordinating team + pr_creation_team = Team( + name="pr-creation-team", + mode="coordinate", + members=[issue_analyzer, architect, code_implementer, git_manager], + model=AwsBedrock(id=model_id, session=session), + instructions=[ + "Coordinate PR creation process with clear separation of tasks:", + + "PHASE 1 - ANALYSIS (Issue Analyzer):", + " - Parse and understand the issue requirements", + " - Analyze repository context using KG queries", + " - Identify affected components and scope", + " - Provide structured analysis to team", + + "PHASE 2 - ARCHITECTURE (Architect):", + " - Wait for issue analysis completion", + " - Design minimal solution architecture", + " - Plan implementation strategy", + " - Create detailed file-by-file implementation plan", + + "PHASE 3 - IMPLEMENTATION (Code Implementer):", + " - Wait for architecture design completion", + " - Set up repository and create branch", + " - Implement code changes following the plan", + " - Write ONLY minimal code addressing the issue", + " - Ensure code follows existing patterns", + + "PHASE 4 - GIT OPERATIONS (Git Manager):", + " - Wait for code implementation completion", + " - Commit changes with clear messages", + " - Push feature branch to remote", + " - Provide branch information for manual PR creation", + + "COORDINATION RULES:", + "- Each phase must complete before next begins", + "- Share context and findings between agents", + "- Maintain focus on minimal, targeted solutions", + "- Ensure proper git workflow throughout", + + "Chat history: " + system_prompt + ], + markdown=True, + show_members_responses=True, + enable_agentic_context=True, + add_datetime_to_instructions=True, + tools=[ + ShellTools(), + FileTools(), + GithubTools(get_pull_requests=True, get_pull_request_changes=True), + RepoAnalysisTools(), + ReasoningTools(add_instructions=True, think=True, analyze=True) + ] + ) + + return pr_creation_team + + def _initialize_taskmaster(self): + """Initialize the TaskMaster client and PRD agent if not already done.""" + if self.taskmaster is None: + self.taskmaster = TaskMasterClient() + + if self.prd_agent is None: + model_id = self.config_manager.lm_provider_params["model_id"] + session = boto3.Session() + self.prd_agent = PRDAgent(model_id=model_id, session=session) + + def _parse_command(self, message: str): + """Parse user command to determine if it's a task-related command.""" + # Check for issue URL + issue_match = re.search(r'(https://github\.com/[^/\s]+/[^/\s]+/issues/\d+)', message) + if issue_match: + return { + "action": "process_issue", + "issue_url": issue_match.group(1) + } + + # Check for task details command + task_details_match = re.search(r'(?:show|get|display)\s+task\s+(?:details|info)?\s+(?:for|of)?\s*[#]?(\d+)', message, re.IGNORECASE) + if task_details_match: + return { + "action": "show_task_details", + "task_id": task_details_match.group(1) + } + + # Check for implement task command + impl_task_match = re.search(r'implement\s+task\s*[#]?(\d+)', message, re.IGNORECASE) + if impl_task_match: + return { + "action": "implement_task", + "task_id": impl_task_match.group(1) + } + + # Check for list tasks command + if re.search(r'(?:list|show|get)\s+(?:all\s+)?tasks', message, re.IGNORECASE): + return { + "action": "list_tasks" + } + + # Check for create PR command + if re.search(r'(?:create|make)\s+(?:a\s+)?(?:PR|pull\s*request)(?:\s+for\s+(?:completed|done)\s+tasks)?', message, re.IGNORECASE): + return { + "action": "create_pr" + } + + # Default to standard PR creation + return { + "action": "standard_pr_creation" + } + + async def _process_issue(self, issue_url: str): + """Process a GitHub issue to create PRD and tasks.""" + try: + print(f"Processing issue: {issue_url}") + self._initialize_taskmaster() + + # Extract repo URL from issue URL + repo_match = re.search(r'(https://github\.com/[^/]+/[^/]+)/', issue_url) + if repo_match: + self.current_repo_url = repo_match.group(1) + ".git" + print(f"Extracted repo URL: {self.current_repo_url}") + + self.current_issue_url = issue_url + + # Create a consistent feature branch name based on the issue number + issue_number = issue_url.split('/')[-1] + self.feature_branch = f"feature/issue-{issue_number}" + print(f"Using feature branch: {self.feature_branch} for all tasks") + + # If LOCAL_REPO_PATH is set but doesn't exist, clone the repository there + if self.local_repo_path and self.current_repo_url and not os.path.exists(self.local_repo_path): + print(f"LOCAL_REPO_PATH is set to {self.local_repo_path} but doesn't exist. Will clone repository there.") + # The actual cloning will happen in _validate_local_repo when called + + # Create PRD from issue + print("Creating PRD from issue...") + raw_prd = await self.prd_agent.create_prd_from_issue(issue_url) + + # Check for and remove any repetition in the PRD + # This can happen if the model generates multiple PRDs for the same issue + if "# Product Requirements Document" in raw_prd: + # Find all occurrences of PRD headers + prd_headers = [m.start() for m in re.finditer(r'# Product Requirements Document', raw_prd)] + if len(prd_headers) > 1: + # Keep only the first PRD (up to the second header) + self.current_prd = raw_prd[:prd_headers[1]].strip() + print("Detected and removed duplicate PRD content") + else: + self.current_prd = raw_prd + else: + self.current_prd = raw_prd + + print(f"PRD created successfully! Length: {len(self.current_prd)} chars") + + # Save PRD to file for debugging + with open("generated_prd.md", "w") as f: + f.write(self.current_prd) + print("PRD saved to generated_prd.md") + + # Create tasks from PRD + print("Creating tasks from PRD...") + self.current_tasks = await self.taskmaster.create_tasks_from_prd(self.current_prd) + print(f"Created {len(self.current_tasks)} tasks successfully!") + + # Auto-analyze repository + print("Auto-analyzing repository...") + self._auto_analyze_repo(issue_url) + print("Repository analysis complete") + + # Get available tasks (no dependencies) + available_tasks = self.taskmaster.get_available_tasks() + print(f"Found {len(available_tasks)} available tasks with no dependencies") + + # Format response - SIMPLIFIED to show only PRD and available tasks + response = f"## Issue Processed Successfully\n\n" + response += f"Issue URL: {issue_url}\n\n" + response += f"### PRD\n\n" + response += f"{self.current_prd}\n\n" # Show full PRD + + # Only show available tasks with no dependencies + response += f"### Available Tasks\n" + if available_tasks: + response += f"These tasks have no dependencies and can be implemented immediately:\n\n" + # Only show title and description for each task + for task in available_tasks: + response += f"**Task #{task.id}: {task.title}**\n" + response += f"Description: {task.description}\n\n" + + # Add quick links to details and implementation + response += f"\n\nCommands:\n" + response += f"- 'show task details for #ID' to see implementation details of a specific task\n" + response += f"- 'implement task #ID' to implement a specific task\n" + response += f"- 'list tasks' to see all tasks\n" + else: + response += f"No tasks are currently ready for implementation.\n" + + return response + + except Exception as e: + import traceback + error_trace = traceback.format_exc() + print(f"Error in _process_issue: {e}\n{error_trace}") + return f"## Error Processing Issue\n\nAn error occurred while processing the issue: {str(e)}\n\nPlease try again or contact support." + + async def _show_task_details(self, task_id: str): + """Show details for a specific task, including implementation details.""" + try: + self._initialize_taskmaster() + + if not self.current_tasks: + return "No tasks available. Please process an issue first." + + task = self.taskmaster.get_task_by_id(task_id) + if not task: + return f"Task with ID {task_id} not found." + + # Format the task with full details + response = f"## Task #{task_id} Details\n\n" + response += f"**{task.title}**\n\n" + response += f"**Description:** {task.description}\n\n" + response += f"**Priority:** {task.priority}\n" + response += f"**Status:** {task.status}\n\n" + + # Show dependencies + if task.dependencies: + response += f"**Dependencies:**\n" + for dep_id in task.dependencies: + dep_task = self.taskmaster.get_task_by_id(dep_id) + status = "✅ Completed" if dep_task and dep_task.status == "done" else "⏳ Pending" + response += f"- Task #{dep_id}: {status}\n" + response += "\n" + + # Show implementation details + if task.details: + response += f"**Implementation Details:**\n```\n{task.details}\n```\n\n" + + # Show test strategy + if task.test_strategy: + response += f"**Test Strategy:**\n```\n{task.test_strategy}\n```\n\n" + + # Add implementation option if task is available + available_tasks = self.taskmaster.get_available_tasks() + if task in available_tasks: + response += f"**This task has no unmet dependencies and can be implemented immediately.**\n" + response += f"\nTo implement this task, type: 'implement task #{task_id}'\n" + else: + # Show dependencies that need to be completed first + if task.dependencies: + response += f"\n**This task has dependencies that must be completed first:**\n" + for dep_id in task.dependencies: + dep_task = self.taskmaster.get_task_by_id(dep_id) + if dep_task and dep_task.status != "done": + response += f"- Task #{dep_id}: {dep_task.title} (Status: {dep_task.status})\n" + + return response + + except Exception as e: + import traceback + error_trace = traceback.format_exc() + print(f"Error in _show_task_details: {e}\n{error_trace}") + return f"## Error Showing Task Details\n\nAn error occurred while showing task details: {str(e)}\n\nPlease try again or contact support." + + async def _implement_task(self, task_id: str): + """Implement a specific task using TaskExecutionAgent.""" + try: + print(f"Implementing task #{task_id}...") + self._initialize_taskmaster() + + if not self.current_tasks: + return "No tasks available. Please process an issue first." + + task = self.taskmaster.get_task_by_id(task_id) + if not task: + return f"Task with ID {task_id} not found." + + # Check if task dependencies are met + available_tasks = self.taskmaster.get_available_tasks() + if task not in available_tasks: + # Show which dependencies need to be completed + unmet_deps = [] + for dep_id in task.dependencies: + dep_task = self.taskmaster.get_task_by_id(dep_id) + if dep_task and dep_task.status != "done": + unmet_deps.append(dep_id) + + response = f"## Task {task_id} Has Unmet Dependencies\n\n" + response += f"The following dependencies must be completed first:\n\n" + + for dep_id in unmet_deps: + dep_task = self.taskmaster.get_task_by_id(dep_id) + if dep_task: + response += f"- Task #{dep_id}: {dep_task.title}\n" + response += f" To implement: 'implement task #{dep_id}'\n" + + return response + + print(f"Task #{task_id} is available for implementation") + + # Mark task as in-progress + self.taskmaster.update_task_status(task_id, "in-progress") + + # Create repository context information + repo_context = f"Repository URL: {self.current_repo_url}\n" + if self.feature_branch: + repo_context += f"Feature Branch: {self.feature_branch}\n" + repo_context += "Use this feature branch for all tasks.\n" + if self.local_repo_path: + # Make sure the path is absolute + abs_path = os.path.abspath(self.local_repo_path) + repo_context += f"Local Repository Path: {abs_path}\n" + repo_context += f"IMPORTANT: Save all files to {abs_path}\n" + repo_context += "Use the existing local repository instead of cloning a new one.\n" + print(f"Using local repository path: {abs_path}") + + # Ensure the directory exists + if not os.path.exists(abs_path): + os.makedirs(abs_path, exist_ok=True) + print(f"Created directory: {abs_path}") + + # Clone the repository if it's not already a git repository + if not os.path.exists(os.path.join(abs_path, '.git')): + if self.current_repo_url: + print(f"Cloning repository {self.current_repo_url} to {abs_path}") + try: + # Remove any existing content + if os.path.exists(abs_path) and os.listdir(abs_path): + print("Removing existing content before cloning") + for item in os.listdir(abs_path): + item_path = os.path.join(abs_path, item) + if os.path.isdir(item_path): + import shutil + shutil.rmtree(item_path) + elif os.path.isfile(item_path): + os.remove(item_path) + + # Clone the repository + subprocess.run(["git", "clone", self.current_repo_url, abs_path], check=True, capture_output=True) + print(f"Successfully cloned repository to {abs_path}") + except Exception as e: + print(f"Warning: Failed to clone repository: {e}") + else: + print(f"Warning: No repository URL available to clone. Initializing empty git repository.") + try: + subprocess.run(["git", "init"], cwd=abs_path, check=True, capture_output=True) + except Exception as e: + print(f"Warning: Failed to initialize git repository: {e}") + + # Add PRD context + repo_context += f"\nPRD Context:\n{self.current_prd[:1000]}...\n" + + # Initialize TaskExecutionAgent + model_id = self.config_manager.lm_provider_params["model_id"] + task_agent = TaskExecutionAgent(model_id=model_id, session=session) + + # Execute the task + print(f"Running TaskExecutionAgent for task #{task_id}") + result = await task_agent.execute_task(task, repo_context) + + # Update task status to done + print(f"Updating task #{task_id} status to done") + # First try using the TaskMaster command directly + try: + work_dir = os.getcwd() + print(f"Running: npx task-master set-status --status=done --id={task_id}") + cmd_result = subprocess.run([ + 'npx', 'task-master', 'set-status', + f'--status=done', + f'--id={task_id}' + ], cwd=work_dir, capture_output=True, text=True) + + if cmd_result.returncode == 0: + print(f"Successfully updated task {task_id} status to done via direct command") + else: + print(f"Direct command failed: {cmd_result.stderr}") + # Fall back to using the TaskMasterClient + success = self.taskmaster.update_task_status(task_id, "done") + if not success: + print(f"Warning: Failed to update task status in TaskMaster. Updating in memory only.") + # Update the task status in memory + for t in self.current_tasks: + if t.id == task_id: + t.status = "done" + break + except Exception as e: + print(f"Error updating task status: {e}") + # Fall back to using the TaskMasterClient + self.taskmaster.update_task_status(task_id, "done") + + # Check if new tasks are now available + new_available_tasks = self.taskmaster.get_available_tasks() + newly_available = [t for t in new_available_tasks if t not in available_tasks] + + # Format the response + response = f"## Task #{task_id} Implementation\n\n" + response += f"**{task.title}**\n\n" + response += result + + # Add information about newly available tasks + if newly_available: + response += f"\n\n## New Tasks Available\n\n" + response += f"The following tasks are now available for implementation:\n\n" + response += self.taskmaster.format_tasks_for_agents(newly_available, show_details=False) + response += f"\n\nUse 'show task details for #ID' to see implementation details of a specific task.\n" + + return response + + except Exception as e: + import traceback + error_trace = traceback.format_exc() + print(f"Error in _implement_task: {e}\n{error_trace}") + return f"## Error Implementing Task\n\nAn error occurred while implementing the task: {str(e)}\n\nPlease try again or contact support." + + async def _list_tasks(self): + """List all tasks.""" + try: + self._initialize_taskmaster() + + if not self.current_tasks: + return "No tasks available. Please process an issue first." + + # Get available tasks (no dependencies) + available_tasks = self.taskmaster.get_available_tasks() + + # Get completed tasks + completed_tasks = [t for t in self.current_tasks if t.status == "done"] + + response = f"## All Tasks\n\n" + for task in self.current_tasks: + response += f"**Task #{task.id}: {task.title}**\n" + response += f"Description: {task.description}\n" + response += f"Status: {task.status}\n\n" + + # Add section for available tasks + response += f"\n\n## Ready to Implement Tasks\n" + if available_tasks: + response += f"These tasks have no unmet dependencies and can be implemented immediately:\n\n" + response += self.taskmaster.format_tasks_for_agents(available_tasks, show_details=False) + + # Add quick links to details and implementation + response += f"\n\nCommands:\n" + response += f"- 'show task details for #ID' to see implementation details of a specific task\n" + response += f"- 'implement task #ID' to implement a specific task\n" + else: + response += f"No tasks are currently ready for implementation.\n" + + # Add section for completed tasks if any + if completed_tasks: + response += f"\n\n## Completed Tasks\n" + response += f"These tasks have been completed:\n\n" + for task in completed_tasks: + response += f"- Task #{task.id}: {task.title}\n" + + response += f"\n\nYou can create a PR for these completed tasks by typing: 'create PR for completed tasks'\n" + + return response + + except Exception as e: + import traceback + error_trace = traceback.format_exc() + print(f"Error in _list_tasks: {e}\n{error_trace}") + return f"## Error Listing Tasks\n\nAn error occurred while listing tasks: {str(e)}\n\nPlease try again or contact support." + + async def _create_pr(self): + """Create a PR from completed tasks.""" + try: + self._initialize_taskmaster() + + if not self.current_tasks: + return "No tasks available. Please process an issue first." + + # Get completed tasks + completed_tasks = [t for t in self.current_tasks if t.status == "done"] + + if not completed_tasks: + return "No completed tasks found. Please implement at least one task before creating a PR." + + # Validate local repository + if not self._validate_local_repo(): + return "No valid local repository found. Please set the LOCAL_REPO_PATH environment variable to a valid git repository." + + # Create system prompt with PR details + system_prompt = f""" + Create a Pull Request for the following completed tasks: + + Repository: {self.current_repo_url} + Local Repository Path: {self.local_repo_path} + Feature Branch: {self.feature_branch} + Issue URL: {self.current_issue_url} + + Completed Tasks: + {', '.join([f'#{t.id}: {t.title}' for t in completed_tasks])} + + PRD Context: + {self.current_prd[:500]}... + + IMPORTANT: All tasks have been implemented in the same feature branch ({self.feature_branch}). + """ + + # Initialize git manager agent + model_id = self.config_manager.lm_provider_params["model_id"] + git_manager = Agent( + name="git_pr_manager", + role="Git PR Manager", + model=AwsBedrock(id=model_id, session=session), + markdown=True, + instructions=[ + "PR CREATION WORKFLOW:", + + "STEP 1 - Repository Verification:", + f" - Verify the local repository at {self.local_repo_path}", + f" - Check that you're on the feature branch {self.feature_branch}", + " - Ensure all changes are committed and pushed", + + "STEP 2 - PR Description Creation:", + " - Create a detailed PR description based on completed tasks", + " - Include task IDs and titles", + " - Summarize the changes made", + " - Reference the original issue", + + "STEP 3 - PR Creation Instructions:", + " - Provide clear instructions for the user to create the PR", + " - Include the branch name to use", + " - Include the PR description to copy-paste", + + "CRITICAL REQUIREMENTS:", + "- DO NOT create the PR automatically", + "- Provide instructions for the user to create it manually", + "- Ensure all task implementations are included" + ], + tools=[ + ShellTools(), + GithubTools(get_pull_requests=True), + ReasoningTools(add_instructions=True, think=True, analyze=True) + ] + ) + + # Run the git manager to prepare PR + response = git_manager.run( + f"Create PR instructions for completed tasks: {', '.join([f'#{t.id}' for t in completed_tasks])}", + stream=False + ) + + return response.content + + except Exception as e: + import traceback + error_trace = traceback.format_exc() + print(f"Error in _create_pr: {e}\n{error_trace}") + return f"## Error Creating PR\n\nAn error occurred while creating the PR: {str(e)}\n\nPlease try again or create the PR manually." + + async def _standard_pr_creation(self, message_body, system_prompt): + """Standard PR creation workflow.""" + # Auto-analyze repository if URL is provided + self._auto_analyze_repo(message_body) + + # Add local repository path to system prompt if available + if self.local_repo_path: + system_prompt += f"\n\nUse the existing local repository at: {self.local_repo_path}" + + team = self.initialize_team(system_prompt) + response = team.run( + message_body, + stream=False, + stream_intermediate_steps=True, + show_full_reasoning=True + ) + + return response.content + + async def process_message(self, message: Message): + provider_name = self.config_manager.lm_provider.name + model_id = self.config_manager.lm_provider_params["model_id"] + + history = YChatHistory(ychat=self.ychat, k=2) + messages = await history.aget_messages() + + history_text = "" + if messages: + history_text = "\nPrevious conversation:\n" + for msg in messages: + role = "User" if isinstance(msg, HumanMessage) else "Assistant" + history_text += f"{role}: {msg.content}\n" + + variables = PRCreationPersonaVariables( + input=message.body, + model_id=model_id, + provider_name=provider_name, + persona_name=self.name, + context=history_text + ) + + system_prompt = PR_CREATION_PROMPT_TEMPLATE.format_messages(**variables.model_dump())[0].content + + try: + # Parse command from message + command = self._parse_command(message.body) + action = command["action"] + + # Execute appropriate action + if action == "process_issue": + response = await self._process_issue(command["issue_url"]) + elif action == "show_task_details": + response = await self._show_task_details(command["task_id"]) + elif action == "implement_task": + response = await self._implement_task(command["task_id"]) + elif action == "list_tasks": + response = await self._list_tasks() + elif action == "create_pr": + response = await self._create_pr() + else: # standard_pr_creation + response = await self._standard_pr_creation(message.body, system_prompt) + + # Stream response + async def response_iterator(): + yield response + + await self.stream_message(response_iterator()) + + except ValueError as e: + error_message = f"Configuration Error: {str(e)}\nThis may be due to missing or invalid environment variables, model configuration, or input parameters." + async def error_iterator(): + yield error_message + await self.stream_message(error_iterator()) + + except boto3.exceptions.Boto3Error as e: + error_message = f"AWS Connection Error: {str(e)}\nThis may be due to invalid AWS credentials or network connectivity issues." + async def error_iterator(): + yield error_message + await self.stream_message(error_iterator()) + + except Exception as e: + error_message = f"PR Creation Error ({type(e).__name__}): {str(e)}\nAn unexpected error occurred while the PR creation team was processing your request." + async def error_iterator(): + yield error_message + await self.stream_message(error_iterator()) + + def _validate_local_repo(self): + """Validate that the local repository path exists and is a git repository. + If the path doesn't exist but is specified, clone the repository there.""" + if not self.local_repo_path: + return False + + try: + # Ensure the directory exists + if not os.path.isdir(self.local_repo_path): + print(f"Creating directory {self.local_repo_path}") + os.makedirs(self.local_repo_path, exist_ok=True) + + # Check if it's a git repository + is_git_repo = False + try: + result = subprocess.run( + ["git", "-C", self.local_repo_path, "rev-parse", "--is-inside-work-tree"], + capture_output=True, text=True + ) + is_git_repo = result.returncode == 0 and result.stdout.strip() == "true" + except Exception: + is_git_repo = False + + # If not a git repo and we have a repo URL, clone it + if not is_git_repo and self.current_repo_url: + print(f"Cloning repository {self.current_repo_url} to {self.local_repo_path}") + + # Remove any existing content + if os.path.exists(self.local_repo_path) and os.listdir(self.local_repo_path): + print("Removing existing content before cloning") + for item in os.listdir(self.local_repo_path): + item_path = os.path.join(self.local_repo_path, item) + if os.path.isdir(item_path): + import shutil + shutil.rmtree(item_path) + elif os.path.isfile(item_path): + os.remove(item_path) + + # Hardcode the GitHub username + username = "bhavana-nair" + print(f"Using hardcoded GitHub username: {username}") + + # If we have a username, use the fork URL instead of the original repo URL + if username and 'github.com' in self.current_repo_url: + # Extract original repo owner and name + repo_parts = self.current_repo_url.replace('https://github.com/', '').replace('.git', '').split('/') + if len(repo_parts) >= 2: + original_owner, repo_name = repo_parts[0], repo_parts[1] + # Create fork URL + fork_url = f"https://github.com/{username}/{repo_name}.git" + print(f"Using fork URL: {fork_url} instead of original: {self.current_repo_url}") + + # Clone from the fork + result = subprocess.run( + ["git", "clone", fork_url, self.local_repo_path], + capture_output=True, text=True + ) + + # Add original repo as upstream remote + if result.returncode == 0: + print("Adding original repository as upstream remote") + subprocess.run( + ["git", "-C", self.local_repo_path, "remote", "add", "upstream", self.current_repo_url], + capture_output=True, text=True + ) + else: + print(f"Could not parse repository URL: {self.current_repo_url}, using original URL") + result = subprocess.run( + ["git", "clone", self.current_repo_url, self.local_repo_path], + capture_output=True, text=True + ) + else: + # Fall back to original URL if we can't determine the fork + print(f"Using original repository URL: {self.current_repo_url}") + result = subprocess.run( + ["git", "clone", self.current_repo_url, self.local_repo_path], + capture_output=True, text=True + ) + + if result.returncode != 0: + print(f"Failed to clone repository: {result.stderr}") + return False + + print(f"Successfully cloned repository to {self.local_repo_path}") + is_git_repo = True + elif not is_git_repo: + print(f"Local repository path {self.local_repo_path} is not a git repository and no repo URL is available") + return False + + # If we have a feature branch, check it out or create it + if is_git_repo and self.feature_branch: + print(f"Setting up feature branch: {self.feature_branch}") + + # Check if the branch exists + branch_result = subprocess.run( + ["git", "-C", self.local_repo_path, "branch", "--list", self.feature_branch], + capture_output=True, text=True + ) + branch_exists = self.feature_branch in branch_result.stdout if branch_result.stdout else False + + if branch_exists: + print(f"Checking out existing branch: {self.feature_branch}") + subprocess.run( + ["git", "-C", self.local_repo_path, "checkout", self.feature_branch], + capture_output=True, text=True + ) + else: + print(f"Creating new branch: {self.feature_branch}") + # Try to checkout main or master first + try: + subprocess.run( + ["git", "-C", self.local_repo_path, "checkout", "main"], + capture_output=True, text=True + ) + except: + try: + subprocess.run( + ["git", "-C", self.local_repo_path, "checkout", "master"], + capture_output=True, text=True + ) + except: + print("Could not find main or master branch") + + # Create and checkout the feature branch + try: + subprocess.run( + ["git", "-C", self.local_repo_path, "checkout", "-b", self.feature_branch], + capture_output=True, text=True + ) + print(f"Created and checked out branch: {self.feature_branch}") + + # Push the new branch to the fork + print(f"Pushing new branch to fork: {self.feature_branch}") + try: + # Check if we're using a fork by looking for upstream remote + remotes = subprocess.run( + ["git", "-C", self.local_repo_path, "remote", "-v"], + capture_output=True, text=True + ) + using_fork = "upstream" in remotes.stdout + + if using_fork: + print(f"Pushing branch to fork (origin/{self.feature_branch})") + else: + print(f"Pushing branch to origin/{self.feature_branch}") + + subprocess.run( + ["git", "-C", self.local_repo_path, "push", "-u", "origin", self.feature_branch], + capture_output=True, text=True + ) + print(f"Successfully pushed branch {self.feature_branch} to fork") + except Exception as push_error: + print(f"Warning: Could not push branch to fork: {push_error}") + except Exception as e: + print(f"Warning: Could not create feature branch: {e}") + + print(f"Validated local repository at {self.local_repo_path}") + return True + except Exception as e: + print(f"Error validating local repository: {e}") + return False + + def _auto_analyze_repo(self, issue_text: str): + """Automatically extract repo URL and create knowledge graph""" + # If we don't have a repo URL yet, try to extract it from the issue text + if not self.current_repo_url: + patterns = [ + r'https://github\.com/([^/\s]+/[^/\s]+)', + r'github\.com/([^/\s]+/[^/\s]+)' + ] + + for pattern in patterns: + match = re.search(pattern, issue_text) + if match: + repo_path = match.group(1).rstrip('/') + self.current_repo_url = f"https://github.com/{repo_path}.git" + print(f"Extracted repo URL: {self.current_repo_url}") + break + + # If we have a valid local repository, use that instead of cloning + if self._validate_local_repo(): + print(f"Using local repository at {self.local_repo_path} for analysis") + analyzer = BulkCodeAnalyzer("neo4j://127.0.0.1:7687", (os.getenv("NEO4J_USER", "neo4j"), os.getenv("NEO4J_PASSWORD", ""))) + analyzer.analyze_folder(self.local_repo_path, clear_existing=True) + return self.local_repo_path + + # If we have a repo URL but no valid local repo, clone it to a temporary location + if self.current_repo_url: + return self._clone_and_analyze(self.current_repo_url) + + return None + + def _clone_and_analyze(self, repo_url: str): + """Clone repository and create knowledge graph""" + import time + start_time = time.time() + + try: + # Use current directory for cloning to avoid path issues + current_dir = os.getcwd() + target_folder = os.path.join(current_dir, "repo_analysis") + + # Remove existing folder if it exists + if os.path.exists(target_folder): + subprocess.run(["rm", "-rf", target_folder], check=True, capture_output=True) + + clone_start = time.time() + + # Hardcode the GitHub username + username = "bhavana-nair" + print(f"Using hardcoded GitHub username: {username}") + + # If we have a username, use the fork URL instead of the original repo URL + if username and 'github.com' in repo_url: + # Extract original repo owner and name + repo_parts = repo_url.replace('https://github.com/', '').replace('.git', '').split('/') + if len(repo_parts) >= 2: + original_owner, repo_name = repo_parts[0], repo_parts[1] + # Create fork URL + fork_url = f"https://github.com/{username}/{repo_name}.git" + print(f"Using fork URL: {fork_url} instead of original: {repo_url}") + + # Clone from the fork + subprocess.run(["git", "clone", fork_url, target_folder], check=True, capture_output=True) + + # Add original repo as upstream remote + print("Adding original repository as upstream remote") + subprocess.run( + ["git", "-C", target_folder, "remote", "add", "upstream", repo_url], + capture_output=True, text=True + ) + else: + print(f"Could not parse repository URL: {repo_url}, using original URL") + subprocess.run(["git", "clone", repo_url, target_folder], check=True, capture_output=True) + else: + # Fall back to original URL if we can't determine the fork + print(f"Using original repository URL: {repo_url}") + subprocess.run(["git", "clone", repo_url, target_folder], check=True, capture_output=True) + + clone_time = time.time() - clone_start + + kg_start = time.time() + # Get Neo4j credentials from environment variables + neo4j_user = os.getenv("NEO4J_USER", "neo4j") + neo4j_password = os.getenv("NEO4J_PASSWORD", "") + if not neo4j_password: + print("Warning: NEO4J_PASSWORD environment variable not set. Knowledge graph analysis may fail.") + analyzer = BulkCodeAnalyzer("neo4j://127.0.0.1:7687", (neo4j_user, neo4j_password)) + analyzer.analyze_folder(target_folder, clear_existing=True) + kg_time = time.time() - kg_start + + total_time = time.time() - start_time + print(f"KG Creation Times - Clone: {clone_time:.2f}s, Analysis: {kg_time:.2f}s, Total: {total_time:.2f}s") + + return target_folder + + except Exception as e: + print(f"Error analyzing repository {repo_url}: {e}") + return None \ No newline at end of file diff --git a/jupyter_ai_personas/pr_creation_persona/repo_structure_tools.py b/jupyter_ai_personas/pr_creation_persona/repo_structure_tools.py new file mode 100644 index 0000000..721aadb --- /dev/null +++ b/jupyter_ai_personas/pr_creation_persona/repo_structure_tools.py @@ -0,0 +1,430 @@ +"""Repository structure analysis tools for PR creation persona.""" + +import os +import re +from typing import Dict, List, Tuple, Optional +from agno.tools import Toolkit +from agno.agent import Agent +import sys +sys.path.append('../knowledge_graph') +from jupyter_ai_personas.knowledge_graph.code_analysis_tool import CodeAnalysisTool + +class RepoStructureTools(Toolkit): + """Tools for analyzing repository structure and determining proper file placement.""" + + def __init__(self, **kwargs): + self.code_tool = CodeAnalysisTool() + + super().__init__(name="repo_structure", tools=[ + self.analyze_folder_structure, + self.get_component_placement_map, + self.suggest_file_path, + self.validate_file_path, + self.create_parent_directories, + self.analyze_project_templates + ], **kwargs) + + def analyze_folder_structure(self, agent: Agent, repo_path: str) -> str: + """ + Analyze the repository folder structure to identify patterns. + + Args: + agent: The agent instance + repo_path: Path to the repository root + + Returns: + str: Analysis of folder structure patterns + """ + try: + # Get folder structure + folder_map = {} + component_types = { + "models": [], + "views": [], + "controllers": [], + "utils": [], + "tests": [], + "services": [], + "components": [], + "personas": [] + } + + # Walk through the repository + for root, dirs, files in os.walk(repo_path): + # Skip common non-code directories + if any(skip_dir in root for skip_dir in ['.git', 'node_modules', '__pycache__', '.venv']): + continue + + # Analyze Python files + for file in files: + if file.endswith('.py') and not file.startswith('__'): + rel_path = os.path.relpath(os.path.join(root, file), repo_path) + folder = os.path.dirname(rel_path) + + # Track folder usage + folder_map[folder] = folder_map.get(folder, 0) + 1 + + # Categorize components based on patterns + if 'model' in file.lower() or 'schema' in file.lower(): + component_types["models"].append(rel_path) + elif 'view' in file.lower() or 'template' in file.lower(): + component_types["views"].append(rel_path) + elif 'controller' in file.lower() or 'handler' in file.lower(): + component_types["controllers"].append(rel_path) + elif 'util' in file.lower() or 'helper' in file.lower(): + component_types["utils"].append(rel_path) + elif 'test' in file.lower(): + component_types["tests"].append(rel_path) + elif 'service' in file.lower(): + component_types["services"].append(rel_path) + elif 'component' in file.lower(): + component_types["components"].append(rel_path) + elif 'persona' in file.lower(): + component_types["personas"].append(rel_path) + + # Generate analysis + analysis = "Repository Structure Analysis:\n\n" + + # Most common folders + sorted_folders = sorted(folder_map.items(), key=lambda x: x[1], reverse=True) + analysis += "Common Code Folders:\n" + for folder, count in sorted_folders[:10]: + analysis += f"- {folder}: {count} files\n" + + # Component type patterns + analysis += "\nComponent Type Patterns:\n" + for comp_type, paths in component_types.items(): + if paths: + common_folders = {} + for path in paths: + folder = os.path.dirname(path) + common_folders[folder] = common_folders.get(folder, 0) + 1 + + most_common = sorted(common_folders.items(), key=lambda x: x[1], reverse=True) + if most_common: + analysis += f"- {comp_type}: typically in {most_common[0][0]}\n" + + return analysis + + except Exception as e: + return f"Error analyzing folder structure: {str(e)}" + + def get_component_placement_map(self, agent: Agent, repo_path: str) -> str: + """ + Create a mapping of component types to appropriate folder locations. + + Args: + agent: The agent instance + repo_path: Path to the repository root + + Returns: + str: JSON mapping of component types to folder locations + """ + try: + # Initialize component type mapping + component_map = {} + + # Walk through the repository + for root, dirs, files in os.walk(repo_path): + # Skip common non-code directories + if any(skip_dir in root for skip_dir in ['.git', 'node_modules', '__pycache__', '.venv']): + continue + + # Check for specific component folders + rel_path = os.path.relpath(root, repo_path) + + # Map component types based on folder names + folder_name = os.path.basename(root).lower() + if 'test' in folder_name: + component_map['tests'] = rel_path + elif 'model' in folder_name: + component_map['models'] = rel_path + elif 'view' in folder_name: + component_map['views'] = rel_path + elif 'controller' in folder_name: + component_map['controllers'] = rel_path + elif 'util' in folder_name or 'helper' in folder_name: + component_map['utils'] = rel_path + elif 'service' in folder_name: + component_map['services'] = rel_path + elif 'component' in folder_name: + component_map['components'] = rel_path + elif 'persona' in folder_name: + component_map['personas'] = rel_path + + # Look for patterns in Python files + for file in files: + if file.endswith('.py') and not file.startswith('__'): + file_path = os.path.join(root, file) + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Check for class patterns + if re.search(r'class\s+\w+Model|\w+Schema', content): + component_map.setdefault('models', rel_path) + if re.search(r'class\s+\w+View|\w+Template', content): + component_map.setdefault('views', rel_path) + if re.search(r'class\s+\w+Controller|\w+Handler', content): + component_map.setdefault('controllers', rel_path) + if re.search(r'class\s+\w+Service', content): + component_map.setdefault('services', rel_path) + if re.search(r'class\s+\w+Persona', content): + component_map.setdefault('personas', rel_path) + except: + # Skip files that can't be read + pass + + # Format the result + result = "Component Placement Map:\n" + for comp_type, folder in component_map.items(): + result += f"- {comp_type}: {folder}\n" + + return result + + except Exception as e: + return f"Error creating component placement map: {str(e)}" + + def suggest_file_path(self, agent: Agent, file_name: str, component_type: str, repo_path: str) -> str: + """ + Suggest appropriate file path based on repository patterns. + + Args: + agent: The agent instance + file_name: Name of the file to create + component_type: Type of component (model, view, controller, etc.) + repo_path: Path to the repository root + + Returns: + str: Suggested file path + """ + try: + # Get component placement map + placement_map_str = self.get_component_placement_map(agent, repo_path) + + # Extract folder for component type + component_folder = None + for line in placement_map_str.split('\n'): + if line.startswith(f"- {component_type}:"): + component_folder = line.split(': ')[1].strip() + break + + # If no specific folder found, look for similar components + if not component_folder: + # Use knowledge graph to find similar components + query = f""" + MATCH (n) + WHERE n.name CONTAINS '{file_name.replace('.py', '')}' OR + n.name CONTAINS '{component_type}' + RETURN n.file as file_path + LIMIT 5 + """ + + try: + result = self.code_tool.query_code(query) + if result and isinstance(result, list) and len(result) > 0: + # Extract common folder pattern + folders = [] + for item in result: + if 'file_path' in item and item['file_path']: + folder = os.path.dirname(item['file_path']) + folders.append(folder) + + if folders: + # Find most common folder + folder_counts = {} + for folder in folders: + folder_counts[folder] = folder_counts.get(folder, 0) + 1 + + most_common = sorted(folder_counts.items(), key=lambda x: x[1], reverse=True) + if most_common: + component_folder = most_common[0][0] + except: + # If knowledge graph query fails, continue with other methods + pass + + # If still no folder found, use common patterns + if not component_folder: + # Default patterns based on component type + if component_type == 'tests': + component_folder = 'tests' + elif component_type == 'models': + component_folder = 'models' + elif component_type == 'views': + component_folder = 'views' + elif component_type == 'controllers': + component_folder = 'controllers' + elif component_type == 'utils': + component_folder = 'utils' + elif component_type == 'services': + component_folder = 'services' + elif component_type == 'personas': + # Look for persona pattern + for root, dirs, files in os.walk(repo_path): + if 'persona.py' in files: + rel_path = os.path.relpath(root, repo_path) + component_folder = rel_path + break + + # If still not found, use default pattern + if not component_folder: + component_folder = 'jupyter_ai_personas' + + # Construct suggested path + if component_folder: + suggested_path = os.path.join(repo_path, component_folder, file_name) + else: + # Default to a reasonable location if no pattern found + suggested_path = os.path.join(repo_path, 'jupyter_ai_personas', file_name) + + return suggested_path + + except Exception as e: + return f"Error suggesting file path: {str(e)}" + + def validate_file_path(self, agent: Agent, file_path: str, repo_path: str) -> str: + """ + Validate if a file path follows project conventions. + + Args: + agent: The agent instance + file_path: Path to validate + repo_path: Path to the repository root + + Returns: + str: Validation result with suggestions if needed + """ + try: + # Make path relative to repo root + if file_path.startswith(repo_path): + rel_path = os.path.relpath(file_path, repo_path) + else: + rel_path = file_path + + # Check if path is directly in root (which we want to avoid) + if '/' not in rel_path and '\\' not in rel_path: + # File is in root, suggest better location + file_name = os.path.basename(rel_path) + + # Determine component type from filename + component_type = 'utils' # Default + if 'test' in file_name.lower(): + component_type = 'tests' + elif 'model' in file_name.lower(): + component_type = 'models' + elif 'view' in file_name.lower(): + component_type = 'views' + elif 'controller' in file_name.lower(): + component_type = 'controllers' + elif 'service' in file_name.lower(): + component_type = 'services' + elif 'persona' in file_name.lower(): + component_type = 'personas' + + # Get better suggestion + better_path = self.suggest_file_path(agent, file_name, component_type, repo_path) + + return f"WARNING: File would be created in repository root. Consider using: {better_path}" + + # Check if path follows existing patterns + placement_map_str = self.get_component_placement_map(agent, repo_path) + + # Determine if path matches any known patterns + matches_pattern = False + for line in placement_map_str.split('\n'): + if ': ' in line: + _, folder = line.split(': ', 1) + if folder.strip() in rel_path: + matches_pattern = True + break + + if matches_pattern: + return f"VALID: Path follows project conventions: {file_path}" + else: + # Path doesn't match known patterns, but might still be valid + return f"CAUTION: Path doesn't match common project patterns, but may still be valid: {file_path}" + + except Exception as e: + return f"Error validating file path: {str(e)}" + + def create_parent_directories(self, agent: Agent, file_path: str) -> str: + """ + Ensure parent directories exist before file creation. + + Args: + agent: The agent instance + file_path: Path to the file to be created + + Returns: + str: Result of directory creation + """ + try: + # Get parent directory + parent_dir = os.path.dirname(file_path) + + # Check if parent directory exists + if not os.path.exists(parent_dir): + # Create parent directories + os.makedirs(parent_dir, exist_ok=True) + return f"Created parent directories for: {file_path}" + else: + return f"Parent directories already exist for: {file_path}" + + except Exception as e: + return f"Error creating parent directories: {str(e)}" + + def analyze_project_templates(self, agent: Agent, repo_path: str) -> str: + """ + Analyze existing files to identify project templates and patterns. + + Args: + agent: The agent instance + repo_path: Path to the repository root + + Returns: + str: Analysis of project templates and patterns + """ + try: + # Find persona patterns + persona_patterns = [] + + # Walk through the repository + for root, dirs, files in os.walk(repo_path): + # Skip common non-code directories + if any(skip_dir in root for skip_dir in ['.git', 'node_modules', '__pycache__', '.venv']): + continue + + # Look for persona.py files + if 'persona.py' in files: + rel_path = os.path.relpath(os.path.join(root, 'persona.py'), repo_path) + persona_patterns.append(rel_path) + + # Analyze persona structure + persona_analysis = "Persona Structure Analysis:\n" + + for persona_path in persona_patterns: + try: + full_path = os.path.join(repo_path, persona_path) + with open(full_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Extract class name + class_match = re.search(r'class\s+(\w+)\(BasePersona\)', content) + if class_match: + class_name = class_match.group(1) + + # Check for common files in the same directory + persona_dir = os.path.dirname(full_path) + dir_files = os.listdir(persona_dir) + + persona_analysis += f"\n- {class_name} ({persona_path}):\n" + persona_analysis += f" Directory: {os.path.dirname(persona_path)}\n" + persona_analysis += f" Files: {', '.join(dir_files)}\n" + except: + # Skip files that can't be read + pass + + return persona_analysis + + except Exception as e: + return f"Error analyzing project templates: {str(e)}" \ No newline at end of file diff --git a/jupyter_ai_personas/pr_creation_persona/template.py b/jupyter_ai_personas/pr_creation_persona/template.py new file mode 100644 index 0000000..90abed3 --- /dev/null +++ b/jupyter_ai_personas/pr_creation_persona/template.py @@ -0,0 +1,51 @@ +from langchain.prompts import ChatPromptTemplate +from pydantic import BaseModel + +class PRCreationPersonaVariables(BaseModel): + input: str + model_id: str + provider_name: str + persona_name: str + context: str + +PR_CREATION_PROMPT_TEMPLATE = ChatPromptTemplate.from_messages([ + ("system", """You are a PR creation assistant coordinating a team of specialized agents to analyze issues and implement fixes. Your role is to oversee the development process from issue analysis to code implementation. + +Development Guidelines: + +Issue Analysis: +- Parse and understand the issue requirements +- Identify affected components and files +- Determine scope and complexity +- Plan implementation approach + +Architecture Design: +- Design solution architecture +- Identify required changes and new components +- Plan file structure and organization +- Consider integration points and dependencies + +Code Implementation: +- Write minimal, focused code that addresses the issue +- Follow existing code patterns and conventions +- Place files in appropriate directories following project structure +- Never create files directly in the repository root +- Implement proper error handling +- Ensure code quality and maintainability + +Git Operations: +- Clone main branch +- Create feature branch with descriptive name +- Commit changes with clear messages +- Push to remote branch (DO NOT create PR) + +Repository Management: +- Use knowledge graph for codebase understanding +- Maintain consistency with existing patterns +- Consider impact on existing functionality +- Ensure proper testing integration + +Current context: +{context}"""), + ("human", "{input}") +]) \ No newline at end of file diff --git a/jupyter_ai_personas/pr_review_persona/repo_analysis_tools.py b/jupyter_ai_personas/pr_review_persona/repo_analysis_tools.py new file mode 100644 index 0000000..54323f3 --- /dev/null +++ b/jupyter_ai_personas/pr_review_persona/repo_analysis_tools.py @@ -0,0 +1,92 @@ +import os +import subprocess +import tempfile +import re +from agno.tools import Toolkit +from agno.utils.log import logger +from agno.agent import Agent +import sys + +sys.path.append("../knowledge_graph") +from jupyter_ai_personas.knowledge_graph.code_analysis_tool import CodeAnalysisTool +from jupyter_ai_personas.knowledge_graph.schema_validator import SchemaValidator + + +class RepoAnalysisTools(Toolkit): + def __init__(self, **kwargs): + # Use environment variables for Neo4j credentials + neo4j_uri = os.getenv("NEO4J_URI", "neo4j://127.0.0.1:7687") + neo4j_user = os.getenv("NEO4J_USER", "neo4j") + neo4j_password = os.getenv("NEO4J_PASSWORD") + + if not neo4j_password: + raise ValueError("NEO4J_PASSWORD environment variable must be set") + + self.code_tool = CodeAnalysisTool() + self.schema_validator = SchemaValidator(neo4j_uri, (neo4j_user, neo4j_password)) + + super().__init__( + name="repo_analysis", + tools=[ + self.get_schema_info, + self.query_codebase, + # self.get_function_source + # self.find_class_relationships, + # self.find_impact_analysis, + # self.check_dependents_handled, + # self.debug_database_contents, + # self.get_nodes_by_file, + # self.find_related_context, + # self.get_nodes_by_lines, + # self.analyze_signature_changes, + # self.detect_semantic_patterns, + # self.comprehensive_pr_analysis + ], + **kwargs, + ) + + def get_schema_info(self, agent: Agent) -> str: + """ + Get the knowledge graph schema information. + + Returns: + str: Schema information for query writing + """ + try: + return self.schema_validator.generate_schema_info() + except Exception as e: + return f"Error getting schema: {str(e)}" + + def query_codebase(self, agent: Agent, query: str) -> str: + """ + Execute a custom query on the analyzed codebase knowledge graph. + + Args: + agent (Agent): The agent instance + query (str): Cypher query to execute on the knowledge graph + + Returns: + str: Query results + """ + import time + + start_time = time.time() + + try: + print(f"\n=== KG QUERY DEBUG ===") + print(f"Full Cypher Query:") + print(f"{query}") + print(f"--- Executing Query ---") + + result = self.code_tool.query_code(query) + query_time = time.time() - start_time + + print(f"Query Time: {query_time:.3f}s") + print(f"Result Preview: {str(result)[:200]}...") + print(f"=== END KG QUERY DEBUG ===\n") + + return result + except Exception as e: + print(f"KG Query Error: {str(e)}") + print(f"=== END KG QUERY DEBUG ===\n") + return f"Error executing query: {str(e)}" diff --git a/jupyter_ai_personas/task_master/__init__.py b/jupyter_ai_personas/task_master/__init__.py new file mode 100644 index 0000000..2360801 --- /dev/null +++ b/jupyter_ai_personas/task_master/__init__.py @@ -0,0 +1,4 @@ +"""TaskMaster integration for Jupyter AI personas.""" + +from .taskmaster_client import TaskMasterClient, Task +from .prd_agent import PRDAgent \ No newline at end of file diff --git a/jupyter_ai_personas/task_master/prd_agent.py b/jupyter_ai_personas/task_master/prd_agent.py new file mode 100644 index 0000000..3b06c56 --- /dev/null +++ b/jupyter_ai_personas/task_master/prd_agent.py @@ -0,0 +1,136 @@ +"""PRD Creation Agent for analyzing issues and creating Product Requirements Documents.""" + +import os +import re +import boto3 +from typing import Optional +from agno.agent import Agent +from agno.models.aws import AwsBedrock +from agno.tools.shell import ShellTools +from agno.tools.file import FileTools +from agno.tools.github import GithubTools +from agno.tools.reasoning import ReasoningTools + +class PRDAgent: + """Agent responsible for analyzing issues and creating PRDs.""" + + def __init__(self, model_id: str, session): + self.agent = Agent( + name="prd_creator", + role="Product Requirements Document Creator", + model=AwsBedrock(id=model_id, session=session), + markdown=True, + instructions=[ + "CORE RESPONSIBILITY: Analyze issues and create comprehensive PRDs", + + "PRD STRUCTURE REQUIREMENTS:", + "1. PROBLEM STATEMENT:", + " - Clear definition of the issue", + " - Impact and urgency assessment", + " - Affected stakeholders", + + "2. SOLUTION OVERVIEW:", + " - High-level approach", + " - Key components involved", + " - Technical considerations", + + "3. FUNCTIONAL REQUIREMENTS:", + " - Specific features needed", + " - User interactions", + " - System behaviors", + + "4. TECHNICAL REQUIREMENTS:", + " - Architecture considerations", + " - Performance requirements", + " - Security considerations", + + "5. ACCEPTANCE CRITERIA:", + " - Measurable success criteria", + " - Testing requirements", + " - Quality standards", + + "6. IMPLEMENTATION TASKS:", + " - Break down implementation into specific tasks", + " - Identify dependencies between tasks", + " - Prioritize tasks by importance", + + "ANALYSIS GUIDELINES:", + "- Focus on minimal viable solution", + "- Consider existing codebase patterns", + "- Identify reusable components", + "- Prioritize maintainability", + "- Create actionable tasks that can be implemented" + ], + tools=[ + ShellTools(), + FileTools(), + GithubTools(get_issue=True, list_issue_comments=True), + ReasoningTools(add_instructions=True, think=True, analyze=True) + ] + ) + + async def create_prd_from_issue(self, issue_url: str, repo_context: str = "") -> str: + """Create a PRD from a GitHub issue URL using Agno agent.""" + # Extract issue details for context + issue_match = re.search(r'github\.com/([^/]+/[^/]+)/issues/(\d+)', issue_url) + if not issue_match: + raise ValueError(f"Invalid GitHub issue URL: {issue_url}") + + repo_name, issue_number = issue_match.groups() + + # Use the agent with GithubTools to fetch issue content + fetch_prompt = f"""Fetch GitHub issue {repo_name}#{issue_number} using get_issue with repo_path={repo_name} and issue_number={issue_number}. + Provide the full issue title and description.""" + + print(f"Fetching issue content for {repo_name}#{issue_number}...") + + try: + # Run the agent to fetch issue details + fetch_response = self.agent.run(fetch_prompt, stream=False) + issue_content = fetch_response.content if hasattr(fetch_response, 'content') else str(fetch_response) + print("Successfully fetched issue content") + except Exception as e: + print(f"Error fetching issue: {e}") + issue_content = f"Issue URL: {issue_url}\nUnable to fetch details automatically." + + # Use the Agno agent to generate the PRD + prompt = f""" + Analyze the following GitHub issue and create a comprehensive Product Requirements Document (PRD): + + ISSUE CONTENT: + {issue_content} + + REPOSITORY CONTEXT: + {repo_context} + + Create a detailed PRD with the following sections: + 1. Issue Reference (Repository: {repo_name}, Issue: {issue_number}) + 2. Problem Statement - Clear definition of the issue and its impact + 3. Solution Overview - High-level approach and key components + 4. Functional Requirements - Specific features and behaviors needed + 5. Technical Requirements - Architecture, performance, and security considerations + 6. Implementation Tasks - Break down implementation into specific tasks with dependencies + 7. Acceptance Criteria - Measurable success criteria + + For the Implementation Tasks section, create a list of specific tasks that can be directly used by TaskMaster. + Each task should have: + - A clear title + - A detailed description + - Priority (high/medium/low) + - Dependencies on other tasks (if any) + + Focus on creating actionable requirements that can be broken down into specific tasks. + Be specific and detailed about what needs to be implemented. + """ + + try: + print("Generating PRD...") + response = self.agent.run(prompt, stream=False) + print("PRD generation completed successfully") + return response.content if hasattr(response, 'content') else str(response) + except Exception as e: + print(f"Error generating PRD: {e}") + raise ValueError(f"Failed to generate PRD: {e}") + + # Alias for backward compatibility + create_prd = create_prd_from_issue \ No newline at end of file diff --git a/jupyter_ai_personas/task_master/task_agent.py b/jupyter_ai_personas/task_master/task_agent.py new file mode 100644 index 0000000..745b899 --- /dev/null +++ b/jupyter_ai_personas/task_master/task_agent.py @@ -0,0 +1,506 @@ +"""Task execution agent that picks up and executes tasks from TaskMaster.""" + +import os +import subprocess +import sys +from agno.agent import Agent +from agno.models.aws import AwsBedrock +from agno.tools.shell import ShellTools +from agno.tools.file import FileTools +from agno.tools.python import PythonTools +from .taskmaster_client import TaskMasterClient, Task + +# Add path for repo structure tools +sys.path.append('../pr_creation_persona') +try: + from jupyter_ai_personas.pr_creation_persona.repo_structure_tools import RepoStructureTools +except ImportError: + # Fallback path for direct imports + try: + from ..pr_creation_persona.repo_structure_tools import RepoStructureTools + except ImportError: + print("Warning: Could not import RepoStructureTools") + RepoStructureTools = None + + +class TaskExecutionAgent: + """Agent that can pick up and execute specific tasks.""" + + def __init__(self, model_id: str, session, agent_name: str = "task_executor"): + self.taskmaster_client = TaskMasterClient() + self.agent = Agent( + name=agent_name, + role="Task Execution Specialist", + model=AwsBedrock(id=model_id, session=session), + markdown=True, + instructions=[ + "TASK EXECUTION WORKFLOW:", + + "STEP 1 - Task Selection:", + " - Review available tasks from TaskMaster", + " - Select tasks with no unmet dependencies", + " - Prioritize high-priority tasks", + + "STEP 2 - Task Analysis:", + " - Understand task requirements and acceptance criteria", + " - Identify required files and components", + " - Plan implementation approach", + + "STEP 3 - Implementation:", + " - Write minimal code to meet acceptance criteria", + " - Follow existing code patterns", + " - Implement proper error handling", + + "STEP 4 - Validation:", + " - Verify implementation meets acceptance criteria", + " - Test functionality where possible", + " - Document any assumptions or limitations", + + "EXECUTION PRINCIPLES:", + "- Focus only on assigned task scope", + "- Write minimal, clean code", + "- Follow existing patterns and conventions", + "- Complete acceptance criteria fully" + ], + tools=[ + ShellTools(), + FileTools(), + PythonTools(), + RepoStructureTools() if RepoStructureTools else None + ] + ) + + async def execute_task(self, task: Task, repo_context: str = "") -> str: + """Execute a specific task.""" + # Extract information from repo_context + local_repo_path = None + repo_url = None + feature_branch = None + + for line in repo_context.split('\n'): + if line.startswith('Local Repository Path:'): + local_repo_path = line.replace('Local Repository Path:', '').strip() + elif line.startswith('Repository URL:'): + repo_url = line.replace('Repository URL:', '').strip() + elif line.startswith('Feature Branch:'): + feature_branch = line.replace('Feature Branch:', '').strip() + + # Set up git repository if needed + if local_repo_path and repo_url: + self._setup_git_repository(local_repo_path, repo_url, feature_branch) + + prompt = f""" + Execute the following task: + + TASK: {task.title} + ID: {task.id} + PRIORITY: {task.priority} + + DESCRIPTION: + {task.description} + + DETAILS: + {task.details or 'No additional details provided'} + + TEST STRATEGY: + {task.test_strategy or 'No test strategy specified'} + + DEPENDENCIES: {', '.join(task.dependencies) if task.dependencies else 'None'} + + REPOSITORY CONTEXT: + {repo_context} + + Implement this task following the execution workflow. + Focus only on meeting the acceptance criteria with minimal code. + + IMPORTANT INSTRUCTIONS: + - Save all files to: {local_repo_path} + - NEVER create files in the repository root + - Follow project structure conventions and place files in appropriate directories + - Create parent directories if they don't exist before creating files + - Use the feature branch: {feature_branch or 'main'} + - After implementing the code, commit your changes with a descriptive message + - Include the task ID in your commit message + """ + + # If local_repo_path is available, set the working directory for all tools + if local_repo_path: + print(f"Setting working directory to: {local_repo_path}") + # Set ShellTools working directory + self.agent.tools[0].cwd = local_repo_path + # Set FileTools base path + self.agent.tools[1].base_path = local_repo_path + + # Analyze repository structure if RepoStructureTools is available + structure_info = "" + if RepoStructureTools and os.path.exists(local_repo_path): + try: + print("\nAnalyzing repository structure before task execution...") + structure_tools = RepoStructureTools() + + # Analyze folder structure + structure_analysis = structure_tools.analyze_folder_structure(None, local_repo_path) + + # Get component placement map + placement_map = structure_tools.get_component_placement_map(None, local_repo_path) + + # Analyze project templates + template_analysis = structure_tools.analyze_project_templates(None, local_repo_path) + + # Add structure information to prompt + structure_info = "\n\nREPOSITORY STRUCTURE ANALYSIS:\n" + structure_info += "Component Placement Guidelines:\n" + + # Extract component placement guidelines from placement map + for line in placement_map.split('\n'): + if line.startswith('- ') and ': ' in line: + structure_info += f"{line}\n" + + # Add template information + structure_info += "\nProject Templates:\n" + template_lines = template_analysis.split('\n') + for line in template_lines[:10]: # Limit to first 10 lines + structure_info += f"{line}\n" + + print("Repository structure analysis complete") + except Exception as e: + print(f"Warning: Error analyzing repository structure: {e}") + + # Add explicit instructions about the path + prompt += f""" + + CRITICAL PATH INSTRUCTIONS: + - You MUST save all files to: {local_repo_path} + - Use absolute paths when creating files + - NEVER create files directly in the repository root + - Follow the project structure patterns identified below + - Create parent directories if they don't exist before creating files + - Validate file paths against project conventions + {structure_info} + """ + else: + print("Warning: No local repository path specified. Files will be saved to the current directory.") + + # Execute the task + response = self.agent.run(prompt, stream=False) + + # If we have a local repo and feature branch, commit the changes + if local_repo_path and feature_branch: + try: + # Check if there are changes to commit + status_result = subprocess.run( + ["git", "-C", local_repo_path, "status", "--porcelain"], + capture_output=True, text=True + ) + + if status_result.stdout.strip(): + print("Committing changes...") + # Add all changes + subprocess.run( + ["git", "-C", local_repo_path, "add", "."], + check=True, capture_output=True + ) + + # Commit changes + commit_message = f"Implement task #{task.id}: {task.title}" + subprocess.run( + ["git", "-C", local_repo_path, "commit", "-m", commit_message], + check=True, capture_output=True + ) + + print(f"Changes committed to branch {feature_branch}") + + # Try to push changes to the fork (origin) + try: + # Check if we're using a fork by looking for upstream remote + remotes = subprocess.run( + ["git", "-C", local_repo_path, "remote", "-v"], + capture_output=True, text=True + ) + using_fork = "upstream" in remotes.stdout + + if using_fork: + print(f"Pushing changes to fork (origin/{feature_branch})") + else: + print(f"Pushing changes to origin/{feature_branch}") + + # First try to get more detailed error information + print("Checking git status before push...") + subprocess.run( + ["git", "-C", local_repo_path, "status"], + capture_output=False + ) + + # Try pushing with verbose output + push_result = subprocess.run( + ["git", "-C", local_repo_path, "push", "-v", "-u", "origin", feature_branch], + capture_output=True, text=True + ) + + if push_result.returncode == 0: + print(f"Changes pushed to remote branch {feature_branch}") + else: + print(f"Push failed with error code {push_result.returncode}") + print(f"Error output: {push_result.stderr}") + print(f"Standard output: {push_result.stdout}") + + # Try to diagnose the issue + print("\nAttempting to diagnose push failure...") + print("\nChecking remote URLs:") + subprocess.run( + ["git", "-C", local_repo_path, "remote", "-v"], + capture_output=False + ) + + # Continue with task completion even if push fails + print("\nContinuing with task completion despite push failure") + except Exception as e: + print(f"Warning: Could not push changes: {e}") + print("\nContinuing with task completion despite push failure") + else: + print("No changes to commit") + except Exception as e: + print(f"Warning: Error during git operations: {e}") + + return response.content if hasattr(response, 'content') else str(response) + + def _setup_git_repository(self, local_repo_path, repo_url, feature_branch): + """Set up git repository for task implementation.""" + try: + print(f"Setting up git repository at {local_repo_path}") + print(f"Repository URL: {repo_url}") + print(f"Feature branch: {feature_branch}") + + # Step 1: Ensure the directory exists + os.makedirs(local_repo_path, exist_ok=True) + + # Step 2: Check if it's already a git repository + is_git_repo = os.path.exists(os.path.join(local_repo_path, '.git')) + + # Step 3: Clone the repository if needed + if not is_git_repo: + print(f"Cloning repository {repo_url} to {local_repo_path}") + # Remove any existing content + if os.path.exists(local_repo_path) and os.listdir(local_repo_path): + print("Removing existing content before cloning") + for item in os.listdir(local_repo_path): + item_path = os.path.join(local_repo_path, item) + if os.path.isdir(item_path) and item != '.git': + import shutil + shutil.rmtree(item_path) + elif os.path.isfile(item_path): + os.remove(item_path) + + # Clone the repository from the fork instead of the original repo + # Hardcode the GitHub username + username = "bhavana-nair" + print(f"Using hardcoded GitHub username: {username}") + + # If we have a username, use the fork URL instead of the original repo URL + if username and 'github.com' in repo_url: + # Extract original repo owner and name + repo_parts = repo_url.replace('https://github.com/', '').replace('.git', '').split('/') + if len(repo_parts) >= 2: + original_owner, repo_name = repo_parts[0], repo_parts[1] + # Create fork URL + fork_url = f"https://github.com/{username}/{repo_name}.git" + print(f"Using fork URL: {fork_url} instead of original: {repo_url}") + + # Clone from the fork + result = subprocess.run( + ["git", "clone", fork_url, local_repo_path], + capture_output=True, + text=True + ) + + # Add original repo as upstream remote + if result.returncode == 0: + print("Adding original repository as upstream remote") + subprocess.run( + ["git", "-C", local_repo_path, "remote", "add", "upstream", repo_url], + capture_output=True, + text=True + ) + else: + print(f"Could not parse repository URL: {repo_url}, using original URL") + result = subprocess.run( + ["git", "clone", repo_url, local_repo_path], + capture_output=True, + text=True + ) + else: + # Fall back to original URL if we can't determine the fork + print(f"Using original repository URL: {repo_url}") + result = subprocess.run( + ["git", "clone", repo_url, local_repo_path], + capture_output=True, + text=True + ) + if result.returncode == 0: + print("Repository cloned successfully") + else: + print(f"Error cloning repository: {result.stderr}") + return False + else: + print(f"Using existing git repository at {local_repo_path}") + # Fetch latest changes + result = subprocess.run( + ["git", "-C", local_repo_path, "fetch"], + capture_output=True, + text=True + ) + if result.returncode == 0: + print("Fetched latest changes") + else: + print(f"Warning: Could not fetch latest changes: {result.stderr}") + # Continue anyway + + # Step 4: Create and checkout the feature branch + if feature_branch: + # First checkout main branch + print("Checking out main branch") + result = subprocess.run( + ["git", "-C", local_repo_path, "checkout", "main"], + capture_output=True, + text=True + ) + + if result.returncode != 0: + # Try master if main doesn't exist + print("Main branch not found, trying master") + result = subprocess.run( + ["git", "-C", local_repo_path, "checkout", "master"], + capture_output=True, + text=True + ) + + if result.returncode != 0: + print("Could not find main or master branch, using current branch") + + # Create and checkout the feature branch + print(f"Creating and checking out feature branch: {feature_branch}") + try: + # Check if branch exists + branch_exists = subprocess.run( + ["git", "-C", local_repo_path, "rev-parse", "--verify", feature_branch], + capture_output=True + ).returncode == 0 + + if branch_exists: + print(f"Branch {feature_branch} already exists, checking it out") + subprocess.run( + ["git", "-C", local_repo_path, "checkout", feature_branch], + check=True + ) + else: + print(f"Creating new branch: {feature_branch}") + subprocess.run( + ["git", "-C", local_repo_path, "checkout", "-b", feature_branch], + check=True + ) + + # Push the new branch to the fork + print(f"Pushing new branch to fork: {feature_branch}") + try: + # Check remote configuration + print("Checking remote configuration:") + subprocess.run( + ["git", "-C", local_repo_path, "remote", "-v"], + capture_output=False + ) + + # Try pushing with verbose output + push_result = subprocess.run( + ["git", "-C", local_repo_path, "push", "-v", "-u", "origin", feature_branch], + capture_output=True, text=True + ) + + if push_result.returncode == 0: + print(f"Successfully pushed branch {feature_branch} to fork") + else: + print(f"Push failed with error code {push_result.returncode}") + print(f"Error output: {push_result.stderr}") + print(f"Standard output: {push_result.stdout}") + print("\nContinuing with branch setup despite push failure") + except Exception as push_error: + print(f"Warning: Could not push branch to fork: {push_error}") + print("Continuing with branch setup despite push failure") + except subprocess.CalledProcessError as e: + print(f"Error with branch operations: {e}") + + print("Git repository setup complete") + return True + except Exception as e: + print(f"Error setting up git repository: {e}") + return False + + def get_available_tasks(self) -> list[Task]: + """Get tasks that this agent can execute.""" + return self.taskmaster_client.get_available_tasks() + + def mark_task_complete(self, task_id: str) -> bool: + """Mark a task as completed in TaskMaster.""" + return self.taskmaster_client.update_task_status(task_id, 'done') + + def mark_task_in_progress(self, task_id: str) -> bool: + """Mark a task as in progress in TaskMaster.""" + return self.taskmaster_client.update_task_status(task_id, 'in-progress') + + def validate_file_path(self, file_path: str, repo_path: str) -> tuple[bool, str]: + """Validate if a file path follows project conventions. + + Args: + file_path: Path to validate + repo_path: Path to the repository root + + Returns: + tuple: (is_valid, message) + """ + if not RepoStructureTools: + # If RepoStructureTools is not available, consider all paths valid + return True, "Path validation skipped: RepoStructureTools not available" + + try: + # Create RepoStructureTools instance + structure_tools = RepoStructureTools() + + # Validate the path + validation_result = structure_tools.validate_file_path(None, file_path, repo_path) + + # Check if path is directly in root + if validation_result.startswith("WARNING:"): + return False, validation_result + elif validation_result.startswith("VALID:"): + return True, validation_result + else: + # Path doesn't match known patterns, but might still be valid + return True, validation_result + + except Exception as e: + # If validation fails, consider the path valid to avoid blocking + return True, f"Path validation error: {str(e)}" + + def ensure_parent_directories(self, file_path: str) -> bool: + """Ensure parent directories exist before file creation. + + Args: + file_path: Path to the file to be created + + Returns: + bool: True if successful, False otherwise + """ + try: + # Get parent directory + parent_dir = os.path.dirname(file_path) + + # Check if parent directory exists + if not os.path.exists(parent_dir): + # Create parent directories + os.makedirs(parent_dir, exist_ok=True) + print(f"Created parent directories for: {file_path}") + + return True + + except Exception as e: + print(f"Error creating parent directories: {str(e)}") + return False \ No newline at end of file diff --git a/jupyter_ai_personas/task_master/taskmaster_client.py b/jupyter_ai_personas/task_master/taskmaster_client.py new file mode 100644 index 0000000..41b4c13 --- /dev/null +++ b/jupyter_ai_personas/task_master/taskmaster_client.py @@ -0,0 +1,483 @@ +"""TaskMaster AI client integration using the actual TaskMaster library.""" + +import os +import json +import tempfile +import subprocess +import sys +from typing import List, Dict, Any, Optional +from dataclasses import dataclass + +# Add path for repo structure tools +sys.path.append('../pr_creation_persona') +try: + from jupyter_ai_personas.pr_creation_persona.repo_structure_tools import RepoStructureTools +except ImportError: + # Fallback path for direct imports + try: + from ..pr_creation_persona.repo_structure_tools import RepoStructureTools + except ImportError: + print("Warning: Could not import RepoStructureTools") + RepoStructureTools = None + + +@dataclass +class Task: + """Represents a task from TaskMaster.""" + id: str + title: str + description: str + priority: str + status: str + dependencies: List[str] + details: Optional[str] = None + test_strategy: Optional[str] = None + + +class TaskMasterClient: + """Client for integrating with actual TaskMaster AI.""" + + def __init__(self, project_root: str = None): + # Use a persistent directory for TaskMaster + if project_root: + self.project_root = project_root + else: + home_dir = os.path.expanduser("~") + self.project_root = os.path.join(home_dir, ".jupyter-ai-taskmaster") + os.makedirs(self.project_root, exist_ok=True) + + self.tasks: List[Task] = [] + self._taskmaster_available = True + self._ensure_taskmaster_setup() + + def _ensure_taskmaster_setup(self): + """Check if TaskMaster is available and set up config.""" + try: + # Check if npx is available + print("Checking npx availability...") + subprocess.run(['npx', '--version'], capture_output=True, check=True) + print("npx is available") + + # Check if task-master is available + print("Checking task-master availability...") + version_result = subprocess.run(['npx', 'task-master', '--version'], + capture_output=True, text=True) + print(f"task-master version: {version_result.stdout.strip() if version_result.returncode == 0 else 'not available'}") + if version_result.returncode != 0: + print(f"task-master check failed: {version_result.stderr}") + self._taskmaster_available = False + return + + # Create config directory with API key + work_dir = os.getcwd() + config_dir = os.path.join(work_dir, ".taskmaster") + os.makedirs(config_dir, exist_ok=True) + + # Get API key from environment variable + api_key = os.getenv('ANTHROPIC_API_KEY') + print(f"ANTHROPIC_API_KEY: {api_key}") + if not api_key: + raise ValueError("ANTHROPIC_API_KEY environment variable is not set") + + # Create config file with Anthropic API + config_path = os.path.join(config_dir, "config.json") + with open(config_path, "w") as f: + json.dump({ + "provider": "anthropic", + "model": "claude-3-5-sonnet-20241022", + "apiKey": api_key + }, f, indent=2) + print("Using Anthropic API for TaskMaster") + + print(f"Created TaskMaster config at {config_path}") + self._taskmaster_available = True + except (subprocess.CalledProcessError, FileNotFoundError) as e: + print(f"Warning: TaskMaster not available or timed out: {e}") + self._taskmaster_available = False + except ValueError as e: + print(f"Configuration error: {e}") + self._taskmaster_available = False + + async def create_tasks_from_prd(self, prd_content: str) -> List[Task]: + """Create tasks from PRD using TaskMaster with Anthropic API.""" + + # Check for API key in environment + api_key = os.getenv('ANTHROPIC_API_KEY') + if not api_key: + print("ANTHROPIC_API_KEY not set, using fallback task creation") + return + + try: + # Use current directory where TaskMaster is already installed + work_dir = os.getcwd() + print(f"Using current directory for TaskMaster: {work_dir}") + + # Create PRD file + prd_path = os.path.join(work_dir, 'prd.txt') + with open(prd_path, 'w') as f: + f.write(prd_content) + + # Use TaskMaster to parse PRD and generate tasks + print(f"Running TaskMaster parse-prd on {prd_path}...") + print(f"Command: npx task-master parse-prd --input {prd_path} --force") + print(f"Working directory: {work_dir}") + try: + # Run parse-prd in the current directory + result = subprocess.run([ + 'npx', 'task-master', 'parse-prd', + '--input', prd_path, + '--force' + ], cwd=work_dir, capture_output=True, text=True) + print(f"Command completed with return code: {result.returncode}") + except Exception as cmd_error: + print(f"Command execution error: {cmd_error}") + return + + print(f"TaskMaster parse-prd result: {result.returncode}") + if result.returncode != 0: + print(f"TaskMaster stderr: {result.stderr}") + print(f"TaskMaster stdout: {result.stdout}") + # Try running with --debug flag to get more information + debug_result = subprocess.run([ + 'npx', 'task-master', 'parse-prd', + '--input', prd_path, + '--force', + '--debug' + ], cwd=work_dir, capture_output=True, text=True) + print(f"Debug output: {debug_result.stdout}") + print(f"Debug errors: {debug_result.stderr}") + + try: + return self._load_tasks() + except Exception as load_error: + print(f"Error loading tasks: {load_error}") + + except Exception as e: + print(f"Error creating tasks: {e}") + return + + def _load_tasks(self) -> List[Task]: + """Load tasks from TaskMaster tasks.json file.""" + # Look for tasks.json in various possible locations + work_dir = os.getcwd() + possible_paths = [ + os.path.join(work_dir, "tasks.json"), + os.path.join(work_dir, ".taskmaster", "tasks", "tasks.json") + ] + + for path in possible_paths: + if os.path.exists(path): + tasks_path = path + print(f"Found tasks at: {tasks_path}") + break + else: + print("Tasks file not found") + return [] + + if not os.path.exists(tasks_path): + return [] + + try: + with open(tasks_path, 'r') as f: + data = json.load(f) + + tasks = [] + # Check if tasks are nested under 'master' key + if 'master' in data and 'tasks' in data['master']: + task_list = data['master']['tasks'] + else: + task_list = data.get('tasks', []) + + for task_data in task_list: + task = Task( + id=str(task_data.get('id', '')), # Convert ID to string + title=task_data.get('title', ''), + description=task_data.get('description', ''), + priority=task_data.get('priority', 'medium'), + status=task_data.get('status', 'pending'), + dependencies=[str(dep) for dep in task_data.get('dependencies', [])], # Convert dependencies to strings + details=task_data.get('details'), + test_strategy=task_data.get('testStrategy') + ) + tasks.append(task) + + self.tasks = tasks + return tasks + + except Exception as e: + print(f"Error loading tasks: {e}") + return [] + + def get_available_tasks(self) -> List[Task]: + """Get tasks that can be worked on (no unmet dependencies).""" + completed_tasks = {t.id for t in self.tasks if t.status == 'done'} + + available = [] + for task in self.tasks: + if task.status == 'pending': + if not task.dependencies or all(dep in completed_tasks for dep in task.dependencies): + available.append(task) + + return available + + def format_tasks_for_agents(self, tasks: List[Task], show_details: bool = False, repo_path: str = None) -> str: + """Format tasks for agent consumption. + + Args: + tasks: List of tasks to format + show_details: Whether to show implementation details and test strategy + repo_path: Optional repository path for file path suggestions + """ + if not tasks: + return "No tasks available." + + formatted = "" # Remove the header to make it cleaner + for task in tasks: + formatted += f"**Task #{task.id}: {task.title}**\n" + formatted += f"Description: {task.description}\n" + + # Only show these details if explicitly requested + if show_details: + formatted += f"Priority: {task.priority}\n" + formatted += f"Status: {task.status}\n" + + if task.dependencies: + formatted += f"Dependencies: {', '.join(task.dependencies)}\n" + + # Add file path suggestions if repo_path is provided + if repo_path and RepoStructureTools and show_details: + # Check if task details already include file path suggestions + if task.details and not "SUGGESTED FILE PATHS:" in task.details: + file_path_suggestions = self._generate_file_path_suggestions(task, repo_path) + if file_path_suggestions: + # Update task details with file path suggestions + if task.details: + task.details += "\n\n" + file_path_suggestions + else: + task.details = file_path_suggestions + + if task.details: + formatted += f"Implementation Details:\n{task.details}\n" + + if task.test_strategy: + formatted += f"Test Strategy:\n{task.test_strategy}\n" + + formatted += "\n" + + return formatted + + def _generate_file_path_suggestions(self, task: Task, repo_path: str) -> str: + """Generate file path suggestions for a task based on repository structure. + + Args: + task: The task to generate suggestions for + repo_path: Path to the repository root + + Returns: + str: File path suggestions formatted as markdown + """ + import re + + if not RepoStructureTools or not os.path.exists(repo_path): + return "" + + try: + # Extract potential file names from task description and details + potential_files = [] + + # Look for Python class or file mentions in description + if task.description: + class_matches = re.findall(r'class\s+([A-Za-z0-9_]+)', task.description) + file_matches = re.findall(r'([A-Za-z0-9_]+\.py)', task.description) + potential_files.extend(class_matches) + potential_files.extend(file_matches) + + # Look for Python class or file mentions in details + if task.details: + class_matches = re.findall(r'class\s+([A-Za-z0-9_]+)', task.details) + file_matches = re.findall(r'([A-Za-z0-9_]+\.py)', task.details) + potential_files.extend(class_matches) + potential_files.extend(file_matches) + + # Convert class names to potential file names if needed + for i, name in enumerate(potential_files): + if not name.endswith('.py'): + potential_files[i] = f"{name.lower()}.py" + + # Remove duplicates + potential_files = list(set(potential_files)) + + if not potential_files: + return "" + + # Create RepoStructureTools instance + structure_tools = RepoStructureTools() + + # Generate path suggestions + file_path_suggestions = "SUGGESTED FILE PATHS:\n" + + for file_name in potential_files[:3]: # Limit to first 3 files + # Determine component type from filename + component_type = 'utils' # Default + if 'test' in file_name.lower(): + component_type = 'tests' + elif 'model' in file_name.lower(): + component_type = 'models' + elif 'view' in file_name.lower(): + component_type = 'views' + elif 'controller' in file_name.lower(): + component_type = 'controllers' + elif 'service' in file_name.lower(): + component_type = 'services' + elif 'persona' in file_name.lower(): + component_type = 'personas' + + # Get suggested path + suggested_path = structure_tools.suggest_file_path(None, file_name, component_type, repo_path) + file_path_suggestions += f"- {file_name}: {suggested_path}\n" + + return file_path_suggestions + + except Exception as e: + print(f"Error generating file path suggestions: {str(e)}") + return "" + + def update_task_status(self, task_id: str, status: str) -> bool: + """Update task status using TaskMaster.""" + try: + # Run TaskMaster command to update status with the correct format + work_dir = os.getcwd() + result = subprocess.run([ + 'npx', 'task-master', 'set-status', + f'--status={status}', # Correct format: --status=done + f'--id={task_id}' # Correct format: --id=129 + ], cwd=work_dir, capture_output=True, text=True) + + print(f"TaskMaster set-status result: {result.returncode}") + if result.returncode != 0: + print(f"Error output: {result.stderr}") + print(f"Standard output: {result.stdout}") + else: + print(f"Successfully updated task {task_id} status to {status}") + + # Also update the status in our local tasks list + for task in self.tasks: + if task.id == task_id: + task.status = status + break + + # Directly update the tasks.json file as a backup method + self._update_tasks_json_directly(task_id, status) + + # Reload tasks to ensure we have the latest data + self._load_tasks() + + return result.returncode == 0 + except Exception as e: + print(f"Error updating task status: {e}") + # Try direct update as fallback + return self._update_tasks_json_directly(task_id, status) + + def _update_tasks_json_directly(self, task_id: str, status: str) -> bool: + """Directly update the task status in the tasks.json file.""" + try: + # Find the tasks.json file + work_dir = os.getcwd() + tasks_path = os.path.join(work_dir, ".taskmaster", "tasks", "tasks.json") + + if not os.path.exists(tasks_path): + print(f"Tasks file not found at {tasks_path}") + return False + + # Read the current content + with open(tasks_path, 'r') as f: + data = json.load(f) + + # Update the task status + updated = False + if 'master' in data and 'tasks' in data['master']: + for task in data['master']['tasks']: + if str(task.get('id', '')) == str(task_id): + task['status'] = status + updated = True + break + + # Write the updated content back + if updated: + with open(tasks_path, 'w') as f: + json.dump(data, f, indent=2) + print(f"Directly updated task {task_id} status to {status} in {tasks_path}") + return True + else: + print(f"Task {task_id} not found in {tasks_path}") + return False + + except Exception as e: + print(f"Error directly updating tasks.json: {e}") + return False + + def get_task_by_id(self, task_id: str) -> Optional[Task]: + """Get a task by its ID.""" + for task in self.tasks: + if task.id == task_id: + return task + return None + + def get_task_details(self, task_id: str) -> str: + """Get formatted details for a specific task.""" + task = self.get_task_by_id(task_id) + if not task: + return f"Task with ID {task_id} not found." + + # Format with full details + return self.format_tasks_for_agents([task], show_details=True) + + def analyze_repository_structure(self, repo_path: str) -> Dict[str, Any]: + """Analyze repository structure and create folder map for component placement. + + Args: + repo_path: Path to the repository root + + Returns: + Dict with structure analysis results + """ + if not RepoStructureTools or not repo_path or not os.path.exists(repo_path): + print(f"Cannot analyze repository structure: RepoStructureTools not available or invalid path") + return {} + + try: + # Create RepoStructureTools instance + structure_tools = RepoStructureTools() + + # Analyze folder structure + print("Analyzing repository folder structure...") + structure_analysis = structure_tools.analyze_folder_structure(None, repo_path) + + # Get component placement map + print("\nGenerating component placement map...") + placement_map = structure_tools.get_component_placement_map(None, repo_path) + + # Analyze project templates + print("\nAnalyzing project templates...") + template_analysis = structure_tools.analyze_project_templates(None, repo_path) + + # Parse the placement map to create folder_map + folder_map = {} + for line in placement_map.split('\n'): + if ': ' in line and line.startswith('- '): + comp_type, folder = line.replace('- ', '').split(': ', 1) + folder_map[comp_type.strip()] = folder.strip() + + print(f"\nFolder map created: {folder_map}") + + return { + "structure_analysis": structure_analysis, + "placement_map": placement_map, + "template_analysis": template_analysis, + "folder_map": folder_map + } + + except Exception as e: + print(f"Error analyzing repository structure: {str(e)}") + return {} + \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0c403d7..e7d8de9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,10 @@ pr_review = [ "agno", "boto3", "pygithub", + "langchain-core", + "neo4j", + "tree-sitter", + "tree-sitter-python", ] data_analytics = [ @@ -62,7 +66,14 @@ data_analytics = [ "seaborn" ] -all = ["jupyter-ai-personas[finance,emoji,software_team,data_analytics,pr_review]"] +task_master = [ + "agno", + "boto3", + "requests", + "pygithub", + ] + +all = ["jupyter-ai-personas[finance,emoji,software_team,data_analytics,pr_review, task_master]"] [build-system] requires = ["hatchling"] @@ -74,3 +85,4 @@ emoji_persona = "jupyter_ai_personas.emoji_persona.persona:EmojiPersona" software_team_persona = "jupyter_ai_personas.software_team_persona.persona:SoftwareTeamPersona" data_analytics_persona = "jupyter_ai_personas.data_analytics_persona.persona:DataAnalyticsTeam" pr_review_persona = "jupyter_ai_personas.pr_review_persona.persona:PRReviewPersona" +pr_creation_persona = "jupyter_ai_personas.pr_creation_persona.persona:PRCreationPersona" \ No newline at end of file