From ce75f2f4931af831c565024446266c1a7e3421e4 Mon Sep 17 00:00:00 2001 From: Mengqin Shen Date: Tue, 6 Jan 2026 21:58:16 -0800 Subject: [PATCH 1/2] feat(py): add mcp sample test --- py/samples/mcp/README.md | 60 ++++ py/samples/mcp/pyproject.toml | 40 +++ py/samples/mcp/run.sh | 31 ++ py/samples/mcp/src/http_server.py | 96 ++++++ py/samples/mcp/src/main.py | 291 ++++++++++++++++++ py/samples/mcp/src/server.py | 93 ++++++ py/samples/mcp/test-workspace/hello-world.txt | 4 + 7 files changed, 615 insertions(+) create mode 100644 py/samples/mcp/README.md create mode 100644 py/samples/mcp/pyproject.toml create mode 100755 py/samples/mcp/run.sh create mode 100644 py/samples/mcp/src/http_server.py create mode 100644 py/samples/mcp/src/main.py create mode 100644 py/samples/mcp/src/server.py create mode 100644 py/samples/mcp/test-workspace/hello-world.txt diff --git a/py/samples/mcp/README.md b/py/samples/mcp/README.md new file mode 100644 index 0000000000..650846a6fa --- /dev/null +++ b/py/samples/mcp/README.md @@ -0,0 +1,60 @@ +# MCP Sample + +This sample demonstrates using the MCP (Model Context Protocol) plugin with Genkit Python SDK. + +## Setup environment + +Obtain an API key from [ai.dev](https://ai.dev). + +Export the API key as env variable `GEMINI\_API\_KEY` in your shell +configuration. + +### Run the MCP Client/Host +```bash +cd py/samples/mcp +genkit start -- uv run src/main.py +``` + +This will: +1. Connect to the configured MCP servers +2. Execute sample flows demonstrating tool usage +3. Clean up connections on exit + +### Run the MCP Client/Host +```bash +cd py/samples/mcp +genkit start -- uv run src/http_server.py +``` + +This will: +1. Connect to the configured MCP servers +2. Execute sample flows demonstrating tool usage +3. Clean up connections on exit + +### Run the MCP Server +```bash +cd py/samples/mcp +genkit start -- uv run src/server.py +``` + +This starts an MCP server on stdio that other MCP clients can connect to. + +## Requirements + +- Python 3.10+ +- `mcp` - Model Context Protocol Python SDK +- `genkit` - Genkit Python SDK +- `genkit-plugins-google-genai` - Google AI plugin for Genkit + +## MCP Servers Used + +The sample connects to these MCP servers (must be available): +- **mcp-server-git** - Install via `uvx mcp-server-git` +- **@modelcontextprotocol/server-filesystem** - Install via npm +- **@modelcontextprotocol/server-everything** - Install via npm + +## Learn More + +- [MCP Documentation](https://modelcontextprotocol.io/) +- [Genkit Python Documentation](https://firebase.google.com/docs/genkit) +- [MCP Plugin Source](../../plugins/mcp/) diff --git a/py/samples/mcp/pyproject.toml b/py/samples/mcp/pyproject.toml new file mode 100644 index 0000000000..e458555677 --- /dev/null +++ b/py/samples/mcp/pyproject.toml @@ -0,0 +1,40 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +[project] +dependencies = [ + "genkit", + "genkit-plugin-google-genai", + "genkit-plugins-mcp", + "mcp", +] +description = "MCP sample application for Genkit Python SDK" +name = "mcp-sample" +readme = "README.md" +requires-python = ">=3.10" +version = "0.1.0" + +[tool.uv.sources] +genkit = { workspace = true } +genkit-plugin-google-genai = { workspace = true } +genkit-plugins-mcp = { workspace = true } + +[build-system] +build-backend = "hatchling.build" +requires = ["hatchling"] + +[tool.hatch.build.targets.wheel] +packages = ["src"] diff --git a/py/samples/mcp/run.sh b/py/samples/mcp/run.sh new file mode 100755 index 0000000000..5ac7553c7e --- /dev/null +++ b/py/samples/mcp/run.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +case "$1" in + server) + shift + exec genkit start -- uv run src/server.py "$@" + ;; + http) + shift + exec genkit start -- uv run src/http_server.py "$@" + ;; + *) + # Default to main.py + exec genkit start -- uv run src/main.py "$@" + ;; +esac diff --git a/py/samples/mcp/src/http_server.py b/py/samples/mcp/src/http_server.py new file mode 100644 index 0000000000..d0040b5000 --- /dev/null +++ b/py/samples/mcp/src/http_server.py @@ -0,0 +1,96 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +HTTP MCP Server Example + +This demonstrates creating an HTTP-based MCP server using SSE transport +with Starlette and the official MCP Python SDK. +""" + +import asyncio +import logging + +import mcp.types as types +import uvicorn +from mcp.server import Server +from mcp.server.sse import SseServerTransport +from starlette.applications import Starlette +from starlette.responses import Response +from starlette.routing import Mount, Route + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + """Start the HTTP MCP server.""" + + # Create SSE transport logic + # The endpoint '/mcp/' is where clients will POST messages + sse = SseServerTransport('/mcp/') + + async def handle_sse(request): + """Handle incoming SSE connections.""" + async with sse.connect_sse(request.scope, request.receive, request._send) as streams: + read_stream, write_stream = streams + + # Create a new server instance for this session + # This mirrors the JS logic of creating a new McpServer per connection + server = Server('example-server', version='1.0.0') + + @server.list_tools() + async def list_tools() -> list[types.Tool]: + return [ + types.Tool( + name='test_http', + description='Test HTTP transport', + inputSchema={'type': 'object', 'properties': {}}, + ) + ] + + @server.call_tool() + async def call_tool(name: str, arguments: dict) -> list[types.TextContent]: + if name == 'test_http': + # In this SSE implementation, valid session ID is internal + # but we can return a confirmation. + return [types.TextContent(type='text', text='Session Active')] + raise ValueError(f'Unknown tool: {name}') + + # Run the server with the streams + await server.run(read_stream, write_stream, server.create_initialization_options()) + + # Return empty response after connection closes + return Response() + + # Define routes + # GET /mcp -> Starts SSE stream + # POST /mcp/ -> Handles messages (via SseServerTransport) + routes = [ + Route('/mcp', endpoint=handle_sse, methods=['GET']), + Mount('/mcp/', app=sse.handle_post_message), + ] + + app = Starlette(routes=routes) + + config = uvicorn.Config(app, host='0.0.0.0', port=3334, log_level='info') + server = uvicorn.Server(config) + + print('HTTP MCP server running on http://localhost:3334/mcp') + await server.serve() + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/py/samples/mcp/src/main.py b/py/samples/mcp/src/main.py new file mode 100644 index 0000000000..2e32301a95 --- /dev/null +++ b/py/samples/mcp/src/main.py @@ -0,0 +1,291 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + + +import asyncio +import os +from pathlib import Path + +import structlog +from pydantic import BaseModel + +from genkit.ai import Genkit +from genkit.plugins.google_genai import GoogleAI +from genkit.plugins.mcp import McpServerConfig, create_mcp_host + +logger = structlog.get_logger(__name__) + +# Get the current directory +current_dir = Path(__file__).parent +workspace_dir = current_dir.parent / 'test-workspace' +repo_root = current_dir.parent.parent.parent.parent + +# Initialize Genkit with GoogleAI +ai = Genkit(plugins=[GoogleAI()], model='googleai/gemini-2.5-flash') + +# Create MCP host with multiple servers +mcp_host = create_mcp_host({ + 'git-client': McpServerConfig(command='uvx', args=['mcp-server-git']), + 'fs': McpServerConfig(command='npx', args=['-y', '@modelcontextprotocol/server-filesystem', str(workspace_dir)]), + 'everything': McpServerConfig(command='npx', args=['-y', '@modelcontextprotocol/server-everything']), +}) + + +@ai.flow(name='git_commits') +async def git_commits(query: str = ''): + """Summarize recent git commits using MCP git client.""" + await mcp_host.start() + tools = await mcp_host.register_tools(ai) + + result = await ai.generate(prompt=f"summarize last 5 commits in '{repo_root}'", tools=tools) + + await mcp_host.close() + return result.text + + +@ai.flow(name='dynamic_git_commits') +async def dynamic_git_commits(query: str = ''): + """Summarize recent git commits using 'all' tools matching pattern.""" + await mcp_host.start() + tools = await mcp_host.register_tools(ai) + + # Simulate wildcard matching "git-client:tool/*" by passing all tools + # (since registration prefixes with server name) + # JS: tools: ['test-mcp-manager:tool/*'] + + result = await ai.generate(prompt=f"summarize last 5 commits in '{repo_root}'", tools=tools) + + await mcp_host.close() + return result.text + + +@ai.flow(name='get_file') +async def get_file(query: str = ''): + """Read and summarize a file using MCP filesystem client.""" + await mcp_host.start() + tools = await mcp_host.register_tools(ai) + + result = await ai.generate(prompt=f"summarize contents of hello-world.txt (in '{workspace_dir}')", tools=tools) + + await mcp_host.close() + return result.text + + +@ai.flow(name='dynamic_get_file') +async def dynamic_get_file(query: str = ''): + """Read file using specific tool selection.""" + await mcp_host.start() + tools = await mcp_host.register_tools(ai) + + # Filter for specific tool: 'fs/read_file' + # JS: tools: ['test-mcp-manager:tool/fs/read_file'] + import fnmatch + + filtered_tools = [t for t in tools if fnmatch.fnmatch(t, '*/fs/read_file') or t.endswith('fs/read_file')] + + result = await ai.generate( + prompt=f"summarize contents of hello-world.txt (in '{workspace_dir}')", tools=filtered_tools + ) + + await mcp_host.close() + return result.text + + +@ai.flow(name='dynamic_prefix_tool') +async def dynamic_prefix_tool(query: str = ''): + """Read file using prefix tool selection.""" + await mcp_host.start() + tools = await mcp_host.register_tools(ai) + + # Filter for prefix: 'fs/read_*' + # JS: tools: ['test-mcp-manager:tool/fs/read_*'] + import fnmatch + + filtered_tools = [t for t in tools if fnmatch.fnmatch(t, '*/fs/read_*')] + + result = await ai.generate( + prompt=f"summarize contents of hello-world.txt (in '{workspace_dir}')", tools=filtered_tools + ) + + await mcp_host.close() + return result.text + + +@ai.flow(name='dynamic_disable_enable') +async def dynamic_disable_enable(query: str = ''): + """Test disabling and re-enabling an MCP client.""" + await mcp_host.start() + tools = await mcp_host.register_tools(ai) + + import fnmatch + + filtered_tools = [t for t in tools if fnmatch.fnmatch(t, '*/fs/read_file') or t.endswith('fs/read_file')] + + # 1. Run successfully + result1 = await ai.generate( + prompt=f"summarize contents of hello-world.txt (in '{workspace_dir}')", tools=filtered_tools + ) + text1 = result1.text + + # 2. Disable 'fs' and try to run (should fail) + await mcp_host.disable('fs') + text2 = '' + try: + # Note: In Python, we might need to verify if tools list is updated + # or if the tool call fails. disable() closes connection. + # register_tools should ideally be called again or the tool invocation fails. + # Since we passed 'filtered_tools' (names), the model will try to call. + # The tool wrapper checks connection. + result = await ai.generate( + prompt=f"summarize contents of hello-world.txt (in '{workspace_dir}')", tools=filtered_tools + ) + text2 = f'ERROR! This should have failed but succeeded: {result.text}' + except Exception as e: + text2 = str(e) + + # 3. Re-enable 'fs' and run + await mcp_host.enable('fs') + # Re-registering might be needed if registry was cleaned, but here we just re-connnect + # Implementation detail: Does register_tools need to be called again? + # Code shows wrappers capture client, client.session is updated on connect. + await mcp_host.clients['fs'].connect() + + result3 = await ai.generate( + prompt=f"summarize contents of hello-world.txt (in '{workspace_dir}')", tools=filtered_tools + ) + text3 = result3.text + + await mcp_host.close() + + return f'Original:
{text1}
After Disable:
{text2}
After Enable:
{text3}' + + +@ai.flow(name='test_resource') +async def test_resource(query: str = ''): + """Test reading a resource (simulated).""" + await mcp_host.start() + + # Python SDK doesn't support 'resources' param in generate yet. + # We manually fetch the resource and add to prompt. + # JS: resources: await mcpHost.getActiveResources(ai) + + resource_content = 'Resource not found' + uri = 'test://static/resource/1' + + # In a real implementation we would look up the resource provider. + # Here we search 'everything' client or similar. + found = False + for client in mcp_host.clients.values(): + if client.session and not client.config.disabled: + try: + # Try reading directly + res = await client.read_resource(uri) + if res and res.contents: + resource_content = res.contents[0].text + found = True + break + except Exception: + continue + + result = await ai.generate( + prompt=f'analyze this: {resource_content}', + ) + + await mcp_host.close() + return result.text + + +@ai.flow(name='dynamic_test_resources') +async def dynamic_test_resources(query: str = ''): + """Test reading resources with wildcard (simulated).""" + # Same simulation as test_resource + return await test_resource(query) + + +@ai.flow(name='dynamic_test_one_resource') +async def dynamic_test_one_resource(query: str = ''): + """Test reading one specific resource (simulated).""" + # Same simulation as test_resource + return await test_resource(query) + + +@ai.flow(name='update_file') +async def update_file(query: str = ''): + """Update a file using MCP filesystem client.""" + await mcp_host.start() + tools = await mcp_host.register_tools(ai) + + result = await ai.generate( + prompt=f"Improve hello-world.txt (in '{workspace_dir}') by rewriting the text, making it longer, use your imagination.", + tools=tools, + ) + + await mcp_host.close() + return result.text + + +class ControlMcpInput(BaseModel): + action: str # 'RECONNECT', 'ENABLE', 'DISABLE', 'DISCONNECT' + client_id: str = 'git-client' + + +@ai.flow(name='control_mcp') +async def control_mcp(input: ControlMcpInput): + """Control MCP client connections (enable/disable/reconnect).""" + client_id = input.client_id + action = input.action.upper() + + if action == 'DISABLE': + if client_id in mcp_host.clients: + mcp_host.clients[client_id].config.disabled = True + await mcp_host.clients[client_id].close() + elif action == 'DISCONNECT': + if client_id in mcp_host.clients: + await mcp_host.clients[client_id].close() + elif action == 'RECONNECT': + if client_id in mcp_host.clients: + await mcp_host.clients[client_id].connect() + elif action == 'ENABLE': + if client_id in mcp_host.clients: + mcp_host.clients[client_id].config.disabled = False + await mcp_host.clients[client_id].connect() + + return f'Action {action} completed for {client_id}' + + +async def main(): + """Run sample flows.""" + logger.info('Starting MCP sample application') + + # Test git commits flow + logger.info('Testing git_commits flow...') + try: + result = await git_commits() + logger.info('git_commits result', result=result[:200]) + except Exception as e: + logger.error('git_commits failed', error=str(e)) + + # Test get_file flow + logger.info('Testing get_file flow...') + try: + result = await get_file() + logger.info('get_file result', result=result[:200]) + except Exception as e: + logger.error('get_file failed', error=str(e)) + + +if __name__ == '__main__': + ai.run_main(main()) diff --git a/py/samples/mcp/src/server.py b/py/samples/mcp/src/server.py new file mode 100644 index 0000000000..3638dbfad5 --- /dev/null +++ b/py/samples/mcp/src/server.py @@ -0,0 +1,93 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +MCP Server Example + +This demonstrates creating an MCP server that exposes Genkit tools, prompts, +and resources through the Model Context Protocol. +""" + +import asyncio + +from pydantic import BaseModel, Field + +from genkit.ai import Genkit +from genkit.plugins.google_genai import GoogleAI +from genkit.plugins.mcp import McpServerOptions, create_mcp_server + +# Initialize Genkit +ai = Genkit(plugins=[]) + + +# Define a tool +class AddInput(BaseModel): + a: int = Field(..., description='First number') + b: int = Field(..., description='Second number') + + +@ai.tool(name='add', description='add two numbers together') +def add(input: AddInput) -> int: + return input.a + input.b + + +# Define a prompt +happy_prompt = ai.define_prompt( + input_schema={'action': str}, + prompt="If you're happy and you know it, {{action}}.", +) + + +from genkit.core.action.types import ActionKind + + +# Define resources (manually registering since define_resource is not yet in Genkit API) +def define_resource(name: str, uri: str, fn): + ai.registry.register_action(kind=ActionKind.RESOURCE, name=name, fn=fn, metadata={'resource': {'uri': uri}}) + + +def define_resource_template(name: str, template: str, fn): + ai.registry.register_action( + kind=ActionKind.RESOURCE, name=name, fn=fn, metadata={'resource': {'template': template}} + ) + + +def my_resource_handler(inp): + return {'content': [{'text': 'my resource'}]} + + +define_resource('my resources', 'test://static/resource/1', my_resource_handler) + + +def file_resource_handler(inp): + uri = inp.get('uri') + return {'content': [{'text': f'file contents for {uri}'}]} + + +define_resource_template('file', 'file://{path}', file_resource_handler) + + +async def main(): + """Start the MCP server.""" + # Create MCP server + server = create_mcp_server(ai, McpServerOptions(name='example_server', version='0.0.1')) + + print('Starting MCP server on stdio...') + await server.start() + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/py/samples/mcp/test-workspace/hello-world.txt b/py/samples/mcp/test-workspace/hello-world.txt new file mode 100644 index 0000000000..723e9faf6d --- /dev/null +++ b/py/samples/mcp/test-workspace/hello-world.txt @@ -0,0 +1,4 @@ +Hello, World! + +This is a test file for the MCP filesystem sample. +It demonstrates reading and writing files through the MCP protocol. From 89ece344abe54adafb5bc23fd0ddb7ed59e151d2 Mon Sep 17 00:00:00 2001 From: Mengqin Shen Date: Wed, 7 Jan 2026 08:31:34 -0800 Subject: [PATCH 2/2] feat(py): fixed with gemini comments --- py/samples/mcp/README.md | 7 +++---- py/samples/mcp/src/main.py | 16 ++-------------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/py/samples/mcp/README.md b/py/samples/mcp/README.md index 650846a6fa..d7e444808a 100644 --- a/py/samples/mcp/README.md +++ b/py/samples/mcp/README.md @@ -20,16 +20,15 @@ This will: 2. Execute sample flows demonstrating tool usage 3. Clean up connections on exit -### Run the MCP Client/Host +### Run the HTTP MCP Server ```bash cd py/samples/mcp genkit start -- uv run src/http_server.py ``` This will: -1. Connect to the configured MCP servers -2. Execute sample flows demonstrating tool usage -3. Clean up connections on exit +1. Start an MCP server over HTTP/SSE on port 3334. +2. Expose a test tool named test_http. ### Run the MCP Server ```bash diff --git a/py/samples/mcp/src/main.py b/py/samples/mcp/src/main.py index 2e32301a95..194b6cdc53 100644 --- a/py/samples/mcp/src/main.py +++ b/py/samples/mcp/src/main.py @@ -15,8 +15,6 @@ # SPDX-License-Identifier: Apache-2.0 -import asyncio -import os from pathlib import Path import structlog @@ -64,7 +62,6 @@ async def dynamic_git_commits(query: str = ''): # Simulate wildcard matching "git-client:tool/*" by passing all tools # (since registration prefixes with server name) - # JS: tools: ['test-mcp-manager:tool/*'] result = await ai.generate(prompt=f"summarize last 5 commits in '{repo_root}'", tools=tools) @@ -91,7 +88,6 @@ async def dynamic_get_file(query: str = ''): tools = await mcp_host.register_tools(ai) # Filter for specific tool: 'fs/read_file' - # JS: tools: ['test-mcp-manager:tool/fs/read_file'] import fnmatch filtered_tools = [t for t in tools if fnmatch.fnmatch(t, '*/fs/read_file') or t.endswith('fs/read_file')] @@ -111,7 +107,6 @@ async def dynamic_prefix_tool(query: str = ''): tools = await mcp_host.register_tools(ai) # Filter for prefix: 'fs/read_*' - # JS: tools: ['test-mcp-manager:tool/fs/read_*'] import fnmatch filtered_tools = [t for t in tools if fnmatch.fnmatch(t, '*/fs/read_*')] @@ -144,11 +139,6 @@ async def dynamic_disable_enable(query: str = ''): await mcp_host.disable('fs') text2 = '' try: - # Note: In Python, we might need to verify if tools list is updated - # or if the tool call fails. disable() closes connection. - # register_tools should ideally be called again or the tool invocation fails. - # Since we passed 'filtered_tools' (names), the model will try to call. - # The tool wrapper checks connection. result = await ai.generate( prompt=f"summarize contents of hello-world.txt (in '{workspace_dir}')", tools=filtered_tools ) @@ -158,9 +148,6 @@ async def dynamic_disable_enable(query: str = ''): # 3. Re-enable 'fs' and run await mcp_host.enable('fs') - # Re-registering might be needed if registry was cleaned, but here we just re-connnect - # Implementation detail: Does register_tools need to be called again? - # Code shows wrappers capture client, client.session is updated on connect. await mcp_host.clients['fs'].connect() result3 = await ai.generate( @@ -197,7 +184,8 @@ async def test_resource(query: str = ''): resource_content = res.contents[0].text found = True break - except Exception: + except Exception as e: + logger.debug("Failed to read resource from client", error=e) continue result = await ai.generate(