Skip to content
This repository was archived by the owner on Sep 23, 2025. It is now read-only.

Commit d417a18

Browse files
committed
Implement Marco-Polo discovery protocol with unified message structure
- Add Marco, Polo, Goodbye message types to IPCMessageType enum - Create PoloPayload, GoodbyePayload, and ResponsePayload structures - Unify IPCMessage and IPCResponse into single IPCMessage structure - Add discovery message methods: send_marco(), send_polo(), send_goodbye() - Implement Marco message handling in reader task with Polo responses - Add unsolicited Polo message on MCP server startup - Add graceful shutdown with Goodbye message via cloned IPC communicator - Update message handling to use unified structure throughout Phase 2 of issue #20 (Multi-window message bus) - 50% complete Core discovery protocol implemented, ready for extension-side integration
1 parent e625b3c commit d417a18

File tree

4 files changed

+203
-39
lines changed

4 files changed

+203
-39
lines changed

server/src/ipc.rs

Lines changed: 150 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
//! Ports the logic from server/src/ipc.ts to Rust with cross-platform support.
55
66
use crate::types::{
7-
GetSelectionResult, IPCMessage, IPCMessageType, IPCResponse, LogLevel, LogParams,
8-
PresentReviewParams, PresentReviewResult,
7+
GetSelectionResult, GoodbyePayload, IPCMessage, IPCMessageType, LogLevel, LogParams,
8+
PoloPayload, PresentReviewParams, PresentReviewResult, ResponsePayload,
99
};
1010
use futures::FutureExt;
1111
use serde_json;
@@ -73,7 +73,7 @@ struct IPCCommunicatorInner {
7373
/// Tracks outgoing requests awaiting responses from VSCode extension
7474
/// Key: unique message ID (UUID), Value: channel to send response back to caller
7575
/// Enables concurrent request/response handling with proper correlation
76-
pending_requests: HashMap<String, oneshot::Sender<IPCResponse>>,
76+
pending_requests: HashMap<String, oneshot::Sender<ResponsePayload>>,
7777

7878
/// Flag to track if we have an active connection and reader task
7979
/// When true, ensure_connection() is a no-op
@@ -258,12 +258,96 @@ impl IPCCommunicator {
258258
}
259259
}
260260

261+
/// Send Marco discovery message. In normal workflow,
262+
/// this is actually sent by the *extension* to broadcast
263+
/// "who's out there?" -- but we include it for testing purposes.
264+
pub async fn send_marco(&self) -> Result<()> {
265+
if self.test_mode {
266+
info!("Marco discovery message sent (test mode)");
267+
return Ok(());
268+
}
269+
270+
let message = IPCMessage {
271+
message_type: IPCMessageType::Marco,
272+
payload: serde_json::json!({}),
273+
id: Uuid::new_v4().to_string(),
274+
};
275+
276+
debug!("Sending Marco discovery message");
277+
self.send_message_without_reply(message).await
278+
}
279+
280+
/// Send Polo discovery message (MCP server announces presence with shell PID)
281+
pub async fn send_polo(&self, terminal_shell_pid: u32) -> Result<()> {
282+
if self.test_mode {
283+
info!(
284+
"Polo discovery message sent (test mode) with shell PID: {}",
285+
terminal_shell_pid
286+
);
287+
return Ok(());
288+
}
289+
290+
let payload = PoloPayload { terminal_shell_pid };
291+
let message = IPCMessage {
292+
message_type: IPCMessageType::Polo,
293+
payload: serde_json::to_value(payload)?,
294+
id: Uuid::new_v4().to_string(),
295+
};
296+
297+
debug!(
298+
"Sending Polo discovery message with shell PID: {}",
299+
terminal_shell_pid
300+
);
301+
self.send_message_without_reply(message).await
302+
}
303+
304+
/// Send Goodbye discovery message (MCP server announces departure with shell PID)
305+
pub async fn send_goodbye(&self, terminal_shell_pid: u32) -> Result<()> {
306+
if self.test_mode {
307+
info!(
308+
"Goodbye discovery message sent (test mode) with shell PID: {}",
309+
terminal_shell_pid
310+
);
311+
return Ok(());
312+
}
313+
314+
let payload = GoodbyePayload { terminal_shell_pid };
315+
let message = IPCMessage {
316+
message_type: IPCMessageType::Goodbye,
317+
payload: serde_json::to_value(payload)?,
318+
id: Uuid::new_v4().to_string(),
319+
};
320+
321+
debug!(
322+
"Sending Goodbye discovery message with shell PID: {}",
323+
terminal_shell_pid
324+
);
325+
self.send_message_without_reply(message).await
326+
}
327+
328+
/// Gracefully shutdown the IPC communicator, sending Goodbye discovery message
329+
pub async fn shutdown(&self) -> Result<()> {
330+
if self.test_mode {
331+
info!("IPC shutdown (test mode)");
332+
return Ok(());
333+
}
334+
335+
let shell_pid = {
336+
let inner_guard = self.inner.lock().await;
337+
inner_guard.terminal_shell_pid
338+
};
339+
340+
self.send_goodbye(shell_pid).await?;
341+
info!("Sent Goodbye discovery message during shutdown");
342+
Ok(())
343+
}
344+
261345
/// Sends an IPC message and waits for a response from VSCode extension
262346
///
263347
/// Sets up response correlation using the message UUID and waits up to 5 seconds
264348
/// for the background reader task to deliver the matching response.
265349
/// Uses the underlying `write_message` primitive to send the data.
266-
async fn send_message_with_reply(&self, message: IPCMessage) -> Result<IPCResponse> {
350+
async fn send_message_with_reply(&self, message: IPCMessage) -> Result<ResponsePayload> {
267351
debug!(
268352
"Sending IPC message with ID: {} (PID: {})",
269353
message.id,
@@ -512,7 +596,7 @@ impl IPCCommunicator {
512596
}
513597
};
514598

515-
Self::handle_response_message(&inner, &message_str).await;
599+
Self::handle_incoming_message(&inner, &message_str).await;
516600
}
517601
Err(e) => {
518602
error!("Error reading from IPC connection: {}", e);
@@ -535,39 +619,81 @@ impl IPCCommunicator {
535619
.boxed()
536620
}
537621

538-
/// Processes incoming response messages from VSCode extension
539-
/// Matches responses to pending requests by ID and sends results back to callers
540-
async fn handle_response_message(inner: &Arc<Mutex<IPCCommunicatorInner>>, message_str: &str) {
622+
/// Processes incoming messages from the daemon
623+
/// Handles both responses to our requests and incoming messages (like Marco)
624+
async fn handle_incoming_message(inner: &Arc<Mutex<IPCCommunicatorInner>>, message_str: &str) {
541625
debug!(
542-
"Received IPC response (PID: {}): {}",
626+
"Received IPC message (PID: {}): {}",
543627
std::process::id(),
544628
message_str
545629
);
546630

547-
// Parse the response message
548-
let response: IPCResponse = match serde_json::from_str(message_str) {
549-
Ok(r) => r,
631+
// Parse as unified IPCMessage
632+
let message: IPCMessage = match serde_json::from_str(message_str) {
633+
Ok(msg) => msg,
550634
Err(e) => {
551635
error!(
552-
"Failed to parse IPC response: {} - Message: {}",
636+
"Failed to parse incoming message: {} - Message: {}",
553637
e, message_str
554638
);
555639
return;
556640
}
557641
};
558642

559-
// Find the pending request and send the response
560-
let mut inner_guard = inner.lock().await;
561-
if let Some(sender) = inner_guard.pending_requests.remove(&response.id) {
562-
if let Err(_) = sender.send(response) {
563-
warn!("Failed to send response to caller - receiver dropped");
643+
match message.message_type {
644+
IPCMessageType::Response => {
645+
// Handle response to our request
646+
let response_payload: ResponsePayload =
647+
match serde_json::from_value(message.payload) {
648+
Ok(payload) => payload,
649+
Err(e) => {
650+
error!("Failed to parse response payload: {}", e);
651+
return;
652+
}
653+
};
654+
655+
let mut inner_guard = inner.lock().await;
656+
if let Some(sender) = inner_guard.pending_requests.remove(&message.id) {
657+
if let Err(_) = sender.send(response_payload) {
658+
warn!("Failed to send response to caller - receiver dropped");
659+
}
660+
} else {
661+
// Every message (including the ones we send...) gets rebroadcast to everyone,
662+
// so this is (hopefully) to some other MCP server. Just ignore it.
663+
debug!(
664+
"Received response for unknown request ID: {} (PID: {})",
665+
message.id,
666+
std::process::id()
667+
);
668+
}
669+
}
670+
IPCMessageType::Marco => {
671+
info!("Received Marco discovery message, responding with Polo");
672+
673+
// Get shell PID from inner state
674+
let shell_pid = {
675+
let inner_guard = inner.lock().await;
676+
inner_guard.terminal_shell_pid
677+
};
678+
679+
// Create a temporary IPCCommunicator to send Polo response
680+
let temp_communicator = IPCCommunicator {
681+
inner: Arc::clone(inner),
682+
test_mode: false,
683+
};
684+
685+
if let Err(e) = temp_communicator.send_polo(shell_pid).await {
686+
error!("Failed to send Polo response to Marco: {}", e);
687+
}
688+
}
689+
_ => {
690+
// Every message (including the ones we send...) gets rebroadcast to everyone,
691+
// so we can just ignore anything else.
692+
debug!(
693+
"Received unhandled message type: {:?}",
694+
message.message_type
695+
);
564696
}
565-
} else {
566-
warn!(
567-
"Received response for unknown request ID: {} (PID: {})",
568-
response.id,
569-
std::process::id()
570-
);
571697
}
572698
}
573699
}

server/src/main.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ async fn main() -> Result<()> {
106106

107107
// Create our server instance
108108
let server = DialecticServer::new().await?;
109+
110+
// Clone the IPC communicator for shutdown handling
111+
let ipc_for_shutdown = server.ipc().clone();
109112

110113
// Start the MCP server with stdio transport
111114
let service = server.serve(stdio()).await.inspect_err(|e| {
@@ -118,6 +121,11 @@ async fn main() -> Result<()> {
118121
service.waiting().await?;
119122

120123
info!("Dialectic MCP Server shutting down");
124+
125+
// Send Goodbye discovery message before shutdown
126+
if let Err(e) = ipc_for_shutdown.shutdown().await {
127+
error!("Error during IPC shutdown: {}", e);
128+
}
121129
}
122130
}
123131
std::mem::drop(flush_guard);

server/src/server.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,21 @@ impl DialecticServer {
4949
ipc.initialize().await?;
5050
info!("IPC communication with message bus daemon initialized");
5151

52+
// Send unsolicited Polo message to announce our presence
53+
ipc.send_polo(shell_pid).await?;
54+
info!("Sent Polo discovery message with shell PID: {}", shell_pid);
55+
5256
Ok(Self {
5357
ipc,
5458
tool_router: Self::tool_router(),
5559
})
5660
}
5761

62+
/// Get a reference to the IPC communicator
63+
pub fn ipc(&self) -> &IPCCommunicator {
64+
&self.ipc
65+
}
66+
5867
/// Ensure the message bus daemon is running for the given VSCode PID
5968
async fn ensure_daemon_running(vscode_pid: u32) -> Result<()> {
6069
crate::daemon::spawn_daemon_process(vscode_pid).await

server/src/types.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,33 @@ pub struct GetSelectionResult {
112112
pub message: Option<String>,
113113
}
114114

115+
/// Payload for Polo discovery messages (MCP server announces presence)
116+
#[derive(Debug, Clone, Deserialize, Serialize)]
117+
pub struct PoloPayload {
118+
/// Shell PID of the terminal where this MCP server is running
119+
pub terminal_shell_pid: u32,
120+
}
121+
122+
/// Payload for Goodbye discovery messages (MCP server announces departure)
123+
#[derive(Debug, Clone, Deserialize, Serialize)]
124+
pub struct GoodbyePayload {
125+
/// Shell PID of the terminal where this MCP server was running
126+
pub terminal_shell_pid: u32,
127+
}
128+
129+
/// Payload for Response messages (replaces IPCResponse struct)
130+
#[derive(Debug, Clone, Deserialize, Serialize)]
131+
pub struct ResponsePayload {
132+
/// Whether the operation succeeded
133+
pub success: bool,
134+
135+
/// Optional error message
136+
pub error: Option<String>,
137+
138+
/// Optional data payload for responses like get_selection
139+
pub data: Option<serde_json::Value>,
140+
}
141+
115142
/// IPC message sent from MCP server to VSCode extension
116143
#[derive(Debug, Clone, Deserialize, Serialize)]
117144
pub struct IPCMessage {
@@ -133,20 +160,14 @@ pub enum IPCMessageType {
133160
PresentReview,
134161
Log,
135162
GetSelection,
163+
/// Extension broadcasts "who's out there?" to discover active MCP servers
164+
Marco,
165+
/// MCP server announces presence with shell PID (response to Marco or unsolicited)
166+
Polo,
167+
/// MCP server announces departure with shell PID
168+
Goodbye,
169+
/// Response to any message (replaces IPCResponse struct)
170+
Response,
136171
}
137172

138-
/// IPC response sent from VSCode extension back to MCP server
139-
#[derive(Debug, Clone, Deserialize, Serialize)]
140-
pub struct IPCResponse {
141-
/// Response to message with this ID
142-
pub id: String,
143-
144-
/// Whether the operation succeeded
145-
pub success: bool,
146-
147-
/// Optional error message
148-
pub error: Option<String>,
149-
150-
/// Optional data payload for get_selection responses
151-
pub data: Option<serde_json::Value>,
152-
}
173+

0 commit comments

Comments
 (0)