Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions compose/production/mcp-server/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM python:3.10.9-alpine3.17

ENV PYTHONUNBUFFERED=1

ENV PYTHONPATH="$PYTHONPATH:/app/config"

RUN apk --no-cache add build-base curl \
&& addgroup -S mcp \
&& adduser -S -G mcp mcp \
&& pip install --no-cache-dir -U setuptools pip

COPY ./requirements /requirements

RUN pip install --no-cache-dir -r /requirements/mcp_requirements.txt \
&& rm -rf /requirements

USER mcp

WORKDIR /app

ENTRYPOINT [ "python", "server.py", "--host", "0.0.0.0", "--port", "3001" ]
24 changes: 24 additions & 0 deletions local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,27 @@ services:
timeout: ${HEALTHCHECK_TIMEOUT}
retries: ${HEALTHCHECK_RETRIES}
start_period: ${HEALTHCHECK_START}

mcp-server:
build:
context: .
dockerfile: ./compose/production/mcp-server/Dockerfile
image: ghostwriter_local_mcp_server
depends_on:
- graphql_engine
ports:
- "8081:3001"
volumes:
- ./mcp-server:/app
labels:
name: ghostwriter_mcp_server
environment:
- GHOSTWRITER_URL=http://${DJANGO_HOST}:${DJANGO_PORT}
- GRAPHQL_URL=http://${HASURA_GRAPHQL_SERVER_HOSTNAME}:${HASURA_GRAPHQL_SERVER_PORT}/v1/graphql
- JWT_SECRET_KEY=${DJANGO_JWT_SECRET_KEY}
healthcheck:
test: curl --insecure --fail http://mcp-server:8000/healthz || exit 1
interval: ${HEALTHCHECK_INTERVAL}
timeout: ${HEALTHCHECK_TIMEOUT}
retries: ${HEALTHCHECK_RETRIES}
start_period: ${HEALTHCHECK_START}
14 changes: 14 additions & 0 deletions mcp-server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# MCP Server

Edit any files python files inside `mcp-server/` and you must run `./ghostwriter-cli containers restart --dev`.
A tool will only load the `prompts.yaml` file when it is called so editing this file does not require a container restart.

# Authentication

The MCP server uses the JWT from ghostwriter for authentication and is required when calling any MCP methods. This token is used when querying the graphql endpoint over the local docker network.

# Tools

## GenerateExecutiveSummaryTool

This tool will query the findings on a given report ID and provide a prompt to the LLM to generate a natural language response. When the tool is called it loads the `executive_summary_prompt` from `prompts.yaml` and places your findings into the `{findings}` variable
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# 3rd Party Libraries
from fastmcp import FastMCP, Context

# Ghostwriter MCP Server Imports
from ghostwriter_mcp_server.utils.graphql import graphql_request
from ghostwriter_mcp_server.utils.load_config import load_config

class GenerateExecutiveSummaryTool:
"""Tool to generate executive summaries for reports."""

def __init__(self, mcp: FastMCP):
"""Initialize the GenerateExecutiveSummaryTool."""
mcp.tool(name='generate_executive_summary')(self.generate_executive_summary_tool)

async def generate_executive_summary_tool(
self,
ctx: Context,
report_id: int
) -> str:
"""
Generate an executive summary for a report.

Args:
report_id (int): The ID of the report to generate a summary for.

Returns:
str: A system prompt to generate an executive summary
"""
await ctx.info(f'Loading the most up to date prompt template...')
prompts = load_config("prompts.yaml")

await ctx.info(f'Querying the findings for report {report_id}')
graphql_query = '''query SearchReportFindings($reportId: bigint!) {
reportedFinding(
where: {
reportId: {_eq: $reportId},
severity: {severity: {_in: ["Critical", "High", "Medium", "Low", "Informational"]}}
},
order_by: {severity: {weight: asc}},
limit: 5
) {
title
severity {
severity
}
description
mitigation
}
}'''
# Execute the GraphQL query
response = await graphql_request(graphql_query, ctx, variables={"reportId": report_id})

await ctx.info(f'Formatting the findings into a markdown string')
findings_list = response.get("data", {}).get("reportedFinding", [])
findings_str = "\n".join(
f"# {f['title']} ({f['severity']['severity']})\n## Description\n{f['description']}\n## Recommendation\n{f['mitigation']}\n"
for f in findings_list
)

if not findings_list:
raise Exception("No findings found for the report.")

await ctx.info(f'Generating the executive summary prompt')
prompt_template = prompts['executive_summary_prompt']
if "{findings}" not in prompt_template:
raise Exception("The `executive_summary_prompt` prompt template must contain the {findings} variable")
prompt = prompt_template.format(findings=findings_str)

return prompt
Empty file.
27 changes: 27 additions & 0 deletions mcp-server/ghostwriter_mcp_server/utils/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Standard Libraries
import jwt

# 3rd Party Libraries
import environ
from fastmcp.server.auth import AccessToken, TokenVerifier

env = environ.Env()
JWT_SECRET_KEY = env("JWT_SECRET_KEY", default="secret")

class GhostwriterTokenVerifier(TokenVerifier):
"""Verify the token from Ghostwriter."""

async def verify_token(self, token: str) -> AccessToken | None:
"""Verify the JWT token and return the access token."""
print("Validating authentication token...")
try:
decoded = jwt.decode(token, JWT_SECRET_KEY, algorithms=["HS256"], audience="Ghostwriter")
return AccessToken(
token=token,
client_id=decoded.get("client_id", ""),
scopes=decoded.get("scopes", ["user"]),
expires_at=decoded.get("exp"),
)
except Exception as e:
print(e)
return None
29 changes: 29 additions & 0 deletions mcp-server/ghostwriter_mcp_server/utils/graphql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Standard Libraries
import httpx

# 3rd Party Libraries
import environ
from starlette.requests import Request
from fastmcp import Context

env = environ.Env()
GRAPHQL_URL = env("GRAPHQL_URL", default="http://localhost:8080")

async def graphql_request(query: str, context: Context, variables: dict = None) -> dict:
"""Helper function to make async GraphQL requests."""
request: Request = context.request_context.request
token = request.headers.get("Authorization")
if not token:
raise Exception("Unauthorized: No Authorization header found")
headers = {"Content-Type": "application/json", "Authorization": token}
async with httpx.AsyncClient() as client:
response = await client.post(
GRAPHQL_URL,
json={"query": query, "variables": variables},
headers=headers
)
response_json = response.json()
if "errors" in response_json:
raise Exception(f"GraphQL query failed with errors: {response_json['errors']}")
else:
return response_json
6 changes: 6 additions & 0 deletions mcp-server/ghostwriter_mcp_server/utils/load_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import yaml

def load_config(file_path: str) -> dict:
with open(file_path, 'r') as file:
config = yaml.safe_load(file)
return config
13 changes: 13 additions & 0 deletions mcp-server/prompts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Configure the prompt that the executive summary tool sends to the LLM to generate an executive summary from the report findings (Must include a {findings} variable)
executive_summary_prompt: |
You are a cybersecurity analyst tasked with generating an executive summary for a penetration test report. The summary should be concise, non-technical and it should be between 1 and 2 paragraphs, do not use bullet points.
Use the findings provided between the <Findings> tags to:
1. Summarize the overall security posture.
2. Highlight the number and severity of findings (e.g., how many critical, high, medium, low, and informational issues were found).
3. Mention the general nature of the vulnerabilities discovered (e.g., misconfigurations, outdated software, weak authentication).
4. Emphasize the importance of remediation and outline a general prioritization strategy.
5. Maintain a professional tone and avoid deep technical jargon.

<Findings>
{findings}
</Findings>
44 changes: 44 additions & 0 deletions mcp-server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Standard Libraries
import sys
import argparse

# 3rd Party Libraries
from fastmcp import FastMCP

# Ghostwriter MCP Server Imports
from ghostwriter_mcp_server.tools.generate_executive_summary import GenerateExecutiveSummaryTool
from ghostwriter_mcp_server.utils.auth import GhostwriterTokenVerifier

def main() -> int:
"""Entry point for the Ghostwriter MCP server.

Returns:
int: Exit code (0 for success, non-zero for failure)
"""

parser = argparse.ArgumentParser(description='Ghostwriter MCP Server')
parser.add_argument('--host', default='localhost', help='Host for the MCP server')
parser.add_argument('--port', type=int, default=8000, help='Port for the MCP server')

args = parser.parse_args()

mcp = FastMCP(
"Ghostwriter MCP Server",
host=args.host,
port=args.port,
auth=GhostwriterTokenVerifier(),
)

# Tools
GenerateExecutiveSummaryTool(mcp)

try:
print(f'Starting Ghostwriter MCP Server on {args.host}:{args.port}')
mcp.run(transport="streamable-http")
return 0
except Exception as e:
print(f'Error starting Ghostwriter MCP Server: {e}')
return 1

if __name__ == '__main__':
sys.exit(main())
6 changes: 6 additions & 0 deletions requirements/mcp_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
django-environ==0.10.0 # https://github.com/joke2k/django-environ
httpx==0.28.1
starlette==0.47.2
pyyaml==6.0.1
pyjwt==2.10.1
fastmcp==2.11.3