Skip to content

Commit b059fa0

Browse files
feat(clp-mcp-server): Add search_by_kql MCP tool call. (#1436)
Co-authored-by: Lin Zhihao <[email protected]>
1 parent 8e77a81 commit b059fa0

File tree

8 files changed

+320
-16
lines changed

8 files changed

+320
-16
lines changed

components/clp-mcp-server/clp_mcp_server/clp_connector.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ async def submit_query(
5050
:param begin_ts: The beginning timestamp of the query range.
5151
:param end_ts: The end timestamp of the query range.
5252
:return: The ID assigned to the query.
53-
:raise ValueError: If ``end_ts`` is smaller than ``begin_ts``.
54-
:raise aiomysql.Error: If there is an error connecting to or querying MariaDB.
55-
:raise pymongo.errors.PyMongoError: If there is an error interacting with MongoDB.
56-
:raise Exception: For any other unexpected errors.
53+
:raise: ValueError if `end_ts` is smaller than `begin_ts`.
54+
:raise: RuntimeError if it fails to retrieve the ID of the submitted query.
55+
:raise: aiomysql.Error if there is an error connecting to or querying MariaDB.
56+
:raise: pymongo.errors.PyMongoError if there is an error interacting with MongoDB.
57+
:raise: Exception for any other unexpected errors.
5758
"""
5859
if begin_ts is not None and end_ts is not None and end_ts < begin_ts:
5960
err_msg = f"end_ts {end_ts} is smaller than begin_ts {begin_ts}."
@@ -123,9 +124,10 @@ async def wait_query_completion(self, query_id: str, timeout: float | None = Non
123124
124125
:param query_id: The ID of the query.
125126
:param timeout: Maximum time to wait in seconds, or None for no timeout.
126-
:raise aiomysql.Error: If there is an error connecting to or querying MariaDB.
127-
:raise ValueError: When the query is not found.
128-
:raise RuntimeError: When the query fails or is cancelled.
127+
:raise: aiomysql.Error if there is an error connecting to or querying MariaDB.
128+
:raise: ValueError if the query is not found.
129+
:raise: RuntimeError if the query fails or is cancelled.
130+
:raise: TimeoutError if the timeout is reached before the query completes.
129131
"""
130132
waiting_states = {QueryJobStatus.PENDING, QueryJobStatus.RUNNING, QueryJobStatus.CANCELLING}
131133
error_states = {QueryJobStatus.FAILED, QueryJobStatus.CANCELLED, QueryJobStatus.KILLED}

components/clp-mcp-server/clp_mcp_server/clp_mcp_server.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import logging
55
import socket
66
import sys
7+
from pathlib import Path
78

89
import click
10+
from clp_py_utils.clp_config import CLPConfig
11+
from clp_py_utils.core import read_yaml_config_file
12+
from pydantic import ValidationError
913

1014
from .server import create_mcp_server
1115

@@ -15,22 +19,32 @@
1519
"--host", type=str, default="127.0.0.1", help="The server's host address (default: 127.0.0.1)."
1620
)
1721
@click.option("--port", type=int, default=8000, help="The server's port number (default: 8000).")
18-
def main(host: str, port: int) -> None:
22+
@click.option(
23+
"--config-path",
24+
type=click.Path(exists=True),
25+
default="/etc/clp-config.yml",
26+
help="The path to server's configuration file (default: /etc/clp-config.yml).",
27+
)
28+
def main(host: str, port: int, config_path: Path) -> int:
1929
"""
2030
Runs the CLP MCP server with HTTP transport.
2131
2232
:param host: The server's host address (IP address or hostname).
2333
:param port: The server's port number (1-65535).
34+
:param config_path: The path to server's configuration file.
35+
:return: Exit code (0 for success, non-zero for failure).
2436
"""
2537
logging.basicConfig(
2638
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
2739
)
2840
logger = logging.getLogger(__name__)
2941

42+
exit_code = 0
43+
3044
# Validate host and port
3145
if len(host.strip()) == 0:
3246
logger.error("Host cannot be empty.")
33-
sys.exit(1)
47+
exit_code = 1
3448

3549
# Validate host format (IP address or resolvable hostname)
3650
try:
@@ -44,21 +58,32 @@ def main(host: str, port: int) -> None:
4458
"Host validation failed: '%s' is not a valid IP address and DNS resolution failed.",
4559
host,
4660
)
47-
sys.exit(1)
61+
exit_code = 1
4862

4963
max_port = 65535
5064
if port <= 0 or port > max_port:
5165
logger.error("Port must be between 1 and %d, got: %d.", max_port, port)
52-
sys.exit(1)
66+
exit_code = 1
5367

5468
try:
55-
mcp = create_mcp_server()
69+
clp_config = CLPConfig.model_validate(read_yaml_config_file(config_path))
70+
except ValidationError:
71+
logger.exception("Configuration validation failed.")
72+
exit_code = 1
73+
except Exception:
74+
logger.exception("Failed to load configuration.")
75+
exit_code = 1
76+
77+
try:
78+
mcp = create_mcp_server(clp_config)
5679
logger.info("Starting CLP MCP Server on %s:%d.", host, port)
5780
mcp.run(transport="streamable-http", host=host, port=port)
5881
except Exception:
5982
logger.exception("Failed to start MCP server.")
60-
sys.exit(1)
83+
exit_code = 1
84+
85+
return exit_code
6186

6287

6388
if __name__ == "__main__":
64-
main()
89+
sys.exit(main())

components/clp-mcp-server/clp_mcp_server/server/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
# 10 minutes
77
SESSION_TTL_SECONDS = 600
88

9+
TIMESTAMP_NOT_AVAILABLE = "N/A"
10+
911
SERVER_NAME = "clp-mcp-server"
1012

1113
# fmt: off

components/clp-mcp-server/clp_mcp_server/server/server.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@
22

33
from typing import Any
44

5+
from clp_py_utils.clp_config import CLPConfig
56
from fastmcp import Context, FastMCP
67

8+
from clp_mcp_server.clp_connector import ClpConnector
9+
710
from . import constants
811
from .session_manager import SessionManager
12+
from .utils import format_query_results, sort_by_timestamp
913

1014

11-
def create_mcp_server() -> FastMCP:
15+
def create_mcp_server(clp_config: CLPConfig) -> FastMCP:
1216
"""
1317
Creates and defines API tool calls for the CLP MCP server.
1418
19+
:param clp_config:
1520
:return: A configured `FastMCP` instance.
1621
:raise: Propagates `FastMCP.__init__`'s exceptions.
1722
:raise: Propagates `FastMCP.tool`'s exceptions.
@@ -20,6 +25,8 @@ def create_mcp_server() -> FastMCP:
2025

2126
session_manager = SessionManager(session_ttl_seconds=constants.SESSION_TTL_SECONDS)
2227

28+
connector = ClpConnector(clp_config)
29+
2330
@mcp.tool
2431
async def get_instructions(ctx: Context) -> str:
2532
"""
@@ -67,4 +74,38 @@ def hello_world(name: str = "clp-mcp-server user") -> dict[str, Any]:
6774
"status": "running",
6875
}
6976

77+
@mcp.tool
78+
async def search_by_kql(kql_query: str, ctx: Context) -> dict[str, Any]:
79+
"""
80+
Searches log events that match the given Kibana Query Language (KQL) query. The resulting
81+
events are ordered by timestamp in descending order (latest to oldest), cached for
82+
subsequent pagination, and returned with the first page of results.
83+
84+
:param kql_query:
85+
:param ctx: The `FastMCP` context containing the metadata of the underlying MCP session.
86+
:return: A dictionary containing the following key-value pairs on success:
87+
- "items": A list of log entries in the requested page.
88+
- "num_total_pages": Total number of pages available from the query as an integer.
89+
- "num_total_items": Total number of log entries available from the query as an integer.
90+
- "num_items_per_page": Number of log entries per page.
91+
- "has_next": Whether a page exists after the returned one.
92+
- "has_previous": Whether a page exists before the returned one.
93+
:return: A dictionary with the following key-value pair on failures:
94+
- "Error": An error message describing the failure.
95+
"""
96+
await session_manager.start()
97+
98+
try:
99+
query_id = await connector.submit_query(kql_query)
100+
await connector.wait_query_completion(query_id)
101+
results = await connector.read_results(query_id)
102+
except (ValueError, RuntimeError, TimeoutError) as e:
103+
return {"Error": str(e)}
104+
105+
sorted_results = sort_by_timestamp(results)
106+
formatted_results = format_query_results(sorted_results)
107+
return session_manager.cache_query_result_and_get_first_page(
108+
ctx.session_id, formatted_results
109+
)
110+
70111
return mcp

components/clp-mcp-server/clp_mcp_server/server/session_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ def get_page_data(self, page_index: int) -> dict[str, Any]:
109109
Retrieves the n-th page of a paginated response with the paging metadata from the previous
110110
query.
111111
112-
NOTE: This docstring must be synchronized with `get_nth_page`'s MCP tool call.
112+
NOTE: This docstring must be synchronized with MCP tool calls: `get_nth_page` and
113+
`search_by_kql`.
113114
114115
:param page_index: Zero-based index, e.g., 0 for the first page.
115116
:return: A dictionary containing the following key-value pairs on success:
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Utility functions for CLP MCP server."""
2+
3+
import logging
4+
from datetime import datetime, timezone
5+
from typing import Any
6+
7+
from .constants import TIMESTAMP_NOT_AVAILABLE
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def convert_epoch_to_date_string(epoch_ts: int) -> str:
13+
"""
14+
:param epoch_ts: Unix epoch timestamp in milliseconds.
15+
:return: ISO 8601 formatted date string with millisecond precision (YYYY-MM-DDTHH:mm:ss.fffZ).
16+
:raise: ValueError if `epoch_ts` cannot be converted to a valid date string.
17+
"""
18+
try:
19+
epoch_seconds = epoch_ts / 1000.0
20+
dt = datetime.fromtimestamp(epoch_seconds, tz=timezone.utc)
21+
return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")
22+
except (ValueError, OSError, OverflowError) as e:
23+
err_msg = f"Invalid timestamp {epoch_ts}: {e}."
24+
raise ValueError(err_msg) from e
25+
26+
27+
def format_query_results(query_results: list[dict[str, Any]]) -> list[str]:
28+
"""
29+
Formats the query results. For a log event to be formatted, it must contain the following
30+
kv-pairs:
31+
- "timestamp": An integer representing the epoch timestamp in milliseconds.
32+
- "message": A string representing the log message.
33+
34+
The message will be formatted as `timestamp: <date string>, message: <message>`:
35+
36+
:param query_results: A list of dictionaries representing kv-pair log events.
37+
:return: A list of strings representing formatted log events.
38+
"""
39+
formatted_log_events = []
40+
for obj in query_results:
41+
epoch = obj.get("timestamp")
42+
timestamp_str = TIMESTAMP_NOT_AVAILABLE
43+
44+
if isinstance(epoch, int):
45+
try:
46+
timestamp_str = convert_epoch_to_date_string(epoch)
47+
except (TypeError, ValueError) as e:
48+
logger.warning("Failed to convert epoch timestamp=%s to date string: %s.", epoch, e)
49+
50+
message = obj.get("message", "")
51+
if not message:
52+
logger.warning("Empty message attached to a log event: %s.", obj)
53+
continue
54+
55+
formatted_log_events.append(f"timestamp: {timestamp_str}, message: {message}")
56+
57+
return formatted_log_events
58+
59+
60+
def sort_by_timestamp(query_results: list[dict[str, Any]]) -> list[dict[str, Any]]:
61+
"""
62+
Sorts the query results in-place by timestamp in descending order (latest to oldest).
63+
64+
Note:
65+
- Timestamp is expected to be an integer representing the epoch timestamp in milliseconds,
66+
stored under the `timestamp` key.
67+
- If `timestamp` is missing or not an integer, it is treated as the oldest possible timestamp.
68+
69+
:param query_results: A list of dictionaries representing kv-pair log events.
70+
:return: The input list sorted in-place.
71+
72+
"""
73+
74+
def _key(log_entry: dict[str, Any]) -> int:
75+
ts = log_entry.get("timestamp")
76+
return ts if isinstance(ts, int) else -1
77+
78+
query_results.sort(key=_key, reverse=True)
79+
80+
return query_results
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Tests for server module."""

0 commit comments

Comments
 (0)