Skip to content

Commit d0b0649

Browse files
szymondudyczolruas
authored andcommitted
Template with MCP server for llm-app (#9527)
Co-authored-by: Olivier Ruas <olivier@pathway.com> GitOrigin-RevId: fb737a2c9ba933f56ccf0efa57ee533b820786be
1 parent a56172f commit d0b0649

17 files changed

+371
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
PATHWAY_LICENSE_KEY="YOUR PATHWAY KEY" # can be obtained here: https://pathway.com/user/license
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
FROM pathwaycom/pathway:latest
2+
3+
WORKDIR /app
4+
5+
RUN apt-get update \
6+
&& apt-get install -y python3-opencv tesseract-ocr-eng \
7+
&& rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/*
8+
9+
COPY requirements.txt .
10+
RUN pip install -U --no-cache-dir -r requirements.txt
11+
12+
COPY . .
13+
14+
EXPOSE 8068
15+
16+
CMD ["python", "app.py"]
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
<p align="center" class="flex items-center gap-1 justify-center flex-wrap">
2+
<img src="../../assets/gcp-logo.svg?raw=true" alt="GCP Logo" height="20" width="20">
3+
<a href="https://pathway.com/developers/user-guide/deployment/gcp-deploy">Deploy with GCP</a> |
4+
<img src="../../assets/aws-fargate-logo.svg?raw=true" alt="AWS Logo" height="20" width="20">
5+
<a href="https://pathway.com/developers/user-guide/deployment/aws-fargate-deploy">Deploy with AWS</a> |
6+
<img src="../../assets/azure-logo.svg?raw=true" alt="Azure Logo" height="20" width="20">
7+
<a href="https://pathway.com/developers/user-guide/deployment/azure-aci-deploy">Deploy with Azure</a> |
8+
<img src="../../assets/render.png?raw=true" alt="Render Logo" height="20" width="20">
9+
<a href="https://pathway.com/developers/user-guide/deployment/render-deploy"> Deploy with Render </a>
10+
</p>
11+
12+
# MCP Server with Realtime Document Indexing
13+
14+
This is a template for exposing a real-time document indexing pipeline powered by [Pathway](https://github.com/pathwaycom/pathway) as an Model Context Protocol (MCP) server.
15+
16+
The [Model Context Protocol (MCP)](https://modelcontextprotocol.io/docs/getting-started/intro) is designed to standardize the way applications interact with large language models (LLMs). It serves as a bridge, much like a universal connector, enabling seamless integration between AI models and various data sources and tools. This protocol facilitates the creation of sophisticated AI workflows and agents, enhancing the capabilities of LLMs by connecting them with real-world data and functionalities.
17+
18+
The capabilities of the pipeline include:
19+
20+
- Real-time document indexing from Microsoft 365 SharePoint, Google Drive, or a local directory;
21+
- Similarity search by user query;
22+
- Filtering by the metadata according to the condition given in [JMESPath format](https://jmespath.org/);
23+
- The documents are available from a standardized MCP server.
24+
25+
## Summary of the Pipeline
26+
27+
This example spawns an MCP server that has three tools:
28+
- `retrieve_query` to perform similarity search on the indexed documents,
29+
- `statistics_query` to get the basic stats about the indexer's health,
30+
- `inputs_query` to retrieve the metadata of all files currently processed by the indexer.
31+
32+
You can get specification of those tools by querying the `list_tools` on the MCP server.
33+
34+
## How It Works
35+
36+
This pipeline uses several Pathway connectors to read the data from the local drive, Google Drive, or Microsoft SharePoint sources. It allows you to poll the changes with low latency and to do the modifications tracking. So, if something changes in the tracked files, the corresponding change is reflected in the internal collections. The contents are read into a single Pathway Table as binary objects.
37+
38+
After that, those binary objects are parsed with the [Docling](https://www.docling.ai/) library and split into chunks. With the usage of the [SentenceTransformer](https://www.sbert.net/) embedder, the pipeline embeds the obtained chunks.
39+
40+
Finally, the embeddings are indexed with the capabilities of Pathway's machine-learning library. The user can then query the created index by connecting to the MCP server using an MCP client.
41+
42+
## Pipeline Organization
43+
44+
This folder contains several objects:
45+
- `app.py`, the pipeline code using Pathway and written in Python;
46+
- `app.yaml`, the file containing configuration of the pipeline, like embedding model, sources, or the server address;
47+
- `requirements.txt`, the textfile denoting the pip dependencies for running this pipeline. It can be passed to `pip install -r requirements.txt` to install everything that is needed to launch the pipeline locally;
48+
- `Dockerfile`, the Docker configuration for running the pipeline in the container;
49+
- `docker-compose.yml`, the docker-compose configuration for running the pipeline along with the chat UI;
50+
- `files-for-indexing/`, a folder with exemplary files that can be used for the test runs.
51+
52+
## Customizing the pipeline
53+
54+
The code can be modified by changing the `app.yaml` configuration file. To read more about YAML files used in Pathway templates, read [our guide](https://pathway.com/developers/templates/configure-yaml).
55+
56+
In the `app.yaml` file we define:
57+
- input connectors
58+
- embedder
59+
- index
60+
and any of these can be replaced or, if no longer needed, removed. For components that can be used check
61+
Pathway [LLM xpack](https://pathway.com/developers/user-guide/llm-xpack/overview), or you can implement your own.
62+
63+
Here some examples of what can be modified.
64+
65+
### Embedding Model
66+
67+
By default this template uses locally run model `mixedbread-ai/mxbai-embed-large-v1`. If you wish, you can replace this with any other model, by changing
68+
`$embedder` in `app.yaml`. For example, to use OpenAI embedder, set:
69+
```yaml
70+
$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder
71+
model: "text-embedding-3-small"
72+
cache_strategy: !pw.udfs.DefaultCache {}
73+
retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy {}
74+
```
75+
76+
If you choose to use a provider, that requires API key, remember to set appropriate environmental values (you can also set them in the `.env` file) - e.g. for using OpenAI embedders, set the `OPENAI_API_KEY` variable.
77+
78+
### Webserver
79+
80+
You can configure the name, the host and the port of the MCP server.
81+
Here is the default configuration:
82+
```yaml
83+
mcp_http: !pw.xpacks.llm.mcp_server.PathwayMcp
84+
name: "Streamable MCP Server"
85+
transport: "streamable-http"
86+
host: "localhost"
87+
port: 8068
88+
serve:
89+
- $document_store
90+
```
91+
92+
### Cache
93+
94+
You can configure whether you want to enable cache or persistence, to avoid repeated API accesses, and where the cache is stored.
95+
Default values:
96+
```yaml
97+
persistence_mode: !pw.PersistenceMode.UDF_CACHING
98+
persistence_backend: !pw.persistence.Backend.filesystem
99+
path: ".Cache"
100+
```
101+
102+
### Data sources
103+
104+
You can configure the data sources by changing `$sources` in `app.yaml`.
105+
You can add as many data sources as you want. You can have several sources of the same kind, for instance, several local sources from different folders.
106+
The sections below describe how to configure local, Google Drive and Sharepoint source, and you can check the examples of YAML configuration in our [user guide](https://pathway.com/developers/templates/yaml-snippets/data-sources-examples/). While these are not described in this Section, you can also use any input [connector](https://pathway.com/developers/user-guide/connecting-to-data/connectors) from Pathway package.
107+
108+
By default, the app uses a local data source to read documents from the `files-from-indexing` folder.
109+
110+
#### Local Data Source
111+
112+
The local data source is configured by using map with tag `!pw.io.fs.read`. Then set `path` to denote the path to a folder with files to be indexed.
113+
114+
#### Google Drive Data Source
115+
116+
The Google Drive data source is enabled by using map with tag `!pw.io.gdrive.read`. The map must contain two main parameters:
117+
- `object_id`, containing the ID of the folder that needs to be indexed. It can be found from the URL in the web interface, where it's the last part of the address. For example, the publicly available demo folder in Google Drive has the URL `https://drive.google.com/drive/folders/1cULDv2OaViJBmOfG5WB0oWcgayNrGtVs`. Consequently, the last part of this address is `1cULDv2OaViJBmOfG5WB0oWcgayNrGtVs`, hence this is the `object_id` you would need to specify.
118+
- `service_user_credentials_file`, containing the path to the credentials files for the Google [service account](https://cloud.google.com/iam/docs/service-account-overview). To get more details on setting up the service account and getting credentials, you can also refer to [this tutorial](https://pathway.com/developers/user-guide/connectors/gdrive-connector#setting-up-google-drive).
119+
120+
Besides, to speed up the indexing process you may want to specify the `refresh_interval` parameter, denoted by an integer number of seconds. It corresponds to the frequency between two sequential folder scans. If unset, it defaults to 30 seconds.
121+
122+
For the full list of the available parameters, please refer to the Google Drive connector [documentation](https://pathway.com/developers/api-docs/pathway-io/gdrive#pathway.io.gdrive.read).
123+
124+
#### SharePoint Data Source
125+
126+
This data source requires Scale or Enterprise [license key](https://pathway.com/pricing) - you can obtain free Scale key on [Pathway website](https://pathway.com/get-license).
127+
128+
To use it, set the map tag to be `!pw.xpacks.connectors.sharepoint.read`, and then provide values of `url`, `tenant`, `client_id`, `cert_path`, `thumbprint` and `root_path`. To read about the meaning of these arguments, check the Sharepoint connector [documentation](https://pathway.com/developers/api-docs/pathway-xpacks-sharepoint#pathway.xpacks.connectors.sharepoint.read).
129+
130+
## Running the Template
131+
132+
### Pathway License Key
133+
Pathway MCP Server requires a Pathway license key, so before you run the template, you need to set the license key. This template is available for free via [Pathway Scale](https://pathway.com/features), for which you can get the license key [here](https://pathway.com/user/license). Once you have your license key, create a `.env` file, in which set `PATHWAY_LICENSE_KEY` to your license key - see `.env.example` for an example of `.env` file.
134+
135+
### Locally
136+
137+
This template can be run locally by executing `python app.py` in this directory. Please note that the local run requires the `Pathway` library and other dependencies to be installed. It can be done with a pip command:
138+
139+
```bash
140+
pip install pathway[all]
141+
pip install -r requirements.txt
142+
```
143+
144+
### With Docker`.
145+
146+
To run jointly the MCP server with real-time document indexint, please execute:
147+
148+
```bash
149+
docker compose up --build
150+
```
151+
152+
The `docker-compose.yml` file declares a [volume bind mount](https://docs.docker.com/reference/cli/docker/container/run/#volume) that makes changes to files under `files-for-indexing/` made on your host computer visible inside the docker container. If the index does not react to file changes, please check that the bind mount works
153+
by running `docker compose exec pathway_vector_indexer ls -l /app/files-for-indexing/` and verifying that all files are visible.
154+
155+
156+
## Querying the Template with an MCP client
157+
158+
To test your examples, you need an MCP client which will connect to your MCP server. You can use the fastmcp package to define a client as follows:
159+
160+
```python
161+
import asyncio
162+
from fastmcp import Client
163+
164+
# Change the URL if you change the default values in the app.yaml
165+
PATHWAY_MCP_URL = "http://localhost:8068/mcp/"
166+
167+
client = Client(PATHWAY_MCP_URL)
168+
169+
170+
async def main():
171+
async with client:
172+
tools = await client.list_tools()
173+
print(tools)
174+
175+
async with client:
176+
result = await client.call_tool(
177+
name="retrieve_query",
178+
arguments={"query": "How to create a webserver in Pathway?", "k": 3},
179+
)
180+
print(result)
181+
182+
183+
asyncio.run(main())
184+
185+
```
186+
187+
You can list the different tools available in the MCP server using the `list_tools` of the client. To access a given tool, you can use the method call_tool, with the name and arguments parameters. The arguments should be a dict of the different values: in this case, the `retrieve_query` tool has two required arguments: `query` and `k`.
188+
189+
## Using MCP server in Claude Desktop
190+
To use MCP server created by this template in Claude Desktop, follow the [guide in Pathway's documentation](https://pathway.com/developers/user-guide/llm-xpack/pathway-mcp-claude-desktop).
191+
192+
## Adding Files to Index
193+
194+
To test index updates, simply add more files to the `files-for-indexing` folder if the local data source is used.
195+
If you are using Google Drive, simply upload your files in the folder configured in the `sources_configuration.yaml` file.
196+
197+
Then you can use the similarity search and stats endpoints, provided below.

templates/document_store_mcp_server/__init__.py

Whitespace-only changes.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import logging
2+
3+
import pathway as pw
4+
from dotenv import load_dotenv
5+
from pathway.xpacks.llm.mcp_server import PathwayMcp
6+
from pydantic import BaseModel, ConfigDict
7+
8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format="%(asctime)s %(name)s %(levelname)s %(message)s",
11+
datefmt="%Y-%m-%d %H:%M:%S",
12+
)
13+
14+
load_dotenv()
15+
16+
17+
class App(BaseModel):
18+
mcp_http: PathwayMcp
19+
host: str = "0.0.0.0"
20+
port: int = 8000
21+
22+
terminate_on_error: bool = False
23+
persistence_backend: pw.persistence.Backend | None = None
24+
persistence_mode: pw.PersistenceMode | None = pw.PersistenceMode.UDF_CACHING
25+
26+
def run(self) -> None:
27+
if self.persistence_mode is not None:
28+
if self.persistence_backend is None:
29+
persistence_backend = pw.persistence.Backend.filesystem("./Cache")
30+
else:
31+
persistence_backend = self.persistence_backend
32+
persistence_config = pw.persistence.Config(
33+
persistence_backend,
34+
persistence_mode=self.persistence_mode,
35+
)
36+
else:
37+
persistence_config = None
38+
pw.run(
39+
terminate_on_error=self.terminate_on_error,
40+
persistence_config=persistence_config,
41+
monitoring_level=pw.MonitoringLevel.NONE,
42+
)
43+
44+
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
45+
46+
47+
if __name__ == "__main__":
48+
with open("app.yaml") as f:
49+
config = pw.load_yaml(f)
50+
print(config)
51+
app = App(**config)
52+
app.run()
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# This YAML configuration file is used to set up and configure the Document indexing RAG template.
2+
# It defines various components such as data sources, embedders, splitters, parsers, and retrievers.
3+
# Each section is configured to specify how the template should process and handle data for answering the queries.
4+
# You can learn more about the YAML syntax here: https://pathway.com/developers/templates/configure-yaml
5+
6+
7+
# $sources defines the data sources used to read the data which will be indexed in the RAG.
8+
# You can learn more how to configure data sources here:
9+
# https://pathway.com/developers/templates/yaml-examples/data-sources-examples
10+
11+
$sources:
12+
# File System connector, reading data locally.
13+
- !pw.io.fs.read
14+
path: files-for-indexing
15+
format: binary
16+
with_metadata: true
17+
18+
# Uncomment to use the SharePoint connector
19+
# - !pw.xpacks.connectors.sharepoint.read
20+
# url: $SHAREPOINT_URL
21+
# tenant: $SHAREPOINT_TENANT
22+
# client_id: $SHAREPOINT_CLIENT_ID
23+
# cert_path: sharepointcert.pem
24+
# thumbprint: $SHAREPOINT_THUMBPRINT
25+
# root_path: $SHAREPOINT_ROOT
26+
# with_metadata: true
27+
# refresh_interval: 30
28+
29+
# Uncomment to use the Google Drive connector
30+
# - !pw.io.gdrive.read
31+
# object_id: $DRIVE_ID
32+
# service_user_credentials_file: gdrive_indexer.json
33+
# file_name_pattern:
34+
# - "*.pdf"
35+
# - "*.pptx"
36+
# object_size_limit: null
37+
# with_metadata: true
38+
# refresh_interval: 30
39+
40+
# Model used for embedding
41+
$embedding_model: "mixedbread-ai/mxbai-embed-large-v1"
42+
43+
# Specifies the embedder model for converting text into embeddings.
44+
$embedder: !pw.xpacks.llm.embedders.SentenceTransformerEmbedder
45+
model: $embedding_model
46+
call_kwargs:
47+
show_progress_bar: False
48+
49+
# Defines the splitter settings for dividing text into smaller chunks.
50+
$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter
51+
max_tokens: 400
52+
53+
# Configures the parser for processing and extracting information from documents.
54+
$parser: !pw.xpacks.llm.parsers.DoclingParser
55+
async_mode: "fully_async"
56+
chunk: false
57+
cache_strategy: !pw.udfs.DefaultCache {}
58+
59+
# Sets up the retriever factory for indexing and retrieving documents.
60+
$retriever_factory: !pw.indexing.UsearchKnnFactory
61+
reserved_space: 1000
62+
embedder: $embedder
63+
metric: !pw.indexing.USearchMetricKind.COS
64+
65+
# Manages the storage and retrieval of documents for the RAG template.
66+
$document_store: !pw.xpacks.llm.document_store.DocumentStore
67+
docs: $sources
68+
parser: $parser
69+
splitter: $splitter
70+
retriever_factory: $retriever_factory
71+
72+
# Streamable MCP server, can be proxied
73+
mcp_http: !pw.xpacks.llm.mcp_server.PathwayMcp
74+
name: "Streamable MCP Server"
75+
transport: "streamable-http"
76+
host: "0.0.0.0"
77+
port: 8068
78+
serve:
79+
- $document_store
80+
81+
# By default, caching is enabled for UDFs with cache_strategy set.
82+
# You can disable it by uncommenting the following line.
83+
# persistence_mode: null
84+
# You can also set persistence_mode to !pw.PersistenceMode.PERSISTING to enable persistence
85+
# across restarts.
86+
# By default, when enabled, Cache is stored in .Cache directory.
87+
# You can customize the location by uncommenting and modifying the following lines:
88+
# persistence_backend: !pw.persistence.Backend.filesystem
89+
# path: ".Cache"
90+
91+
# If `terminate_on_error` is true then the program will terminate whenever any error is encountered.
92+
# Defaults to false, uncomment the following line if you want to set it to true
93+
# terminate_on_error: true
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
version: "3.8"
2+
services:
3+
pathway_mcp_server:
4+
build:
5+
context: .
6+
ports:
7+
- "8068:8068"
8+
environment:
9+
PATHWAY_LICENSE_KEY: $PATHWAY_LICENSE_KEY
10+
volumes:
11+
- "./files-for-indexing:/app/files-for-indexing"
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)