Skip to content

Commit 119e817

Browse files
MCP server skeleton
1 parent 1446028 commit 119e817

File tree

3 files changed

+156
-4
lines changed

3 files changed

+156
-4
lines changed

application/apps/indexer/mcp/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@ edition = "2024"
77
workspace = true
88

99
[dependencies]
10+
axum = { version = "0.7", features = ["macros"] }
11+
rmcp = { version = "0.11", features = [
12+
"server",
13+
"transport-io",
14+
"schemars",
15+
"transport-streamable-http-server-session",
16+
"transport-streamable-http-server",
17+
] }
18+
schemars = "1.1"
1019
tokio.workspace = true
1120
tokio-util.workspace = true
1221
log.workspace = true
22+
serde.workspace = true
23+
anyhow.workspace = true

application/apps/indexer/mcp/src/server/messages.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use rmcp::schemars::JsonSchema;
2+
use serde::{Deserialize, Serialize};
13
use tokio::sync::oneshot;
24

35
use crate::types::McpError;
@@ -12,7 +14,7 @@ pub enum McpServerToChipmunk {
1214
}
1315

1416
// TODO: MOCK
15-
#[derive(Debug)]
17+
#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)]
1618
pub struct SearchFilter {
1719
pub value: String,
1820
pub is_regex: bool,

application/apps/indexer/mcp/src/server/mod.rs

Lines changed: 142 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,90 @@
1+
use anyhow::Result;
12
use log::{error, warn};
3+
use rmcp::{
4+
ErrorData as RmcpError,
5+
handler::server::wrapper::Parameters,
6+
handler::server::{ServerHandler, tool::ToolRouter},
7+
model::{CallToolResult, Content, ErrorCode},
8+
model::{ServerCapabilities, ServerInfo},
9+
tool, tool_handler, tool_router,
10+
transport::streamable_http_server::{
11+
session::local::LocalSessionManager,
12+
tower::{StreamableHttpServerConfig, StreamableHttpService},
13+
},
14+
};
215
use tokio::{
316
sync::{mpsc, oneshot},
417
time::{self, sleep},
518
};
619

720
pub mod messages;
821

22+
pub const BIND_ADDRESS: &str = "127.0.0.1:8181";
23+
924
use messages::McpServerToChipmunk;
1025

1126
use crate::server::messages::SearchFilter;
1227

13-
#[derive(Debug)]
28+
#[derive(Clone, Debug)]
1429
pub struct McpServer {
1530
server_to_chipmunk_tx: mpsc::Sender<McpServerToChipmunk>,
31+
pub tool_router: ToolRouter<Self>,
1632
}
1733

34+
#[tool_handler]
35+
impl ServerHandler for McpServer {
36+
fn get_info(&self) -> ServerInfo {
37+
ServerInfo {
38+
instructions: Some("Chipmunk MCP Server".to_string()),
39+
capabilities: ServerCapabilities::builder()
40+
.enable_tools()
41+
.enable_resources()
42+
.enable_prompts()
43+
.build(),
44+
..Default::default()
45+
}
46+
}
47+
}
48+
49+
#[tool_router]
1850
impl McpServer {
1951
pub fn new() -> (Self, mpsc::Receiver<McpServerToChipmunk>) {
2052
let (server_to_chipmunk_tx, server_to_chipmunk_rx) = mpsc::channel(32);
2153

2254
(
2355
Self {
2456
server_to_chipmunk_tx,
57+
tool_router: Self::tool_router(),
2558
},
2659
server_to_chipmunk_rx,
2760
)
2861
}
2962

30-
pub fn start(self) {
31-
tokio::spawn(self.run());
63+
pub async fn start(self) -> Result<()> {
64+
let ct = tokio_util::sync::CancellationToken::new();
65+
let (mcp_server, _task_rx_inner) = McpServer::new();
66+
67+
let service = StreamableHttpService::new(
68+
move || Ok(mcp_server.clone()),
69+
LocalSessionManager::default().into(),
70+
StreamableHttpServerConfig {
71+
cancellation_token: ct.child_token(),
72+
..Default::default()
73+
},
74+
);
75+
let router = axum::Router::new().nest_service("/mcp", service);
76+
let tcp_listener = tokio::net::TcpListener::bind(BIND_ADDRESS).await?;
77+
78+
tokio::spawn(async move {
79+
if let Err(server_err) = axum::serve(tcp_listener, router).await {
80+
eprintln!("MCP Server error: {:?}", server_err);
81+
}
82+
});
83+
84+
Ok(())
3285
}
3386

87+
#[allow(dead_code)]
3488
async fn run(self) {
3589
// TODO: Send a mock message after 1 seconds
3690
warn!("🔅 MCP: sleep timer started");
@@ -73,4 +127,89 @@ impl McpServer {
73127
}
74128
}
75129
}
130+
131+
#[tool(description = r#"Generate SearchFilter objects for filtering logs.
132+
133+
This tool accepts one or more filter specifications and returns a list of SearchFilter objects.
134+
Each filter can be customized with flags for regex matching, case sensitivity, and word boundaries.
135+
136+
**Input Parameters:**
137+
- `filters`: An array of filter objects, where each object contains:
138+
- `filter` (string): The text or pattern to search for
139+
- `is_regex` (boolean): true if the filter is a regular expression pattern
140+
- `ignore_case` (boolean): true for case-insensitive matching
141+
- `is_word` (boolean): true to match whole words only (word boundary matching)
142+
143+
**Usage Examples:**
144+
145+
Single filter:
146+
- Input: [{"filter": "error", "is_regex": false, "ignore_case": false, "is_word": false}]
147+
- Use case: Find exact matches of "error"
148+
149+
Multiple filters:
150+
- Input: [
151+
{"filter": "ERROR", "is_regex": false, "ignore_case": true, "is_word": false},
152+
{"filter": "\\d{4}-\\d{2}-\\d{2}", "is_regex": true, "ignore_case": false, "is_word": false}
153+
]
154+
- Use case: Find "ERROR" (any case) OR date patterns
155+
156+
Common patterns:
157+
- Case-insensitive word: {"filter": "warning", "is_regex": false, "ignore_case": true, "is_word": true}
158+
- Regex pattern: {"filter": "\\b(error|fail|exception)\\b", "is_regex": true, "ignore_case": false, "is_word": false}
159+
- Exact match: {"filter": "timeout", "is_regex": false, "ignore_case": false, "is_word": false}
160+
161+
**Natural Language Interpretation:**
162+
When the user provides natural language instructions, interpret them as follows:
163+
- "error" → single filter for "error"
164+
- "error or warning" → two filters, one for "error" and one for "warning"
165+
- "case-insensitive ERROR" → set ignore_case: true
166+
- "match the word 'timeout'" → set is_word: true
167+
- "regex pattern \\d+" → set is_regex: true
168+
- "find ERROR, WARNING, and CRITICAL" → three separate filters
169+
"#)]
170+
async fn apply_search_filter(
171+
&self,
172+
Parameters(params): Parameters<Vec<SearchFilter>>,
173+
) -> Result<CallToolResult, RmcpError> {
174+
log::info!(
175+
"Received apply_search_filter tool call with params: {:?}",
176+
params
177+
);
178+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
179+
let task = McpServerToChipmunk::ApplyFilter {
180+
filters: vec![],
181+
response_tx,
182+
};
183+
let task_tx_clone = self.server_to_chipmunk_tx.clone();
184+
// Send task over communication channel in a separate thread,
185+
// in future, we can skip match over task spawn
186+
match tokio::spawn(async move { task_tx_clone.send(task).await }).await {
187+
Ok(_) => log::info!("Sent Search task to MCP server"),
188+
Err(err) => log::error!(
189+
"Failed to send Search task to MCP server: ApplyFilter: {}",
190+
err
191+
),
192+
};
193+
194+
// Wait for the response from task over communication channel
195+
// based on the response send back the JSON response to client
196+
response_rx
197+
.await
198+
.map(|task_response| match task_response {
199+
Ok(()) => Ok(CallToolResult::success(vec![Content::json(
200+
"Server task finished successfully",
201+
)?])),
202+
Err(err) => {
203+
let err_msg = format!("Error while applying the task: {err}");
204+
Ok(CallToolResult::error(vec![Content::json(err_msg)?]))
205+
}
206+
})
207+
.map_err(|err| {
208+
RmcpError::new(
209+
ErrorCode::INTERNAL_ERROR,
210+
format!("Did not receive the response from search filter task {err:?}"),
211+
None,
212+
)
213+
})?
214+
}
76215
}

0 commit comments

Comments
 (0)