Skip to content

Commit bad9ab4

Browse files
Add CodeQL-Python MCP tool
1 parent d6ea161 commit bad9ab4

File tree

8 files changed

+464
-0
lines changed

8 files changed

+464
-0
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
Queries in support of the CodeQL MCP Server are maintained as query packs.
2+
3+
If you add your own queries, please follow established conventions for normal CodeQL query pack development.
4+
5+
To run the CodeQL for Python server:
6+
- create a codespace, preferably with more cores
7+
- install CodeQL extension for VS Code
8+
- press `Ctrl/Cmd + Shift + P` and type "CodeQL: Install Pack Dependencies". Choose "sylwia-budzynska/mcp-python" and press "OK".
9+
- find the path to the codeql binary, which comes preinstalled with the VS Code CodeQL extension, with the command:
10+
```bash
11+
find ~ -type f -name codeql -executable 2>/dev/null
12+
```
13+
It will most likely look similar to this:
14+
```
15+
/home/codespace/.vscode-remote/data/User/globalStorage/github.vscode-codeql/distribution1/codeql/codeql
16+
```
17+
- create a folder named 'data'
18+
- create or update your `.env` file in the root of this project with values for:
19+
```
20+
COPILOT_TOKEN= # a fine-grained GitHub personal access token with permssion for "copilot chat"
21+
CODEQL_DBS_BASE_PATH="/workspaces/seclab-taskflows/data/codeql_databases" #path to folder with your CodeQL databases
22+
23+
# Example values for a local setup, run with `python -m seclab_taskflow_agent -t seclab_taskflows.taskflows.audit.remote_sources_local`
24+
MEMCACHE_STATE_DIR="/workspaces/seclab-taskflows/data" # path to folder for storing the memcache database
25+
DATA_DIR="/workspaces/seclab-taskflows/data" # path to folder for storing the codeql_sqlite databases and all other data
26+
GITHUB_PERSONAL_ACCESS_TOKEN= # can be the same token as COPILOT_TOKEN. Or another one, with access e.g. to private repositories
27+
CODEQL_CLI= # output of command `find ~ -type f -name codeql -executable 2>/dev/null`
28+
29+
# Example docker env run with ./run_seclab_agent.sh [...]
30+
# CODEQL_CLI="codeql"
31+
# CODEQL_DBS_BASE_PATH="/app/data/codeql_databases"
32+
# MEMCACHE_STATE_DIR="/app/data"
33+
# DATA_DIR="/app/data"
34+
```
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# SPDX-FileCopyrightText: 2025 GitHub
2+
# SPDX-License-Identifier: MIT
3+
4+
from sqlalchemy import String, Text, Integer, ForeignKey, Column
5+
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship
6+
from typing import Optional
7+
8+
class Base(DeclarativeBase):
9+
pass
10+
11+
12+
class Source(Base):
13+
__tablename__ = 'source'
14+
15+
id: Mapped[int] = mapped_column(primary_key=True)
16+
repo: Mapped[str]
17+
source_location: Mapped[str]
18+
type: Mapped[str]
19+
notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
20+
21+
def __repr__(self):
22+
return (f"<Source(id={self.id}, repo={self.repo}, "
23+
f"location={self.source_location}, type={self.type}, "
24+
# f"line={self.line},",
25+
f"notes={self.notes})>")
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
# SPDX-FileCopyrightText: 2025 GitHub
2+
# SPDX-License-Identifier: MIT
3+
4+
5+
import logging
6+
logging.basicConfig(
7+
level=logging.DEBUG,
8+
format='%(asctime)s - %(levelname)s - %(message)s',
9+
filename='logs/mcp_codeql_python.log',
10+
filemode='a'
11+
)
12+
from seclab_taskflow_agent.mcp_servers.codeql.client import run_query, file_from_uri, list_src_files, _debug_log, search_in_src_archive
13+
14+
from pydantic import Field
15+
#from mcp.server.fastmcp import FastMCP, Context
16+
from fastmcp import FastMCP, Context # use FastMCP 2.0
17+
from pathlib import Path
18+
import os
19+
import csv
20+
import json
21+
from sqlalchemy import create_engine
22+
from sqlalchemy.orm import Session
23+
from pathlib import Path
24+
import zipfile
25+
import httpx
26+
import aiofiles
27+
from .codeql_sqlite_models import Base, Source
28+
29+
MEMORY = Path(os.getenv('CODEQL_SQLITE_DIR', default='/app/my_data'))
30+
mcp = FastMCP("CodeQL-Python")
31+
32+
CODEQL_DBS_BASE_PATH = Path(os.getenv('CODEQL_DBS_BASE_PATH', default='/workspaces/seclab-taskflow-agent/my_data'))
33+
34+
# tool name -> templated query lookup for supported languages
35+
TEMPLATED_QUERY_PATHS = {
36+
# to add a language, port the templated query pack and add its definition here
37+
'python': {
38+
'remote_sources': 'queries/mcp-python/remote_sources.ql'
39+
}
40+
}
41+
42+
43+
def source_to_dict(result):
44+
return {
45+
"source_id": result.id,
46+
"repo": result.repo,
47+
"source_location": result.source_location,
48+
"type": result.type,
49+
"notes": result.notes
50+
}
51+
52+
def _resolve_query_path(language: str, query: str) -> Path:
53+
global TEMPLATED_QUERY_PATHS
54+
if language not in TEMPLATED_QUERY_PATHS:
55+
raise RuntimeError(f"Error: Language `{language}` not supported!")
56+
query_path = TEMPLATED_QUERY_PATHS[language].get(query)
57+
if not query_path:
58+
raise RuntimeError(f"Error: query `{query}` not supported for `{language}`!")
59+
return Path(query_path)
60+
61+
62+
def _resolve_db_path(relative_db_path: str | Path):
63+
global CODEQL_DBS_BASE_PATH
64+
# path joins will return "/B" if "/A" / "////B" etc. as well
65+
# not windows compatible and probably needs additional hardening
66+
relative_db_path = str(relative_db_path).strip().lstrip('/')
67+
relative_db_path = Path(relative_db_path)
68+
absolute_path = CODEQL_DBS_BASE_PATH / relative_db_path
69+
if not absolute_path.is_dir():
70+
_debug_log(f"Database path not found: {absolute_path}")
71+
raise RuntimeError(f"Error: Database not found at {absolute_path}!")
72+
return str(absolute_path)
73+
74+
# This sqlite database is specifically made for CodeQL for Python MCP.
75+
class CodeqlSqliteBackend:
76+
def __init__(self, memcache_state_dir: str):
77+
self.memcache_state_dir = memcache_state_dir
78+
self.location_pattern = r'^([a-zA-Z]+)(:\d+){4}$'
79+
if not Path(self.memcache_state_dir).exists():
80+
db_dir = 'sqlite://'
81+
else:
82+
db_dir = f'sqlite:///{self.memcache_state_dir}/codeql_sqlite.db'
83+
self.engine = create_engine(db_dir, echo=False)
84+
Base.metadata.create_all(self.engine, tables=[Source.__table__])
85+
86+
87+
def store_new_source(self, repo, source_location, type, notes, update = False):
88+
with Session(self.engine) as session:
89+
existing = session.query(Source).filter_by(repo = repo, source_location = source_location).first()
90+
if existing:
91+
existing.notes += notes
92+
session.commit()
93+
return f"Updated notes for source at {source_location} in {repo}."
94+
else:
95+
if update:
96+
return f"No source exists at repo {repo}, location {source_location}"
97+
new_source = Source(repo = repo, source_location = source_location, type = type, notes = notes)
98+
session.add(new_source)
99+
session.commit()
100+
return f"Added new source for {source_location} in {repo}."
101+
102+
def get_sources(self, repo):
103+
with Session(self.engine) as session:
104+
results = session.query(Source).filter_by(repo=repo).all()
105+
sources = [source_to_dict(source) for source in results]
106+
return sources
107+
108+
109+
# our query result format is: "human readable template {val0} {val1},'key0,key1',val0,val1"
110+
def _csv_parse(raw):
111+
results = []
112+
reader = csv.reader(raw.strip().splitlines())
113+
try:
114+
for i, row in enumerate(reader):
115+
if i == 0:
116+
continue
117+
# col1 has what we care about, but offer flexibility
118+
keys = row[1].split(',')
119+
this_obj = {'description': row[0].format(*row[2:])}
120+
for j, k in enumerate(keys):
121+
this_obj[k.strip()] = row[j + 2]
122+
results.append(this_obj)
123+
except csv.Error as e:
124+
return ["Error: CSV parsing error: " + str(e)]
125+
return results
126+
127+
128+
def _run_query(query_name: str, database_path: str, language: str, template_values: dict):
129+
"""Run a CodeQL query and return the results"""
130+
131+
try:
132+
database_path = _resolve_db_path(database_path)
133+
except RuntimeError:
134+
return f"The database path for {database_path} could not be resolved"
135+
try:
136+
query_path = _resolve_query_path(language, query_name)
137+
except RuntimeError:
138+
return f"The query {query_name} is not supported for language: {language}"
139+
try:
140+
csv = run_query(Path(__file__).parent.resolve() /
141+
query_path,
142+
database_path,
143+
fmt='csv',
144+
template_values=template_values,
145+
log_stderr=True)
146+
return _csv_parse(csv)
147+
except Exception as e:
148+
return f"The query {query_name} encountered an error: {e}"
149+
150+
def _get_file_contents(db: str | Path, uri: str):
151+
"""Retrieve file contents from a CodeQL database"""
152+
db = Path(db)
153+
return file_from_uri(uri, db)
154+
155+
backend = CodeqlSqliteBackend(MEMORY)
156+
157+
@mcp.tool()
158+
def remote_sources(owner: str, repo: str,
159+
database_path: str = Field(description="The CodeQL database path."),
160+
language: str = Field(description="The language used for the CodeQL database.")):
161+
"""List all remote sources and their locations in a CodeQL database, then store the results in a database."""
162+
163+
repo = f"{owner}/{repo}"
164+
results = _run_query('remote_sources', database_path, language, {})
165+
166+
# Check if results is an error (list of strings) or valid data (list of dicts)
167+
if results and isinstance(results[0], str):
168+
return f"Error: {results[0]}"
169+
170+
# Store each result as a source
171+
stored_count = 0
172+
for result in results:
173+
backend.store_new_source(
174+
repo=repo,
175+
source_location=result.get('location', ''),
176+
type=result.get('source', ''),
177+
notes='', #result.get('description', ''),
178+
update=False
179+
)
180+
stored_count += 1
181+
182+
return f"Stored {stored_count} remote sources in {repo}."
183+
184+
@mcp.tool()
185+
def fetch_sources(owner: str, repo: str):
186+
"""
187+
Fetch all sources from the repo
188+
"""
189+
repo = f"{owner}/{repo}"
190+
return json.dumps(backend.get_sources(repo))
191+
192+
@mcp.tool()
193+
def add_source_notes(owner: str, repo: str,
194+
database_path: str = Field(description="The CodeQL database path."),
195+
source_location: str = Field(description="The path to the file and column info that contains the source"),
196+
notes: str = Field(description="The notes to append to this source", default="")):
197+
"""
198+
Add new notes to an existing source. The notes will be appended to any existing notes.
199+
"""
200+
repo = f"{owner}/{repo}"
201+
try:
202+
database_path = _resolve_db_path(database_path)
203+
except RuntimeError:
204+
return f"The database path for {database_path} could not be resolved"
205+
return backend.store_new_source(repo, source_location, "", notes, update=True)
206+
207+
@mcp.tool()
208+
def clear_codeql_repo(owner: str, repo: str):
209+
"""
210+
Clear all data for a given repo from the database
211+
"""
212+
repo = f"{owner}/{repo}"
213+
with Session(backend.engine) as session:
214+
deleted_sources = session.query(Source).filter_by(repo=repo).delete()
215+
# deleted_apps = session.query(Application).filter_by(repo=repo).delete()
216+
session.commit()
217+
return f"Cleared {deleted_sources} sources from repo {repo}."
218+
219+
@mcp.tool()
220+
def get_file_contents(
221+
file_uri: str = Field(description="The file URI to get contents for. The URI scheme is defined as `file://path` and `file://path:region`. Examples of file URI: `file:///path/to/file:1:2:3:4`, `file:///path/to/file`. File URIs optionally contain a region definition that looks like `start_line:start_column:end_line:end_column` which will limit the contents returned to the specified region, for example `file:///path/to/file:1:2:3:4` indicates a file region of `1:2:3:4` which would return the content of the file starting at line 1, column 1 and ending at line 3 column 4. Line and column indices are 1-based, meaning line and column values start at 1. If the region is ommitted the full contents of the file will be returned, for example `file:///path/to/file` returns the full contents of `/path/to/file`."),
222+
database_path: str = Field(description="The path to the CodeQL database.")):
223+
"""Get the contents of a file URI from a CodeQL database path."""
224+
225+
database_path = _resolve_db_path(database_path)
226+
try:
227+
# fix up any incorrectly formatted relative path uri
228+
if not file_uri.startswith('file:///'):
229+
if file_uri.startswith('file://'):
230+
file_uri = file_uri[len('file://'):]
231+
file_uri = 'file:///' + file_uri.lstrip('/')
232+
results = _get_file_contents(database_path, file_uri)
233+
except Exception as e:
234+
results = f"Error: could not retrieve {file_uri}: {e}"
235+
return results
236+
237+
@mcp.tool()
238+
def list_source_files(database_path: str = Field(description="The path to the CodeQL database."),
239+
regex_filter: str = Field(description="Optional Regex filter.", default = r'[\s\S]+')):
240+
"""List the available source files in a CodeQL database using their file:// URI"""
241+
database_path = _resolve_db_path(database_path)
242+
results = list_src_files(database_path, as_uri=True)
243+
return json.dumps([{'uri': item} for item in results if re.search(regex_filter, item)], indent=2)
244+
245+
@mcp.tool()
246+
def search_in_source_code(database_path: str = Field(description="The path to the CodeQL database."),
247+
search_term: str = Field(description="The term to search in the source code")):
248+
"""
249+
Search for a string in the source code. Returns the line number and file.
250+
"""
251+
resolved_database_path = _resolve_db_path(database_path)
252+
results = search_in_src_archive(resolved_database_path, search_term)
253+
out = []
254+
if isinstance(results, dict):
255+
for k,v in results.items():
256+
out.append({"database" : database_path, "path" : k, "lines" : v})
257+
return json.dumps(out, indent = 2)
258+
259+
if __name__ == "__main__":
260+
mcp.run(show_banner=False, transport="http", host="127.0.0.1", port=9998)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
---
2+
lockVersion: 1.0.0
3+
dependencies:
4+
codeql/concepts:
5+
version: 0.0.8
6+
codeql/controlflow:
7+
version: 2.0.18
8+
codeql/dataflow:
9+
version: 2.0.18
10+
codeql/mad:
11+
version: 1.0.34
12+
codeql/python-all:
13+
version: 4.1.0
14+
codeql/regex:
15+
version: 1.0.34
16+
codeql/ssa:
17+
version: 2.0.10
18+
codeql/threat-models:
19+
version: 1.0.34
20+
codeql/tutorial:
21+
version: 1.0.34
22+
codeql/typetracking:
23+
version: 2.0.18
24+
codeql/util:
25+
version: 2.0.21
26+
codeql/xml:
27+
version: 1.0.34
28+
codeql/yaml:
29+
version: 1.0.34
30+
compiled: false
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
library: false
3+
warnOnImplicitThis: false
4+
name: sylwia-budzynska/mcp-python
5+
version: 0.0.1
6+
dependencies:
7+
codeql/python-all: ^4.1.0
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/**
2+
* This is an automatically generated file
3+
* @name Hello world
4+
* @kind problem
5+
* @problem.severity warning
6+
* @id python/example/hello-world
7+
*/
8+
9+
import python
10+
11+
from File f
12+
select f, "Hello, world!"
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/**
2+
* @id mcp-python/remote-sources
3+
* @name Python Remote Sources
4+
* @description Identifies nodes that act as remote sources in Python code, along with their locations.
5+
* @tags source, location
6+
*/
7+
import python
8+
import semmle.python.dataflow.new.RemoteFlowSources
9+
10+
string normalizeLocation(Location l) {
11+
result = "file://" + "/" + l.getFile().getRelativePath() + ":" + l.getStartLine().toString() + ":" + l.getStartColumn().toString()
12+
+ ":" + l.getEndLine().toString() + ":" + l.getEndColumn().toString()
13+
}
14+
15+
from RemoteFlowSource source
16+
select
17+
"Remote source {0} is defined at {1}", "source,location", source.getSourceType(), normalizeLocation(source.getLocation())

0 commit comments

Comments
 (0)