Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion reactive_graph/src/effect/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl Dispose for ImmediateEffect {

impl ImmediateEffect {
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// NOTE: this requires a `Fn` function because it might recurse.
/// Use [Self::new_mut] to pass a `FnMut` function, it'll panic on recursion.
Expand All @@ -82,6 +83,7 @@ impl ImmediateEffect {
Self { inner: Some(inner) }
}
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// # Panics
/// Panics on recursion or if triggered in parallel. Also see [Self::new]
Expand All @@ -93,15 +95,30 @@ impl ImmediateEffect {
Self::new(move || fun.try_lock().expect(MSG)())
}
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// NOTE: this requires a `Fn` function because it might recurse.
/// Use [Self::new_mut_scoped] to pass a `FnMut` function, it'll panic on recursion.
/// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
#[track_caller]
pub fn new_scoped(fun: impl Fn() + Send + Sync + 'static) {
let effect = Self::new(fun);

on_cleanup(move || effect.dispose());
}
/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
/// (Unless [batch] is used.)
///
/// NOTE: this effect is automatically cleaned up when the current owner is cleared or disposed.
///
/// # Panics
/// Panics on recursion or if triggered in parallel. Also see [Self::new_scoped]
#[track_caller]
pub fn new_mut_scoped(fun: impl FnMut() + Send + Sync + 'static) {
let effect = Self::new_mut(fun);

on_cleanup(move || effect.dispose());
}

/// Creates a new effect which runs immediately, then again as soon as any tracked signal changes.
///
Expand Down Expand Up @@ -130,6 +147,41 @@ impl DefinedAt for ImmediateEffect {
}
}

/// Defers any [ImmediateEffect]s from running until the end of the function.
///
/// NOTE: this affects only [ImmediateEffect]s, not other effects.
///
/// NOTE: this is rarely needed, but it is useful for example when multiple signals
/// need to be updated atomically (for example a double-bound signal tree).
pub fn batch<T>(f: impl FnOnce() -> T) -> T {
struct ExecuteOnDrop;
impl Drop for ExecuteOnDrop {
fn drop(&mut self) {
let effects = {
let mut batch = inner::BATCH.write().or_poisoned();
batch.take().unwrap().into_inner().expect("lock poisoned")
};
// TODO: Should we skip the effects if it's panicking?
for effect in effects {
effect.update_if_necessary();
}
}
}
let mut execute_on_drop = None;
{
let mut batch = inner::BATCH.write().or_poisoned();
if batch.is_none() {
execute_on_drop = Some(ExecuteOnDrop);
} else {
// Nested batching has no effect.
}
*batch = Some(batch.take().unwrap_or_default());
}
let ret = f();
drop(execute_on_drop);
ret
}

mod inner {
use crate::{
graph::{
Expand All @@ -140,13 +192,19 @@ mod inner {
owner::Owner,
traits::DefinedAt,
};
use indexmap::IndexSet;
use or_poisoned::OrPoisoned;
use std::{
panic::Location,
sync::{Arc, RwLock, Weak},
thread::{self, ThreadId},
};

/// Only the [super::batch] function ever writes to the outer RwLock.
/// While the effects will write to the inner one.
pub(super) static BATCH: RwLock<Option<RwLock<IndexSet<AnySubscriber>>>> =
RwLock::new(None);

/// Handles subscription logic for effects.
///
/// To handle parallelism and recursion we assign ordered (1..) ids to each run.
Expand Down Expand Up @@ -202,6 +260,8 @@ mod inner {
fun: impl Fn() + Send + Sync + 'static,
) -> Arc<RwLock<EffectInner>> {
let owner = Owner::new();
#[cfg(any(debug_assertions, leptos_debuginfo))]
let defined_at = Location::caller();

Arc::new_cyclic(|weak| {
let any_subscriber = AnySubscriber(
Expand All @@ -211,7 +271,7 @@ mod inner {

RwLock::new(EffectInner {
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
defined_at,
owner,
state: ReactiveNodeState::Dirty,
run_count_start: 0,
Expand Down Expand Up @@ -260,6 +320,17 @@ mod inner {
ReactiveNodeState::Dirty => true,
};

{
if let Some(batch) = &*BATCH.read().or_poisoned() {
let mut batch = batch.write().or_poisoned();
let subscriber =
self.read().or_poisoned().any_subscriber.clone();

batch.insert(subscriber);
return needs_update;
}
}

if needs_update {
let mut guard = self.write().or_poisoned();

Expand Down
35 changes: 35 additions & 0 deletions reactive_graph/tests/effect_immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,38 @@ fn threaded_chaos_effect() {
let values: Vec<_> = signals.iter().map(|s| s.get_untracked()).collect();
println!("FINAL: {values:?}");
}

#[cfg(feature = "effects")]
#[test]
fn test_batch() {
use imports::*;
use reactive_graph::{effect::batch, owner::StoredValue};

let owner = Owner::new();
owner.set();

let a = RwSignal::new(0);
let b = RwSignal::new(0);

let values = StoredValue::new(Vec::new());

ImmediateEffect::new_scoped(move || {
println!("{} = {}", a.get(), b.get());
values.write_value().push((a.get(), b.get()));
});

a.set(1);
b.set(1);

batch(move || {
a.set(2);
b.set(2);

batch(move || {
a.set(3);
b.set(3);
});
});

assert_eq!(values.get_value(), vec![(0, 0), (1, 0), (1, 1), (3, 3)]);
}
Loading