| 
1 |  | -"""PIA Search tool for comprehensive database searches."""  | 
 | 1 | +"""PIA Search tools for database searches and facets discovery."""  | 
2 | 2 | 
 
  | 
3 | 3 | import httpx  | 
4 | 4 | import mcp.types as types  | 
 | 
10 | 10 | logger = logging.getLogger(__name__)  | 
11 | 11 | settings = Settings()  | 
12 | 12 | 
 
  | 
13 |  | -# Tool definition based on the API response  | 
 | 13 | +# Tool definitions based on the API response  | 
14 | 14 | pia_search_tool = types.Tool(  | 
15 | 15 |     name="pia_search",  | 
16 | 16 |     description="Search the Program Integrity Alliance (PIA) database for documents and recommendations. Returns comprehensive results with full citation information and clickable links for proper attribution. Each result includes corresponding citations with data source attribution (GAO, OIG, etc.). Supports structured filtering using index field names with operators like eq, ne, in, not_in, gte, lte, etc.",  | 
 | 
48 | 48 |     },  | 
49 | 49 | )  | 
50 | 50 | 
 
  | 
 | 51 | +pia_search_facets_tool = types.Tool(  | 
 | 52 | +    name="pia_search_facets",  | 
 | 53 | +    description="Get available facets (filter values) for the PIA database. This can help understand what filter values are available before performing searches.",  | 
 | 54 | +    inputSchema={  | 
 | 55 | +        "type": "object",  | 
 | 56 | +        "properties": {  | 
 | 57 | +            "query": {  | 
 | 58 | +                "type": "string",  | 
 | 59 | +                "description": "Optional query to get facets for",  | 
 | 60 | +                "default": "",  | 
 | 61 | +            }  | 
 | 62 | +        },  | 
 | 63 | +    },  | 
 | 64 | +)  | 
 | 65 | + | 
51 | 66 | 
 
  | 
52 | 67 | async def handle_pia_search(arguments: Dict[str, Any]) -> List[types.TextContent]:  | 
53 | 68 |     """Handle PIA search requests."""  | 
@@ -107,3 +122,63 @@ async def handle_pia_search(arguments: Dict[str, Any]) -> List[types.TextContent  | 
107 | 122 |     except Exception as e:  | 
108 | 123 |         logger.error(f"Error during PIA search: {e}")  | 
109 | 124 |         return [types.TextContent(type="text", text=f"Error: {str(e)}")]  | 
 | 125 | + | 
 | 126 | + | 
 | 127 | +async def handle_pia_search_facets(  | 
 | 128 | +    arguments: Dict[str, Any],  | 
 | 129 | +) -> List[types.TextContent]:  | 
 | 130 | +    """Handle PIA search facets requests."""  | 
 | 131 | +    try:  | 
 | 132 | +        # Prepare the request payload  | 
 | 133 | +        payload = {  | 
 | 134 | +            "jsonrpc": "2.0",  | 
 | 135 | +            "id": 1,  | 
 | 136 | +            "method": "tools/call",  | 
 | 137 | +            "params": {"name": "pia_search_facets", "arguments": arguments},  | 
 | 138 | +        }  | 
 | 139 | + | 
 | 140 | +        try:  | 
 | 141 | +            api_key = settings.API_KEY  | 
 | 142 | +        except ValueError as e:  | 
 | 143 | +            return [  | 
 | 144 | +                types.TextContent(  | 
 | 145 | +                    type="text",  | 
 | 146 | +                    text=f"Error: {str(e)} Configure API key in MCP server settings.",  | 
 | 147 | +                )  | 
 | 148 | +            ]  | 
 | 149 | + | 
 | 150 | +        headers = {"Content-Type": "application/json", "x-api-key": api_key}  | 
 | 151 | + | 
 | 152 | +        async with httpx.AsyncClient(timeout=settings.REQUEST_TIMEOUT) as client:  | 
 | 153 | +            response = await client.post(  | 
 | 154 | +                settings.PIA_API_URL, json=payload, headers=headers  | 
 | 155 | +            )  | 
 | 156 | +            response.raise_for_status()  | 
 | 157 | + | 
 | 158 | +            result = response.json()  | 
 | 159 | + | 
 | 160 | +            if "error" in result:  | 
 | 161 | +                error_msg = result["error"].get("message", "Unknown error")  | 
 | 162 | +                return [types.TextContent(type="text", text=f"API Error: {error_msg}")]  | 
 | 163 | + | 
 | 164 | +            if "result" in result:  | 
 | 165 | +                # Format the facets nicely  | 
 | 166 | +                facets = result["result"]  | 
 | 167 | +                formatted_result = json.dumps(facets, indent=2, ensure_ascii=False)  | 
 | 168 | +                return [types.TextContent(type="text", text=formatted_result)]  | 
 | 169 | +            else:  | 
 | 170 | +                return [  | 
 | 171 | +                    types.TextContent(type="text", text="No facets returned from API")  | 
 | 172 | +                ]  | 
 | 173 | + | 
 | 174 | +    except httpx.HTTPStatusError as e:  | 
 | 175 | +        logger.error(f"HTTP error during PIA search facets: {e}")  | 
 | 176 | +        return [  | 
 | 177 | +            types.TextContent(  | 
 | 178 | +                type="text",  | 
 | 179 | +                text=f"HTTP Error {e.response.status_code}: {e.response.text}",  | 
 | 180 | +            )  | 
 | 181 | +        ]  | 
 | 182 | +    except Exception as e:  | 
 | 183 | +        logger.error(f"Error during PIA search facets: {e}")  | 
 | 184 | +        return [types.TextContent(type="text", text=f"Error: {str(e)}")]  | 
0 commit comments