|
| 1 | +#!/usr/bin/env python |
| 2 | +import asyncio |
| 3 | +from http.server import BaseHTTPRequestHandler, HTTPServer |
| 4 | +import os |
| 5 | +import threading |
| 6 | +import webbrowser |
| 7 | + |
| 8 | +from mistralai import Mistral |
| 9 | +from mistralai.extra.run.context import RunContext |
| 10 | + |
| 11 | +from mistralai.extra.mcp.sse import ( |
| 12 | + MCPClientSSE, |
| 13 | + SSEServerParams, |
| 14 | +) |
| 15 | +from mistralai.extra.mcp.auth import build_oauth_params |
| 16 | + |
| 17 | +MODEL = "mistral-medium-latest" |
| 18 | + |
| 19 | +CALLBACK_PORT = 16010 |
| 20 | + |
| 21 | + |
| 22 | +# Use an official remote mcp server |
| 23 | +# you can find some at: |
| 24 | +# - https://mcpservers.org/remote-mcp-servers |
| 25 | +# - https://support.anthropic.com/en/articles/11176164-pre-built-integrations-using-remote-mcp |
| 26 | +# this one has auth: https://mcp.linear.app/sse |
| 27 | + |
| 28 | + |
| 29 | +def run_callback_server(callback_func): |
| 30 | + auth_response: dict = {"url": ""} |
| 31 | + |
| 32 | + class OAuthCallbackHandler(BaseHTTPRequestHandler): |
| 33 | + server_version = "HTTP" |
| 34 | + code = None |
| 35 | + |
| 36 | + def do_GET(self): |
| 37 | + if "/callback" in self.path: |
| 38 | + try: |
| 39 | + auth_response["url"] = self.path |
| 40 | + self.send_response(200) |
| 41 | + self.send_header("Content-type", "text/html") |
| 42 | + self.end_headers() |
| 43 | + callback_func() |
| 44 | + response_html = "<html><body><p>You may now close this window.</p></body></html>" |
| 45 | + self.wfile.write(response_html.encode()) |
| 46 | + threading.Thread(target=httpd.shutdown).start() |
| 47 | + except Exception: |
| 48 | + self.send_response(500) |
| 49 | + self.end_headers() |
| 50 | + |
| 51 | + server_address = ("localhost", CALLBACK_PORT) |
| 52 | + httpd = HTTPServer(server_address, OAuthCallbackHandler) |
| 53 | + threading.Thread(target=httpd.serve_forever).start() |
| 54 | + redirect_url = f"http://localhost:{CALLBACK_PORT}/oauth/callback" |
| 55 | + return httpd, redirect_url, auth_response |
| 56 | + |
| 57 | + |
| 58 | +async def main(): |
| 59 | + api_key = os.environ["MISTRAL_API_KEY"] |
| 60 | + client = Mistral(api_key=api_key) |
| 61 | + |
| 62 | + server_url = "https://mcp.linear.app/sse" |
| 63 | + |
| 64 | + # set-up the client |
| 65 | + mcp_client = MCPClientSSE( |
| 66 | + sse_params=SSEServerParams( |
| 67 | + url=server_url, |
| 68 | + ) |
| 69 | + ) |
| 70 | + |
| 71 | + callback_event = asyncio.Event() |
| 72 | + event_loop = asyncio.get_event_loop() |
| 73 | + |
| 74 | + # check if auth is required |
| 75 | + if await mcp_client.requires_auth(): |
| 76 | + # let's login |
| 77 | + httpd, redirect_url, auth_response = run_callback_server( |
| 78 | + callback_func=lambda: event_loop.call_soon_threadsafe(callback_event.set) |
| 79 | + ) |
| 80 | + try: |
| 81 | + # First create the required oauth config, this means fetching the server metadata and registering a client |
| 82 | + oauth_params = await build_oauth_params( |
| 83 | + mcp_client.base_url, redirect_url=redirect_url |
| 84 | + ) |
| 85 | + mcp_client.set_oauth_params(oauth_params=oauth_params) |
| 86 | + login_url, state = await mcp_client.get_auth_url_and_state(redirect_url) |
| 87 | + |
| 88 | + # The oauth params like client_id, client_secret would generally be saved in some persistent storage. |
| 89 | + # The oauth state and token would be saved in a user session. |
| 90 | + |
| 91 | + # wait for the user to complete the authentication process |
| 92 | + print("Please go to this URL and authorize the application:", login_url) |
| 93 | + webbrowser.open(login_url, new=2) |
| 94 | + await callback_event.wait() |
| 95 | + |
| 96 | + # in a real app this would be your oauth2 callback route you would get the code from the query params, |
| 97 | + # verify the state, and then get the token |
| 98 | + # Here we recreate a new client with the saved params which and exchange the code for a token |
| 99 | + mcp_client = MCPClientSSE( |
| 100 | + sse_params=SSEServerParams( |
| 101 | + url=server_url, |
| 102 | + ), |
| 103 | + oauth_params=oauth_params, |
| 104 | + ) |
| 105 | + |
| 106 | + token = await mcp_client.get_token_from_auth_response( |
| 107 | + auth_response["url"], redirect_url=redirect_url, state=state |
| 108 | + ) |
| 109 | + mcp_client.set_auth_token(token) |
| 110 | + |
| 111 | + except Exception as e: |
| 112 | + print(f"Error during authentication: {e}") |
| 113 | + finally: |
| 114 | + httpd.shutdown() |
| 115 | + httpd.server_close() |
| 116 | + |
| 117 | + # Now it's possible to make a query to the mcp server as we would do without authentication |
| 118 | + async with RunContext( |
| 119 | + model=MODEL, |
| 120 | + ) as run_ctx: |
| 121 | + # Add mcp client to the run context |
| 122 | + await run_ctx.register_mcp_client(mcp_client=mcp_client) |
| 123 | + |
| 124 | + run_result = await client.beta.conversations.run_async( |
| 125 | + run_ctx=run_ctx, |
| 126 | + inputs="Tell me which projects do I have in my workspace?", |
| 127 | + ) |
| 128 | + |
| 129 | + print(f"Final Response: {run_result.output_as_text}") |
| 130 | + |
| 131 | + |
| 132 | +if __name__ == "__main__": |
| 133 | + asyncio.run(main()) |
0 commit comments