Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ If changes are needed in these files, that must happen in the separate Positron

- When a `match` expression is the last expression in a function, omit `return` keywords in match arms. Let the expression evaluate to the function's return value.

- For error messages and logging, prefer direct formatting syntax: `Err(anyhow!("Message: {err}"))` instead of `Err(anyhow!("Message: {}", err))`. This also applies to `log::error!` and `log::warn!` and `log::info!` macros.
- For error messages and logging, prefer direct formatting syntax: `Err(anyhow!("Message: {err}"))` instead of `Err(anyhow!("Message: {}", err))`. This also applies to `log::error!` and `log::warn!` and `log::info!` macros. For logging errors specifically, use Debug formatting `{err:?}` to get more detailed error information.

- Use `log::trace!` instead of `log::debug!`.

Expand Down
188 changes: 108 additions & 80 deletions crates/ark/src/dap/dap_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crossbeam::channel::unbounded;
use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use crossbeam::select;
use dap::errors::ServerError;
use dap::events::*;
use dap::prelude::*;
use dap::requests::*;
Expand Down Expand Up @@ -150,9 +151,18 @@ fn listen_dap_events<W: Write>(
loop {
select!(
recv(backend_events_rx) -> event => {
let event = match event {
Ok(event) => event,
Err(err) => {
// Channel closed, sender dropped
log::info!("DAP: Event channel closed: {err:?}");
return;
Comment on lines +154 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is it getting dropped on the sender side when the ext host restarts suddenly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we've hit that code path, it's a defensive fix. It would happen if the DAP comm closed before the DAP connection I think.

},
};

log::trace!("DAP: Got event from backend: {:?}", event);

let event = match event.unwrap() {
let event = match event {
DapBackendEvent::Continued => {
Event::Continued(ContinuedEventBody {
thread_id: THREAD_ID,
Expand Down Expand Up @@ -191,7 +201,10 @@ fn listen_dap_events<W: Write>(
};

let mut output = output.lock().unwrap();
output.send_event(event).unwrap();
if let Err(err) = output.send_event(event) {
log::warn!("DAP: Failed to send event, closing: {err:?}");
return;
}
},

// Break the loop and terminate the thread
Expand Down Expand Up @@ -229,91 +242,83 @@ impl<R: Read, W: Write> DapServer<R, W> {

pub fn serve(&mut self) -> bool {
log::trace!("DAP: Polling");
let req = match self.server.poll_request().unwrap() {
Some(req) => req,
None => return false,
let req = match self.server.poll_request() {
Ok(Some(req)) => req,
Ok(None) => return false,
Err(err) => {
log::warn!("DAP: Connection closed: {err:?}");
return false;
},
};
log::trace!("DAP: Got request: {:#?}", req);

let cmd = req.command.clone();

match cmd {
Command::Initialize(args) => {
self.handle_initialize(req, args);
},
Command::Attach(args) => {
self.handle_attach(req, args);
},
Command::Disconnect(args) => {
self.handle_disconnect(req, args);
},
Command::Restart(args) => {
self.handle_restart(req, args);
},
Command::Threads => {
self.handle_threads(req);
},
Command::SetBreakpoints(args) => {
self.handle_set_breakpoints(req, args);
},
let result = match cmd {
Command::Initialize(args) => self.handle_initialize(req, args),
Command::Attach(args) => self.handle_attach(req, args),
Command::Disconnect(args) => self.handle_disconnect(req, args),
Command::Restart(args) => self.handle_restart(req, args),
Command::Threads => self.handle_threads(req),
Command::SetBreakpoints(args) => self.handle_set_breakpoints(req, args),
Command::SetExceptionBreakpoints(args) => {
self.handle_set_exception_breakpoints(req, args);
},
Command::StackTrace(args) => {
self.handle_stacktrace(req, args);
},
Command::Source(args) => {
self.handle_source(req, args);
},
Command::Scopes(args) => {
self.handle_scopes(req, args);
},
Command::Variables(args) => {
self.handle_variables(req, args);
self.handle_set_exception_breakpoints(req, args)
},
Command::StackTrace(args) => self.handle_stacktrace(req, args),
Command::Source(args) => self.handle_source(req, args),
Command::Scopes(args) => self.handle_scopes(req, args),
Command::Variables(args) => self.handle_variables(req, args),
Command::Continue(args) => {
let resp = ResponseBody::Continue(ContinueResponse {
all_threads_continued: Some(true),
});
self.handle_step(req, args, DebugRequest::Continue, resp);
self.handle_step(req, args, DebugRequest::Continue, resp)
},
Command::Next(args) => {
self.handle_step(req, args, DebugRequest::Next, ResponseBody::Next);
self.handle_step(req, args, DebugRequest::Next, ResponseBody::Next)
},
Command::StepIn(args) => {
self.handle_step(req, args, DebugRequest::StepIn, ResponseBody::StepIn);
self.handle_step(req, args, DebugRequest::StepIn, ResponseBody::StepIn)
},
Command::StepOut(args) => {
self.handle_step(req, args, DebugRequest::StepOut, ResponseBody::StepOut);
self.handle_step(req, args, DebugRequest::StepOut, ResponseBody::StepOut)
},
_ => {
log::warn!("DAP: Unknown request");
let rsp = req.error("Ark DAP: Unknown request");
self.server.respond(rsp).unwrap();
self.respond(rsp)
},
};

if let Err(err) = result {
log::warn!("DAP: Handler failed, closing connection: {err:?}");
return false;
}

true
}

fn respond(&mut self, rsp: Response) {
fn respond(&mut self, rsp: Response) -> Result<(), ServerError> {
log::trace!("DAP: Responding to request: {rsp:#?}");
self.server.respond(rsp).unwrap();
self.server.respond(rsp)
}

fn send_event(&mut self, event: Event) {
fn send_event(&mut self, event: Event) -> Result<(), ServerError> {
log::trace!("DAP: Sending event: {event:#?}");
self.server.send_event(event).unwrap();
self.server.send_event(event)
}

fn handle_initialize(&mut self, req: Request, _args: InitializeArguments) {
fn handle_initialize(
&mut self,
req: Request,
_args: InitializeArguments,
) -> Result<(), ServerError> {
let rsp = req.success(ResponseBody::Initialize(types::Capabilities {
supports_restart_request: Some(true),
..Default::default()
}));
self.respond(rsp);

self.send_event(Event::Initialized);
self.respond(rsp)?;
self.send_event(Event::Initialized)
}

// Handle SetBreakpoints requests from the frontend.
Expand All @@ -331,12 +336,15 @@ impl<R: Read, W: Write> DapServer<R, W> {
// - When a user unchecks a breakpoint, it appears as a deletion (omitted
// from the request). We preserve verified breakpoints as Disabled so we
// can restore their state when re-enabled without requiring re-sourcing.
fn handle_set_breakpoints(&mut self, req: Request, args: SetBreakpointsArguments) {
fn handle_set_breakpoints(
&mut self,
req: Request,
args: SetBreakpointsArguments,
) -> Result<(), ServerError> {
let Some(path) = args.source.path.as_ref() else {
// We don't currently have virtual documents managed via source references
log::warn!("Missing a path to set breakpoints for.");
self.respond(req.error("Missing a path to set breakpoints for"));
return;
return self.respond(req.error("Missing a path to set breakpoints for"));
};

// We currently only support "path" URIs as Positron never sends URIs.
Expand All @@ -349,8 +357,7 @@ impl<R: Read, W: Write> DapServer<R, W> {
let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse {
breakpoints: vec![],
}));
self.respond(rsp);
return;
return self.respond(rsp);
},
};

Expand All @@ -361,10 +368,9 @@ impl<R: Read, W: Write> DapServer<R, W> {
Ok(content) => content,
Err(err) => {
// TODO: What do we do with breakpoints in virtual documents?
log::error!("Failed to read file '{path}': {err}");
log::warn!("Failed to read file '{path}': {err:?}");
let rsp = req.error(&format!("Failed to read file: {path}"));
self.respond(rsp);
return;
return self.respond(rsp);
},
};

Expand Down Expand Up @@ -502,31 +508,39 @@ impl<R: Read, W: Write> DapServer<R, W> {
breakpoints: response_breakpoints,
}));

self.respond(rsp);
self.respond(rsp)
}

fn handle_attach(&mut self, req: Request, _args: AttachRequestArguments) {
fn handle_attach(
&mut self,
req: Request,
_args: AttachRequestArguments,
) -> Result<(), ServerError> {
let rsp = req.success(ResponseBody::Attach);
self.respond(rsp);
self.respond(rsp)?;

self.send_event(Event::Thread(ThreadEventBody {
reason: ThreadEventReason::Started,
thread_id: THREAD_ID,
}));
}))
}

fn handle_disconnect(&mut self, req: Request, _args: DisconnectArguments) {
fn handle_disconnect(
&mut self,
req: Request,
_args: DisconnectArguments,
) -> Result<(), ServerError> {
// Only send `Q` if currently in a debugging session.
let is_debugging = { self.state.lock().unwrap().is_debugging };
if is_debugging {
self.send_command(DebugRequest::Quit);
}

let rsp = req.success(ResponseBody::Disconnect);
self.respond(rsp);
self.respond(rsp)
}

fn handle_restart<T>(&mut self, req: Request, _args: T) {
fn handle_restart<T>(&mut self, req: Request, _args: T) -> Result<(), ServerError> {
// If connected to Positron, forward the restart command to the
// frontend. Otherwise ignore it.
if let Some(tx) = &self.comm_tx {
Expand All @@ -535,35 +549,39 @@ impl<R: Read, W: Write> DapServer<R, W> {
}

let rsp = req.success(ResponseBody::Restart);
self.respond(rsp);
self.respond(rsp)
}

// All servers must respond to `Threads` requests, possibly with
// a dummy thread as is the case here
fn handle_threads(&mut self, req: Request) {
fn handle_threads(&mut self, req: Request) -> Result<(), ServerError> {
let rsp = req.success(ResponseBody::Threads(ThreadsResponse {
threads: vec![Thread {
id: THREAD_ID,
name: String::from("R console"),
}],
}));
self.respond(rsp);
self.respond(rsp)
}

fn handle_set_exception_breakpoints(
&mut self,
req: Request,
_args: SetExceptionBreakpointsArguments,
) {
) -> Result<(), ServerError> {
let rsp = req.success(ResponseBody::SetExceptionBreakpoints(
SetExceptionBreakpointsResponse {
breakpoints: None, // TODO
},
));
self.respond(rsp);
self.respond(rsp)
}

fn handle_stacktrace(&mut self, req: Request, args: StackTraceArguments) {
fn handle_stacktrace(
&mut self,
req: Request,
args: StackTraceArguments,
) -> Result<(), ServerError> {
let state = self.state.lock().unwrap();
let stack = &state.stack;
let fallback_sources = &state.fallback_sources;
Expand Down Expand Up @@ -597,17 +615,17 @@ impl<R: Read, W: Write> DapServer<R, W> {
}));

drop(state);
self.respond(rsp);
self.respond(rsp)
}

fn handle_source(&mut self, req: Request, _args: SourceArguments) {
fn handle_source(&mut self, req: Request, _args: SourceArguments) -> Result<(), ServerError> {
let message = "Unsupported `source` request: {req:?}";
log::error!("{message}");
let rsp = req.error(message);
self.respond(rsp);
self.respond(rsp)
}

fn handle_scopes(&mut self, req: Request, args: ScopesArguments) {
fn handle_scopes(&mut self, req: Request, args: ScopesArguments) -> Result<(), ServerError> {
let state = self.state.lock().unwrap();
let frame_id_to_variables_reference = &state.frame_id_to_variables_reference;

Expand Down Expand Up @@ -637,15 +655,19 @@ impl<R: Read, W: Write> DapServer<R, W> {
let rsp = req.success(ResponseBody::Scopes(ScopesResponse { scopes }));

drop(state);
self.respond(rsp);
self.respond(rsp)
}

fn handle_variables(&mut self, req: Request, args: VariablesArguments) {
fn handle_variables(
&mut self,
req: Request,
args: VariablesArguments,
) -> Result<(), ServerError> {
let variables_reference = args.variables_reference;
let variables = self.collect_r_variables(variables_reference);
let variables = self.into_variables(variables);
let rsp = req.success(ResponseBody::Variables(VariablesResponse { variables }));
self.respond(rsp);
self.respond(rsp)
}

fn collect_r_variables(&self, variables_reference: i64) -> Vec<RVariable> {
Expand Down Expand Up @@ -708,10 +730,16 @@ impl<R: Read, W: Write> DapServer<R, W> {
out
}

fn handle_step<A>(&mut self, req: Request, _args: A, cmd: DebugRequest, resp: ResponseBody) {
fn handle_step<A>(
&mut self,
req: Request,
_args: A,
cmd: DebugRequest,
resp: ResponseBody,
) -> Result<(), ServerError> {
self.send_command(cmd);
let rsp = req.success(resp);
self.respond(rsp);
self.respond(rsp)
}

fn send_command(&mut self, cmd: DebugRequest) {
Expand Down