diff --git a/examples/github_coding_agent/.env.example b/examples/github_coding_agent/.env.example new file mode 100644 index 00000000..34108dde --- /dev/null +++ b/examples/github_coding_agent/.env.example @@ -0,0 +1,2 @@ +GITHUB_API_KEY=__github_pat_your_github_token_here__ +LLAMA_STACK_URL=http://localhost:5000 \ No newline at end of file diff --git a/examples/github_coding_agent/.gitignore b/examples/github_coding_agent/.gitignore new file mode 100644 index 00000000..8ec79880 --- /dev/null +++ b/examples/github_coding_agent/.gitignore @@ -0,0 +1,5 @@ +env/ +.env +sandbox/ +__pycache__/ +.pytest_cache/ \ No newline at end of file diff --git a/examples/github_coding_agent/DEVELOPMENT.md b/examples/github_coding_agent/DEVELOPMENT.md new file mode 100644 index 00000000..dad05ccf --- /dev/null +++ b/examples/github_coding_agent/DEVELOPMENT.md @@ -0,0 +1,6 @@ + +### Running the tests + +```bash +pytest tests/ -v +``` diff --git a/examples/github_coding_agent/README.md b/examples/github_coding_agent/README.md new file mode 100644 index 00000000..19e672cf --- /dev/null +++ b/examples/github_coding_agent/README.md @@ -0,0 +1,88 @@ +# GitHub Coding Agent + +This is a coding agent that can read a GitHub issue, write code and submits a PR with a fix. It currently performs 5% on [SWE-Bench lite](https://www.swebench.com/). Demo: + +https://github.com/user-attachments/assets/1c579e25-7630-404c-8ce5-1b72c2a22c1c + +## What You Need +- A GitHub account +- git installed on your computer +- Python 3.10 + +## Setup Steps + +1. Start Llama Stack: + +This uses the fireworks distribution of Llama Stack, but will work with any other distribution that supports 3.3 70B model (405b support coming soon). +```bash +export LLAMA_STACK_PORT=5000 +export FIREWORKS_API_KEY=your_key_here +docker run -it \ + -p $LLAMA_STACK_PORT:$LLAMA_STACK_PORT \ + -v ~/.llama:/root/.llama \ + llamastack/distribution-fireworks \ + --port $LLAMA_STACK_PORT \ + --env FIREWORKS_API_KEY=$FIREWORKS_API_KEY +``` + +2. Get a GitHub token: + - Go to https://github.com/settings/personal-access-tokens/new + - Enter a name + - Pick which repositories it has access to +![Giving access to a repo](docs/images/repo-permissions.png) + - Give it these permissions: +![GitHub repo permissions](docs/images/github-repo-permissions.png) + - Create the token and copy it +3. Setup your .env file: +```bash +cp .env.example .env +``` +Then open `.env` and add your GitHub token: +``` +GITHUB_API_KEY=github_pat_11SDF... +``` + +4. Create a virtual environment: +```bash +# python -m venv .venv should also work here as well but this is only tested on python 3.10 +conda create -n llama-stack-coding-agent python=3.10 +conda activate llama-stack-coding-agent +``` + +5. Install the dependencies: + +```bash +pip install -r requirements.txt +``` + +6. Start the agent: +```bash +python -m llama_agent.main --issue-url your_github_issue_url + +# For example: +# python -m llama_agent.main --issue-url https://github.com/example-user/example-repo/issues/34 +``` + +## What It Does +- Reads GitHub issues +- Clones the repository under `sandbox/` +- Creates a fix locally +- Makes a new branch +- Submits a Pull Request with the fixes +- If it can't fix something, it leaves a comment explaining why +- Only supports Llama 3.3 70B at the moment + +## Is It Safe? + +Yes - the LLM: + +- Doesn't have access to git tools or GitHub API (regular logic makes git commands) +- Doesn't execute any commands or arbitrary code +- Only works in a sandbox folder +- Won't push to your main branch +- Only creates new branches +- Won't close or merge Pull Requests +- Can't close/edit issues + +## Evaluation results +This currently performs 5% on SWE-Bench lite using Llama 3.3 70B; There are a lot of opportunities for improvement. See the evaluation results here: https://huggingface.co/datasets/aidando73/llama-codes-swe-bench-evals/tree/main. diff --git a/examples/github_coding_agent/docs/images/github-repo-permissions.png b/examples/github_coding_agent/docs/images/github-repo-permissions.png new file mode 100644 index 00000000..707faa1f Binary files /dev/null and b/examples/github_coding_agent/docs/images/github-repo-permissions.png differ diff --git a/examples/github_coding_agent/docs/images/repo-permissions.png b/examples/github_coding_agent/docs/images/repo-permissions.png new file mode 100644 index 00000000..78d2b970 Binary files /dev/null and b/examples/github_coding_agent/docs/images/repo-permissions.png differ diff --git a/examples/github_coding_agent/llama_agent/__init__.py b/examples/github_coding_agent/llama_agent/__init__.py new file mode 100644 index 00000000..f862b7ba --- /dev/null +++ b/examples/github_coding_agent/llama_agent/__init__.py @@ -0,0 +1,4 @@ +import os + +REPO_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +SANDBOX_DIR = os.path.join(REPO_DIR, "sandbox") \ No newline at end of file diff --git a/examples/github_coding_agent/llama_agent/agent.py b/examples/github_coding_agent/llama_agent/agent.py new file mode 100644 index 00000000..19dd58c5 --- /dev/null +++ b/examples/github_coding_agent/llama_agent/agent.py @@ -0,0 +1,472 @@ +import os +from typing import Literal, Optional, Tuple, Union +from llama_stack_client import LlamaStackClient +from llama_models.llama3.api.chat_format import ChatFormat +from llama_models.llama3.api.tokenizer import Tokenizer +from llama_models.llama3.api.datatypes import StopReason +from llama_models.llama3.api.tool_utils import ( + is_valid_python_list, + parse_python_list_for_function_calls, +) +import re +from llama_agent.utils.file_tree import list_files_in_repo +from llama_agent import REPO_DIR +from llama_agent.utils.ansi import red, yellow, magenta, blue +from subprocess import run + +# Currently only supports 3.3-70B-Instruct at the moment since it depends on the 3.3/3.2 tool prompt format +MODEL_ID = "meta-llama/Llama-3.3-70B-Instruct" +ITERATIONS = 15 + +SANDBOX_DIR = os.path.join(REPO_DIR, "sandbox") +# We give the agent a virtual working directory so it doesn't have to worry about long absolute paths +AGENT_WORKING_DIR = "/workspace/" + +formatter = ChatFormat(Tokenizer.get_instance()) + + +def run_agent( + client: LlamaStackClient, repo: str, issue_title: str, issue_body: str +) -> Tuple[Literal["changes_made", "no_changes_made"], str, Optional[str]]: + """ + Returns: + Tuple[Literal["changes_made", "no_changes_made"], str, Optional[str]]: + ("changes_made", pr_title, pr_body): "changes_made", the PR title, and the PR body + or ("no_changes_made", reasoning, None): "no_changes_made", the reason why no changes were made, and None + """ + + # System prompt + message = "<|begin_of_text|>" + message += header("system") + message += """ + You are an expert software engineer. + You will be given a problem statement in + + Based on the , you will need to make one or more function/tool calls to achieve the purpose. + If none of the function can be used, point it out. If the given question lacks the parameters required by the function, + also point it out. You should only return the function call in tools call sections. + + If you decide to invoke any of the function(s), you MUST put it in the format of [func_name1(params_name1=params_value1, params_name2=params_value2...), func_name2(params)] + If you decide to invoke multiple functions, you MUST put commas between the function calls. E.g., [func_name1(params), func_name2(params), func_name3(params)] + + Here is a list of functions in JSON format that you can invoke. + + [ + { + "name": "list_files", + "description": "List all files in a directory.", + "parameters": { + "type": "dict", + "required": ["path"], + "properties": { + "path": { + "type": "string", + "description": "Absolute path to a directory, e.g. `/workspace/django`. If referencing a file, will return the name of the file." + } + }, + } + }, + { + "name": "edit_file", + "description": "Edit a file. Specify the path to the file and the new_str to write to it. If old_str is specified, only the old_str will be replaced with new_str, otherwise the entire file will be replaced by new_str.", + "parameters": { + "type": "dict", + "required": ["path", "new_str"], + "properties": { + "path": { + "type": "string", + "description": "Absolute path to file or directory, e.g. `/workspace/django/file.py` or `/workspace/django`." + }, + "old_str": { + "type": "string", + "description": "The string in the file at `path` to replace. If not specified, the entire file will be replaced by new_str" + }, + "new_str": { + "type": "string", + "description": "The new string to write to the file. If the old_str is specified, only the old_str will be replaced with new_str, otherwise the entire file will be replaced by new_str." + } + } + } + }, + { + "name": "view_file", + "description": "View a file", + "parameters": { + "type": "dict", + "required": ["path"], + "properties": { + "path": { + "type": "string", + "description": "The absolute path to the file to view, e.g. `/workspace/django/file.py` or `/workspace/django`." + } + } + } + }, + { + "name": "finish", + "description": "If you have solved the problem, you can call this function to finish the task.", + "parameters": {} + } + ] + + Please explain your reasoning before you make any edits in a tag. + + <|eot_id|> + """.strip() + + # User prompt + message += header("user") + files_in_repo = "\n".join( + list_files_in_repo(os.path.join(SANDBOX_DIR, repo), depth=2) + ) + message += f""" + + {os.path.join(AGENT_WORKING_DIR, repo)} + + + + {files_in_repo} + + + + Issue title: {issue_title} + Issue body: {issue_body} + + + You are in the working directory as specified in . Please specify paths in absolute paths only. + I have included the top level files and directories in the repository in . + Please start by listing out and viewing files in the repository to understand the problem.<|eot_id|> + """.strip() + + finished = False + for i in range(ITERATIONS): + print("\n") + print(f"Iteration {i+1} of {ITERATIONS}") + print("-" * 80) + + if finished: + break + + message += header("assistant") + response = client.inference.completion( + model_id=MODEL_ID, + content=message, + ) + + # Display thinking alongside with tool calls + thinking_match = re.search( + r"(.*?)", response.content, re.DOTALL + ) + if thinking_match: + print(f"Thinking: {magenta(thinking_match.group(1).strip())}") + else: + # Check for any text outside of tool tags + non_tool_content = re.sub( + r".*?", "", response.content, flags=re.DOTALL + ).strip() + if non_tool_content: + print(f"Thinking: {magenta(non_tool_content)}") + + message += response.content + message += f"<|eot_id|>" + + # Evaluate tool calls + tool_calls = parse_tool_calls(response.content) + for tool_call in tool_calls: + + if tool_call[0] == "error": + _, error_message = tool_call + msg = f"ERROR - Could not parse tool call: {error_message}" + print(red(msg)) + message += chat_message("tool", msg) + continue + + tool_name, tool_params = tool_call + msg = ( + f"Executing tool call: " + + blue(f"[{tool_name}{display_tool_params(tool_params)}]") + ) + message += header("tool") + message += msg + "\n" + print(msg) + + try: + result, result_msg = execute_tool_call(tool_name, tool_params) + except Exception as e: + result, result_msg = ("error", f"ERROR - Calling tool: {tool_name} {e}") + + message += f"Result: {result_msg}\n" + + if result == "success": + # Truncate the result message to 200 characters since it can be long + print("Result: " + result_msg[:200] + "...") + else: + print("Result: " + result_msg) + + message += f"<|eot_id|>" + + if result == "success" and tool_name == "finish": + finished = True + + if finished: + print(blue("Agent marked as finished")) + else: + print(yellow("Max iterations reached")) + + # Create a PR title + message += chat_message( + "user", + "Please create a PR title that summarizes the changes you've made. Do not include any leading or trailing punctuation.", + ) + message += header("assistant") + response = client.inference.completion( + model_id=MODEL_ID, + content=message, + ) + pr_title = response.content + + # Check if there are any changes + # If there are no changes, ask the agent to explain why + diff_cmd = run(f"cd {os.path.join(SANDBOX_DIR, repo)} && git diff", shell=True, capture_output=True) + if not diff_cmd.stdout: + print(f"No changes were made - agent explaining why...") + message += chat_message( + "user", + ( + "No changes were made." + "Could you explain your reasoning for not making any changes?" + "Please write it in GitHub Flavored Markdown." + "Also provide some next steps to fix the issue." + ), + ) + message += header("assistant") + response = client.inference.completion( + model_id=MODEL_ID, + content=message, + ) + reasoning = response.content + return ("no_changes_made", reasoning, None) + + # Create a PR body + message += chat_message( + "user", + ( + "Summarizing all of the changes and thinking you've done," + "please write a PR body that explains the changes you've made." + "Please write it in GitHub Flavored Markdown." + ), + ) + message += header("assistant") + # Llama sometimes includes an unnecessary "## PR body" title so we add it here to make sure it's not included + message += "## PR Body\n\n" + response = client.inference.completion( + model_id=MODEL_ID, + content=message, + ) + pr_body = response.content + + return "changes_made", pr_title, pr_body + + +def execute_tool_call( + tool_name: str, tool_params: dict[str, str] +) -> Union[Tuple[Literal["success"], str], Tuple[Literal["error"], str]]: + """ + Execute a tool call and return a message indicating the result of the tool call. + + Args: + tool_name (str): The name of the tool to execute. + tool_params (dict[str, str]): The parameters to pass to the tool. + + Returns: + Union[Tuple[Literal["success"], str], Tuple[Literal["error"], str]]: + ("success", result): The result of the tool call. + ("error", error_message): The error message if the tool call failed. + """ + if tool_name == "list_files": + if (error := validate_param_exists("path", tool_params) + or validate_not_symlink(tool_params["path"]) + or validate_path_in_sandbox(tool_params["path"]) + or validate_directory_exists(tool_params["path"])): + return ("error", error) + + path = translate_path(tool_params["path"]) + files = list_files_in_repo(path, depth=1) + return ("success", "\n".join(files)) + + elif tool_name == "edit_file": + if ( + error := validate_param_exists("path", tool_params) + or validate_path_in_sandbox(tool_params["path"]) + or validate_param_exists("new_str", tool_params) + or validate_not_symlink(tool_params["path"]) + or validate_file_exists(tool_params["path"]) + or validate_not_a_directory(tool_params["path"]) + ): + return ("error", error) + + path = translate_path(tool_params["path"]) + if "old_str" in tool_params: + with open(f"{path}", "r") as f: + file_content = f.read() + with open(f"{path}", "w") as f: + old_str = tool_params["old_str"] + new_str = tool_params["new_str"] + new_content = file_content.replace(old_str, new_str) + f.write(new_content) + else: + with open(f"{path}", "w") as f: + f.write(tool_params["new_str"]) + return ("success", "File successfully updated") + + + elif tool_name == "view_file": + if (error := validate_param_exists("path", tool_params) + or validate_not_symlink(tool_params["path"]) + or validate_path_in_sandbox(tool_params["path"]) + or validate_file_exists(tool_params["path"]) + or validate_not_a_directory(tool_params["path"])): + return ("error", error) + + path = translate_path(tool_params["path"]) + with open(f"{path}", "r") as f: + file_content = f.read() + return ("success", file_content) + + elif tool_name == "finish": + return ("success", "Task marked as finished") + + else: + return ("error", f"ERROR - Unknown tool: {tool_name}") + + +def translate_path(path: str) -> str: + if path.startswith(AGENT_WORKING_DIR): + return os.path.join(SANDBOX_DIR, path[len(AGENT_WORKING_DIR) :]) + else: + return os.path.join(SANDBOX_DIR, path) + + +def parse_tool_calls( + content, +) -> list[Union[tuple[str, dict[str, str]], tuple[Literal["error"], str]]]: + """ + Parse tool calls from the content. + + Args: + content (str): The content to parse tool calls from. + + Returns: + list[Union[tuple[str, dict[str, str]], tuple[Literal["error"], str]]: Either: + tuple[str, dict[str, str]]: + - name (str): The name of the tool + - params (dict): The parameters of the tool + or tuple[Literal["error"], str] if the tool call could not be parsed: + - "error" + - error_message (str): The error message + """ + tool_calls = [] + for match in re.finditer(r"(.*?)", content, re.DOTALL): + tool_content = match.group(1) + if not is_valid_python_list(tool_content): + tool_content = tool_content.strip() + + # Add square brackets if missing + if not tool_content.startswith("["): + tool_content = f"[{tool_content}" + if not tool_content.endswith("]"): + tool_content = f"{tool_content}]" + + try: + result = parse_python_list_for_function_calls(tool_content) + if is_valid_python_list(tool_content): + # Add the original tool content to each result tuple + result = [(name, params) for name, params in result] + tool_calls.extend(result) + else: + tool_calls.append( + ( + "error", + "Tool call invalid syntax: " + match.group(0), + ) + ) + except Exception as e: + tool_calls.append( + ( + "error", + "Tool call invalid syntax: Could not parse tool call: " + + match.group(0) + + " " + + str(e), + ) + ) + + return tool_calls + + +def display_tool_params(tool_params: dict[str, str]): + return ( + "(" + + ", ".join( + [ + param_name + '="' + str(param_value) + '"' + for param_name, param_value in tool_params.items() + ] + ) + + ")" + ) + + +def validate_param_exists( + param_name: str, tool_params: dict[str, str] +) -> Optional[str]: + if param_name not in tool_params: + return f"ERROR - {param_name} not found in tool params: {display_tool_params(tool_params)}" + return None + +def validate_path_in_sandbox(path: str) -> Optional[str]: + """ + Validate that a path stays within the sandbox directory. + + Args: + path (str): The path to validate + + Returns: + Optional[str]: Error message if path is invalid, None if valid + """ + # Resolve the absolute path after translation to catch any ../ tricks + path = translate_path(path) + resolved_path = os.path.abspath(path) + sandbox_path = os.path.abspath(SANDBOX_DIR) + + if not resolved_path.startswith(sandbox_path): + # From the agent's perspective, any paths not in the sandbox don't exist + return f"ERROR - File {path} does not exist" + return None + +def validate_not_symlink(path: str) -> Optional[str]: + if os.path.islink(translate_path(path)): + return f"ERROR - File {path} is a symlink. Simlinks not allowed" + return None + +def validate_file_exists(path: str) -> Optional[str]: + if not os.path.exists(translate_path(path)): + return f"ERROR - File {path} does not exist. Please ensure the path is an absolute path and that the file exists." + return None + +def validate_not_a_directory(path: str) -> Optional[str]: + if os.path.isdir(translate_path(path)): + return f"ERROR - File {path} is a directory. Please ensure the path references a file, not a directory." + return None + +def validate_directory_exists(path: str) -> Optional[str]: + if not os.path.exists(translate_path(path)): + return f"ERROR - Directory {path} does not exist. Please ensure the path is an absolute path and that the directory exists." + return None + +def chat_message(role: Literal["user", "assistant", "system", "tool"], content: str): + return f"<|start_header_id|>{role}<|end_header_id|>\n\n{content}<|eot_id|>" + + +def header(role: Literal["user", "assistant", "system", "tool"]): + return f"<|start_header_id|>{role}<|end_header_id|>\n\n" diff --git a/examples/github_coding_agent/llama_agent/github.py b/examples/github_coding_agent/llama_agent/github.py new file mode 100644 index 00000000..a24d8562 --- /dev/null +++ b/examples/github_coding_agent/llama_agent/github.py @@ -0,0 +1,36 @@ +class Issue: + repo: str + owner: str + issue_number: int + + def __init__(self, url: str): + """ + Parse the issue url into the owner, repo and issue number + """ + # Truncate the https:// prefix if it exists + base_url = url + if url.startswith("https://"): + url = url[len("https://") :] + + # Check that the domain is github.com + if not url.startswith("github.com"): + raise ValueError(f"Expected github.com as the domain: {base_url}") + + parts = url.split("/") + if ( + len(parts) < 5 + ): # We expect 5 parts: github.com/owner/repo/issues/issue_number + raise ValueError("Invalid GitHub issue URL format") + + self.owner = parts[1] + self.repo = parts[2] + if parts[3] != "issues": + raise ValueError(f"Expected /issues/ in the URL: {base_url}") + + if parts[4] == "": + raise ValueError(f"Expected an issue number in the URL: {base_url}") + + try: + self.issue_number = int(parts[4]) # Issue number + except ValueError: + raise ValueError(f"Expected an integer issue number: {parts[4]}") diff --git a/examples/github_coding_agent/llama_agent/main.py b/examples/github_coding_agent/llama_agent/main.py new file mode 100644 index 00000000..f78e59f0 --- /dev/null +++ b/examples/github_coding_agent/llama_agent/main.py @@ -0,0 +1,192 @@ +import argparse +import os +import json +from typing import Tuple +import requests +from llama_agent.utils.ansi import bold, red, green, yellow, blue, magenta, cyan +from dotenv import load_dotenv +from llama_agent.agent import run_agent, MODEL_ID +import shutil +import time +from llama_stack_client import LlamaStackClient +from llama_agent.github import Issue +from llama_agent import SANDBOX_DIR +from subprocess import run + + +def main(issue_url: str): + github_api_key = os.getenv("GITHUB_API_KEY") + if not github_api_key: + raise ValueError("GITHUB_API_KEY is not set in the environment variables") + + llama_stack_url = os.getenv("LLAMA_STACK_URL") + if not llama_stack_url: + raise ValueError("LLAMA_STACK_URL is not set in the environment variables") + + client = LlamaStackClient(base_url=llama_stack_url) + + models = client.models.list() + if MODEL_ID not in [model.identifier for model in models]: + raise ValueError( + f"Model {MODEL_ID} not found in LlamaStack. Llama Stack Coding Agent only supports {MODEL_ID} at the moment." + ) + + issue = Issue(issue_url) + print( + f"Issue {'#' + str(issue.issue_number)} in {f'{issue.owner}/{issue.repo}'}" + ) + print() + + response = requests.get( + f"https://api.github.com/repos/{issue.owner}/{issue.repo}/issues/{issue.issue_number}", + headers={"Authorization": f"Bearer {github_api_key}"}, + ) + issue_data = response.json() + print(f"Title: {cyan(issue_data['title'])}") + print(f"Body: {magenta(issue_data['body'])}") + print() + + # Make sure the sandbox directory exists + os.makedirs(SANDBOX_DIR, exist_ok=True) + + repo_path = os.path.join(SANDBOX_DIR, issue.repo) + + # git clone the repo + # Check if repo already exists and remove it if it does + if not os.path.exists(repo_path): + print("Cloning repo...") + os.system( + f"cd sandbox && git clone https://{github_api_key}@github.com/{issue.owner}/{issue.repo}.git" + ) + + cmd = run( + f"cd {repo_path} && git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@'", + shell=True, + check=True, + capture_output=True, + ) + default_branch = cmd.stdout.decode().strip() + + if not os.path.exists(repo_path): + cmd = run( + f"cd {repo_path} && git checkout -f {default_branch}", + shell=True, + check=True, + capture_output=True, + ) + else: + + # If we have a different token, we need to update the remote url + run( + f"cd {repo_path} && git remote set-url origin https://{github_api_key}@github.com/{issue.owner}/{issue.repo}.git", + shell=True, + check=True, + capture_output=True, + ) + + # Force checkout main branch if repo exists + print("Setting up repo...") + cmd = run( + f"cd {repo_path} && git checkout -f {default_branch}", + shell=True, + check=True, + capture_output=True, + ) + cmd = run( + f"cd {repo_path} && git clean -fdx", + shell=True, + check=True, + capture_output=True, + ) + + # Run the agent + agent_response = run_agent( + client, issue.repo, issue_data["title"], issue_data["body"] + ) + + branch_name = f"llama-agent-{issue.issue_number}-{int(time.time())}" + + changes_made = agent_response[0] + if changes_made == "no_changes_made": + reasoning = agent_response[1] + + run( + f"cd {repo_path} && " + f"touch .keep && " + f"git checkout -b {branch_name} && " + f"git add . && " + f"git commit -m 'Initial commit' && " + f"git push origin {branch_name}", + shell=True, + check=True, + capture_output=True, + ) + + # Create an issue comment explaining the reasoning + response = requests.post( + f"https://api.github.com/repos/{issue.owner}/{issue.repo}/pulls", + headers={"Authorization": f"Bearer {github_api_key}"}, + json={ + "title": f"Agent attempted to solve: #{issue.issue_number} - {issue_data['title']}", + "body": f"Agent attempted to resolve #{issue.issue_number}, but no changes were made. Here's it's explanation:\n\n{reasoning}", + "head": branch_name, + "base": default_branch, + }, + ) + + if response.status_code != 201: + raise ValueError(f"Failed to create PR: {response.json()}") + + print() + print( + f"Agent attempted to solve the issue, but no changes were made. It's explanation is on the PR:\n\n" + f"\t{yellow(response.json()['html_url'])}" + ) + else: + pr_title = agent_response[1] + pr_body = agent_response[2] + + # Commit changes and create a new branch + + cmd = run( + f"cd {repo_path} && " + f"git checkout -b {branch_name} && " + f"git add . && " + f"git commit -m 'Testing new PR' && " + f"git push origin {branch_name}", + shell=True, + capture_output=True, + ) + if cmd.returncode != 0: + raise ValueError(f"Failed to create new branch: {cmd.stderr.decode()}") + + # Create a new PR + response = requests.post( + f"https://api.github.com/repos/{issue.owner}/{issue.repo}/pulls", + headers={"Authorization": f"Bearer {github_api_key}"}, + json={ + "title": f"#{issue.issue_number} - {pr_title}", + "body": f"Resolves #{issue.issue_number}\n{pr_body}", + "head": branch_name, + "base": default_branch, + }, + ) + if response.status_code != 201: + raise ValueError(f"Failed to create new PR: {response.json()}") + + print() + print(f"Created new PR: {green(response.json()['html_url'])}") + + +if __name__ == "__main__": + load_dotenv() + parser = argparse.ArgumentParser() + parser.add_argument( + "--issue-url", + type=str, + required=True, + help="The issue url to solve. E.g., https://github.com/aidando73/bitbucket-syntax-highlighting/issues/67", + ) + args = parser.parse_args() + + main(issue_url=args.issue_url) diff --git a/examples/github_coding_agent/llama_agent/utils/__init__.py b/examples/github_coding_agent/llama_agent/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/github_coding_agent/llama_agent/utils/ansi.py b/examples/github_coding_agent/llama_agent/utils/ansi.py new file mode 100644 index 00000000..0bf1f117 --- /dev/null +++ b/examples/github_coding_agent/llama_agent/utils/ansi.py @@ -0,0 +1,37 @@ +# ANSI color codes for terminal output +ANSI_RED = "\033[91m" +ANSI_GREEN = "\033[92m" +ANSI_YELLOW = "\033[93m" +ANSI_BLUE = "\033[94m" +ANSI_MAGENTA = "\033[95m" +ANSI_CYAN = "\033[96m" +ANSI_RESET = "\033[0m" +ANSI_BOLD = "\033[1m" + +def red(text: any) -> str: + """Wrap text in red ANSI color""" + return f"{ANSI_RED}{text}{ANSI_RESET}" + +def green(text: any) -> str: + """Wrap text in green ANSI color""" + return f"{ANSI_GREEN}{text}{ANSI_RESET}" + +def yellow(text: any) -> str: + """Wrap text in yellow ANSI color""" + return f"{ANSI_YELLOW}{text}{ANSI_RESET}" + +def blue(text: any) -> str: + """Wrap text in blue ANSI color""" + return f"{ANSI_BLUE}{text}{ANSI_RESET}" + +def magenta(text: any) -> str: + """Wrap text in magenta ANSI color""" + return f"{ANSI_MAGENTA}{text}{ANSI_RESET}" + +def cyan(text: any) -> str: + """Wrap text in cyan ANSI color""" + return f"{ANSI_CYAN}{text}{ANSI_RESET}" + +def bold(text: any) -> str: + """Wrap text in bold ANSI style""" + return f"{ANSI_BOLD}{text}{ANSI_RESET}" \ No newline at end of file diff --git a/examples/github_coding_agent/llama_agent/utils/file_tree.py b/examples/github_coding_agent/llama_agent/utils/file_tree.py new file mode 100644 index 00000000..4329d630 --- /dev/null +++ b/examples/github_coding_agent/llama_agent/utils/file_tree.py @@ -0,0 +1,96 @@ +import os +from typing import List +from subprocess import run + +class Directory: + name: str + files: set[str] + directories: set["Directory"] + + def __init__(self, name: str): + self.name = name + self.files = set() + self.directories = set() + + def add_file(self, file: str): + self.files.add(file) + + def add_directory(self, directory: "Directory"): + self.directories.add(directory) + + def __str__(self): + return f"{self.name} ({len(self.files)} files, {len(self.directories)} directories)" + + def __repr__(self): + return f"{self.name} ({len(self.files)} files, {len(self.directories)} directories)" + + def __eq__(self, other): + return self.name == other.name + + def __hash__(self): + return hash(self.name) + + +def list_files_in_repo(path: str, depth: int = 1) -> List[str]: + """ + List all files in the given path, up to the given depth + Returns a list of file paths, including directories + Directories are represented by a trailing slash + Directories are displayed first, then files + + Assumes the directory is in a git repo. Will exclude any files that are not in the git repo + """ + + if not os.path.exists(path): + raise FileNotFoundError(f"File {path} does not exist") + + if os.path.isfile(path): + return [path] + + # We use git ls-tree to ignore any files like .git/ + cmd = run( + f"cd {path} && git ls-tree -r --name-only HEAD", + shell=True, + text=True, + capture_output=True, + ) + files = cmd.stdout + if cmd.returncode != 0: + raise AssertionError(f"Failed to list files in repo: {cmd.stderr}") + + files = files.splitlines() + + root = Directory(path) + for file in files: + # Sometimes git ls-tree returns files with quotes around them + # E.g., for files with spaces in their name + file = file.strip('"') + parts = file.split("/") + cur = root + for i in range(len(parts)): + if i + 1 > depth: + break + if i == len(parts) - 1: + cur.add_file(parts[i]) + else: + if Directory(parts[i]) not in cur.directories: + temp = Directory(parts[i]) + cur.add_directory(temp) + cur = temp + else: + cur = next(d for d in cur.directories if d.name == parts[i]) + res = [] + + def dfs(directory: Directory, path=""): + # Recursively process subdirectories + for subdir in sorted(directory.directories, key=lambda x: x.name): + subdir_path = os.path.join(path, subdir.name) + res.append(subdir_path + "/") # Add trailing slash for directories + dfs(subdir, subdir_path) + + # Add all files in current directory + for file in sorted(directory.files): + res.append(os.path.join(path, file)) + + dfs(root) + return res diff --git a/examples/github_coding_agent/requirements-dev.txt b/examples/github_coding_agent/requirements-dev.txt new file mode 100644 index 00000000..55b033e9 --- /dev/null +++ b/examples/github_coding_agent/requirements-dev.txt @@ -0,0 +1 @@ +pytest \ No newline at end of file diff --git a/examples/github_coding_agent/requirements.txt b/examples/github_coding_agent/requirements.txt new file mode 100644 index 00000000..96ef9bfa --- /dev/null +++ b/examples/github_coding_agent/requirements.txt @@ -0,0 +1,8 @@ +argparse +requests +python-dotenv +pydantic +llama-stack-client +llama-models +# Required for llama-models +blobfile \ No newline at end of file diff --git a/examples/github_coding_agent/tests/__init__.py b/examples/github_coding_agent/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/github_coding_agent/tests/test_agent.py b/examples/github_coding_agent/tests/test_agent.py new file mode 100644 index 00000000..fa2462be --- /dev/null +++ b/examples/github_coding_agent/tests/test_agent.py @@ -0,0 +1,356 @@ +import pytest +from subprocess import run +from llama_agent.agent import ( + display_tool_params, + parse_tool_calls, + translate_path, + SANDBOX_DIR, + execute_tool_call, + REPO_DIR, +) +from llama_agent.utils.file_tree import list_files_in_repo +import tempfile +import os +import shutil + + +class TestDisplayToolParams: + def test_no_params(self): + assert display_tool_params({}) == "()" + + def test_one_param(self): + assert display_tool_params({"a": "b"}) == '(a="b")' + + def test_three_params(self): + assert ( + display_tool_params({"a": "b", "c": "d", "e": "f"}) + == '(a="b", c="d", e="f")' + ) + + +class TestParseToolCallFromContent: + def test_basic_tool_call(self): + content = '[func1(a="1", b="2")]' + assert parse_tool_calls(content) == [("func1", {"a": "1", "b": "2"})] + + def test_empty_arg(self): + content = '[func1(a="1", b=)]' + + res = parse_tool_calls(content) + + assert len(res) == 1 + error, error_message = res[0] + assert error == "error" + assert "Tool call invalid syntax" in error_message + + def test_handles_missing_left_matching_bracket(self): + content = "func1()]" + + res = parse_tool_calls(content) + + assert len(res) == 1 + tool_name, tool_params = res[0] + assert tool_name == "func1" + assert tool_params == {} + + def test_handles_missing_right_matching_bracket(self): + content = '[func1(a="1", b="2")]' + + res = parse_tool_calls(content) + + assert len(res) == 1 + tool_name, tool_params = res[0] + assert tool_name == "func1" + assert tool_params == {"a": "1", "b": "2"} + + def test_handles_missing_left_matching_bracket_and_right_matching_bracket(self): + content = 'func1(a="1", b="2")' + + res = parse_tool_calls(content) + + assert len(res) == 1 + tool_name, tool_params = res[0] + assert tool_name == "func1" + assert tool_params == {"a": "1", "b": "2"} + + def test_handles_multiple_tool_calls(self): + content = '[func1(a="1", b="2"), func2(c="3", d="4")]' + + res = parse_tool_calls(content) + + assert len(res) == 2 + assert res[0] == ("func1", {"a": "1", "b": "2"}) + assert res[1] == ("func2", {"c": "3", "d": "4"}) + + def test_handles_multiple_tool_tags_and_text(self): + content = """ + I should use func1 to do something. + [func1(a="1", b="2")] + I should use func2 to do something else. + [func2(c="3", d="4")] + """ + + res = parse_tool_calls(content) + + assert len(res) == 2 + assert res[0] == ("func1", {"a": "1", "b": "2"}) + assert res[1] == ("func2", {"c": "3", "d": "4"}) + + +class TestFileTree: + @pytest.fixture(autouse=True) + def setup_method(self): + """Set up test environment before each test method""" + # Create a temporary directory for tests + # Create a new temp directory for each test + self.test_dir = tempfile.mkdtemp() + + yield + + # Clean up the temp directory after each test + shutil.rmtree(self.test_dir) + + def test_file_not_found(self): + with pytest.raises( + FileNotFoundError, match="File /workspace/does_not_exist does not exist" + ): + list_files_in_repo("/workspace/does_not_exist") + + def test_handles_if_file_is_not_in_git(self): + open(os.path.join(self.test_dir, "file1.txt"), "w").close() + + with pytest.raises(AssertionError, match="not a git repository"): + list_files_in_repo(self.test_dir) + + def test_default_depth_1(self): + os.makedirs(os.path.join(self.test_dir, "dir1")) + os.makedirs(os.path.join(self.test_dir, "dir2")) + open(os.path.join(self.test_dir, "file1.txt"), "w").close() + open(os.path.join(self.test_dir, "dir1", "file2.txt"), "w").close() + add_to_git(self.test_dir) + + res = list_files_in_repo(self.test_dir) + + assert res == ["dir1/", "file1.txt"] + + def test_depth_2(self): + os.makedirs(os.path.join(self.test_dir, "dir1")) + os.makedirs(os.path.join(self.test_dir, "dir2")) + open(os.path.join(self.test_dir, "file1.txt"), "w").close() + open(os.path.join(self.test_dir, "dir1", "file2.txt"), "w").close() + open(os.path.join(self.test_dir, "dir2", "file3.txt"), "w").close() + add_to_git(self.test_dir) + + res = list_files_in_repo(self.test_dir, depth=2) + + assert res == [ + "dir1/", + "dir1/file2.txt", + "dir2/", + "dir2/file3.txt", + "file1.txt", + ] + + +class TestTranslatePath: + + def test_workspace_path(self): + assert translate_path("/workspace/repo") == os.path.join(SANDBOX_DIR, "repo") + + def test_relative_path(self): + assert translate_path("repo") == os.path.join(SANDBOX_DIR, "repo") + + +class TestExecuteToolCall: + @pytest.fixture(autouse=True) + def setup_method(self): + """Set up test environment before each test method""" + self.test_dir = os.path.join(SANDBOX_DIR, "test_repo") + os.makedirs(self.test_dir) + with open(os.path.join(self.test_dir, "file.txt"), "w") as f: + f.write("old content\n\nHello World") + + yield + + shutil.rmtree(self.test_dir) + + def test_list_files_no_path(self): + res = execute_tool_call("list_files", {}) + + assert res == ("error", "ERROR - path not found in tool params: ()") + + def test_list_files_success(self): + os.makedirs(os.path.join(self.test_dir, "dir1")) + os.makedirs(os.path.join(self.test_dir, "dir2")) + open(os.path.join(self.test_dir, "dir1", "file.txt"), "w").close() + open(os.path.join(self.test_dir, "dir2", "file.txt"), "w").close() + add_to_git(self.test_dir) + + res = execute_tool_call("list_files", {"path": "/workspace/test_repo"}) + + assert res == ("success", "dir1/\ndir2/\nfile.txt") + + def test_list_files_throws_if_path_not_in_sandbox(self): + path = "/workspace/../llama_agent" + res = execute_tool_call("list_files", {"path": path}) + + assert res == ( + "error", + # From the agent's perspective, any paths not in the sandbox don't exist + f"ERROR - File {SANDBOX_DIR}/../llama_agent does not exist", + ) + + def test_list_files_path_not_exists(self): + res = execute_tool_call( + "list_files", {"path": "/workspace/test_repo/does_not_exist"} + ) + + assert res == ( + "error", + f"ERROR - Directory /workspace/test_repo/does_not_exist does not exist. Please ensure the path is an absolute path and that the directory exists.", + ) + + def test_list_files_relative_path(self): + os.makedirs(os.path.join(self.test_dir, "dir1")) + open(os.path.join(self.test_dir, "dir1", "file.txt"), "w").close() + add_to_git(self.test_dir) + + res = execute_tool_call("list_files", {"path": "test_repo/dir1"}) + + assert res == ("success", "file.txt") + + def test_list_files_symlink(self): + os.symlink(REPO_DIR, os.path.join(self.test_dir, "bad_dir")) + + res = execute_tool_call("list_files", {"path": "/workspace/test_repo/bad_dir"}) + + assert res == ( + "error", + "ERROR - File /workspace/test_repo/bad_dir is a symlink. Simlinks not allowed", + ) + + def test_edit_file_success(self): + res = execute_tool_call( + "edit_file", + {"path": "/workspace/test_repo/file.txt", "new_str": "new content"}, + ) + + assert res == ("success", "File successfully updated") + self.assert_file_content("file.txt", "new content") + + def test_edit_file_error(self): + res = execute_tool_call( + "edit_file", {"path": "repo/file.txt", "new_str": "new content"} + ) + + error, error_message = res + assert error == "error" + assert ( + "ERROR - File repo/file.txt does not exist. Please ensure the path is an absolute path and that the file exists." + == error_message + ) + + def test_edit_file_no_path(self): + res = execute_tool_call("edit_file", {"new_str": "new content"}) + + assert res == ( + "error", + 'ERROR - path not found in tool params: (new_str="new content")', + ) + + def test_edit_file_no_new_str(self): + res = execute_tool_call("edit_file", {"path": "/workspace/test_repo/file.txt"}) + + assert res == ( + "error", + 'ERROR - new_str not found in tool params: (path="/workspace/test_repo/file.txt")', + ) + + def test_edit_file_str_replace(self): + res = execute_tool_call( + "edit_file", + { + "path": "/workspace/test_repo/file.txt", + "old_str": "\nHello World", + "new_str": "Goodbye", + }, + ) + + assert res == ("success", "File successfully updated") + self.assert_file_content("file.txt", "old content\nGoodbye") + + def test_edit_file_path_not_in_sandbox(self): + res = execute_tool_call( + "edit_file", + {"path": "/workspace/../llama_agent/main.py", "new_str": "new content"}, + ) + + assert res == ( + "error", + f"ERROR - File {SANDBOX_DIR}/../llama_agent/main.py does not exist", + ) + + def test_edit_file_path_not_symlink(self): + # Create a symlink to a file outside of the sandbox. E.g., simulate to trying to steal credentials + os.symlink( + os.path.join(REPO_DIR, ".env.example"), os.path.join(self.test_dir, ".env") + ) + + res = execute_tool_call( + "edit_file", {"path": "/workspace/test_repo/.env", "new_str": "new content"} + ) + + assert res == ( + "error", + "ERROR - File /workspace/test_repo/.env is a symlink. Simlinks not allowed", + ) + + def test_view_file_path_not_in_sandbox(self): + res = execute_tool_call( + "view_file", {"path": "/workspace/../llama_agent/main.py"} + ) + + assert res == ( + "error", + f"ERROR - File {SANDBOX_DIR}/../llama_agent/main.py does not exist", + ) + + def test_view_file_symlink(self): + os.symlink( + os.path.join(REPO_DIR, ".env.example"), os.path.join(self.test_dir, "pwned") + ) + + res = execute_tool_call("view_file", {"path": "/workspace/test_repo/pwned"}) + + assert res == ( + "error", + "ERROR - File /workspace/test_repo/pwned is a symlink. Simlinks not allowed", + ) + + def test_view_file_path_not_exists(self): + res = execute_tool_call( + "view_file", {"path": "/workspace/test_repo/does_not_exist"} + ) + + assert res == ( + "error", + "ERROR - File /workspace/test_repo/does_not_exist does not exist. Please ensure the path is an absolute path and that the file exists.", + ) + + def test_view_file_success(self): + res = execute_tool_call("view_file", {"path": "/workspace/test_repo/file.txt"}) + + assert res == ("success", "old content\n\nHello World") + + def assert_file_content(self, path: str, expected_content: str) -> None: + with open(os.path.join(self.test_dir, path), "r") as f: + assert f.read() == expected_content + + +def add_to_git(dir: str) -> None: + run( + f"cd {dir} && git init && git add . && git commit -m 'Initial commit'", + shell=True, + check=True, + capture_output=True, + ) diff --git a/examples/github_coding_agent/tests/test_github.py b/examples/github_coding_agent/tests/test_github.py new file mode 100644 index 00000000..515de1f7 --- /dev/null +++ b/examples/github_coding_agent/tests/test_github.py @@ -0,0 +1,33 @@ +import pytest +from llama_agent.github import Issue + +class TestIssue: + def test_basic_url(self): + url = "https://github.com/aidando73/bitbucket-syntax-highlighting/issues/67" + issue = Issue(url) + assert issue.owner == "aidando73" + assert issue.repo == "bitbucket-syntax-highlighting" + assert issue.issue_number == 67 + + def test_basic_url_without_https(self): + url = "github.com/aidando73/bitbucket-syntax-highlighting/issues/67" + issue = Issue(url) + assert issue.owner == "aidando73" + assert issue.repo == "bitbucket-syntax-highlighting" + assert issue.issue_number == 67 + + def test_invalid_url(self): + with pytest.raises(ValueError, match="Expected github.com as the domain"): + Issue("https://not-github.com/owner/repo/issues/1") + + def test_invalid_url_no_issue(self): + with pytest.raises(ValueError, match="Invalid GitHub issue URL format"): + Issue("https://github.com/owner/repo") + + def test_invalid_url_no_issue_number(self): + with pytest.raises(ValueError, match="Expected an issue number in the URL"): + Issue("https://github.com/owner/repo/issues/") + + def test_issue_number_is_not_integer(self): + with pytest.raises(ValueError, match="Expected an integer issue number"): + Issue("https://github.com/owner/repo/issues/not_an_integer") diff --git a/examples/github_coding_agent/tests/test_main.py b/examples/github_coding_agent/tests/test_main.py new file mode 100644 index 00000000..a40eab8c --- /dev/null +++ b/examples/github_coding_agent/tests/test_main.py @@ -0,0 +1,30 @@ +import pytest +from llama_agent.main import main + +class TestApp: + @pytest.fixture(autouse=True) + def setup_method(self, monkeypatch): + """Set up test environment before each test method""" + # Ensure environment variables are set for tests + monkeypatch.setenv("GITHUB_API_KEY", "test_key") + monkeypatch.setenv("LLAMA_STACK_URL", "http://localhost:5000") + + def test_no_github_api_key(self, monkeypatch): + monkeypatch.delenv("GITHUB_API_KEY", raising=False) + + with pytest.raises( + ValueError, match="GITHUB_API_KEY is not set in the environment variables" + ): + main( + issue_url="https://github.com/aidando73/bitbucket-syntax-highlighting/issues/67" + ) + + def test_no_llama_stack_url(self, monkeypatch): + monkeypatch.delenv("LLAMA_STACK_URL", raising=False) + + with pytest.raises( + ValueError, match="LLAMA_STACK_URL is not set in the environment variables" + ): + main( + issue_url="https://github.com/aidando73/bitbucket-syntax-highlighting/issues/67" + ) \ No newline at end of file