Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 200 additions & 4 deletions pyrefly/lib/lsp/non_wasm/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::Instant;

use crossbeam_channel::Receiver;
Expand Down Expand Up @@ -91,6 +93,9 @@ use lsp_types::OneOf;
use lsp_types::Position;
use lsp_types::PositionEncodingKind;
use lsp_types::PrepareRenameResponse;
use lsp_types::ProgressParams;
use lsp_types::ProgressParamsValue;
use lsp_types::ProgressToken;
use lsp_types::PublishDiagnosticsParams;
use lsp_types::Range;
use lsp_types::ReferenceParams;
Expand Down Expand Up @@ -129,6 +134,11 @@ use lsp_types::UnregistrationParams;
use lsp_types::Url;
use lsp_types::VersionedTextDocumentIdentifier;
use lsp_types::WatchKind;
use lsp_types::WorkDoneProgress;
use lsp_types::WorkDoneProgressBegin;
use lsp_types::WorkDoneProgressCreateParams;
use lsp_types::WorkDoneProgressEnd;
use lsp_types::WorkDoneProgressReport;
use lsp_types::WorkspaceClientCapabilities;
use lsp_types::WorkspaceEdit;
use lsp_types::WorkspaceFoldersServerCapabilities;
Expand All @@ -145,6 +155,7 @@ use lsp_types::notification::DidSaveTextDocument;
use lsp_types::notification::Exit;
use lsp_types::notification::Initialized;
use lsp_types::notification::Notification as _;
use lsp_types::notification::Progress;
use lsp_types::notification::PublishDiagnostics;
use lsp_types::request::CallHierarchyIncomingCalls;
use lsp_types::request::CallHierarchyOutgoingCalls;
Expand Down Expand Up @@ -182,6 +193,7 @@ use lsp_types::request::TypeHierarchySubtypes;
use lsp_types::request::TypeHierarchySupertypes;
use lsp_types::request::UnregisterCapability;
use lsp_types::request::WillRenameFiles;
use lsp_types::request::WorkDoneProgressCreate;
use lsp_types::request::WorkspaceConfiguration;
use lsp_types::request::WorkspaceSymbolRequest;
use pyrefly_build::SourceDatabase;
Expand Down Expand Up @@ -289,6 +301,7 @@ use crate::lsp::wasm::notebook::DidSaveNotebookDocument;
use crate::lsp::wasm::provide_type::ProvideType;
use crate::lsp::wasm::provide_type::ProvideTypeResponse;
use crate::lsp::wasm::provide_type::provide_type;
use crate::state::load::Load;
use crate::state::load::LspFile;
use crate::state::lsp::DisplayTypeErrors;
use crate::state::lsp::FindDefinitionItemWithDocstring;
Expand All @@ -303,7 +316,9 @@ use crate::state::state::CancellableTransaction;
use crate::state::state::CommittingTransaction;
use crate::state::state::State;
use crate::state::state::Transaction;
use crate::state::subscriber::CompositeSubscriber;
use crate::state::subscriber::PublishDiagnosticsSubscriber;
use crate::state::subscriber::Subscriber;
use crate::types::class::ClassDefIndex;

pub struct InitializeInfo {
Expand Down Expand Up @@ -565,6 +580,145 @@ impl ServerConnection {
}
}

const PROGRESS_REPORT_INTERVAL: Duration = Duration::from_millis(100);

struct LspProgressSubscriber<'a> {
server: &'a Server,
token: ProgressToken,
title: &'static str,
state: Mutex<LspProgressState>,
}

struct LspProgressState {
started: u64,
finished: u64,
began: bool,
ended: bool,
last_report: Instant,
last_percentage: u32,
}

impl LspProgressState {
fn snapshot(&mut self) -> (String, u32) {
let mut percentage = if self.started == 0 {
0
} else {
((self.finished * 100) / self.started) as u32
};
if self.started > 0 {
percentage = percentage.min(99);
}
if percentage < self.last_percentage {
percentage = self.last_percentage;
}
self.last_percentage = percentage;
(format!("{}/{}", self.finished, self.started), percentage)
}
}

impl<'a> LspProgressSubscriber<'a> {
fn new(server: &'a Server, title: &'static str) -> Option<Self> {
if !server.supports_work_done_progress() {
return None;
}
let token = server.new_progress_token();
server.send_request::<WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
token: token.clone(),
});
Comment on lines +620 to +627
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

window/workDoneProgress/create is sent, but the server may start emitting $/progress notifications immediately (on the first start_work) without waiting for the create request to be acknowledged. Per LSP, clients may ignore progress updates until the token is created. Consider tracking the create request/response (e.g., store the request id/token in Server, mark it ready on the matching response, and have LspProgressSubscriber suppress Begin/Report until ready) so Begin cannot be sent before the create response.

Copilot uses AI. Check for mistakes.
Some(Self {
server,
token,
title,
state: Mutex::new(LspProgressState {
started: 0,
finished: 0,
began: false,
ended: false,
last_report: Instant::now(),
last_percentage: 0,
}),
})
}

fn send_progress(&self, value: WorkDoneProgress) {
let params = ProgressParams {
token: self.token.clone(),
value: ProgressParamsValue::WorkDone(value),
};
self.server
.connection
.send(Message::Notification(new_notification::<Progress>(params)));
}

fn event(&self, update: impl FnOnce(&mut LspProgressState)) {
let now = Instant::now();
let outcome = {
let mut state = self.state.lock();
if state.ended {
return;
}
update(&mut state);
if state.started == 0 {
return;
}
let send_begin = !state.began;
if send_begin {
state.began = true;
}
let should_report =
send_begin || now.duration_since(state.last_report) >= PROGRESS_REPORT_INTERVAL;
if !should_report {
return;
}
state.last_report = now;
let (message, percentage) = state.snapshot();
Some((send_begin, message, percentage))
};
if let Some((send_begin, message, percentage)) = outcome {
if send_begin {
self.send_progress(WorkDoneProgress::Begin(WorkDoneProgressBegin {
title: self.title.to_owned(),
cancellable: None,
message: Some(message),
percentage: Some(percentage),
}));
} else {
self.send_progress(WorkDoneProgress::Report(WorkDoneProgressReport {
cancellable: None,
message: Some(message),
percentage: Some(percentage),
}));
}
}
}
}

impl Subscriber for LspProgressSubscriber<'_> {
fn start_work(&self, _: &Handle) {
self.event(|state| state.started += 1);
}

fn finish_work(&self, _: &Transaction<'_>, _: &Handle, _: &Arc<Load>, _: bool) {
self.event(|state| state.finished += 1);
}
}

impl Drop for LspProgressSubscriber<'_> {
fn drop(&mut self) {
let message = {
let mut state = self.state.lock();
if state.ended || !state.began {
return;
}
state.ended = true;
format!("{}/{}", state.finished, state.started)
};
self.send_progress(WorkDoneProgress::End(WorkDoneProgressEnd {
message: Some(message),
}));
}
}

fn diagnostic_markdown_support(params: &Value) -> bool {
let text_document = match params
.get("capabilities")
Expand Down Expand Up @@ -739,6 +893,7 @@ pub struct Server {
completion_mru: Mutex<CompletionMru>,
outgoing_request_id: AtomicI32,
outgoing_requests: Mutex<HashMap<RequestId, Request>>,
next_progress_token_id: AtomicUsize,
filewatcher_registered: AtomicBool,
watched_patterns: Mutex<SmallSet<WatchPattern>>,
version_info: Mutex<HashMap<PathBuf, i32>>,
Expand Down Expand Up @@ -2227,6 +2382,7 @@ impl Server {
completion_mru: Mutex::new(CompletionMru::default()),
outgoing_request_id: AtomicI32::new(1),
outgoing_requests: Mutex::new(HashMap::new()),
next_progress_token_id: AtomicUsize::new(1),
filewatcher_registered: AtomicBool::new(false),
watched_patterns: Mutex::new(SmallSet::new()),
version_info: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -2307,6 +2463,20 @@ impl Server {
self.outgoing_requests.lock().insert(id, request);
}

fn supports_work_done_progress(&self) -> bool {
self.initialize_params
.capabilities
.window
.as_ref()
.and_then(|window| window.work_done_progress)
== Some(true)
}

fn new_progress_token(&self) -> ProgressToken {
let id = self.next_progress_token_id.fetch_add(1, Ordering::Relaxed);
ProgressToken::String(format!("pyrefly-progress-{id}"))
}

/// Run the transaction with the in-memory content of open files. Returns the handles of open files when the transaction is done.
fn validate_in_memory_for_transaction(
&self,
Expand Down Expand Up @@ -2735,10 +2905,23 @@ impl Server {
)
}
};
let subscriber = PublishDiagnosticsSubscriber { publish_callback };
let mut subscribers: Vec<Box<dyn Subscriber + '_>> = Vec::new();
subscribers.push(Box::new(PublishDiagnosticsSubscriber { publish_callback }));
if let Some(progress_subscriber) =
LspProgressSubscriber::new(server, "Pyrefly: Rechecking")
{
subscribers.push(Box::new(progress_subscriber));
}
let subscriber: Box<dyn Subscriber + '_> = if subscribers.len() == 1 {
subscribers
.pop()
.expect("subscriber list unexpectedly empty")
} else {
Box::new(CompositeSubscriber::new(subscribers))
};
let mut transaction = server
.state
.new_committable_transaction(Require::Exports, Some(Box::new(subscriber)));
.new_committable_transaction(Require::Exports, Some(subscriber));
let invalidate_start = Instant::now();
// Mark files as dirty
f(transaction.as_mut());
Expand Down Expand Up @@ -4716,10 +4899,23 @@ impl Server {
)
}
};
let subscriber = PublishDiagnosticsSubscriber { publish_callback };
let mut subscribers: Vec<Box<dyn Subscriber + '_>> = Vec::new();
subscribers.push(Box::new(PublishDiagnosticsSubscriber { publish_callback }));
if let Some(progress_subscriber) =
LspProgressSubscriber::new(server, "Pyrefly: Rechecking")
{
subscribers.push(Box::new(progress_subscriber));
}
let subscriber: Box<dyn Subscriber + '_> = if subscribers.len() == 1 {
subscribers
.pop()
.expect("subscriber list unexpectedly empty")
} else {
Box::new(CompositeSubscriber::new(subscribers))
};
let mut transaction = server
.state
.new_committable_transaction(Require::Exports, Some(Box::new(subscriber)));
.new_committable_transaction(Require::Exports, Some(subscriber));
let invalidate_start = Instant::now();
transaction.as_mut().invalidate_config();
telemetry_event.set_invalidate_duration(invalidate_start.elapsed());
Expand Down
35 changes: 35 additions & 0 deletions pyrefly/lib/state/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,38 @@ where
(self.publish_callback)(transaction, handle, exports_changed);
}
}

/// A subscriber that forwards all events to each of its inner subscribers.
pub struct CompositeSubscriber<'a> {
subscribers: Vec<Box<dyn Subscriber + 'a>>,
}

impl<'a> CompositeSubscriber<'a> {
pub fn new(subscribers: Vec<Box<dyn Subscriber + 'a>>) -> Self {
assert!(
!subscribers.is_empty(),
"CompositeSubscriber requires at least one subscriber"
);
Comment on lines +212 to +215
Copy link

Copilot AI Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompositeSubscriber::new is a public constructor that panics on an empty subscriber list. Since this is in a library module, it would be safer to avoid a hard panic (e.g., make the type/ctor pub(crate), return a Result<Self, ...>, or allow an empty list as a no-op subscriber) to prevent unexpected crashes if used elsewhere.

Suggested change
assert!(
!subscribers.is_empty(),
"CompositeSubscriber requires at least one subscriber"
);
// Allow an empty subscriber list; this results in a no-op composite subscriber.

Copilot uses AI. Check for mistakes.
Self { subscribers }
}
}

impl<'a> Subscriber for CompositeSubscriber<'a> {
fn start_work(&self, handle: &Handle) {
for subscriber in &self.subscribers {
subscriber.start_work(handle);
}
}

fn finish_work(
&self,
transaction: &Transaction<'_>,
handle: &Handle,
result: &Arc<Load>,
exports_changed: bool,
) {
for subscriber in &self.subscribers {
subscriber.finish_work(transaction, handle, result, exports_changed);
}
}
}
1 change: 1 addition & 0 deletions pyrefly/lib/test/lsp/lsp_interaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod notebook_signature_help;
mod notebook_sync;
mod notebook_tokens;
mod object_model;
mod progress;
mod provide_type;
mod pytorch_benchmark;
mod references;
Expand Down
Loading
Loading