-
Notifications
You must be signed in to change notification settings - Fork 30
✨ run MCP on Databricks Apps #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
66695be
d0a1bbb
139732e
2b4965e
afd6218
b50f331
a4ca53f
7adf49f
be75e90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
bundle: | ||
name: mcp-on-apps | ||
|
||
sync: | ||
include: | ||
- .build | ||
|
||
artifacts: | ||
default: | ||
type: whl | ||
path: . | ||
build: uv build --wheel | ||
|
||
resources: | ||
apps: | ||
mcp-on-apps: | ||
name: "mcp-on-apps" | ||
description: "MCP Server on Databricks Apps" | ||
source_code_path: ./.build | ||
config: | ||
command: ["unitycatalog-mcp-app"] | ||
|
||
targets: | ||
dev: | ||
mode: development | ||
default: true |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from typing import Any | ||
from hatchling.builders.hooks.plugin.interface import BuildHookInterface | ||
from pathlib import Path | ||
import shutil | ||
|
||
|
||
class AppsBuildHook(BuildHookInterface): | ||
"""Hook to create a Databricks Apps-compatible build. | ||
|
||
This hook is used to create a Databricks Apps-compatible build of the project. | ||
|
||
The following steps are performed: | ||
- Remove the ./.build folder if it exists. | ||
- Copy the artifact_path to the ./.build folder. | ||
- Write the name of the artifact to a requirements.txt file in the ./.build folder. | ||
- The resulting build directory is printed to the console. | ||
|
||
""" | ||
|
||
def finalize( | ||
self, version: str, build_data: dict[str, Any], artifact_path: str | ||
) -> None: | ||
self.app.display_info( | ||
f"Running Databricks Apps build hook for project {self.metadata.name} in directory {Path.cwd()}" | ||
) | ||
# remove the ./.build folder if it exists | ||
build_dir = Path(".build") | ||
self.app.display_info(f"Resulting build directory: {build_dir.absolute()}") | ||
|
||
if build_dir.exists(): | ||
self.app.display_info(f"Removing {build_dir}") | ||
shutil.rmtree(build_dir) | ||
self.app.display_info(f"Removed {build_dir}") | ||
else: | ||
self.app.display_info(f"{build_dir} does not exist, skipping removal") | ||
|
||
# copy the artifact_path to the ./.build folder | ||
build_dir.mkdir(exist_ok=True) | ||
self.app.display_info(f"Copying {artifact_path} to {build_dir}") | ||
shutil.copy(artifact_path, build_dir) | ||
|
||
# write the name of the artifact to a requirements.txt file in the ./.build folder | ||
requirements_file = build_dir / "requirements.txt" | ||
|
||
requirements_file.write_text(Path(artifact_path).name, encoding="utf-8") | ||
|
||
self.app.display_info( | ||
f"Apps-compatible build written to {build_dir.absolute()}" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
""" | ||
Collection of base utils for MCP servers. | ||
""" | ||
|
||
import contextlib | ||
import logging | ||
from collections import deque | ||
from dataclasses import dataclass | ||
from typing import AsyncIterator | ||
from uuid import uuid4 | ||
from starlette.applications import Starlette | ||
from starlette.routing import Mount | ||
from starlette.types import Receive, Scope, Send | ||
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager | ||
|
||
from mcp.server import Server | ||
|
||
from mcp.server.streamable_http import ( | ||
EventCallback, | ||
EventId, | ||
EventMessage, | ||
EventStore, | ||
StreamId, | ||
) | ||
from mcp.types import JSONRPCMessage | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass | ||
class EventEntry: | ||
""" | ||
Represents an event entry in the event store. | ||
""" | ||
|
||
event_id: EventId | ||
stream_id: StreamId | ||
message: JSONRPCMessage | ||
|
||
|
||
class InMemoryEventStore(EventStore): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious if we actually need any in-memory event storage for our server, it is stateless right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can keep memory for now, and remove it in future if needed. Apps have 6GB ram, ponentially we can also save into temp db |
||
""" | ||
Simple in-memory implementation of the EventStore interface for resumability. | ||
This is primarily intended for examples and testing, not for production use | ||
where a persistent storage solution would be more appropriate. | ||
|
||
This implementation keeps only the last N events per stream for memory efficiency. | ||
""" | ||
|
||
def __init__(self, max_events_per_stream: int = 100): | ||
"""Initialize the event store. | ||
|
||
Args: | ||
max_events_per_stream: Maximum number of events to keep per stream | ||
""" | ||
self.max_events_per_stream = max_events_per_stream | ||
# for maintaining last N events per stream | ||
self.streams: dict[StreamId, deque[EventEntry]] = {} | ||
# event_id -> EventEntry for quick lookup | ||
self.event_index: dict[EventId, EventEntry] = {} | ||
|
||
async def store_event( | ||
self, stream_id: StreamId, message: JSONRPCMessage | ||
) -> EventId: | ||
"""Stores an event with a generated event ID.""" | ||
event_id = str(uuid4()) | ||
event_entry = EventEntry( | ||
event_id=event_id, stream_id=stream_id, message=message | ||
) | ||
|
||
# Get or create deque for this stream | ||
if stream_id not in self.streams: | ||
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream) | ||
|
||
# If deque is full, the oldest event will be automatically removed | ||
# We need to remove it from the event_index as well | ||
if len(self.streams[stream_id]) == self.max_events_per_stream: | ||
oldest_event = self.streams[stream_id][0] | ||
self.event_index.pop(oldest_event.event_id, None) | ||
|
||
# Add new event | ||
self.streams[stream_id].append(event_entry) | ||
self.event_index[event_id] = event_entry | ||
|
||
return event_id | ||
|
||
async def replay_events_after( | ||
self, | ||
last_event_id: EventId, | ||
send_callback: EventCallback, | ||
) -> StreamId | None: | ||
"""Replays events that occurred after the specified event ID.""" | ||
if last_event_id not in self.event_index: | ||
logger.warning(f"Event ID {last_event_id} not found in store") | ||
return None | ||
|
||
# Get the stream and find events after the last one | ||
last_event = self.event_index[last_event_id] | ||
stream_id = last_event.stream_id | ||
stream_events = self.streams.get(last_event.stream_id, deque()) | ||
|
||
# Events in deque are already in chronological order | ||
found_last = False | ||
for event in stream_events: | ||
if found_last: | ||
await send_callback(EventMessage(event.message, event.event_id)) | ||
elif event.event_id == last_event_id: | ||
found_last = True | ||
|
||
return stream_id | ||
|
||
|
||
async def get_serveable_app(app: Server, json_response: bool = True) -> Starlette: | ||
|
||
event_store = InMemoryEventStore() | ||
|
||
# Create the session manager with our app and event store | ||
session_manager = StreamableHTTPSessionManager( | ||
app=app, | ||
event_store=event_store, # Enable resumability | ||
json_response=json_response, | ||
) | ||
|
||
# ASGI handler for streamable HTTP connections | ||
async def handle_streamable_http( | ||
scope: Scope, receive: Receive, send: Send | ||
) -> None: | ||
await session_manager.handle_request(scope, receive, send) | ||
|
||
@contextlib.asynccontextmanager | ||
async def lifespan(app: Starlette) -> AsyncIterator[None]: | ||
"""Context manager for managing session manager lifecycle.""" | ||
async with session_manager.run(): | ||
logger.info("Application started with StreamableHTTP session manager!") | ||
try: | ||
yield | ||
finally: | ||
logger.info("Application shutting down...") | ||
|
||
# Create an ASGI application using the transport | ||
return Starlette( | ||
debug=True, | ||
routes=[ | ||
Mount("/mcp", app=handle_streamable_http), | ||
], | ||
lifespan=lifespan, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from mcp.server import Server | ||
from mcp.types import Tool as ToolSpec | ||
import uvicorn | ||
from databricks.labs.mcp.base import get_serveable_app | ||
from databricks.labs.mcp.servers.unity_catalog.tools import ( | ||
Content, | ||
) | ||
from databricks.labs.mcp.servers.unity_catalog.cli import get_settings | ||
|
||
from databricks.labs.mcp._version import __version__ as VERSION | ||
from databricks.labs.mcp.servers.unity_catalog.server import get_tools_dict | ||
|
||
|
||
app = Server(name="mcp-unitycatalog", version=VERSION) | ||
tools_dict = get_tools_dict(settings=get_settings()) | ||
|
||
|
||
@app.list_tools() | ||
async def list_tools() -> list[ToolSpec]: | ||
return [tool.tool_spec for tool in tools_dict.values()] | ||
|
||
|
||
@app.call_tool() | ||
async def call_tool(name: str, arguments: dict) -> list[Content]: | ||
tool = tools_dict[name] | ||
return tool.execute(**arguments) | ||
|
||
|
||
def start_app(): | ||
serveable = get_serveable_app(app) | ||
uvicorn.run(serveable, host="0.0.0.0", port=8000) | ||
|
||
|
||
if __name__ == "__main__": | ||
start_app() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move this into the UC server directory, to enable separately deploying individual servers, I can help with that