From b303a35d765c55869acb98b1ad3cc4d7ae1f4685 Mon Sep 17 00:00:00 2001 From: QuartzLibrary <81446760+QuartzLibrary@users.noreply.github.com> Date: Sat, 27 Sep 2025 21:41:01 +0100 Subject: [PATCH 1/3] feat: `ImmediateEffect::new_mut_scoped` --- reactive_graph/src/effect/immediate.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/reactive_graph/src/effect/immediate.rs b/reactive_graph/src/effect/immediate.rs index b9f752bb01..0f4e49ea23 100644 --- a/reactive_graph/src/effect/immediate.rs +++ b/reactive_graph/src/effect/immediate.rs @@ -95,6 +95,7 @@ impl ImmediateEffect { /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. /// /// NOTE: this requires a `Fn` function because it might recurse. + /// Use [Self::new_mut_scoped] to pass a `FnMut` function, it'll panic on recursion. /// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed. #[track_caller] pub fn new_scoped(fun: impl Fn() + Send + Sync + 'static) { @@ -102,6 +103,18 @@ impl ImmediateEffect { on_cleanup(move || effect.dispose()); } + /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// + /// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed. + /// + /// # Panics + /// Panics on recursion or if triggered in parallel. Also see [Self::new_scoped] + #[track_caller] + pub fn new_mut_scoped(fun: impl FnMut() + Send + Sync + 'static) { + let effect = Self::new_mut(fun); + + on_cleanup(move || effect.dispose()); + } /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. /// From d13936cab5919031203e5ac0fe934fe0a42c0b87 Mon Sep 17 00:00:00 2001 From: QuartzLibrary <81446760+QuartzLibrary@users.noreply.github.com> Date: Sat, 27 Sep 2025 21:42:31 +0100 Subject: [PATCH 2/3] fix: `ImmediateEffect` debug info --- reactive_graph/src/effect/immediate.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reactive_graph/src/effect/immediate.rs b/reactive_graph/src/effect/immediate.rs index 0f4e49ea23..ee82aa34ce 100644 --- a/reactive_graph/src/effect/immediate.rs +++ b/reactive_graph/src/effect/immediate.rs @@ -215,6 +215,8 @@ mod inner { fun: impl Fn() + Send + Sync + 'static, ) -> Arc> { let owner = Owner::new(); + #[cfg(any(debug_assertions, leptos_debuginfo))] + let defined_at = Location::caller(); Arc::new_cyclic(|weak| { let any_subscriber = AnySubscriber( @@ -224,7 +226,7 @@ mod inner { RwLock::new(EffectInner { #[cfg(any(debug_assertions, leptos_debuginfo))] - defined_at: Location::caller(), + defined_at, owner, state: ReactiveNodeState::Dirty, run_count_start: 0, From 7a3556bf34f3b7b24384ccb94cebddb61e780192 Mon Sep 17 00:00:00 2001 From: QuartzLibrary <81446760+QuartzLibrary@users.noreply.github.com> Date: Sat, 27 Sep 2025 22:48:28 +0100 Subject: [PATCH 3/3] feat: `effect::immediate::batch` --- reactive_graph/src/effect/immediate.rs | 56 ++++++++++++++++++++++++ reactive_graph/tests/effect_immediate.rs | 35 +++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/reactive_graph/src/effect/immediate.rs b/reactive_graph/src/effect/immediate.rs index ee82aa34ce..f0d01c3404 100644 --- a/reactive_graph/src/effect/immediate.rs +++ b/reactive_graph/src/effect/immediate.rs @@ -65,6 +65,7 @@ impl Dispose for ImmediateEffect { impl ImmediateEffect { /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// (Unless [batch] is used.) /// /// NOTE: this requires a `Fn` function because it might recurse. /// Use [Self::new_mut] to pass a `FnMut` function, it'll panic on recursion. @@ -82,6 +83,7 @@ impl ImmediateEffect { Self { inner: Some(inner) } } /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// (Unless [batch] is used.) /// /// # Panics /// Panics on recursion or if triggered in parallel. Also see [Self::new] @@ -93,6 +95,7 @@ impl ImmediateEffect { Self::new(move || fun.try_lock().expect(MSG)()) } /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// (Unless [batch] is used.) /// /// NOTE: this requires a `Fn` function because it might recurse. /// Use [Self::new_mut_scoped] to pass a `FnMut` function, it'll panic on recursion. @@ -104,6 +107,7 @@ impl ImmediateEffect { on_cleanup(move || effect.dispose()); } /// Creates a new effect which runs immediately, then again as soon as any tracked signal changes. + /// (Unless [batch] is used.) /// /// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed. /// @@ -143,6 +147,41 @@ impl DefinedAt for ImmediateEffect { } } +/// Defers any [ImmediateEffect]s from running until the end of the function. +/// +/// NOTE: this affects only [ImmediateEffect]s, not other effects. +/// +/// NOTE: this is rarely needed, but it is useful for example when multiple signals +/// need to be updated atomically (for example a double-bound signal tree). +pub fn batch(f: impl FnOnce() -> T) -> T { + struct ExecuteOnDrop; + impl Drop for ExecuteOnDrop { + fn drop(&mut self) { + let effects = { + let mut batch = inner::BATCH.write().or_poisoned(); + batch.take().unwrap().into_inner().expect("lock poisoned") + }; + // TODO: Should we skip the effects if it's panicking? + for effect in effects { + effect.update_if_necessary(); + } + } + } + let mut execute_on_drop = None; + { + let mut batch = inner::BATCH.write().or_poisoned(); + if batch.is_none() { + execute_on_drop = Some(ExecuteOnDrop); + } else { + // Nested batching has no effect. + } + *batch = Some(batch.take().unwrap_or_default()); + } + let ret = f(); + drop(execute_on_drop); + ret +} + mod inner { use crate::{ graph::{ @@ -153,6 +192,7 @@ mod inner { owner::Owner, traits::DefinedAt, }; + use indexmap::IndexSet; use or_poisoned::OrPoisoned; use std::{ panic::Location, @@ -160,6 +200,11 @@ mod inner { thread::{self, ThreadId}, }; + /// Only the [super::batch] function ever writes to the outer RwLock. + /// While the effects will write to the inner one. + pub(super) static BATCH: RwLock>>> = + RwLock::new(None); + /// Handles subscription logic for effects. /// /// To handle parallelism and recursion we assign ordered (1..) ids to each run. @@ -275,6 +320,17 @@ mod inner { ReactiveNodeState::Dirty => true, }; + { + if let Some(batch) = &*BATCH.read().or_poisoned() { + let mut batch = batch.write().or_poisoned(); + let subscriber = + self.read().or_poisoned().any_subscriber.clone(); + + batch.insert(subscriber); + return needs_update; + } + } + if needs_update { let mut guard = self.write().or_poisoned(); diff --git a/reactive_graph/tests/effect_immediate.rs b/reactive_graph/tests/effect_immediate.rs index a5f1cfef63..d0502deafa 100644 --- a/reactive_graph/tests/effect_immediate.rs +++ b/reactive_graph/tests/effect_immediate.rs @@ -225,3 +225,38 @@ fn threaded_chaos_effect() { let values: Vec<_> = signals.iter().map(|s| s.get_untracked()).collect(); println!("FINAL: {values:?}"); } + +#[cfg(feature = "effects")] +#[test] +fn test_batch() { + use imports::*; + use reactive_graph::{effect::batch, owner::StoredValue}; + + let owner = Owner::new(); + owner.set(); + + let a = RwSignal::new(0); + let b = RwSignal::new(0); + + let values = StoredValue::new(Vec::new()); + + ImmediateEffect::new_scoped(move || { + println!("{} = {}", a.get(), b.get()); + values.write_value().push((a.get(), b.get())); + }); + + a.set(1); + b.set(1); + + batch(move || { + a.set(2); + b.set(2); + + batch(move || { + a.set(3); + b.set(3); + }); + }); + + assert_eq!(values.get_value(), vec![(0, 0), (1, 0), (1, 1), (3, 3)]); +}