|
1 | 1 | import anyio |
2 | 2 | import click |
3 | | -from datetime import datetime |
4 | 3 | import mcp.types as types |
5 | 4 | from mcp.server.lowlevel import Server |
6 | 5 |
|
7 | 6 | # Import tools from modules |
8 | 7 | from .tools.cve_lookup import lookup_cve |
9 | | -from .tools.package_vulnerability import check_package_vulnerabilities |
10 | | -from .tools.epss_lookup import get_epss_score |
11 | 8 | from .tools.cvss_calculator import calculate_cvss_score |
12 | | -from .tools.vulnerability_search import search_vulnerabilities |
| 9 | +from .tools.epss_lookup import get_epss_score |
13 | 10 | from .tools.exploit_availability import get_exploit_availability |
14 | | -from .tools.vulnerability_timeline import get_vulnerability_timeline |
| 11 | +from .tools.package_vulnerability import check_package_vulnerabilities |
15 | 12 | from .tools.vex_status import get_vex_status |
| 13 | +from .tools.vulnerability_search import search_vulnerabilities |
| 14 | +from .tools.vulnerability_timeline import get_vulnerability_timeline |
16 | 15 |
|
17 | 16 |
|
18 | 17 | @click.command() |
@@ -92,74 +91,81 @@ def main(port: int, transport: str) -> int: |
92 | 91 | ) |
93 | 92 |
|
94 | 93 | @app.call_tool() |
95 | | - async def fetch_tool( # type: ignore[unused-function] |
| 94 | + async def fetch_tool( # type: ignore[unused-function] |
96 | 95 | name: str, arguments: dict |
97 | 96 | ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: |
98 | 97 | if name == "cve_lookup": |
99 | 98 | if "cve_id" not in arguments: |
100 | | - return [types.TextContent( |
101 | | - type="text", |
102 | | - text="Error: Missing required argument 'cve_id'" |
103 | | - )] |
| 99 | + return [ |
| 100 | + types.TextContent( |
| 101 | + type="text", text="Error: Missing required argument 'cve_id'" |
| 102 | + ) |
| 103 | + ] |
104 | 104 | return await lookup_cve(arguments["cve_id"]) |
105 | 105 | elif name == "package_vulnerability_check": |
106 | 106 | if "package_name" not in arguments: |
107 | | - return [types.TextContent( |
108 | | - type="text", |
109 | | - text="Error: Missing required argument 'package_name'" |
110 | | - )] |
| 107 | + return [ |
| 108 | + types.TextContent( |
| 109 | + type="text", |
| 110 | + text="Error: Missing required argument 'package_name'", |
| 111 | + ) |
| 112 | + ] |
111 | 113 | version = arguments.get("version") # Optional parameter |
112 | | - return await check_package_vulnerabilities(arguments["package_name"], version) |
| 114 | + return await check_package_vulnerabilities( |
| 115 | + arguments["package_name"], version |
| 116 | + ) |
113 | 117 | elif name == "get_epss_score": |
114 | 118 | if "cve_id" not in arguments: |
115 | | - return [types.TextContent( |
116 | | - type="text", |
117 | | - text="Error: Missing required argument 'cve_id'" |
118 | | - )] |
| 119 | + return [ |
| 120 | + types.TextContent( |
| 121 | + type="text", text="Error: Missing required argument 'cve_id'" |
| 122 | + ) |
| 123 | + ] |
119 | 124 | return await get_epss_score(arguments["cve_id"]) |
120 | 125 | elif name == "calculate_cvss_score": |
121 | 126 | if "vector" not in arguments: |
122 | | - return [types.TextContent( |
123 | | - type="text", |
124 | | - text="Error: Missing required argument 'vector'" |
125 | | - )] |
| 127 | + return [ |
| 128 | + types.TextContent( |
| 129 | + type="text", text="Error: Missing required argument 'vector'" |
| 130 | + ) |
| 131 | + ] |
126 | 132 | return await calculate_cvss_score(arguments["vector"]) |
127 | 133 | elif name == "search_vulnerabilities": |
128 | 134 | # All parameters are optional for search |
129 | 135 | keywords = arguments.get("keywords") |
130 | | - severity = arguments.get("severity") |
| 136 | + severity = arguments.get("severity") |
131 | 137 | date_range = arguments.get("date_range") |
132 | 138 | return await search_vulnerabilities(keywords, severity, date_range) |
133 | 139 | elif name == "get_exploit_availability": |
134 | 140 | if "cve_id" not in arguments: |
135 | | - return [types.TextContent( |
136 | | - type="text", |
137 | | - text="Error: Missing required argument 'cve_id'" |
138 | | - )] |
| 141 | + return [ |
| 142 | + types.TextContent( |
| 143 | + type="text", text="Error: Missing required argument 'cve_id'" |
| 144 | + ) |
| 145 | + ] |
139 | 146 | return await get_exploit_availability(arguments["cve_id"]) |
140 | 147 | elif name == "get_vulnerability_timeline": |
141 | 148 | if "cve_id" not in arguments: |
142 | | - return [types.TextContent( |
143 | | - type="text", |
144 | | - text="Error: Missing required argument 'cve_id'" |
145 | | - )] |
| 149 | + return [ |
| 150 | + types.TextContent( |
| 151 | + type="text", text="Error: Missing required argument 'cve_id'" |
| 152 | + ) |
| 153 | + ] |
146 | 154 | return await get_vulnerability_timeline(arguments["cve_id"]) |
147 | 155 | elif name == "get_vex_status": |
148 | 156 | if "cve_id" not in arguments: |
149 | | - return [types.TextContent( |
150 | | - type="text", |
151 | | - text="Error: Missing required argument 'cve_id'" |
152 | | - )] |
| 157 | + return [ |
| 158 | + types.TextContent( |
| 159 | + type="text", text="Error: Missing required argument 'cve_id'" |
| 160 | + ) |
| 161 | + ] |
153 | 162 | product = arguments.get("product") # Optional parameter |
154 | 163 | return await get_vex_status(arguments["cve_id"], product) |
155 | 164 | else: |
156 | | - return [types.TextContent( |
157 | | - type="text", |
158 | | - text=f"Error: Unknown tool: {name}" |
159 | | - )] |
| 165 | + return [types.TextContent(type="text", text=f"Error: Unknown tool: {name}")] |
160 | 166 |
|
161 | 167 | @app.list_tools() |
162 | | - async def list_tools() -> list[types.Tool]: # type: ignore[unused-function] |
| 168 | + async def list_tools() -> list[types.Tool]: # type: ignore[unused-function] |
163 | 169 | return [ |
164 | 170 | types.Tool( |
165 | 171 | name="cve_lookup", |
@@ -189,7 +195,7 @@ async def list_tools() -> list[types.Tool]: # type: ignore[unused-function] |
189 | 195 | "version": { |
190 | 196 | "type": "string", |
191 | 197 | "description": "Specific version to check (optional). If not provided, checks all known versions.", |
192 | | - } |
| 198 | + }, |
193 | 199 | }, |
194 | 200 | }, |
195 | 201 | ), |
@@ -238,7 +244,7 @@ async def list_tools() -> list[types.Tool]: # type: ignore[unused-function] |
238 | 244 | "date_range": { |
239 | 245 | "type": "string", |
240 | 246 | "description": "Date range filter. Use predefined ranges (30d, 90d, 1y, 2y) or custom format YYYY-MM-DD,YYYY-MM-DD", |
241 | | - } |
| 247 | + }, |
242 | 248 | }, |
243 | 249 | }, |
244 | 250 | ), |
@@ -284,10 +290,10 @@ async def list_tools() -> list[types.Tool]: # type: ignore[unused-function] |
284 | 290 | "product": { |
285 | 291 | "type": "string", |
286 | 292 | "description": "Product name or identifier to check VEX status for (optional). Examples: 'Windows 11', 'RHEL 8', 'Ubuntu 22.04', 'Apache HTTP Server'", |
287 | | - } |
| 293 | + }, |
288 | 294 | }, |
289 | 295 | }, |
290 | | - ) |
| 296 | + ), |
291 | 297 | ] |
292 | 298 |
|
293 | 299 | if transport == "sse": |
|
0 commit comments