Skip to content

Commit ad6eca1

Browse files
Native: Complete server-client cycle
1 parent 1a58044 commit ad6eca1

File tree

8 files changed

+106
-22
lines changed

8 files changed

+106
-22
lines changed

application/apps/indexer/gui/application/src/session/service/mod.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tokio::{
88
use uuid::Uuid;
99

1010
use mcp::{client::McpClient, server::tasks::Tasks, types::Prompt};
11-
use processor::grabber::LineRange;
11+
use processor::{grabber::LineRange, search::filter::SearchFilter};
1212
use session_core::session::Session;
1313
use stypes::{CallbackEvent, ComputationError, ObserveOptions, ObserveOrigin, Transport};
1414

@@ -122,7 +122,38 @@ impl SessionService {
122122
},
123123

124124
Ok(task) = self.mcp_task_rx.recv() => {
125-
println!("🔵 Session Service {} received MCP task: {:?}", self.session_id(), task);
125+
match task {
126+
Tasks::ApplySearchFilter { session_id, filters, task_result_tx } => {
127+
if self.session_id() == session_id {
128+
let cmd = SessionCommand::ApplySearchFilter { operation_id: session_id, filters: filters.clone() };
129+
println!("🔵 Forwarding ApplySearchFilter command to Session Service {}: {:?}", self.session_id(), cmd);
130+
match self.handle_command(cmd, prompt_tx.clone()).await {
131+
Ok(ControlFlow::Break(())) => {
132+
println!("✅ Successfully handled ApplySearchFilter command with BREACK for session {}", self.session_id());
133+
134+
if let Err(err) = task_result_tx.send(Ok(())).await {
135+
log::error!("Failed to send task result for ApplySearchFilter: {err:?}");
136+
}
137+
break
138+
},
139+
Ok(ControlFlow::Continue(())) => {
140+
println!("✅ Successfully handled ApplySearchFilter command with CONTINUE for session {}", self.session_id());
141+
142+
if let Err(err) = task_result_tx.send(Ok(())).await {
143+
log::error!("Failed to send task result for ApplySearchFilter: {err:?}");
144+
}
145+
},
146+
Err(error) => {
147+
println!("❌ Error while handling ApplySearchFilter command: {error:?}");
148+
self.send_error(error).await;
149+
}
150+
}
151+
}
152+
},
153+
_ => {
154+
todo!();
155+
}
156+
}
126157
},
127158

128159
// Callback receiver won't be dropped when session is dropped.
@@ -190,7 +221,13 @@ impl SessionService {
190221
operation_id,
191222
filters,
192223
} => {
193-
self.session.apply_search_filters(operation_id, filters)?;
224+
let filts = vec![SearchFilter::new("time=13".to_string(), false, true, false)];
225+
println!(
226+
"☑️ Session::SessionService::handle_command: Applying search filters for session {}: {:?}",
227+
self.session_id(),
228+
filters
229+
);
230+
self.session.apply_search_filters(operation_id, filts)?;
194231
}
195232
SessionCommand::DropSearch { operation_id } => {
196233
if let Some(filter_op) = operation_id {

application/apps/indexer/gui/application/src/session/ui/bottom_panel/search/search_bar.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl SearchBar {
7070

7171
// Apply temp filter on pressing enter.
7272
if enter_pressed && !self.query.is_empty() {
73+
println!("🔍 SearchBar: Applying search with query '{}'", self.query);
7374
if self.temp_filter.is_some() {
7475
self.drop_search(shared, actions);
7576
}
@@ -86,6 +87,7 @@ impl SearchBar {
8687
operation_id,
8788
filters: vec![filter.clone()],
8889
};
90+
8991
if actions.try_send_command(&self.cmd_tx, cmd) {
9092
shared.search.set_search_operation(operation_id);
9193
self.temp_filter = Some(filter);

application/apps/indexer/mcp/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ rmcp = { version = "0.11", features = [
2020
] }
2121
anyhow.workspace = true
2222
log.workspace = true
23+
processor.workspace = true
2324
reqwest = { version = "0.12.25", features = ["json"] }
2425
schemars = "1.1"
2526
serde_json.workspace = true

application/apps/indexer/mcp/src/agents/ollama.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct Ollama {
2020
impl Default for Ollama {
2121
fn default() -> Self {
2222
Self {
23-
model: String::from("llama3.2"),
23+
model: String::from("qwen3:8b"),
2424
url: String::from("http://localhost:11434"),
2525
api_key: None,
2626
}

application/apps/indexer/mcp/src/client/mod.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ impl McpClient {
9494
message: e.to_string(),
9595
})?;
9696

97-
warn!(
98-
"🟢 MCP client connected to MCP server: {:?}",
99-
mcp_service.peer_info()
100-
);
101-
10297
// let llm = Llm::from_config(self.llm_config);
10398

10499
tokio::spawn(async move {
@@ -113,8 +108,6 @@ impl McpClient {
113108
error!("MCP client event loop ended: {:?}", e);
114109
}
115110
});
116-
warn!("✅ MCP client started");
117-
118111
Ok(())
119112
}
120113

@@ -131,6 +124,7 @@ impl McpClient {
131124
select! {
132125
Some(prompt) = prompt_rx.recv() => {
133126
let tools = mcp_service.list_tools(Default::default()).await?;
127+
let message = format!("User prompt: {}\n\n session_id:\n{}", prompt.message, prompt.id);
134128

135129
let response = match std::env::var("LLM_AGENT")
136130
.ok()
@@ -139,12 +133,12 @@ impl McpClient {
139133
{
140134
Some("openai") => {
141135
agents::open_ai::OpenAI::default()
142-
.send_chat_message(prompt.message.clone(), &mut history, tools)
136+
.send_chat_message(message.clone(), &mut history, tools)
143137
.await
144138
}
145139
_ => {
146140
agents::ollama::Ollama::default()
147-
.send_chat_message(prompt.message.clone(), &mut history, tools)
141+
.send_chat_message(message.clone(), &mut history, tools)
148142
.await
149143
}
150144
};
@@ -176,8 +170,8 @@ impl McpClient {
176170
}
177171
}
178172
},
179-
_ => {
180-
error!("🔴 MCP Client failed to get mock prompt response:");
173+
Err(err) => {
174+
error!("🔴 MCP Client failed to get mock prompt response: {err:?}");
181175
}
182176
}
183177
// println!("Chipmunk request {chipmunk_request:?}");

application/apps/indexer/mcp/src/server/mod.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ use rmcp::{
99
model::{CallToolResult, Content, ErrorCode, ServerCapabilities, ServerInfo},
1010
tool, tool_handler, tool_router,
1111
transport::streamable_http_server::{
12-
session::local::LocalSessionManager,
12+
session::{self, local::LocalSessionManager},
1313
tower::{StreamableHttpServerConfig, StreamableHttpService},
1414
},
1515
};
1616
use tokio::sync::{broadcast, mpsc};
1717

18-
use crate::{errors::McpError, types::*};
18+
use crate::errors::McpError;
19+
use crate::types::*;
1920
use tasks::{Tasks, Tasks::*};
2021

2122
#[derive(Clone, Debug)]
@@ -78,15 +79,19 @@ impl McpServer {
7879

7980
#[tool(description = r#"Generate SearchFilter objects for filtering logs.
8081
81-
This tool accepts one or more filter specifications and returns a list of SearchFilter objects.
82+
This tool accepts one or more filter specifications along with session_id and returns a list of SearchFilter objects.
8283
Each filter can be customized with flags for regex matching, case sensitivity, and word boundaries.
84+
Session id is present in the prompt and can be used by the LLM Model and sent back in the parameters.
85+
This will help the MCP server to identify which session has requested for the filter and apply it to the correct session.
86+
8387
8488
**Input Parameters:**
8589
- `filters`: An list of filter objects, where each object contains:
8690
- `value` (string): The text or pattern to search for
8791
- `is_regex` (boolean): true if the filter is a regular expression pattern
88-
- `ignore_case` (boolean): true for case-insensitive matching
92+
- `ignore_case` (boolean): true for case-insensitive matching, mostly it is false.
8993
- `is_word` (boolean): true to match whole words only (word boundary matching)
94+
- `session_id` (string): Unique identifier for the session requesting the filter, used to apply the filter to the correct session. Present in Chat message.
9095
9196
**Usage Examples:**
9297
@@ -123,10 +128,31 @@ When the user provides natural language instructions, interpret them as follows:
123128
Parameters(params): Parameters<SearchFilters>,
124129
) -> Result<CallToolResult, RmcpError> {
125130
let (task_result_tx, task_result_rx) = mpsc::channel(1);
131+
let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|e| {
132+
RmcpError::new(
133+
ErrorCode(503),
134+
format!("Invalid session_id format: {}", e),
135+
None,
136+
)
137+
})?;
138+
let filters: Vec<processor::search::filter::SearchFilter> = params
139+
.filters
140+
.iter()
141+
.map(|f| {
142+
processor::search::filter::SearchFilter::new(
143+
f.value.clone(),
144+
f.is_regex,
145+
f.ignore_case,
146+
f.is_word,
147+
)
148+
})
149+
.collect();
126150
let task = ApplySearchFilter {
127-
filters: params.filters.clone(),
151+
session_id,
128152
task_result_tx,
153+
filters,
129154
};
155+
println!("🔵 Received task from LLM {task:?}");
130156
// Send task over communication channel in a separate thread,
131157
// in future, we can skip match over task spawn
132158
match self.task_tx.send(task) {
@@ -160,9 +186,17 @@ Returns a histogram of matches for the current search within an optional range.
160186
) -> Result<CallToolResult, RmcpError> {
161187
let (task_result_tx, task_result_rx) = mpsc::channel(1);
162188
let range = params.range.map(|r| (r.start, r.end));
189+
let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|e| {
190+
RmcpError::new(
191+
ErrorCode(503),
192+
format!("Invalid session_id format: {}", e),
193+
None,
194+
)
195+
})?;
163196
let task = Tasks::GetChartHistogram {
164197
dataset_len: params.dataset_len,
165198
range,
199+
session_id,
166200
task_result_tx,
167201
};
168202
self.task_tx.send(task).map_err(|e| {
@@ -193,9 +227,17 @@ Returns point data for line plots based on extracted values within an optional r
193227
) -> Result<CallToolResult, RmcpError> {
194228
let (task_result_tx, task_result_rx) = mpsc::channel(1);
195229
let range = params.range.map(|r| (r.start, r.end));
230+
let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|e| {
231+
RmcpError::new(
232+
ErrorCode(503),
233+
format!("Invalid session_id format: {}", e),
234+
None,
235+
)
236+
})?;
196237
let task = Tasks::GetChartLinePlots {
197238
dataset_len: params.dataset_len,
198239
range,
240+
session_id,
199241
task_result_tx,
200242
};
201243
self.task_tx.send(task).map_err(|e| {
Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
1-
use tokio::sync::{mpsc, oneshot};
1+
use tokio::sync::mpsc;
2+
use uuid::Uuid;
23

34
use crate::errors::McpError;
4-
use crate::types::SearchFilter;
5+
use processor::search::filter::SearchFilter;
56

67
#[derive(Debug, Clone)]
78
pub enum Tasks {
89
ApplySearchFilter {
10+
session_id: Uuid,
911
filters: Vec<SearchFilter>,
1012
task_result_tx: mpsc::Sender<Result<(), McpError>>,
1113
},
1214
GetChartHistogram {
15+
session_id: Uuid,
1316
dataset_len: u16,
1417
range: Option<(u64, u64)>,
1518
task_result_tx: mpsc::Sender<Result<(), McpError>>,
1619
},
1720
GetChartLinePlots {
21+
session_id: Uuid,
1822
dataset_len: u16,
1923
range: Option<(u64, u64)>,
2024
task_result_tx: mpsc::Sender<Result<(), McpError>>,
2125
},
2226
GenericTask {
27+
session_id: Uuid,
2328
task_result_tx: mpsc::Sender<Result<(), McpError>>,
2429
},
2530
}

application/apps/indexer/mcp/src/types.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub struct RangeU64 {
3131
#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)]
3232
pub struct SearchFilters {
3333
pub filters: Vec<SearchFilter>,
34+
pub session_id: String,
3435
}
3536

3637
#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)]
@@ -42,12 +43,14 @@ pub struct SearchValuesFilters {
4243
pub struct MapRequest {
4344
pub dataset_len: u16,
4445
pub range: Option<RangeU64>,
46+
pub session_id: String,
4547
}
4648

4749
#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)]
4850
pub struct ValuesRequest {
4951
pub dataset_len: u16,
5052
pub range: Option<RangeU64>,
53+
pub session_id: String,
5154
}
5255

5356
#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)]

0 commit comments

Comments
 (0)