Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 89 additions & 45 deletions src/seclab_taskflows/mcp_servers/repo_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,22 @@ def clear_repo_issues(self, repo):
backend = RepoContextBackend(MEMORY)

@mcp.tool()
def store_new_component(owner: str, repo: str, location: str = Field(description="The directory of the component"),
is_app: bool = Field(description="Is this an application", default=None),
is_library: bool = Field(description="Is this a library", default=None),
notes: str = Field(description="The notes taken for this component", default="")):
def store_new_component(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component"),
is_app: bool = Field(description="Is this an application", default=None),
is_library: bool = Field(description="Is this a library", default=None),
notes: str = Field(description="The notes taken for this component", default="")):
"""
Stores a new component in the database.
"""
return backend.store_new_application(process_repo(owner, repo), location, is_app, is_library, notes)

@mcp.tool()
def add_component_notes(owner: str, repo: str, location: str = Field(description="The directory of the component", default=None),
notes: str = Field(description="New notes taken for this component", default="")):
def add_component_notes(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component", default=None),
notes: str = Field(description="New notes taken for this component", default="")):
"""
Add new notes to a component
"""
Expand All @@ -360,7 +364,9 @@ def add_component_notes(owner: str, repo: str, location: str = Field(description
return backend.store_new_application(repo, location, None, None, notes)

@mcp.tool()
def store_new_entry_point(owner: str, repo: str, location: str = Field(description="The directory of the component where the entry point belonged to"),
def store_new_entry_point(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component where the entry point belongs to"),
file: str = Field(description="The file that contains the entry point"),
line: int = Field(description="The file line that contains the entry point"),
user_input: str = Field(description="The variables that are considered as user input"),
Expand All @@ -375,45 +381,54 @@ def store_new_entry_point(owner: str, repo: str, location: str = Field(descripti
return backend.store_new_entry_point(repo, app.id, file, user_input, line, notes)

@mcp.tool()
def store_new_component_issue(owner: str, repo: str, component_id: int,
issue_type: str, notes: str):
def store_new_component_issue(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
component_id: int = Field(description="The ID of the component"),
issue_type: str = Field(description="The type of issue"),
notes: str = Field(description="Notes about the issue")):
"""
Stores a type of common issue for a component.
"""
repo = process_repo(owner, repo)
return backend.store_new_component_issue(repo, component_id, issue_type, notes)

@mcp.tool()
def store_new_audit_result(owner: str, repo: str, component_id: int, issue_type: str, issue_id: int,
has_non_security_error: bool = Field(description="Set to true if there are security issues or logic error but may not be exploitable"),
has_vulnerability: bool = Field(description="Set to true if a security vulnerability is identified"),
notes: str = Field(description="The notes for the audit of this issue")):
def store_new_audit_result(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
component_id: int = Field(description="The ID of the component"),
issue_type: str = Field(description="The type of issue"),
issue_id: int = Field(description="The ID of the issue"),
has_non_security_error: bool = Field(description="Set to true if there are security issues or logic error but may not be exploitable"),
has_vulnerability: bool = Field(description="Set to true if a security vulnerability is identified"),
notes: str = Field(description="The notes for the audit of this issue")):
"""
Stores the audit result for issue with issue_id.
"""
repo = process_repo(owner, repo)
return backend.store_new_audit_result(repo, component_id, issue_type, issue_id, has_non_security_error, has_vulnerability, notes)

@mcp.tool()
def store_new_web_entry_point(owner: str, repo: str,
entry_point_id: int = Field(description="The ID of the entry point this web entry point refers to"),
location: str = Field(description="The directory of the component where the web entry point belongs to"),
method: str = Field(description="HTTP method (GET, POST, etc)", default=""),
path: str = Field(description="URL path (e.g., /info)", default=""),
component: int = Field(description="Component identifier", default=0),
auth: str = Field(description="Authentication information", default=""),
middleware: str = Field(description="Middleware information", default=""),
roles_scopes: str = Field(description="Roles and scopes information", default=""),
notes: str = Field(description="Notes for this web entry point", default="")):
def store_new_web_entry_point(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
entry_point_id: int = Field(description="The ID of the entry point this web entry point refers to"),
location: str = Field(description="The directory of the component where the web entry point belongs to"),
method: str = Field(description="HTTP method (GET, POST, etc)", default=""),
path: str = Field(description="URL path (e.g., /info)", default=""),
component: int = Field(description="Component identifier", default=0),
auth: str = Field(description="Authentication information", default=""),
middleware: str = Field(description="Middleware information", default=""),
roles_scopes: str = Field(description="Roles and scopes information", default=""),
notes: str = Field(description="Notes for this web entry point", default="")):
"""
Stores a new web entry point in a component to the database. A web entry point extends a regular entry point
with web-specific properties like HTTP method, path, authentication, middleware, and roles/scopes.
"""
return backend.store_new_web_entry_point(process_repo(owner, repo), entry_point_id, method, path, component, auth, middleware, roles_scopes, notes)

@mcp.tool()
def add_entry_point_notes(owner: str, repo: str,
location: str = Field(description="The directory of the component where the entry point belonged to"),
def add_entry_point_notes(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component where the entry point belongs to"),
file: str = Field(description="The file that contains the entry point"),
line: int = Field(description="The file line that contains the entry point"),
notes: str = Field(description="The notes for this entry point", default = "")):
Expand All @@ -428,7 +443,9 @@ def add_entry_point_notes(owner: str, repo: str,


@mcp.tool()
def store_new_user_action(owner: str, repo: str, location: str = Field(description="The directory of the component where the user action belonged to"),
def store_new_user_action(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component where the user action belongs to"),
file: str = Field(description="The file that contains the user action"),
line: int = Field(description="The file line that contains the user action"),
notes: str = Field(description="New notes for this user action", default = "")):
Expand All @@ -442,7 +459,9 @@ def store_new_user_action(owner: str, repo: str, location: str = Field(descripti
return backend.store_new_user_action(repo, app.id, file, line, notes)

@mcp.tool()
def add_user_action_notes(owner: str, repo: str, location: str = Field(description="The directory of the component where the user action belonged to"),
def add_user_action_notes(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component where the user action belongs to"),
file: str = Field(description="The file that contains the user action"),
line: str = Field(description="The file line that contains the user action"),
notes: str = Field(description="The notes for user action", default = "")):
Expand All @@ -453,9 +472,11 @@ def add_user_action_notes(owner: str, repo: str, location: str = Field(descripti
return backend.store_new_user_action(repo, app.id, file, line, notes, True)

@mcp.tool()
def get_component(owner: str, repo: str, location: str = Field(description="The directory of the component")):
def get_component(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component")):
"""
The a component from the database
Get a component from the database
"""
repo = process_repo(owner, repo)
app = backend.get_app(repo, location)
Expand All @@ -464,127 +485,150 @@ def get_component(owner: str, repo: str, location: str = Field(description="The
return json.dumps(app_to_dict(app))

@mcp.tool()
def get_components(owner: str, repo: str):
def get_components(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Get components from the repo
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_apps(repo))

@mcp.tool()
def get_entry_points(owner: str, repo: str, location: str = Field(description="The directory of the component")):
def get_entry_points(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component")):
"""
Get all the entry points of a component.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_entries(repo, location))

@mcp.tool()
def get_entry_points_for_repo(owner: str, repo: str):
def get_entry_points_for_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Get all entry points of an repo
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_entries_for_repo(repo))

@mcp.tool()
def get_web_entry_points_component(owner: str, repo: str, component_id: int):
def get_web_entry_points_component(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
component_id: int = Field(description="The ID of the component")):
"""
Get all web entry points for a component
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_web_entries(repo, component_id))

@mcp.tool()
def get_web_entry_points_for_repo(owner: str, repo: str):
def get_web_entry_points_for_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Get all web entry points of an repo
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_web_entries_for_repo(repo))

@mcp.tool()
def get_user_actions(owner: str, repo: str, location: str = Field(description="The directory of the component")):
def get_user_actions(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
location: str = Field(description="The directory of the component")):
"""
Get all the user actions in a component.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_user_actions(repo, location))

@mcp.tool()
def get_user_actions_for_repo(owner: str, repo: str):
def get_user_actions_for_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Get all the user actions in a repo.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_user_actions_for_repo(repo))

@mcp.tool()
def get_component_issues(owner: str, repo: str, component_id: int):
def get_component_issues(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
component_id: int = Field(description="The ID of the component")):
"""
Get issues for the component.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_issues(repo, component_id))

@mcp.tool()
def get_component_results(owner: str, repo: str, component_id: int):
def get_component_results(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
component_id: int = Field(description="The ID of the component")):
"""
Get audit results for the component.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_audit_results(repo, component_id, None, None))

@mcp.tool()
def get_component_vulnerable_results(owner: str, repo: str, component_id: int):
def get_component_vulnerable_results(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
component_id: int = Field(description="The ID of the component")):
"""
Get audit results for the component that are audited as vulnerable.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_audit_results(repo, component_id, has_non_security_error = None, has_vulnerability = True))

@mcp.tool()
def get_component_potential_results(owner: str, repo: str, component_id: int):
def get_component_potential_results(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
component_id: int = Field(description="The ID of the component")):
"""
Get audit results for the component that are audited as an issue but may not be exploitable.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_audit_results(repo, component_id, has_non_security_error = True, has_vulnerability = None))

@mcp.tool()
def get_audit_results_for_repo(owner: str, repo: str):
def get_audit_results_for_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Get audit results for the repo.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_audit_results(repo, component_id = None, has_non_security_error = None, has_vulnerability = None))

@mcp.tool()
def get_vulnerable_audit_results_for_repo(owner: str, repo: str):
def get_vulnerable_audit_results_for_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Get audit results for the repo that are audited as vulnerable.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_audit_results(repo, component_id = None, has_non_security_error = None, has_vulnerability = True))

@mcp.tool()
def get_potential_audit_results_for_repo(owner: str, repo: str):
def get_potential_audit_results_for_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Get audit results for the repo that are potential issues but may not be exploitable.
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_app_audit_results(repo, component_id = None, has_non_security_error = True, has_vulnerability = None))

@mcp.tool()
def clear_repo(owner: str, repo: str):
def clear_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
clear all results for repo.
"""
repo = process_repo(owner, repo)
return backend.clear_repo(repo)

@mcp.tool()
def clear_component_issues_for_repo(owner: str, repo: str):
def clear_component_issues_for_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
clear all results for repo.
"""
Expand Down