diff --git a/apps/staged/src-tauri/src/lib.rs b/apps/staged/src-tauri/src/lib.rs index f277605ec..f849b8b91 100644 --- a/apps/staged/src-tauri/src/lib.rs +++ b/apps/staged/src-tauri/src/lib.rs @@ -17,6 +17,7 @@ pub mod image_commands; pub mod migrations; pub mod note_commands; pub mod paths; +pub mod pr_poll_scheduler; pub mod project_commands; pub mod project_mcp; pub mod prs; @@ -2002,6 +2003,11 @@ pub fn run() { let compat = store::check_db_compatibility(&db_path) .map_err(|e| format!("Cannot check database: {e}"))?; let session_registry = Arc::new(session_runner::SessionRegistry::new()); + // Backend-owned PR-poll scheduler. Managed unconditionally so the + // interest/hint commands resolve even before the store exists (e.g. + // during the needs-reset prompt); the tick loop is only spawned once + // the store is ready (the `Ok` branch below). + let pr_scheduler = Arc::new(pr_poll_scheduler::PrPollScheduler::new()); let (store_slot, reset_info) = match compat { store::DbCompatibility::Ok => { @@ -2024,6 +2030,13 @@ pub fn run() { } // Start the tiered background sync service for all cloned repos. background_sync::spawn(Arc::clone(&store_arc), app.handle().clone()); + // Start the backend PR-poll scheduler — it owns polling + // cadence/concurrency; the frontend only sends interest hints. + pr_poll_scheduler::spawn( + Arc::clone(&pr_scheduler), + Arc::clone(&store_arc), + app.handle().clone(), + ); // `fsmonitor-v1` only flips `.git/config` flags on stale // clones the user may not visit this session — per-project // `ensure_local_clone` already re-applies the same config @@ -2069,6 +2082,7 @@ pub fn run() { app.manage(store_slot); app.manage(session_registry); + app.manage(pr_scheduler); app.manage(Arc::new(actions::ActionExecutor::new())); app.manage(Arc::new(actions::ActionRegistry::new())); app.manage(ShutdownState::default()); @@ -2238,6 +2252,12 @@ pub fn run() { prs::squash_commits, prs::clear_branch_pr_status, prs::recover_branch_pr, + // PR poll scheduler (frontend interest/hint layer) + pr_poll_scheduler::set_foreground_project, + pr_poll_scheduler::set_focus, + pr_poll_scheduler::set_branch_pending, + pr_poll_scheduler::refresh_now, + pr_poll_scheduler::disconnect_client, // Utilities util_commands::open_url, util_commands::is_sq_available, diff --git a/apps/staged/src-tauri/src/pr_poll_scheduler.rs b/apps/staged/src-tauri/src/pr_poll_scheduler.rs new file mode 100644 index 000000000..e7ff97732 --- /dev/null +++ b/apps/staged/src-tauri/src/pr_poll_scheduler.rs @@ -0,0 +1,1039 @@ +//! Backend-owned PR-status poll scheduler. +//! +//! Owns the *cadence and concurrency* of PR-status polling that used to live in +//! the frontend `prPollingService`. A single long-lived tick loop decides which +//! projects are due, dedups in-flight work, and drives every refresh through one +//! bounded pool (shared via [`crate::prs::refresh_project_pr_statuses`]). +//! +//! Frontends shrink to a thin *interest/hint* layer: they tell the backend which +//! project is foregrounded, which branches have pending checks, and whether a +//! window is focused, via the [`set_foreground_project`], [`set_branch_pending`], +//! and [`set_focus`] commands. The effective tier for a project is the union of +//! interest (any foregrounding ⇒ selected; any pending ⇒ fast; nothing focused ⇒ +//! pause). +//! +//! ## Per-client interest (Phase 2) +//! +//! Interest is tracked **per connected client** — the native Tauri window plus +//! each WebSocket browser session — keyed by a frontend-supplied `client_id`. +//! The cadence for a project is the union across all clients ([`PollState::any_focused`], +//! [`PollState::is_foreground`], [`PollState::project_has_pending`]), so a project +//! that any client cares about is polled at the appropriate tier; the *work* +//! bookkeeping (`last_polled_at`/`failures`/`stale`/`forced`) stays project-keyed +//! and shared, so N clients still trigger only one poll per project per tier. +//! +//! Clients are evicted on disconnect (clean WS close ⇒ [`PrPollScheduler::disconnect_client`]) +//! and via a [`CLIENT_TTL_MS`] fallback for dirty drops ([`PollState::evict_stale_clients`], +//! swept each tick). The native window uses the fixed [`TAURI_CLIENT_ID`], which +//! is pre-seeded at launch and exempt from TTL eviction (process death is its +//! teardown), so single-client behaviour stays byte-for-byte equivalent to Phase 1. +//! +//! Poll-state (last-polled timestamps, failure counts) is intentionally **not +//! persisted** — on restart everything is "due", matching the frontend's +//! previous behaviour where the service started fresh each app launch. +//! +//! ## Testing seam +//! +//! All due/dedup/backoff *decisions* live in [`PollState`], whose methods take an +//! explicit `now` (ms since epoch) and in-flight set. The tick loop is the only +//! place that touches the real clock ([`crate::store::now_timestamp`]) and the +//! real fetcher, so the decision logic is unit-testable without `gh` calls or +//! wall-clock sleeps (see the tests at the bottom of this file). + +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::store::Store; + +// --------------------------------------------------------------------------- +// Interval tiers (milliseconds) +// --------------------------------------------------------------------------- +// +// Moved verbatim from the frontend `prPollingService` so the backend is the +// single owner of polling cadence. + +/// Any project with a branch that has pending CI checks (fastest tier). +const PENDING_INTERVAL_MS: i64 = 15_000; +/// The foregrounded/selected project, no pending checks. +const SELECTED_INTERVAL_MS: i64 = 60_000; +/// Background (non-selected, no pending checks). +const BACKGROUND_INTERVAL_MS: i64 = 5 * 60_000; +/// Consecutive failures before a project is reported as stale to the frontend. +const MAX_CONSECUTIVE_FAILURES: u32 = 3; + +/// How often the tick loop wakes up to re-evaluate what is due. Kept well below +/// the fastest tier (15s) so the loop's granularity adds only a small bounded +/// jitter to each tier; interest changes and `refresh_now` nudges additionally +/// wake the loop immediately, so this only bounds the *periodic* re-poll delay. +const TICK_INTERVAL_SECS: u64 = 5; + +/// Well-known id for the native Tauri window. It has no WS heartbeat (the +/// process dying is its teardown), so it is pre-seeded at launch and exempt from +/// TTL eviction. Must match `TAURI_CLIENT_ID` in `prPollingService.ts`. +const TAURI_CLIENT_ID: &str = "tauri-main"; + +/// How long a client's interest survives without a heartbeat before the tick +/// loop evicts it — the dirty-drop fallback for WS clients that vanish without a +/// clean close. Set to ≈3× the expected WS keepalive (≤~30s), so it tolerates +/// transient lag while bounding spurious pending-tier polls from a dead-but- +/// counted client to ≲6. The Tauri id is exempt. +const CLIENT_TTL_MS: i64 = 90_000; + +// --------------------------------------------------------------------------- +// Poll-state — pure decision logic, no clock / store / Tauri handles +// --------------------------------------------------------------------------- + +/// One connected client's interest. The effective cadence for a project is the +/// *union* of these across all clients (see [`PollState::any_focused`], +/// [`PollState::is_foreground`], [`PollState::project_has_pending`]). +#[derive(Default)] +struct ClientInterest { + /// This client's foregrounded/selected project (→ selected tier). + foreground_project: Option, + /// Whether this client's window is focused. + focused: bool, + /// branch_id → project_id for branches this client sees as having pending CI + /// checks (→ pending tier). Mirrors the granularity the frontend tracks via + /// `updateChecksStatus`. + pending_branches: HashMap, + /// Last heartbeat (ms since epoch). Drives TTL eviction of dirty-dropped + /// clients; the Tauri id is exempt regardless of this value. + last_seen: i64, +} + +/// The poll-state and interest the scheduler owns. Pure: every method that needs +/// the current time takes `now` (ms since epoch) as a parameter, so the +/// due/dedup/backoff logic can be unit-tested deterministically. +struct PollState { + /// When each project was last polled (ms since epoch). Absent ⇒ never + /// polled ⇒ immediately due. Not persisted across restarts. + last_polled_at: HashMap, + /// Consecutive failure count per project. + failures: HashMap, + /// Projects currently reported as stale (failures ≥ threshold). Tracked so a + /// stale event is only emitted on transitions. + stale: HashSet, + /// Projects explicitly nudged via `refresh_now`; due on the next tick + /// regardless of interval or focus. Project-keyed and global (a nudge is + /// about the *project*, not the observer), so it is shared across clients + /// and survives the forcing client disconnecting. + forced: HashSet, + /// Per-connected-client interest, keyed by `client_id`. The cadence for a + /// project is the union across these (see the union helpers). Only the + /// *interest* is per-client; the work bookkeeping above stays project-keyed + /// so N clients still trigger only one poll per project per tier. + clients: HashMap, +} + +impl PollState { + fn new() -> Self { + let mut clients = HashMap::new(); + // Pre-seed the native client as focused so the very first immediate tick + // polls at launch, matching Phase 1's `focused: true` default. The Tauri + // frontend's later hints update this same entry, and `last_seen: 0` is + // fine because the Tauri id is exempt from TTL eviction. + clients.insert( + TAURI_CLIENT_ID.to_string(), + ClientInterest { + focused: true, + last_seen: 0, + ..Default::default() + }, + ); + Self { + last_polled_at: HashMap::new(), + failures: HashMap::new(), + stale: HashSet::new(), + forced: HashSet::new(), + clients, + } + } + + /// Whether any connected client's window is focused. No focused client ⇒ + /// periodic polling pauses. + fn any_focused(&self) -> bool { + self.clients.values().any(|c| c.focused) + } + + /// Whether any client has this project foregrounded/selected. + fn is_foreground(&self, project_id: &str) -> bool { + self.clients + .values() + .any(|c| c.foreground_project.as_deref() == Some(project_id)) + } + + /// Whether any client sees a pending branch in this project. + fn project_has_pending(&self, project_id: &str) -> bool { + self.clients + .values() + .any(|c| c.pending_branches.values().any(|p| p == project_id)) + } + + /// The polling interval for a project, as the union of current interest + /// across all clients. Mirrors the frontend `getProjectInterval`. + fn interval_for(&self, project_id: &str) -> i64 { + if self.project_has_pending(project_id) { + PENDING_INTERVAL_MS + } else if self.is_foreground(project_id) { + SELECTED_INTERVAL_MS + } else { + BACKGROUND_INTERVAL_MS + } + } + + /// Compute which of `project_ids` should be polled right now. + /// + /// A project is due when it is not already in flight (dedup) and either it + /// has been explicitly forced (`refresh_now` — bypasses focus and interval), + /// or a client is focused and its tier interval has elapsed. + fn due(&self, project_ids: &[String], now: i64, in_flight: &HashSet) -> Vec { + let mut due = Vec::new(); + for id in project_ids { + if in_flight.contains(id) { + continue; // dedup: a refresh for this project is already running + } + if self.forced.contains(id) { + due.push(id.clone()); + continue; + } + if !self.any_focused() { + continue; // no focused client ⇒ pause periodic polling + } + let last = self.last_polled_at.get(id).copied().unwrap_or(0); + if now.saturating_sub(last) >= self.interval_for(id) { + due.push(id.clone()); + } + } + due + } + + /// Drop tracking for projects/branches that no longer exist. Project-keyed + /// work bookkeeping is pruned by project membership; each client's interest + /// is pruned in place. Client *lifecycle* eviction (disconnect / TTL) is + /// separate — that drops whole clients, this drops dead project/branch + /// interest from surviving clients. + fn prune(&mut self, known_projects: &HashSet<&str>, known_branches: &HashSet<&str>) { + self.last_polled_at + .retain(|k, _| known_projects.contains(k.as_str())); + self.failures + .retain(|k, _| known_projects.contains(k.as_str())); + self.stale.retain(|k| known_projects.contains(k.as_str())); + self.forced.retain(|k| known_projects.contains(k.as_str())); + for client in self.clients.values_mut() { + client.pending_branches.retain(|branch_id, project_id| { + known_branches.contains(branch_id.as_str()) + && known_projects.contains(project_id.as_str()) + }); + if let Some(fg) = &client.foreground_project { + if !known_projects.contains(fg.as_str()) { + client.foreground_project = None; + } + } + } + } + + /// Record a successful poll. Returns `true` if the project transitioned out + /// of the stale state (so the caller should emit a stale-cleared event). + fn record_success(&mut self, project_id: &str, now: i64) -> bool { + self.last_polled_at.insert(project_id.to_string(), now); + self.failures.remove(project_id); + self.stale.remove(project_id) + } + + /// Record a failed poll. Returns `true` if the project just transitioned + /// into the stale state (so the caller should emit a stale event). + /// + /// `last_polled_at` is advanced so a persistently failing project retries on + /// its normal tier cadence rather than every tick. (The old frontend left + /// `lastPolledAt` untouched on failure, which retried failing projects about + /// once a second — exactly the kind of churn the backend should damp.) + fn record_failure(&mut self, project_id: &str, now: i64) -> bool { + self.last_polled_at.insert(project_id.to_string(), now); + let count = self.failures.entry(project_id.to_string()).or_insert(0); + *count += 1; + if *count == MAX_CONSECUTIVE_FAILURES { + self.stale.insert(project_id.to_string()) + } else { + false + } + } + + fn set_foreground(&mut self, client_id: &str, project_id: Option, now: i64) { + let client = self.clients.entry(client_id.to_string()).or_default(); + client.foreground_project = project_id; + client.last_seen = now; + } + + fn set_focus(&mut self, client_id: &str, focused: bool, now: i64) { + let client = self.clients.entry(client_id.to_string()).or_default(); + client.focused = focused; + client.last_seen = now; + } + + fn set_branch_pending( + &mut self, + client_id: &str, + branch_id: String, + project_id: String, + pending: bool, + now: i64, + ) { + let client = self.clients.entry(client_id.to_string()).or_default(); + if pending { + client.pending_branches.insert(branch_id, project_id); + } else { + client.pending_branches.remove(&branch_id); + } + client.last_seen = now; + } + + /// `refresh_now` nudge. Project-keyed and global — independent of which + /// client asked, so it survives that client disconnecting. + fn force(&mut self, project_id: String) { + self.forced.insert(project_id); + } + + // -- Client lifecycle ------------------------------------------------- + + /// Heartbeat: create the client entry if absent and bump its `last_seen` so + /// it survives the next TTL sweep. Called on WS connect and each WS ping. + fn touch(&mut self, client_id: &str, now: i64) { + self.clients + .entry(client_id.to_string()) + .or_default() + .last_seen = now; + } + + /// Clean disconnect: drop the client and all its interest. + fn disconnect_client(&mut self, client_id: &str) { + self.clients.remove(client_id); + } + + /// Dirty-drop fallback: evict clients not heard from within `ttl_ms`. The + /// Tauri id is exempt (the native window has no WS heartbeat; the process + /// dying tears it down). + fn evict_stale_clients(&mut self, now: i64, ttl_ms: i64) { + self.clients + .retain(|id, c| id == TAURI_CLIENT_ID || now.saturating_sub(c.last_seen) <= ttl_ms); + } +} + +// --------------------------------------------------------------------------- +// Scheduler — managed state shared between the tick loop and the hint commands +// --------------------------------------------------------------------------- + +/// Long-lived scheduler state, stored in Tauri managed state as +/// `Arc`. The interest/hint commands mutate [`PollState`] and +/// wake the loop; the loop reads it to decide what to poll. +pub struct PrPollScheduler { + state: Mutex, + /// Projects with an in-flight refresh, for dedup across overlapping ticks + /// and `refresh_now` nudges. + in_flight: Mutex>, + /// Wakes the tick loop when interest changes or a `refresh_now` arrives so + /// it re-evaluates promptly instead of waiting out the periodic tick. + notify: tokio::sync::Notify, +} + +impl PrPollScheduler { + pub fn new() -> Self { + Self { + state: Mutex::new(PollState::new()), + in_flight: Mutex::new(HashSet::new()), + notify: tokio::sync::Notify::new(), + } + } + + // These wrappers are the only place that reads the real clock for interest + // updates: they stamp `now` and delegate to the pure [`PollState`] methods, + // then wake the loop so the union is recomputed promptly. `pub` so the web + // server's `dispatch` / `handle_ws` can drive them via the managed + // `Arc` for WebSocket clients. + + pub fn set_foreground(&self, client_id: String, project_id: Option) { + let now = crate::store::now_timestamp(); + self.state + .lock() + .unwrap() + .set_foreground(&client_id, project_id, now); + self.notify.notify_one(); + } + + pub fn set_focus(&self, client_id: String, focused: bool) { + let now = crate::store::now_timestamp(); + self.state + .lock() + .unwrap() + .set_focus(&client_id, focused, now); + self.notify.notify_one(); + } + + pub fn set_branch_pending( + &self, + client_id: String, + branch_id: String, + project_id: String, + pending: bool, + ) { + let now = crate::store::now_timestamp(); + self.state + .lock() + .unwrap() + .set_branch_pending(&client_id, branch_id, project_id, pending, now); + self.notify.notify_one(); + } + + pub fn force(&self, project_id: String) { + self.state.lock().unwrap().force(project_id); + self.notify.notify_one(); + } + + /// Heartbeat for a client (WS connect / ping). Keeps it alive past the TTL. + pub fn touch(&self, client_id: String) { + let now = crate::store::now_timestamp(); + self.state.lock().unwrap().touch(&client_id, now); + self.notify.notify_one(); + } + + /// Clean disconnect for a client (WS close). Wakes the loop so a vanished + /// focus/foreground/pending recomputes the union promptly (it may now pause + /// or slow polling). + pub fn disconnect_client(&self, client_id: String) { + self.state.lock().unwrap().disconnect_client(&client_id); + self.notify.notify_one(); + } +} + +impl Default for PrPollScheduler { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// Tick loop +// --------------------------------------------------------------------------- + +/// Spawn the PR-poll loop. Call once during app setup, after the store is +/// initialised. Takes the store directly (like `background_sync::spawn`); a DB +/// reset only happens via a restart-gated flow, so the loop's store handle stays +/// valid for the process lifetime. +pub fn spawn(scheduler: Arc, store: Arc, app_handle: tauri::AppHandle) { + tauri::async_runtime::spawn(async move { + poll_loop(scheduler, store, app_handle).await; + }); +} + +async fn poll_loop( + scheduler: Arc, + store: Arc, + app_handle: tauri::AppHandle, +) { + // One bounded pool shared across every project the scheduler refreshes, so a + // tick that finds many projects due still caps total concurrent `gh` + // subprocesses — this is what tames the focus-regain / launch herd. + let semaphore = Arc::new(tokio::sync::Semaphore::new( + crate::prs::PR_REFRESH_CONCURRENCY, + )); + + let mut interval = tokio::time::interval(Duration::from_secs(TICK_INTERVAL_SECS)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + // The first `interval.tick()` resolves immediately, so the loop runs an + // initial tick at startup (everything is due ⇒ initial poll). + tokio::select! { + _ = interval.tick() => {} + _ = scheduler.notify.notified() => {} + } + tick(&scheduler, &store, &app_handle, &semaphore).await; + } +} + +async fn tick( + scheduler: &Arc, + store: &Arc, + app_handle: &tauri::AppHandle, + semaphore: &Arc, +) { + // Re-derive the project list from the DB each tick (cheap indexed read) so + // the backend owns the set of projects to poll without a frontend hint — + // this is what replaces the old `prPollingService.setProjects`. + let project_ids: Vec = match store.list_projects() { + Ok(projects) => projects.into_iter().map(|p| p.id).collect(), + Err(e) => { + log::warn!("[pr_poll] failed to list projects: {e}"); + return; + } + }; + let branch_ids: Vec = match store.list_branch_ids() { + Ok(branch_ids) => branch_ids, + Err(e) => { + log::warn!("[pr_poll] failed to list branches: {e}"); + return; + } + }; + + let now = crate::store::now_timestamp(); + + // Decide what to poll while holding the locks (no `.await` inside), then + // mark those projects in flight. Lock order here (in_flight → state) is the + // only nesting site; completion handlers below take the two locks + // sequentially, never nested, so there is no ordering hazard. + let due = { + let known_projects: HashSet<&str> = project_ids.iter().map(|s| s.as_str()).collect(); + let known_branches: HashSet<&str> = branch_ids.iter().map(|s| s.as_str()).collect(); + let mut in_flight = scheduler.in_flight.lock().unwrap(); + let mut state = scheduler.state.lock().unwrap(); + // Evict clients that dropped without a clean WS close before deciding + // what is due, so their stale interest stops inflating the union. + state.evict_stale_clients(now, CLIENT_TTL_MS); + state.prune(&known_projects, &known_branches); + let due = state.due(&project_ids, now, &in_flight); + for id in &due { + in_flight.insert(id.clone()); + // Consume the explicit nudge now that we're acting on it. + state.forced.remove(id); + } + due + }; + + for project_id in due { + emit_refresh_state(app_handle, &project_id, true); + + let scheduler = Arc::clone(scheduler); + let store = Arc::clone(store); + let app_handle = app_handle.clone(); + let semaphore = Arc::clone(semaphore); + + tauri::async_runtime::spawn(async move { + let result = crate::prs::refresh_project_pr_statuses( + &store, + &app_handle, + &project_id, + semaphore, + ) + .await; + let now = crate::store::now_timestamp(); + + // Update poll-state, then clear the in-flight marker. The two locks + // are taken sequentially (never nested) to stay deadlock-free. + let stale_change = { + let mut state = scheduler.state.lock().unwrap(); + match &result { + Ok(_) => state.record_success(&project_id, now).then_some(false), + Err(e) => { + log::warn!("[pr_poll] refresh failed for project {project_id}: {e}"); + state.record_failure(&project_id, now).then_some(true) + } + } + }; + scheduler.in_flight.lock().unwrap().remove(&project_id); + + emit_refresh_state(&app_handle, &project_id, false); + if let Some(stale) = stale_change { + emit_stale(&app_handle, &project_id, stale); + } + + // A refresh just finished — wake the loop so any project forced + // while this one was in flight is picked up without waiting out the + // periodic tick. + scheduler.notify.notify_one(); + }); + } +} + +// --------------------------------------------------------------------------- +// Events +// --------------------------------------------------------------------------- + +/// Per-project refresh lifecycle, so the frontend can show "checking right now". +/// Replaces the refresh-state the frontend used to track around its own poll +/// loop, which now lives here. +fn emit_refresh_state(app_handle: &tauri::AppHandle, project_id: &str, refreshing: bool) { + #[derive(Clone, serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct PrRefreshState { + project_id: String, + refreshing: bool, + } + + crate::web_server::emit_to_all( + app_handle, + "pr-refresh-state", + PrRefreshState { + project_id: project_id.to_string(), + refreshing, + }, + ); +} + +/// Project crossed (or recovered from) the consecutive-failure threshold, so the +/// frontend can show a stale-data indicator. +fn emit_stale(app_handle: &tauri::AppHandle, project_id: &str, stale: bool) { + #[derive(Clone, serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct PrStale { + project_id: String, + stale: bool, + } + + crate::web_server::emit_to_all( + app_handle, + "pr-status-stale", + PrStale { + project_id: project_id.to_string(), + stale, + }, + ); +} + +// --------------------------------------------------------------------------- +// Interest / hint commands +// --------------------------------------------------------------------------- + +/// Set a client's foregrounded/selected project (→ selected tier). `None` +/// clears it. The effective tier unions this across all connected clients. +#[tauri::command(rename_all = "camelCase")] +pub fn set_foreground_project( + scheduler: tauri::State<'_, Arc>, + client_id: String, + project_id: Option, +) { + scheduler.set_foreground(client_id, project_id); +} + +/// Report a client's window focus. With no client focused, periodic polling +/// pauses (an explicit `refresh_now` still fetches). +#[tauri::command(rename_all = "camelCase")] +pub fn set_focus( + scheduler: tauri::State<'_, Arc>, + client_id: String, + focused: bool, +) { + scheduler.set_focus(client_id, focused); +} + +/// Mark whether a branch has pending CI checks for a client (→ pending tier for +/// its project, unioned across clients). +#[tauri::command(rename_all = "camelCase")] +pub fn set_branch_pending( + scheduler: tauri::State<'_, Arc>, + client_id: String, + branch_id: String, + project_id: String, + pending: bool, +) { + scheduler.set_branch_pending(client_id, branch_id, project_id, pending); +} + +/// Explicitly nudge the scheduler to refresh a project now (e.g. just created or +/// pushed a PR). Folded into the scheduler's dedup rather than fetching directly. +/// The `client_id` is carried only to keep that client's heartbeat fresh; the +/// force itself is project-keyed and global. +#[tauri::command(rename_all = "camelCase")] +pub fn refresh_now( + scheduler: tauri::State<'_, Arc>, + client_id: String, + project_id: String, +) { + scheduler.touch(client_id); + scheduler.force(project_id); +} + +/// Drop a client's interest on clean disconnect. For the native app this fires +/// from `prPollingService.dispose()`; for web it fires on WS close. +#[tauri::command(rename_all = "camelCase")] +pub fn disconnect_client(scheduler: tauri::State<'_, Arc>, client_id: String) { + scheduler.disconnect_client(client_id); +} + +// --------------------------------------------------------------------------- +// Tests — pure due/dedup/backoff logic, no clock or `gh` +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn ids(v: &[&str]) -> Vec { + v.iter().map(|s| s.to_string()).collect() + } + + fn set(v: &[&str]) -> HashSet { + v.iter().map(|s| s.to_string()).collect() + } + + /// A `PollState` with the launch-seeded Tauri client removed, for + /// multi-client tests that want a clean slate of explicitly-added clients. + fn empty_state() -> PollState { + let mut st = PollState::new(); + st.disconnect_client(TAURI_CLIENT_ID); + st + } + + #[test] + fn due_respects_the_three_tiers() { + let mut st = PollState::new(); + st.set_foreground(TAURI_CLIENT_ID, Some("sel".into()), 0); + st.set_branch_pending(TAURI_CLIENT_ID, "b1".into(), "pend".into(), true, 0); + for id in ["sel", "pend", "bg"] { + st.last_polled_at.insert(id.into(), 0); + } + let projects = ids(&["sel", "pend", "bg"]); + let none = HashSet::new(); + + // Just after the pending interval: only the pending project is due. + assert_eq!( + st.due(&projects, PENDING_INTERVAL_MS, &none), + ids(&["pend"]) + ); + + // Past the selected interval: pending + selected (order follows input). + assert_eq!( + st.due(&projects, SELECTED_INTERVAL_MS, &none), + ids(&["sel", "pend"]) + ); + + // Past the background interval: all three. + assert_eq!( + st.due(&projects, BACKGROUND_INTERVAL_MS, &none), + ids(&["sel", "pend", "bg"]) + ); + } + + #[test] + fn unfocused_pauses_periodic_polling_but_not_forced() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + st.set_focus(TAURI_CLIENT_ID, false, 0); + let none = HashSet::new(); + + // Long past every interval, but no focused client ⇒ nothing due. + assert!(st + .due(&ids(&["p"]), BACKGROUND_INTERVAL_MS * 10, &none) + .is_empty()); + + // A forced project still polls while unfocused. + st.force("p".into()); + assert_eq!( + st.due(&ids(&["p"]), BACKGROUND_INTERVAL_MS * 10, &none), + ids(&["p"]) + ); + } + + #[test] + fn in_flight_projects_are_not_re_enqueued() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + let now = BACKGROUND_INTERVAL_MS * 10; + + // Due when nothing is in flight. + assert_eq!(st.due(&ids(&["p"]), now, &HashSet::new()), ids(&["p"])); + // Deduped when already in flight. + assert!(st.due(&ids(&["p"]), now, &set(&["p"])).is_empty()); + + // Even an explicit nudge does not double-fetch an in-flight project. + st.force("p".into()); + assert!(st.due(&ids(&["p"]), now, &set(&["p"])).is_empty()); + } + + #[test] + fn refresh_now_forces_a_poll_before_the_interval_elapses() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + + // Just polled ⇒ not yet interval-due. + assert!(st.due(&ids(&["p"]), 1_000, &HashSet::new()).is_empty()); + + // refresh_now folds in: due immediately regardless of interval. + st.force("p".into()); + assert_eq!(st.due(&ids(&["p"]), 1_000, &HashSet::new()), ids(&["p"])); + } + + #[test] + fn refresh_now_during_in_flight_refresh_survives_completion() { + for completion in [ + PollState::record_success as fn(&mut PollState, &str, i64) -> bool, + PollState::record_failure, + ] { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + + // First nudge is consumed when the tick schedules the refresh. + st.force("p".into()); + assert_eq!(st.due(&ids(&["p"]), 1_000, &HashSet::new()), ids(&["p"])); + st.forced.remove("p"); + + // A second nudge arrives while that refresh is still in flight, so + // it is deduped for now... + st.force("p".into()); + assert!(st.due(&ids(&["p"]), 1_000, &set(&["p"])).is_empty()); + + // ...but completing the original refresh must not clear the fresh + // nudge. Once in-flight clears, the project is due immediately. + completion(&mut st, "p", 2_000); + assert_eq!(st.due(&ids(&["p"]), 2_000, &HashSet::new()), ids(&["p"])); + } + } + + #[test] + fn failures_trip_stale_after_threshold_then_clear_on_success() { + let mut st = PollState::new(); + assert!(!st.record_failure("p", 1)); + assert!(!st.record_failure("p", 2)); + // Third consecutive failure crosses the threshold ⇒ transition to stale. + assert!(st.record_failure("p", 3)); + // Further failures stay stale (no new transition). + assert!(!st.record_failure("p", 4)); + // A success transitions back out of stale exactly once. + assert!(st.record_success("p", 5)); + assert!(!st.record_success("p", 6)); + } + + #[test] + fn failure_advances_last_polled_so_retries_use_the_tier_cadence() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + // Fails at now=1000; last_polled advances to 1000. + st.record_failure("p", 1_000); + // Not due again until a full background interval after the failure. + assert!(st + .due( + &ids(&["p"]), + 1_000 + BACKGROUND_INTERVAL_MS - 1, + &HashSet::new() + ) + .is_empty()); + assert_eq!( + st.due( + &ids(&["p"]), + 1_000 + BACKGROUND_INTERVAL_MS, + &HashSet::new() + ), + ids(&["p"]) + ); + } + + #[test] + fn prune_clears_per_client_foreground_and_pending() { + let mut st = PollState::new(); + st.last_polled_at.insert("gone".into(), 0); + st.failures.insert("gone".into(), 2); + st.stale.insert("gone".into()); + st.force("gone".into()); + st.set_foreground(TAURI_CLIENT_ID, Some("gone".into()), 0); + st.set_branch_pending(TAURI_CLIENT_ID, "b".into(), "gone".into(), true, 0); + + let known_projects: HashSet<&str> = ["alive"].into_iter().collect(); + let known_branches: HashSet<&str> = HashSet::new(); + st.prune(&known_projects, &known_branches); + + assert!(st.last_polled_at.is_empty()); + assert!(st.failures.is_empty()); + assert!(st.stale.is_empty()); + assert!(st.forced.is_empty()); + // The client itself survives prune (lifecycle is separate); only its + // interest in the now-gone project is cleared. + let client = &st.clients[TAURI_CLIENT_ID]; + assert!(client.pending_branches.is_empty()); + assert!(client.foreground_project.is_none()); + } + + #[test] + fn prune_clears_pending_for_deleted_branch_in_surviving_project() { + let mut st = PollState::new(); + st.set_branch_pending( + TAURI_CLIENT_ID, + "deleted-branch".into(), + "alive".into(), + true, + 0, + ); + assert!(st.project_has_pending("alive")); + assert_eq!(st.interval_for("alive"), PENDING_INTERVAL_MS); + + let known_projects: HashSet<&str> = ["alive"].into_iter().collect(); + let known_branches: HashSet<&str> = ["other-branch"].into_iter().collect(); + st.prune(&known_projects, &known_branches); + + assert!(!st.project_has_pending("alive")); + assert_eq!(st.interval_for("alive"), BACKGROUND_INTERVAL_MS); + } + + // -- Phase 2: per-client interest / union / lifecycle ------------------ + + #[test] + fn union_foreground_across_two_clients() { + let mut st = empty_state(); + st.set_focus("a", true, 0); + st.set_focus("b", true, 0); + st.set_foreground("a", Some("p1".into()), 0); + st.set_foreground("b", Some("p2".into()), 0); + for id in ["p1", "p2", "bg"] { + st.last_polled_at.insert(id.into(), 0); + } + // Each client's foreground reaches the selected tier; bg stays slow. + assert_eq!(st.interval_for("p1"), SELECTED_INTERVAL_MS); + assert_eq!(st.interval_for("p2"), SELECTED_INTERVAL_MS); + assert_eq!(st.interval_for("bg"), BACKGROUND_INTERVAL_MS); + // Both foregrounded projects are due at the selected interval; bg isn't. + assert_eq!( + st.due( + &ids(&["p1", "p2", "bg"]), + SELECTED_INTERVAL_MS, + &HashSet::new() + ), + ids(&["p1", "p2"]) + ); + } + + #[test] + fn union_pending_beats_foreground() { + let mut st = empty_state(); + // Client A sees a pending branch in p1; client B merely foregrounds p1. + st.set_branch_pending("a", "b1".into(), "p1".into(), true, 0); + st.set_foreground("b", Some("p1".into()), 0); + // Pending wins the union precedence. + assert_eq!(st.interval_for("p1"), PENDING_INTERVAL_MS); + } + + #[test] + fn union_focus() { + let mut st = empty_state(); + st.set_focus("a", false, 0); + st.set_focus("b", true, 0); + st.last_polled_at.insert("p".into(), 0); + let none = HashSet::new(); + + // Any client focused ⇒ active. + assert!(st.any_focused()); + assert_eq!( + st.due(&ids(&["p"]), BACKGROUND_INTERVAL_MS, &none), + ids(&["p"]) + ); + + // All clients unfocused ⇒ paused. + st.set_focus("b", false, 0); + assert!(!st.any_focused()); + assert!(st + .due(&ids(&["p"]), BACKGROUND_INTERVAL_MS * 10, &none) + .is_empty()); + } + + #[test] + fn disconnect_recomputes_union() { + let mut st = empty_state(); + st.set_foreground("a", Some("p1".into()), 0); + st.set_foreground("b", Some("p1".into()), 0); + assert_eq!(st.interval_for("p1"), SELECTED_INTERVAL_MS); + + // One client leaves: still selected (B still holds it). + st.disconnect_client("a"); + assert!(st.is_foreground("p1")); + assert_eq!(st.interval_for("p1"), SELECTED_INTERVAL_MS); + + // Last client holding it leaves: falls back to the background tier. + st.disconnect_client("b"); + assert!(!st.is_foreground("p1")); + assert_eq!(st.interval_for("p1"), BACKGROUND_INTERVAL_MS); + } + + #[test] + fn disconnect_drops_pending() { + let mut st = empty_state(); + st.set_branch_pending("a", "b1".into(), "p".into(), true, 0); + assert!(st.project_has_pending("p")); + assert_eq!(st.interval_for("p"), PENDING_INTERVAL_MS); + + st.disconnect_client("a"); + assert!(!st.project_has_pending("p")); + assert_eq!(st.interval_for("p"), BACKGROUND_INTERVAL_MS); + } + + #[test] + fn ttl_evicts_stale_clients_but_exempts_tauri() { + // Keep the launch-seeded Tauri client (last_seen = 0). + let mut st = PollState::new(); + st.set_focus("web", true, 0); + st.set_foreground("web", Some("p".into()), 0); + assert!(st.is_foreground("p")); + + // Sweep well past the TTL relative to last_seen = 0. + st.evict_stale_clients(CLIENT_TTL_MS + 1, CLIENT_TTL_MS); + + // The stale web client is gone; its interest no longer counts. + assert!(!st.clients.contains_key("web")); + assert!(!st.is_foreground("p")); + // The Tauri client is exempt despite last_seen = 0, and stays focused. + assert!(st.clients.contains_key(TAURI_CLIENT_ID)); + assert!(st.any_focused()); + } + + #[test] + fn touch_keeps_client_alive() { + let mut st = empty_state(); + st.set_focus("web", true, 0); + + // A heartbeat at the TTL boundary keeps the client alive through a sweep + // at the same instant. + let now = CLIENT_TTL_MS + 10; + st.touch("web", now); + st.evict_stale_clients(now, CLIENT_TTL_MS); + assert!(st.clients.contains_key("web")); + assert!(st.any_focused()); + + // Without a further touch, it is evicted once the TTL elapses past the + // last heartbeat. + st.evict_stale_clients(now + CLIENT_TTL_MS + 1, CLIENT_TTL_MS); + assert!(!st.clients.contains_key("web")); + } + + #[test] + fn forced_is_global_and_survives_disconnect() { + let mut st = empty_state(); + st.last_polled_at.insert("p".into(), 0); + + // Client A nudges a refresh, then disconnects. + st.force("p".into()); + st.disconnect_client("a"); + + // `forced` is project-keyed and global, so it outlives the forcing + // client and still bypasses the focus pause. + assert!(!st.any_focused()); + assert_eq!(st.due(&ids(&["p"]), 1_000, &HashSet::new()), ids(&["p"])); + } + + #[test] + fn single_client_equivalence_to_phase1() { + // One seeded Tauri client reproduces Phase 1's tier and pause behaviour. + let mut st = PollState::new(); + st.set_foreground(TAURI_CLIENT_ID, Some("sel".into()), 0); + st.set_branch_pending(TAURI_CLIENT_ID, "b1".into(), "pend".into(), true, 0); + for id in ["sel", "pend", "bg"] { + st.last_polled_at.insert(id.into(), 0); + } + let projects = ids(&["sel", "pend", "bg"]); + let none = HashSet::new(); + + assert_eq!( + st.due(&projects, PENDING_INTERVAL_MS, &none), + ids(&["pend"]) + ); + assert_eq!( + st.due(&projects, SELECTED_INTERVAL_MS, &none), + ids(&["sel", "pend"]) + ); + assert_eq!( + st.due(&projects, BACKGROUND_INTERVAL_MS, &none), + ids(&["sel", "pend", "bg"]) + ); + + // Unfocusing the lone client pauses periodic polling. + st.set_focus(TAURI_CLIENT_ID, false, 0); + assert!(st + .due(&projects, BACKGROUND_INTERVAL_MS * 10, &none) + .is_empty()); + } +} diff --git a/apps/staged/src-tauri/src/prs.rs b/apps/staged/src-tauri/src/prs.rs index 9cf1af050..667872215 100644 --- a/apps/staged/src-tauri/src/prs.rs +++ b/apps/staged/src-tauri/src/prs.rs @@ -824,7 +824,20 @@ pub async fn refresh_pr_status( Ok(()) } +/// Max concurrent PR-status fetches inside a single project refresh. Each fetch +/// spawns a `gh` subprocess + GitHub round-trip, so we cap the fan-out to avoid +/// a subprocess thundering herd while still resolving a project's PRs in ~1 +/// round-trip's wall-clock instead of N (fully serial). The backend PR-poll +/// scheduler reuses this cap for a single pool shared across all the projects +/// it refreshes (see `pr_poll_scheduler`). +pub(crate) const PR_REFRESH_CONCURRENCY: usize = 6; + /// Refresh PR status for all branches in a project. +/// +/// Thin command wrapper around [`refresh_project_pr_statuses`]; the same core is +/// also driven on a cadence by the backend PR-poll scheduler. Each command call +/// gets its own bounded pool (the scheduler instead shares one pool across +/// projects). #[tauri::command(rename_all = "camelCase")] pub async fn refresh_all_pr_statuses( store: tauri::State<'_, Mutex>>>, @@ -832,23 +845,46 @@ pub async fn refresh_all_pr_statuses( project_id: String, ) -> Result { let store = get_store(&store)?; + let semaphore = Arc::new(tokio::sync::Semaphore::new(PR_REFRESH_CONCURRENCY)); + refresh_project_pr_statuses(&store, &app_handle, &project_id, semaphore).await +} + +/// Core implementation shared by the `refresh_all_pr_statuses` command and the +/// backend PR-poll scheduler. +/// +/// Fans the per-branch fetches out across the bounded `semaphore` pool instead +/// of awaiting them one at a time. Repo resolution is a cheap local DB read so +/// it stays on this task; only the network fetch + DB write + per-branch +/// `pr-status-changed` emit move into the spawned tasks, gated by the semaphore. +/// A final `pr-statuses-refreshed` event is emitted and the number of branches +/// refreshed is returned. +/// +/// The semaphore is passed in (rather than created per call) so the scheduler +/// can share a single pool across every project it refreshes — a tick that +/// finds many projects due still caps total concurrent `gh` subprocesses. +pub(crate) async fn refresh_project_pr_statuses( + store: &Arc, + app_handle: &tauri::AppHandle, + project_id: &str, + semaphore: Arc, +) -> Result { let project = store - .get_project(&project_id) + .get_project(project_id) .map_err(|e| e.to_string())? .ok_or_else(|| format!("Project not found: {project_id}"))?; let branches = store - .list_branches_for_project(&project_id) + .list_branches_for_project(project_id) .map_err(|e| e.to_string())?; let branches_with_prs: Vec<_> = branches .into_iter() .filter(|b| b.pr_number.is_some()) .collect(); - let mut refreshed_count = 0u32; + let mut tasks = Vec::new(); for branch in branches_with_prs { let pr_number = branch.pr_number.unwrap(); - let github_repo = match resolve_branch_repo_and_subpath(&store, &project, &branch) { + let github_repo = match resolve_branch_repo_and_subpath(store, &project, &branch) { Ok((repo, _)) => repo, Err(e) => { log::warn!( @@ -861,64 +897,125 @@ pub async fn refresh_all_pr_statuses( } }; - let pr_result = { - let github_repo = github_repo.clone(); - tauri::async_runtime::spawn_blocking(move || { + let store = Arc::clone(store); + let app_handle = app_handle.clone(); + let semaphore = Arc::clone(&semaphore); + let branch_id = branch.id.clone(); + + tasks.push(tauri::async_runtime::spawn(async move { + // Hold a permit for the whole fetch so no more than + // PR_REFRESH_CONCURRENCY `gh` round-trips are in flight at once. + let _permit = semaphore + .acquire_owned() + .await + .map_err(|e| format!("refresh_project_pr_statuses semaphore closed: {e}"))?; + + let pr_result = tauri::async_runtime::spawn_blocking(move || { git::fetch_pr_status_for_repo(&github_repo, pr_number) }) .await - .map_err(|e| format!("refresh_all_pr_statuses task failed: {e}"))? - }; - match pr_result { - Ok(pr_status) => { - let mergeable = pr_status.mergeable == "MERGEABLE"; - let pr_fetched_at = store::now_timestamp(); - - if let Err(e) = store.update_branch_pr_status( - &branch.id, - Some(pr_status.state.clone()), - Some(pr_status.checks_summary.state.clone()), - pr_status.review_decision.clone(), - Some(mergeable), - Some(pr_status.is_draft), - None, - None, - pr_status.head_sha.clone(), - ) { - log::warn!("Failed to update PR status for branch {}: {}", branch.id, e); - continue; + .map_err(|e| format!("refresh_project_pr_statuses task failed: {e}"))?; + + match pr_result { + Ok(pr_status) => { + let mergeable = pr_status.mergeable == "MERGEABLE"; + let pr_fetched_at = store::now_timestamp(); + + if let Err(e) = store.update_branch_pr_status( + &branch_id, + Some(pr_status.state.clone()), + Some(pr_status.checks_summary.state.clone()), + pr_status.review_decision.clone(), + Some(mergeable), + Some(pr_status.is_draft), + None, + None, + pr_status.head_sha.clone(), + ) { + log::warn!("Failed to update PR status for branch {}: {}", branch_id, e); + return Ok::(false); + } + + crate::web_server::emit_to_all( + &app_handle, + "pr-status-changed", + PrStatusEvent { + branch_id: branch_id.clone(), + pr_state: pr_status.state, + pr_checks_status: pr_status.checks_summary.state, + pr_review_decision: pr_status.review_decision, + pr_mergeable: mergeable, + pr_draft: pr_status.is_draft, + pr_head_sha: pr_status.head_sha, + pr_fetched_at, + failed_checks: pr_status.failed_checks, + }, + ); + + Ok(true) + } + Err(e) => { + log::warn!( + "Failed to fetch PR status for branch {} (PR #{}): {}", + branch_id, + pr_number, + e + ); + Ok(false) } + } + })); + } - refreshed_count += 1; - - crate::web_server::emit_to_all( - &app_handle, - "pr-status-changed", - PrStatusEvent { - branch_id: branch.id.clone(), - pr_state: pr_status.state, - pr_checks_status: pr_status.checks_summary.state, - pr_review_decision: pr_status.review_decision, - pr_mergeable: mergeable, - pr_draft: pr_status.is_draft, - pr_head_sha: pr_status.head_sha, - pr_fetched_at, - failed_checks: pr_status.failed_checks, - }, - ); + let refreshed_count = collect_branch_refresh_results(tasks).await?; + + crate::web_server::emit_to_all(app_handle, "pr-statuses-refreshed", project_id); + + Ok(refreshed_count) +} + +async fn collect_branch_refresh_results(tasks: Vec) -> Result +where + T: std::future::Future, E>>, + E: std::fmt::Display, +{ + let mut refreshed_count = 0u32; + let mut failed_branch_count = 0u32; + let mut task_errors = Vec::new(); + + for task in tasks { + match task.await { + Ok(Ok(refreshed)) => { + if refreshed { + refreshed_count += 1; + } else { + failed_branch_count += 1; + } + } + Ok(Err(e)) => { + log::warn!("PR status refresh task failed: {e}"); + task_errors.push(e); } Err(e) => { - log::warn!( - "Failed to fetch PR status for branch {} (PR #{}): {}", - branch.id, - pr_number, - e - ); + let message = format!("PR status refresh task join failed: {e}"); + log::warn!("{message}"); + task_errors.push(message); } } } - crate::web_server::emit_to_all(&app_handle, "pr-statuses-refreshed", &project_id); + if refreshed_count == 0 && (failed_branch_count > 0 || !task_errors.is_empty()) { + let mut errors = Vec::new(); + if failed_branch_count > 0 { + errors.push(format!("{failed_branch_count} branch refreshes failed")); + } + errors.extend(task_errors); + + return Err(format!( + "all PR status refresh tasks failed: {}", + errors.join("; ") + )); + } Ok(refreshed_count) } @@ -1259,6 +1356,53 @@ mod tests { } } + #[tokio::test] + async fn collect_branch_refresh_results_tolerates_partial_task_failure() { + let tasks = vec![ + tokio::spawn(async { Ok::(true) }), + tokio::spawn(async { + panic!("simulated branch task panic"); + #[allow(unreachable_code)] + Ok::(true) + }), + tokio::spawn(async { Ok::(false) }), + ]; + + let refreshed_count = collect_branch_refresh_results(tasks).await.unwrap(); + + assert_eq!(refreshed_count, 1); + } + + #[tokio::test] + async fn collect_branch_refresh_results_fails_when_all_tasks_fail() { + let tasks = vec![ + tokio::spawn(async { Err::("simulated semaphore failure".to_string()) }), + tokio::spawn(async { + panic!("simulated branch task panic"); + #[allow(unreachable_code)] + Ok::(true) + }), + ]; + + let err = collect_branch_refresh_results(tasks).await.unwrap_err(); + + assert!(err.contains("all PR status refresh tasks failed")); + assert!(err.contains("simulated semaphore failure")); + } + + #[tokio::test] + async fn collect_branch_refresh_results_fails_when_all_branches_fail() { + let tasks = vec![ + tokio::spawn(async { Ok::(false) }), + tokio::spawn(async { Ok::(false) }), + ]; + + let err = collect_branch_refresh_results(tasks).await.unwrap_err(); + + assert!(err.contains("all PR status refresh tasks failed")); + assert!(err.contains("2 branch refreshes failed")); + } + #[test] fn create_pr_pipeline_fetches_base_and_uses_origin_merge_base_for_context() { let steps = build_create_pr_pipeline_steps("pull request", "main", "", "feature-branch"); diff --git a/apps/staged/src-tauri/src/store/branches.rs b/apps/staged/src-tauri/src/store/branches.rs index 09c3267de..00d7e8fe0 100644 --- a/apps/staged/src-tauri/src/store/branches.rs +++ b/apps/staged/src-tauri/src/store/branches.rs @@ -82,6 +82,13 @@ impl Store { rows.collect::, _>>().map_err(Into::into) } + pub fn list_branch_ids(&self) -> Result, StoreError> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare("SELECT id FROM branches")?; + let rows = stmt.query_map([], |row| row.get::<_, String>(0))?; + rows.collect::, _>>().map_err(Into::into) + } + pub fn update_branch_base(&self, id: &str, base_branch: &str) -> Result<(), StoreError> { let conn = self.conn.lock().unwrap(); conn.execute( diff --git a/apps/staged/src-tauri/src/store/tests.rs b/apps/staged/src-tauri/src/store/tests.rs index ca1a593c0..c1a116eae 100644 --- a/apps/staged/src-tauri/src/store/tests.rs +++ b/apps/staged/src-tauri/src/store/tests.rs @@ -1,5 +1,6 @@ //! Tests for the store module. +use std::collections::HashSet; use std::path::Path; use super::models::*; @@ -311,6 +312,24 @@ fn test_list_branches_includes_both_types() { assert_eq!(remote_branch.workspace_name.as_deref(), Some("ws-1")); } +#[test] +fn test_list_branch_ids() { + let store = Store::in_memory().unwrap(); + let project = Project::new("test-owner/test-repo"); + store.create_project(&project).unwrap(); + + let local = Branch::new(&project.id, "local-feature", "main"); + let remote = Branch::new_remote(&project.id, "remote-feature", "main", "ws-1"); + store.create_branch(&local).unwrap(); + store.create_branch(&remote).unwrap(); + + let branch_ids: HashSet = store.list_branch_ids().unwrap().into_iter().collect(); + assert_eq!( + branch_ids, + HashSet::from([local.id.clone(), remote.id.clone()]) + ); +} + #[test] fn test_session_lifecycle() { let store = Store::in_memory().unwrap(); diff --git a/apps/staged/src/App.svelte b/apps/staged/src/App.svelte index d5c237946..7b756e918 100644 --- a/apps/staged/src/App.svelte +++ b/apps/staged/src/App.svelte @@ -45,7 +45,6 @@ import { listenForSessionStatus } from './lib/listeners/sessionStatusListener'; import { darkMode } from './lib/stores/isDark.svelte'; import * as prPollingService from './lib/services/prPollingService'; - import { projectsList } from './lib/features/projects/projectsSidebarState.svelte'; import { reposUiEnabled } from './lib/featureFlags'; import type { StoreIncompatibility } from './lib/types'; @@ -68,12 +67,11 @@ let storeError = $state(null); // ========================================================================= - // App-wide PR polling — sync project list and selected project reactively + // App-wide PR polling — the backend scheduler owns cadence/concurrency and + // derives the project list from the DB. The frontend only forwards the + // selected project as an interest hint (focus + lifecycle wiring is in + // prPollingService.init(), called from onMount). // ========================================================================= - $effect(() => { - prPollingService.setProjects(projectsList.current.map((p) => p.id)); - }); - $effect(() => { prPollingService.setSelectedProject(navigation.selectedProjectId); }); @@ -226,6 +224,8 @@ onMount(async () => { darkMode.init(); + // Wire up PR-polling interest hints (window focus + backend lifecycle events). + prPollingService.init(); document.addEventListener('keydown', handleKonamiKey); // Listen for the app menu Preferences item. @@ -412,6 +412,7 @@ }); onDestroy(() => { + prPollingService.dispose(); document.removeEventListener('keydown', handleKonamiKey); unregisterShortcuts?.(); unlistenSettings?.(); diff --git a/apps/staged/src/lib/commands.ts b/apps/staged/src/lib/commands.ts index ad973231b..99b1b8a85 100644 --- a/apps/staged/src/lib/commands.ts +++ b/apps/staged/src/lib/commands.ts @@ -1071,6 +1071,48 @@ export function refreshAllPrStatuses(projectId: string): Promise { return invokeCommand('refresh_all_pr_statuses', { projectId }); } +// --------------------------------------------------------------------------- +// PR poll scheduler — interest/hint commands +// +// The backend owns PR-polling cadence and concurrency. These commands feed it +// interest hints; the cadence, dedup, and failure backoff all live in the +// backend scheduler (see src-tauri/src/pr_poll_scheduler.rs). +// +// Interest is tracked per connected client, so every hint carries a `clientId` +// (see prPollingService.ts). The same id must also be sent on the WS connect +// query so the backend can correlate interest with disconnect. +// --------------------------------------------------------------------------- + +/** Tell the backend which project this client has foregrounded/selected (→ selected tier). */ +export function setForegroundProject(clientId: string, projectId: string | null): Promise { + return invokeCommand('set_foreground_project', { clientId, projectId }); +} + +/** Report this client's window focus to the backend. No focused client ⇒ polling pauses. */ +export function setPrPollFocus(clientId: string, focused: boolean): Promise { + return invokeCommand('set_focus', { clientId, focused }); +} + +/** Mark whether a branch has pending CI checks for this client (→ pending tier for its project). */ +export function setBranchPending( + clientId: string, + branchId: string, + projectId: string, + pending: boolean +): Promise { + return invokeCommand('set_branch_pending', { clientId, branchId, projectId, pending }); +} + +/** Nudge the backend to refresh a project's PR statuses now (folded into dedup). */ +export function refreshPrStatusesNow(clientId: string, projectId: string): Promise { + return invokeCommand('refresh_now', { clientId, projectId }); +} + +/** Tell the backend this client has disconnected so its interest is dropped. */ +export function disconnectPrPollClient(clientId: string): Promise { + return invokeCommand('disconnect_client', { clientId }); +} + // ============================================================================= // Images // ============================================================================= diff --git a/apps/staged/src/lib/features/projects/ProjectHome.svelte b/apps/staged/src/lib/features/projects/ProjectHome.svelte index 1107088b0..f3def0c6a 100644 --- a/apps/staged/src/lib/features/projects/ProjectHome.svelte +++ b/apps/staged/src/lib/features/projects/ProjectHome.svelte @@ -158,28 +158,55 @@ } ); - // Listen for PR status changes to update branch state - const unlistenPrStatus = listenToEvent('pr-status-changed', (payload) => { - // Find the project that contains this branch and update it - for (const [projectId, branches] of branchesByProject.entries()) { - const branchIndex = branches.findIndex((b) => b.id === payload.branchId); - if (branchIndex !== -1) { - // Update the branch with new PR status - const updatedBranches = [...branches]; - updatedBranches[branchIndex] = { - ...updatedBranches[branchIndex], - prState: payload.prState, - prChecksStatus: payload.prChecksStatus, - prReviewDecision: payload.prReviewDecision, - prMergeable: payload.prMergeable, - prDraft: payload.prDraft, - prHeadSha: payload.prHeadSha, - prFetchedAt: payload.prFetchedAt, - }; - branchesByProject = new Map(branchesByProject).set(projectId, updatedBranches); - break; + // Listen for PR status changes to update branch state. + // + // A PR-polling cycle emits one `pr-status-changed` per branch, so a storm + // of N branches arrives as N separate events. Rebuilding `branchesByProject` + // with a fresh `new Map(...)` per event means N allocations + N derivation + // re-runs, which can pile up on the main thread during a project switch. + // Buffer the events and apply a single rebuild per frame so a burst + // coalesces into one reactive flush without dropping any update. + let pendingPrStatusEvents: PrStatusChangedEvent[] = []; + let prStatusFlushHandle: number | null = null; + + const flushPrStatusEvents = () => { + prStatusFlushHandle = null; + if (pendingPrStatusEvents.length === 0) return; + const events = pendingPrStatusEvents; + pendingPrStatusEvents = []; + + // Apply every buffered event onto one fresh Map. Each event re-scans the + // in-progress map, so multiple updates to the same project compound + // correctly instead of clobbering one another. + const next = new Map(branchesByProject); + for (const payload of events) { + for (const [projectId, branches] of next) { + const branchIndex = branches.findIndex((b) => b.id === payload.branchId); + if (branchIndex !== -1) { + const updatedBranches = [...branches]; + updatedBranches[branchIndex] = { + ...updatedBranches[branchIndex], + prState: payload.prState, + prChecksStatus: payload.prChecksStatus, + prReviewDecision: payload.prReviewDecision, + prMergeable: payload.prMergeable, + prDraft: payload.prDraft, + prHeadSha: payload.prHeadSha, + prFetchedAt: payload.prFetchedAt, + }; + next.set(projectId, updatedBranches); + break; + } } } + branchesByProject = next; + }; + + const unlistenPrStatus = listenToEvent('pr-status-changed', (payload) => { + pendingPrStatusEvents.push(payload); + if (prStatusFlushHandle === null) { + prStatusFlushHandle = requestAnimationFrame(flushPrStatusEvents); + } }); // Refresh a project's branches when a commit session completes so the @@ -205,6 +232,11 @@ unlistenDetection(); unlistenProjectRepoAdded(); unlistenPrStatus(); + if (prStatusFlushHandle !== null) { + cancelAnimationFrame(prStatusFlushHandle); + prStatusFlushHandle = null; + } + pendingPrStatusEvents = []; unlistenSessionStatus(); workspaceLifecycle.stop(); projectRunActionsStore.stopListening(); diff --git a/apps/staged/src/lib/services/prPollingService.ts b/apps/staged/src/lib/services/prPollingService.ts index 0683e7321..6e2200d02 100644 --- a/apps/staged/src/lib/services/prPollingService.ts +++ b/apps/staged/src/lib/services/prPollingService.ts @@ -1,14 +1,61 @@ /** - * Centralized PR status polling service. + * PR status polling — frontend interest/hint layer. * - * Polls all projects app-wide. The selected project polls more frequently - * than background projects, and projects with pending CI checks poll fastest. + * The backend owns PR-polling cadence and concurrency (see + * `src-tauri/src/pr_poll_scheduler.rs`): it derives the project list from the + * DB, decides what is due across the pending/selected/background tiers, dedups + * in-flight work, and backs off failures — all on a single bounded pool. * - * The backend's `refreshAllPrStatuses` already emits per-branch - * `pr-status-changed` events, so components only need to listen for those. + * This module is now a thin shim that: + * - forwards UI interest to the backend as hints (selected project, pending + * checks, window focus, explicit refresh nudges); and + * - re-broadcasts the backend's per-project refresh/stale lifecycle events to + * local subscribers, preserving the `onRefreshing` / `onStale` / + * `isRefreshing` API that `BranchCardPrButton` relies on. + * + * The backend already emits per-branch `pr-status-changed` and a final + * `pr-statuses-refreshed`; components subscribe to those directly. */ -import { refreshAllPrStatuses } from '../commands'; +import { isTauri, listenToEvent, type UnlistenFn } from '../transport'; +import { + setForegroundProject, + setPrPollFocus, + setBranchPending, + refreshPrStatusesNow, + disconnectPrPollClient, +} from '../commands'; + +// --------------------------------------------------------------------------- +// Client identity +// --------------------------------------------------------------------------- +// +// The backend tracks PR-poll interest per connected client and unions the +// cadence across them (see src-tauri/src/pr_poll_scheduler.rs). This module +// owns a stable id for the lifetime of the page that is threaded through every +// interest hint. +// +// - Native (Tauri): the fixed well-known id `tauri-main` (must match +// `TAURI_CLIENT_ID` in pr_poll_scheduler.rs), so single-client behaviour is +// identical to before per-client interest existed. +// - Web (browser): a fresh UUID per page load (per tab). The same value must +// be appended to the WS connect URL (`?clientId=`) by the web transport +// so the backend correlates this client's interest (invoke channel) with +// its disconnect (WS close). See `getPrPollClientId`. + +const TAURI_CLIENT_ID = 'tauri-main'; + +const clientId: string = isTauri ? TAURI_CLIENT_ID : crypto.randomUUID(); + +/** + * This client's PR-poll id, stable for the page's lifetime. Exposed so the web + * transport can append the *same* id to the events WebSocket connect query + * (`?clientId=`) — the backend↔frontend contract requires the same id on + * both the invoke and WS channels. + */ +export function getPrPollClientId(): string { + return clientId; +} // --------------------------------------------------------------------------- // Types @@ -17,148 +64,37 @@ import { refreshAllPrStatuses } from '../commands'; type StaleCallback = (projectId: string, isStale: boolean) => void; type RefreshingCallback = (projectId: string, isRefreshing: boolean) => void; -// --------------------------------------------------------------------------- -// Intervals -// --------------------------------------------------------------------------- +interface PrRefreshStateEvent { + projectId: string; + refreshing: boolean; +} -const PENDING_INTERVAL = 15_000; // any project with pending CI checks -const SELECTED_INTERVAL = 60_000; // selected project, no pending checks -const BACKGROUND_INTERVAL = 5 * 60_000; // non-selected, no pending checks -const MAX_CONSECUTIVE_FAILURES = 3; +interface PrStaleEvent { + projectId: string; + stale: boolean; +} // --------------------------------------------------------------------------- // State // --------------------------------------------------------------------------- -/** All project IDs to poll. */ -const allProjectIds = new Set(); - -/** Currently selected (viewed) project. */ -let selectedProjectId: string | null = null; - -/** Branches with pending checks, keyed by branchId → projectId. */ -const pendingBranches = new Map(); - -/** When each project was last successfully polled. */ -const lastPolledAt = new Map(); - -/** Consecutive failure count per projectId. */ -const failures = new Map(); - /** Registered stale-data callbacks. */ const staleCallbacks = new Set(); /** Registered refresh-state callbacks. */ const refreshingCallbacks = new Set(); -/** Projects currently being refreshed. */ +/** Projects the backend currently reports as refreshing. */ const refreshingProjects = new Set(); -let timerId: ReturnType | null = null; -let refreshInFlight = false; -let windowFocused = true; -let listenersAttached = false; - -/** Project IDs queued for immediate refresh while another refresh is in-flight. */ -const pendingRefreshProjectIds = new Set(); +let initialized = false; +let unlistenRefreshState: UnlistenFn | null = null; +let unlistenStale: UnlistenFn | null = null; // --------------------------------------------------------------------------- // Internal helpers // --------------------------------------------------------------------------- -function projectHasPendingChecks(projectId: string): boolean { - for (const pId of pendingBranches.values()) { - if (pId === projectId) return true; - } - return false; -} - -function getProjectInterval(projectId: string): number { - if (projectHasPendingChecks(projectId)) return PENDING_INTERVAL; - if (projectId === selectedProjectId) return SELECTED_INTERVAL; - return BACKGROUND_INTERVAL; -} - -/** Return project IDs whose polling interval has elapsed. */ -function getProjectsDue(): string[] { - const now = Date.now(); - const due: string[] = []; - for (const projectId of allProjectIds) { - const interval = getProjectInterval(projectId); - const last = lastPolledAt.get(projectId) ?? 0; - if (now - last >= interval) { - due.push(projectId); - } - } - return due; -} - -async function poll() { - if (refreshInFlight || !windowFocused || allProjectIds.size === 0) { - // Don't reschedule here — the in-flight operation's `finally` block - // already calls scheduleNext(), and the other two cases (unfocused / - // empty) intentionally have no timer running. - return; - } - - refreshInFlight = true; - const due = getProjectsDue(); - - for (const projectId of due) { - setProjectRefreshing(projectId, true); - try { - await refreshAllPrStatuses(projectId); - lastPolledAt.set(projectId, Date.now()); - // Reset failure counter on success - const prev = failures.get(projectId) ?? 0; - if (prev > 0) { - failures.set(projectId, 0); - notifyStale(projectId, false); - } - } catch (e) { - const count = (failures.get(projectId) ?? 0) + 1; - failures.set(projectId, count); - console.error( - `[PrPollingService] refreshAllPrStatuses failed for project=${projectId} (attempt ${count}):`, - e - ); - if (count === MAX_CONSECUTIVE_FAILURES) { - notifyStale(projectId, true); - } - } finally { - setProjectRefreshing(projectId, false); - } - } - - refreshInFlight = false; - scheduleNext(); -} - -function scheduleNext() { - stopTimer(); - if (allProjectIds.size === 0 || !windowFocused) return; - - const now = Date.now(); - let minDelay = Infinity; - for (const projectId of allProjectIds) { - const interval = getProjectInterval(projectId); - const last = lastPolledAt.get(projectId) ?? 0; - const remaining = Math.max(0, interval - (now - last)); - minDelay = Math.min(minDelay, remaining); - } - - if (!Number.isFinite(minDelay)) return; - // Floor at 1s to avoid tight loops - timerId = setTimeout(poll, Math.max(1_000, minDelay)); -} - -function stopTimer() { - if (timerId !== null) { - clearTimeout(timerId); - timerId = null; - } -} - function notifyStale(projectId: string, isStale: boolean) { for (const cb of staleCallbacks) { try { @@ -196,88 +132,70 @@ function setProjectRefreshing(projectId: string, isRefreshing: boolean) { // --------------------------------------------------------------------------- function handleFocus() { - windowFocused = true; - poll(); + void setPrPollFocus(clientId, true).catch(() => {}); } function handleBlur() { - windowFocused = false; - stopTimer(); + void setPrPollFocus(clientId, false).catch(() => {}); } -function ensureWindowListeners() { - if (listenersAttached) return; +// --------------------------------------------------------------------------- +// Lifecycle +// --------------------------------------------------------------------------- + +/** + * Wire up the interest/hint layer: forward window focus to the backend and + * subscribe to its refresh/stale lifecycle events. Idempotent; call once at app + * start. No-op in web mode (the transport is stubbed in this build). + */ +export function init(): void { + if (initialized || !isTauri) return; + initialized = true; + window.addEventListener('focus', handleFocus); window.addEventListener('blur', handleBlur); - listenersAttached = true; + // Seed the backend with the current focus state (it defaults to focused, so + // the initial poll already ran; this corrects it if we launched unfocused). + void setPrPollFocus(clientId, document.hasFocus()).catch(() => {}); + + unlistenRefreshState = listenToEvent('pr-refresh-state', (payload) => { + setProjectRefreshing(payload.projectId, payload.refreshing); + }); + unlistenStale = listenToEvent('pr-status-stale', (payload) => { + notifyStale(payload.projectId, payload.stale); + }); } -function removeWindowListeners() { - if (!listenersAttached) return; +/** Tear down listeners and subscriptions. */ +export function dispose(): void { + if (!initialized) return; + initialized = false; + window.removeEventListener('focus', handleFocus); window.removeEventListener('blur', handleBlur); - listenersAttached = false; -} - -// --------------------------------------------------------------------------- -// Public API -// --------------------------------------------------------------------------- + unlistenRefreshState?.(); + unlistenStale?.(); + unlistenRefreshState = null; + unlistenStale = null; -/** Set the full list of project IDs to poll. Starts/stops polling as needed. */ -export function setProjects(projectIds: string[]): void { - const newIds = new Set(projectIds); + // Drop this client's interest so the backend recomputes the union without it. + void disconnectPrPollClient(clientId).catch(() => {}); - // Short-circuit if the set of project IDs hasn't changed - if (newIds.size === allProjectIds.size && projectIds.every((id) => allProjectIds.has(id))) { - return; - } - - // Remove projects no longer in the list - for (const id of allProjectIds) { - if (!newIds.has(id)) { - allProjectIds.delete(id); - lastPolledAt.delete(id); - failures.delete(id); - setProjectRefreshing(id, false); - } - } - - // Clean up pending branches for removed projects - for (const [branchId, projectId] of pendingBranches) { - if (!newIds.has(projectId)) { - pendingBranches.delete(branchId); - } - } - - // Add new projects - for (const id of newIds) { - allProjectIds.add(id); - } - - if (allProjectIds.size > 0) { - ensureWindowListeners(); - // Trigger poll — new projects have no lastPolledAt so they'll be due - poll(); - } else { - stopTimer(); - removeWindowListeners(); - failures.clear(); - for (const projectId of [...refreshingProjects]) { - setProjectRefreshing(projectId, false); - } + for (const projectId of [...refreshingProjects]) { + setProjectRefreshing(projectId, false); } } +// --------------------------------------------------------------------------- +// Public API — interest hints (forwarded to the backend scheduler) +// --------------------------------------------------------------------------- + /** Set the currently selected project (polls more frequently). */ export function setSelectedProject(projectId: string | null): void { - if (selectedProjectId === projectId) return; - selectedProjectId = projectId; - if (projectId && allProjectIds.has(projectId)) { - // Selected project's interval just changed — trigger a poll if it's due - poll(); - } else { - scheduleNext(); - } + if (!isTauri) return; + void setForegroundProject(clientId, projectId).catch((e) => + console.error('[PrPollingService] set_foreground_project failed:', e) + ); } /** Update whether a branch has pending CI checks (affects its project's poll interval). */ @@ -286,17 +204,24 @@ export function updateChecksStatus( projectId: string, hasPendingChecks: boolean ): void { - const hadPending = pendingBranches.has(branchId); - if (hasPendingChecks) { - pendingBranches.set(branchId, projectId); - } else { - pendingBranches.delete(branchId); - } - if (hadPending !== hasPendingChecks) { - scheduleNext(); - } + if (!isTauri) return; + void setBranchPending(clientId, branchId, projectId, hasPendingChecks).catch((e) => + console.error('[PrPollingService] set_branch_pending failed:', e) + ); } +/** Trigger an immediate refresh for a specific project (e.g. after PR creation or push). */ +export function refreshNow(projectId: string): void { + if (!isTauri) return; + void refreshPrStatusesNow(clientId, projectId).catch((e) => + console.error(`[PrPollingService] refresh_now failed for project=${projectId}:`, e) + ); +} + +// --------------------------------------------------------------------------- +// Public API — UI state subscriptions (driven by backend lifecycle events) +// --------------------------------------------------------------------------- + /** Register a callback for stale-data notifications. Returns an unsubscribe function. */ export function onStale(callback: StaleCallback): () => void { staleCallbacks.add(callback); @@ -317,7 +242,7 @@ export function isRefreshing(projectId: string): boolean { } // --------------------------------------------------------------------------- -// PR recovery coordination +// PR recovery coordination (frontend-only dedup, unchanged) // --------------------------------------------------------------------------- /** Branch IDs for which recovery has already been attempted (or is in progress). */ @@ -343,43 +268,3 @@ export function shouldAttemptRecovery(branchId: string): boolean { export function clearRecoveryAttempt(branchId: string): void { recoveryAttempted.delete(branchId); } - -/** Trigger an immediate refresh for a specific project (e.g. after PR creation or push). */ -export function refreshNow(projectId: string): void { - if (refreshInFlight) { - // Queue so the project is refreshed as soon as the current operation finishes. - pendingRefreshProjectIds.add(projectId); - return; - } - refreshInFlight = true; - setProjectRefreshing(projectId, true); - refreshAllPrStatuses(projectId) - .then(() => { - lastPolledAt.set(projectId, Date.now()); - // Reset failure counter on success - const prev = failures.get(projectId) ?? 0; - if (prev > 0) { - failures.set(projectId, 0); - notifyStale(projectId, false); - } - }) - .catch((e) => - console.error(`[PrPollingService] immediate refresh failed for project=${projectId}:`, e) - ) - .finally(() => { - setProjectRefreshing(projectId, false); - refreshInFlight = false; - // Drain queued immediate-refresh requests one at a time. - if (pendingRefreshProjectIds.size > 0) { - const queued = [...pendingRefreshProjectIds]; - pendingRefreshProjectIds.clear(); - // Re-queue all but the first; they'll drain on the next finally cycle. - for (let i = 1; i < queued.length; i++) { - pendingRefreshProjectIds.add(queued[i]); - } - refreshNow(queued[0]); - } else { - scheduleNext(); - } - }); -}