Skip to content

Commit 221b064

Browse files
feat(clp-mcp-server): Introduce session-based MCP capabilities: (y-scope#1401)
- Add `get_instructions` MCP tool call to retrieve the system prompt. - Add `get_nth_page` MCP tool call to retrieve paginated query results cached from the previous search. - Introduce `SessionManager` for managing asynchronous session-level states. Co-authored-by: LinZhihao-723 <[email protected]>
1 parent 9e991ab commit 221b064

File tree

8 files changed

+887
-253
lines changed

8 files changed

+887
-253
lines changed

components/clp-mcp-server/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ For more details on Claude Desktop MCP setup, see the
4949

5050
> **Warning:** 🚧 This section is still under construction.
5151
52+
## Testing
53+
54+
Use the following command to run all unit tests:
55+
56+
```shell
57+
uv test pytest
58+
```
59+
5260
[claude-desktop]: https://claude.ai/download
5361
[claude-desktop-mcp-doc]: https://modelcontextprotocol.io/docs/develop/connect-local-servers
5462
[mcp]: https://modelcontextprotocol.io/docs/getting-started/intro
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Constants for CLP MCP server."""
2+
3+
EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS = 600 # 10 minutes
4+
NUM_ITEMS_PER_PAGE = 10
5+
MAX_CACHED_RESULTS = 1000
6+
# 10 minutes
7+
SESSION_TTL_SECONDS = 600
8+
9+
SERVER_NAME = "clp-mcp-server"
10+
11+
# fmt: off
12+
SYSTEM_PROMPT = (
13+
"You are an AI assistant that helps users query a log database using KQL (Kibana Query Language)."
14+
" You should generate a KQL query that accurately expresses the user's intent. The generated KQL"
15+
" query should be as specific as possible to minimize the number of log messages returned.\n\n"
16+
"You should consider the following guidelines to generate KQL queries efficiently:\n"
17+
"- Use specific field names and values to narrow down the search.\n"
18+
"- Avoid using wildcards (`*`) unless absolutely necessary, as they can lead to large result"
19+
" sets.\n"
20+
"- Use logical operators (`AND`, `OR`, `NOT`) to combine one or more key-value searches.\n"
21+
"- Consider the time range of the logs you are searching. If the user specifies a time range,"
22+
" include it in the KQL query.\n"
23+
"- If the user query is ambiguous or lacks detail, ask clarifying questions to better understand"
24+
" their intent before generating the KQL query.\n"
25+
"- Always ensure that the generated KQL query is syntactically correct and can be executed without"
26+
" errors."
27+
)
28+
# fmt: on

components/clp-mcp-server/clp_mcp_server/server/server.py

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,10 @@
22

33
from typing import Any
44

5-
from fastmcp import FastMCP
5+
from fastmcp import Context, FastMCP
66

7-
8-
class ProtocolConstant:
9-
"""Constants for the CLP MCP Server."""
10-
11-
SERVER_NAME = "clp-mcp-server"
12-
13-
# Tool names
14-
TOOL_HELLO_WORLD = "hello_world"
15-
TOOL_GET_SERVER_INFO = "get_server_info"
16-
17-
@classmethod
18-
def get_capabilities(cls) -> list[str]:
19-
"""
20-
Gets the capabilities of the server.
21-
:return: A list of tool names supported by the server.
22-
"""
23-
return [cls.TOOL_HELLO_WORLD, cls.TOOL_GET_SERVER_INFO]
7+
from . import constants
8+
from .session_manager import SessionManager
249

2510

2611
def create_mcp_server() -> FastMCP:
@@ -31,22 +16,44 @@ def create_mcp_server() -> FastMCP:
3116
:raise: Propagates `FastMCP.__init__`'s exceptions.
3217
:raise: Propagates `FastMCP.tool`'s exceptions.
3318
"""
34-
mcp = FastMCP(name=ProtocolConstant.SERVER_NAME)
19+
mcp = FastMCP(name=constants.SERVER_NAME)
20+
21+
session_manager = SessionManager(session_ttl_seconds=constants.SESSION_TTL_SECONDS)
3522

36-
@mcp.tool()
37-
def get_server_info() -> dict[str, Any]:
23+
@mcp.tool
24+
async def get_instructions(ctx: Context) -> str:
3825
"""
39-
Gets the MCP server's information.
26+
Gets a pre-defined "system prompt" that guides the LLM behavior.
27+
This function must be invoked before any other `FastMCP.tool`.
4028
41-
:return: The server's information with a list of capabilities.
29+
:param ctx: The `FastMCP` context containing the metadata of the underlying MCP session.
30+
:return: A string of "system prompt".
4231
"""
43-
return {
44-
"name": ProtocolConstant.SERVER_NAME,
45-
"capabilities": ProtocolConstant.get_capabilities(),
46-
"status": "running",
47-
}
32+
await session_manager.start()
33+
return session_manager.get_or_create_session(ctx.session_id).get_instructions()
34+
35+
@mcp.tool
36+
async def get_nth_page(page_index: int, ctx: Context) -> dict[str, Any]:
37+
"""
38+
Retrieves the n-th page of a paginated response with the paging metadata from the previous
39+
query.
40+
41+
:param page_index: Zero-based index, e.g., 0 for the first page.
42+
:param ctx: The `FastMCP` context containing the metadata of the underlying MCP session.
43+
:return: A dictionary containing the following key-value pairs on success:
44+
- "items": A list of log entries in the requested page.
45+
- "num_total_pages": Total number of pages available from the query as an integer.
46+
- "num_total_items": Total number of log entries available from the query as an integer.
47+
- "num_items_per_page": Number of log entries per page.
48+
- "has_next": Whether a page exists after the returned one.
49+
- "has_previous": Whether a page exists before the returned one.
50+
:return: A dictionary with the following key-value pair on failures:
51+
- "Error": An error message describing the failure.
52+
"""
53+
await session_manager.start()
54+
return session_manager.get_nth_page(ctx.session_id, page_index)
4855

49-
@mcp.tool()
56+
@mcp.tool
5057
def hello_world(name: str = "clp-mcp-server user") -> dict[str, Any]:
5158
"""
5259
Provides a simple hello world greeting.
@@ -56,7 +63,7 @@ def hello_world(name: str = "clp-mcp-server user") -> dict[str, Any]:
5663
"""
5764
return {
5865
"message": f"Hello World, {name.strip()}!",
59-
"server": ProtocolConstant.SERVER_NAME,
66+
"server": constants.SERVER_NAME,
6067
"status": "running",
6168
}
6269

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
"""Session management for CLP MCP Server."""
2+
3+
import asyncio
4+
from dataclasses import dataclass, field
5+
from datetime import datetime, timedelta, timezone
6+
from typing import Any, ClassVar
7+
8+
from paginate import Page
9+
10+
from . import constants
11+
12+
13+
class PaginatedQueryResult:
14+
"""Paginates the cached log entries returned from a query's response."""
15+
16+
def __init__(self, log_entries: list[str], num_items_per_page: int) -> None:
17+
"""
18+
:param log_entries: Log entries to paginate.
19+
:param num_items_per_page:
20+
:raise: ValueError if the number of cached results or `num_items_per_page` is invalid.
21+
"""
22+
if len(log_entries) > constants.MAX_CACHED_RESULTS:
23+
err_msg = (
24+
"PaginatedQueryResult exceeds maximum allowed cached results:"
25+
f" {len(log_entries)} > {constants.MAX_CACHED_RESULTS}."
26+
)
27+
raise ValueError(err_msg)
28+
29+
if num_items_per_page <= 0:
30+
err_msg = (
31+
f"Invalid num_items_per_page: {num_items_per_page}, it must be a positive integer."
32+
)
33+
raise ValueError(err_msg)
34+
35+
self._num_items_per_page: int = num_items_per_page
36+
self._num_pages: int = (len(log_entries) + num_items_per_page - 1) // num_items_per_page
37+
self._log_entries: list[str] = log_entries
38+
39+
def get_page(self, page_index: int) -> Page | None:
40+
"""
41+
:param page_index: Zero-based index, e.g., 0 for the first page.
42+
:return: A `Page` object for the specified page.
43+
:return: None if `page_index` is out of bounds.
44+
"""
45+
# Convert zero-based to one-based
46+
page_number = page_index + 1
47+
if page_number <= 0 or self._num_pages < page_number:
48+
return None
49+
50+
return Page(
51+
self._log_entries,
52+
page=page_number,
53+
items_per_page=self._num_items_per_page,
54+
)
55+
56+
57+
@dataclass
58+
class SessionState:
59+
"""
60+
Represents the state of a user session, following the same concurrency model as
61+
`SessionManager`.
62+
63+
NOTE: All methods are intended to be executed exclusively by coroutines within the same event
64+
loop in a single-threaded context.
65+
"""
66+
67+
_num_items_per_page: int
68+
_session_id: str
69+
_session_ttl_seconds: float
70+
71+
_cached_query_result: PaginatedQueryResult | None = None
72+
_last_accessed: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
73+
_is_instructions_retrieved: bool = False
74+
75+
_GET_INSTRUCTIONS_NOT_RUN_ERROR: ClassVar[dict[str, str]] = {
76+
"Error": "Please call `get_instructions()` first to understand how to use this MCP server."
77+
}
78+
79+
def cache_query_result_and_get_first_page(
80+
self,
81+
results: list[str],
82+
) -> dict[str, Any]:
83+
"""
84+
:param results: Log entries from the query to cache.
85+
:return: Forwards `SessionState.get_page_data`'s return values.
86+
:return: _GET_INSTRUCTIONS_NOT_RUN_ERROR if `get_instructions` has not been called in this
87+
session.
88+
"""
89+
if self._is_instructions_retrieved is False:
90+
return self._GET_INSTRUCTIONS_NOT_RUN_ERROR.copy()
91+
92+
self._cached_query_result = PaginatedQueryResult(
93+
log_entries=results, num_items_per_page=self._num_items_per_page
94+
)
95+
96+
return self.get_page_data(0)
97+
98+
def get_instructions(self) -> str:
99+
"""
100+
Gets a pre-defined "system prompt" that guides the LLM behavior.
101+
102+
:return: A string of "system prompt".
103+
"""
104+
self._is_instructions_retrieved = True
105+
return constants.SYSTEM_PROMPT
106+
107+
def get_page_data(self, page_index: int) -> dict[str, Any]:
108+
"""
109+
Retrieves the n-th page of a paginated response with the paging metadata from the previous
110+
query.
111+
112+
NOTE: This docstring must be synchronized with `get_nth_page`'s MCP tool call.
113+
114+
:param page_index: Zero-based index, e.g., 0 for the first page.
115+
:return: A dictionary containing the following key-value pairs on success:
116+
- "items": A list of log entries in the requested page.
117+
- "num_total_pages": Total number of pages available from the query as an integer.
118+
- "num_total_items": Total number of log entries available from the query as an integer.
119+
- "num_items_per_page": Number of log entries per page.
120+
- "has_next": Whether a page exists after the returned one.
121+
- "has_previous": Whether a page exists before the returned one.
122+
:return: A dictionary with the following key-value pair on failures:
123+
- "Error": An error message describing the failure.
124+
:return: _GET_INSTRUCTIONS_NOT_RUN_ERROR if `get_instructions` has not been called in this
125+
session.
126+
"""
127+
if self._is_instructions_retrieved is False:
128+
return self._GET_INSTRUCTIONS_NOT_RUN_ERROR.copy()
129+
130+
if self._cached_query_result is None:
131+
return {"Error": "No previous paginated response in this session."}
132+
133+
page = self._cached_query_result.get_page(page_index)
134+
if page is None:
135+
return {"Error": "Page index is out of bounds."}
136+
137+
return {
138+
"items": list(page),
139+
"num_total_pages": page.page_count,
140+
"num_total_items": page.item_count,
141+
"num_items_per_page": page.items_per_page,
142+
"has_next": page.next_page is not None,
143+
"has_previous": page.previous_page is not None,
144+
}
145+
146+
def is_expired(self) -> bool:
147+
""":return: Whether the session has expired."""
148+
time_diff = datetime.now(timezone.utc) - self._last_accessed
149+
return time_diff > timedelta(seconds=self._session_ttl_seconds)
150+
151+
def update_access_time(self) -> None:
152+
"""Updates the last accessed timestamp."""
153+
self._last_accessed = datetime.now(timezone.utc)
154+
155+
156+
class SessionManager:
157+
"""
158+
Manages concurrent user sessions, following the same concurrency model as `FastMCP`.
159+
160+
All data structures managed by this class are accessed exclusively by coroutines running
161+
within the same event loop in a single-thread context. This design implies:
162+
163+
- No thread-level concurrency control (e.g., `threading.Lock`) is necessary, since operations
164+
occur in a single-threaded async context.
165+
- Async synchronization primitives (e.g., `asyncio.Lock`) are only necessary when an `await`
166+
occurs inside a critical section, to prevent interleaving of coroutine execution.
167+
"""
168+
169+
def __init__(self, session_ttl_seconds: float) -> None:
170+
""":param session_ttl_seconds: Session time-to-live in seconds."""
171+
self.sessions: dict[str, SessionState] = {}
172+
self._session_ttl_seconds: float = session_ttl_seconds
173+
self._cleanup_task: asyncio.Task | None = None
174+
175+
async def start(self) -> None:
176+
"""Starts the asynchronous cleanup task."""
177+
if self._cleanup_task is None:
178+
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
179+
180+
async def _cleanup_loop(self) -> None:
181+
"""Cleans up all expired sessions periodically as an async task."""
182+
while True:
183+
await asyncio.sleep(constants.EXPIRED_SESSION_SWEEP_INTERVAL_SECONDS)
184+
self.cleanup_expired_sessions()
185+
186+
def cleanup_expired_sessions(self) -> None:
187+
"""Cleans up all expired sessions."""
188+
expired_sessions = [sid for sid, session in self.sessions.items() if session.is_expired()]
189+
190+
for sid in expired_sessions:
191+
del self.sessions[sid]
192+
193+
def cache_query_result_and_get_first_page(
194+
self, session_id: str, query_results: list[str]
195+
) -> dict[str, Any]:
196+
"""
197+
:param session_id:
198+
:param query_results: Log entries from the query to cache.
199+
:return: Forwards `SessionState.cache_query_result_and_get_first_page`'s return values.
200+
"""
201+
session = self.get_or_create_session(session_id)
202+
203+
return session.cache_query_result_and_get_first_page(results=query_results)
204+
205+
def get_nth_page(self, session_id: str, page_index: int) -> dict[str, Any]:
206+
"""
207+
Retrieves the n-th page of a paginated response with the paging metadata from the previous
208+
query.
209+
210+
:param session_id:
211+
:param page_index: Index of the page to retrieve (zero-based, e.g., 0 is the first page).
212+
:return: Forwards `SessionState.get_page_data`'s return values.
213+
"""
214+
session = self.get_or_create_session(session_id)
215+
216+
return session.get_page_data(page_index)
217+
218+
def get_or_create_session(self, session_id: str) -> SessionState:
219+
"""
220+
Gets an existing session or creates a new one.
221+
222+
:param session_id:
223+
:return: The `SessionState` object for the given `session_id`.
224+
"""
225+
if session_id in self.sessions and self.sessions[session_id].is_expired():
226+
del self.sessions[session_id]
227+
228+
if session_id not in self.sessions:
229+
self.sessions[session_id] = SessionState(
230+
constants.NUM_ITEMS_PER_PAGE, session_id, self._session_ttl_seconds
231+
)
232+
233+
session = self.sessions[session_id]
234+
235+
session.update_access_time()
236+
return session

0 commit comments

Comments
 (0)