Skip to content

Commit 1d2ff96

Browse files
committed
MCP Implementation
1 parent 802e41f commit 1d2ff96

File tree

22 files changed

+954
-868
lines changed

22 files changed

+954
-868
lines changed

mcp_config.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"mcpServers": {
3+
"sql_generator": {
4+
"command": "python",
5+
"args": ["-m", "datu.mcp.tools.sql_generator"],
6+
"env": { "PYTHONPATH": "." }
7+
}
8+
}
9+
}

pyproject.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies = [
3838
"openai>=1.30.1",
3939
"fastmcp>=2.10.5",
4040
"mcp-use[search]>=1.3.7",
41+
"onnxruntime==1.19.2 ; sys_platform == 'darwin' and platform_machine == 'x86_64'",
4142
]
4243

4344
[project.urls]
@@ -307,6 +308,12 @@ ignore_missing_imports = true
307308
[[tool.mypy.overrides]]
308309
module = "langchain_mcp_adapters.tools"
309310
ignore_missing_imports = true
311+
[[tool.mypy.overrides]]
312+
module = "mcp_use"
313+
ignore_missing_imports = true
314+
[[tool.mypy.overrides]]
315+
module = "mcp_use.*"
316+
ignore_missing_imports = true
310317

311318
[tool.towncrier]
312319
directory = "changelog.d"

src/datu/app_config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from pydantic_settings import BaseSettings, SettingsConfigDict
1616

1717
from datu.integrations.config import IntegrationConfigs
18+
from datu.mcp.config import MCPConfig
1819
from datu.services.config import SchemaRAGConfig
1920

2021

@@ -72,7 +73,11 @@ class DatuConfig(BaseSettings):
7273
simulate_llm_response (str): Whether to simulate LLM responses.
7374
schema_sample_limit (int): The maximum number of rows to sample from the schema.
7475
schema_categorical_threshold (int): The threshold for categorical columns in the schema.
76+
enable_mcp (bool): Whether to enable MCP integration.
77+
mcp (MCPConfig | None): Configuration settings for MCP integration.
7578
enable_schema_rag (bool): Enable RAG for schema extraction.
79+
schema_rag (SchemaRAGConfig | None): Configuration settings for schema RAG.
80+
7681
7782
"""
7883

@@ -94,6 +99,11 @@ class DatuConfig(BaseSettings):
9499
schema_categorical_detection: bool = True
95100
schema_sample_limit: int = 1000
96101
schema_categorical_threshold: int = 10
102+
enable_mcp: bool = True
103+
mcp: MCPConfig | None = Field(
104+
default_factory=MCPConfig,
105+
description="Configuration settings for MCP integration.",
106+
)
97107
enable_schema_rag: bool = False
98108
schema_rag: SchemaRAGConfig | None = Field(
99109
default_factory=SchemaRAGConfig,

src/datu/base/chat_schema.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from typing import List, Optional
2+
3+
from pydantic import BaseModel
4+
5+
6+
class ChatMessage(BaseModel):
7+
"""Represents a single message in a chat conversation.
8+
9+
Attributes:
10+
role (str): The role of the message sender (e.g., "user", "assistant").
11+
content (str): The content of the message.
12+
"""
13+
14+
role: str
15+
content: str
16+
17+
18+
class ChatRequest(BaseModel):
19+
"""Represents a chat request containing a list of messages and an optional system prompt.
20+
21+
Attributes:
22+
messages (List[ChatMessage]): A list of messages in the chat conversation.
23+
system_prompt (Optional[str]): An optional system prompt to provide context for the conversation.
24+
"""
25+
26+
messages: List[ChatMessage]
27+
system_prompt: Optional[str] = None

src/datu/base/llm_client.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,34 @@
22

33
from abc import ABC, abstractmethod
44

5+
from mcp_use import MCPClient
6+
7+
from datu.app_config import settings
8+
59

610
class BaseLLMClient(ABC):
711
"""BaseLLMClient class to provide a common interface for LLM clients.
812
This class serves as an abstract base class for all LLM clients,
913
providing a common interface and shared functionality.
1014
"""
1115

16+
def __init__(self):
17+
"""Initializes the BaseLLMClient.
18+
Sets up the client and MCP client if enabled in the settings.
19+
20+
Attributes:
21+
client: The LLM client instance.
22+
mcp_client: The MCP client instance if MCP is enabled in the settings.
23+
agent: The agent instance if applicable.
24+
"""
25+
self.client = None
26+
self.mcp_client = None
27+
if settings.enable_mcp:
28+
self.mcp_client = MCPClient.from_config_file(settings.mcp.config_file)
29+
self.agent = None
30+
1231
@abstractmethod
13-
def chat_completion(self, messages: list, system_prompt: str | None = None) -> str:
32+
async def chat_completion(self, messages: list, system_prompt: str | None = None) -> str:
1433
"""Given a conversation (and an optional system prompt), returns the assistant's text response."""
1534

1635
@abstractmethod

src/datu/llm_clients/openai_client.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from langchain_community.chat_message_histories import ChatMessageHistory
99
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
1010
from langchain_openai import ChatOpenAI
11+
from mcp_use import MCPAgent
1112

1213
from datu.app_config import get_logger, settings
1314
from datu.base.llm_client import BaseLLMClient
@@ -65,15 +66,37 @@ class OpenAIClient(BaseLLMClient):
6566
"""
6667

6768
def __init__(self):
69+
super().__init__()
6870
self.model = getattr(settings, "openai_model", "gpt-4o-mini")
6971
self.client = ChatOpenAI(
7072
api_key=settings.openai_api_key,
7173
model=self.model,
7274
temperature=settings.llm_temperature,
7375
)
7476
self.history = ChatMessageHistory()
77+
if settings.enable_mcp:
78+
if not self.mcp_client:
79+
raise RuntimeError("MCP is enabled but mcp_client was not initialized. ")
80+
try:
81+
self.agent = MCPAgent(
82+
llm=self.client,
83+
client=self.mcp_client,
84+
max_steps=settings.mcp.max_steps,
85+
use_server_manager=settings.mcp.use_server_manager,
86+
)
87+
except Exception:
88+
# Prefer failing early so misconfig doesn’t silently degrade behavior
89+
logger.exception("Failed to construct MCPAgent with provided MCP settings.")
90+
raise
91+
92+
async def chat(self, input_text: str) -> str:
93+
response = await self.agent.run(
94+
input_text,
95+
max_steps=30,
96+
)
97+
return response
7598

76-
def chat_completion(self, messages: list[BaseMessage], system_prompt: str | None = None) -> str:
99+
async def chat_completion(self, messages: list[BaseMessage], system_prompt: str | None = None) -> str:
77100
if settings.simulate_llm_response:
78101
return create_simulated_llm_response()
79102
if not messages:
@@ -114,9 +137,20 @@ def chat_completion(self, messages: list[BaseMessage], system_prompt: str | None
114137
)
115138

116139
self.history.add_message(HumanMessage(content=last_user_message))
117-
response = self.client.invoke(self.history.messages)
118-
self.history.add_message(response)
119-
return response.content if response else ""
140+
# Convert entire history messages to plain text for chat()
141+
# Adjust this if your llm_with_tools expects different format
142+
input_text = "\n".join(msg.content for msg in self.history.messages if hasattr(msg, "content"))
143+
144+
response = await self.chat(input_text)
145+
146+
# Assuming response is a BaseMessage or similar with 'content'
147+
if hasattr(response, "content"):
148+
self.history.add_message(response)
149+
return response.content
150+
else:
151+
# If response is plain text string
152+
self.history.add_message(HumanMessage(content=response))
153+
return response
120154

121155
def fix_sql_error(self, sql_code: str, error_msg: str, loop_count: int) -> str:
122156
"""Generates a corrected SQL query based on the provided SQL code and error message.

src/datu/main.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,14 @@
1111
from fastapi.staticfiles import StaticFiles
1212

1313
from datu.app_config import get_logger, settings
14-
from datu.mcp.launcher import launch_mcp_server
1514
from datu.routers import chat, metadata, transformations
1615
from datu.schema_extractor.schema_cache import load_schema_cache
1716

1817
logger = get_logger(__name__)
1918

2019
# Optionally load schema and graph-rag in cache for use in prompts or logging
2120
if settings.app_environment != "test":
22-
if settings.enable_schema_rag:
23-
launch_mcp_server("schema_rag_server")
24-
else:
25-
schema_data = load_schema_cache()
21+
schema_data = load_schema_cache()
2622

2723

2824
# Create the FastAPI application instance.

src/datu/mcp/client.py

Lines changed: 0 additions & 66 deletions
This file was deleted.

src/datu/mcp/config.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from pydantic import Field
2+
from pydantic_settings import BaseSettings
3+
4+
5+
class MCPConfig(BaseSettings):
6+
"""Configuration settings for MCP integration.
7+
This class uses Pydantic's BaseSettings to manage environment variables
8+
and provides a structured way to access MCP configuration settings.
9+
10+
Args:
11+
max_steps (int): The maximum number of steps for MCP operations.
12+
use_server_manager (bool): Whether to use the server manager for MCP operations.
13+
config_file (str): The file path for the MCP configuration file.
14+
15+
Attributes:
16+
max_steps (int): The maximum number of steps for MCP operations.
17+
use_server_manager (bool): Whether to use the server manager for MCP operations.
18+
config_file (str): The file path for the MCP configuration file.
19+
"""
20+
21+
max_steps: int = Field(default=30, description="Maximum number of steps MCPAgent can run.")
22+
use_server_manager: bool = Field(default=True, description="Whether to enable the Server Manager.")
23+
config_file: str = Field(default="mcp_config.json", description="Path to the MCP configuration file.")

src/datu/mcp/launcher.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)