Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-persistence/src/value_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl ValueBuffer<'_> {
match self {
ValueBuffer::Borrowed(b) => b.into(),
ValueBuffer::Vec(v) => v.into_boxed_slice(),
ValueBuffer::SmallVec(sv) => sv.into_vec().into_boxed_slice(),
ValueBuffer::SmallVec(sv) => sv.into_boxed_slice(),
}
}
}
Expand Down
224 changes: 113 additions & 111 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,17 @@ use crate::{
storage::Storage,
storage_schema::{TaskStorage, TaskStorageAccessors},
},
backing_storage::BackingStorage,
backing_storage::{BackingStorage, SnapshotItem},
data::{
ActivenessState, CellRef, CollectibleRef, CollectiblesRef, Dirtyness, InProgressCellState,
InProgressState, InProgressStateInner, OutputValue, TransientTask,
},
utils::{
arc_or_owned::ArcOrOwned,
chunked_vec::ChunkedVec,
dash_map_drop_contents::drop_contents,
dash_map_raw_entry::{RawEntry, raw_entry},
ptr_eq_arc::PtrEqArc,
shard_amount::compute_shard_amount,
sharded::Sharded,
swap_retain,
},
};

Expand Down Expand Up @@ -164,8 +161,6 @@ pub enum TurboTasksBackendJob {

pub struct TurboTasksBackend<B: BackingStorage>(Arc<TurboTasksBackendInner<B>>);

type TaskCacheLog = Sharded<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>;

struct TurboTasksBackendInner<B: BackingStorage> {
options: BackendOptions,

Expand All @@ -174,7 +169,6 @@ struct TurboTasksBackendInner<B: BackingStorage> {
persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
transient_task_id_factory: IdFactoryWithReuse<TaskId>,

persisted_task_cache_log: Option<TaskCacheLog>,
task_cache: FxDashMap<Arc<CachedTaskType>, TaskId>,

storage: Storage,
Expand Down Expand Up @@ -231,10 +225,6 @@ impl<B: BackingStorage> TurboTasksBackend<B> {
impl<B: BackingStorage> TurboTasksBackendInner<B> {
pub fn new(mut options: BackendOptions, backing_storage: B) -> Self {
let shard_amount = compute_shard_amount(options.num_workers, options.small_preallocation);
let need_log = matches!(
options.storage_mode,
Some(StorageMode::ReadWrite) | Some(StorageMode::ReadWriteOnShutdown)
);
if !options.dependency_tracking {
options.active_tracking = false;
}
Expand All @@ -253,7 +243,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(),
TaskId::MAX,
),
persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
task_cache: FxDashMap::default(),
local_is_partial: AtomicBool::new(next_task_id != TaskId::MIN),
storage: Storage::new(shard_amount, small_preallocation),
Expand Down Expand Up @@ -991,11 +980,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.collect::<Vec<_>>();
}
self.storage.start_snapshot();
let mut persisted_task_cache_log = self
.persisted_task_cache_log
.as_ref()
.map(|l| l.take(|i| i))
.unwrap_or_default();
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
Expand Down Expand Up @@ -1066,9 +1050,40 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let task_cache_stats: Mutex<FxHashMap<_, TaskCacheStats>> =
Mutex::new(FxHashMap::default());

// Helper to encode task data and handle errors/stats
let encode_category = |task_id: TaskId,
data: &TaskStorage,
category: SpecificTaskDataCategory,
buffer: &mut TurboBincodeBuffer|
-> Option<TurboBincodeBuffer> {
match encode_task_data(task_id, data, category, buffer) {
Ok(encoded) => {
#[cfg(feature = "print_cache_item_size")]
{
let mut stats = task_cache_stats.lock();
let entry = stats.entry(self.debug_get_task_name(task_id)).or_default();
match category {
SpecificTaskDataCategory::Meta => entry.add_meta(&encoded),
SpecificTaskDataCategory::Data => entry.add_data(&encoded),
}
}
Some(encoded)
}
Err(err) => {
println!(
"Serializing task {} failed ({:?}): {:?}",
self.debug_get_task_description(task_id),
category,
err
);
None
}
}
};

let preprocess = |task_id: TaskId, inner: &TaskStorage| {
if task_id.is_transient() {
return (None, None);
return (None, None, None);
}

let meta_restored = inner.flags.meta_restored();
Expand All @@ -1078,10 +1093,22 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let meta = meta_restored.then(|| inner.clone_meta_snapshot());
let data = data_restored.then(|| inner.clone_data_snapshot());

(meta, data)
// Capture task type if this is a new task that needs cache persistence
let task_type = inner.flags.new_persistent_task().then(|| {
inner
.get_persistent_task_type()
.expect("new tasks must have a persistent_task_type")
.clone()
});

(meta, data, task_type)
};
let process = |task_id: TaskId,
(meta, data): (Option<TaskStorage>, Option<TaskStorage>),
(meta, data, task_type): (
Option<TaskStorage>,
Option<TaskStorage>,
Option<Arc<CachedTaskType>>,
),
buffer: &mut TurboBincodeBuffer| {
#[cfg(feature = "print_cache_item_size")]
if let Some(ref m) = meta {
Expand All @@ -1091,38 +1118,67 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.or_default()
.add_counts(m);
}
(
let meta = meta
.and_then(|d| encode_category(task_id, &d, SpecificTaskDataCategory::Meta, buffer));
let data = data
.and_then(|d| encode_category(task_id, &d, SpecificTaskDataCategory::Data, buffer));
SnapshotItem {
task_id,
meta.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Meta, buffer)),
data.map(|d| encode_task_data(task_id, &d, SpecificTaskDataCategory::Data, buffer)),
)
meta,
data,
task_type,
}
};
let process_snapshot =
|task_id: TaskId, inner: Box<TaskStorage>, buffer: &mut TurboBincodeBuffer| {
if task_id.is_transient() {
return (task_id, None, None);
}
let process_snapshot = |task_id: TaskId,
inner: Box<TaskStorage>,
buffer: &mut TurboBincodeBuffer| {
if task_id.is_transient() {
return SnapshotItem {
task_id,
meta: None,
data: None,
task_type: None,
};
}

#[cfg(feature = "print_cache_item_size")]
if inner.flags.meta_modified() {
task_cache_stats
.lock()
.entry(self.debug_get_task_name(task_id))
.or_default()
.add_counts(&inner);
}
#[cfg(feature = "print_cache_item_size")]
if inner.flags.meta_modified() {
task_cache_stats
.lock()
.entry(self.debug_get_task_name(task_id))
.or_default()
.add_counts(&inner);
}

// Encode meta/data directly from TaskStorage snapshot
(
task_id,
inner.flags.meta_modified().then(|| {
encode_task_data(task_id, &inner, SpecificTaskDataCategory::Meta, buffer)
}),
inner.flags.data_modified().then(|| {
encode_task_data(task_id, &inner, SpecificTaskDataCategory::Data, buffer)
}),
)
};
// Capture task type if this is a new task (from snapshot's modified flags)
let task_type = inner.flags.new_persistent_task().then(|| {
inner
.get_persistent_task_type()
.expect(
"new_persistent_task can only be set if there is a persistent_task_type",
)
.clone()
});

// Encode meta/data directly from TaskStorage snapshot
let meta = inner
.flags
.meta_modified()
.then(|| encode_category(task_id, &inner, SpecificTaskDataCategory::Meta, buffer))
.flatten();
let data = inner
.flags
.data_modified()
.then(|| encode_category(task_id, &inner, SpecificTaskDataCategory::Data, buffer))
.flatten();

SnapshotItem {
task_id,
meta,
data,
task_type,
}
};

let snapshot = self
.storage
Expand All @@ -1132,75 +1188,26 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.into_iter()
.filter_map(|iter| {
let mut iter = iter
.filter_map(
|(task_id, meta, data): (
_,
Option<Result<SmallVec<_>>>,
Option<Result<SmallVec<_>>>,
)| {
let meta = match meta {
Some(Ok(meta)) => {
#[cfg(feature = "print_cache_item_size")]
task_cache_stats
.lock()
.entry(self.debug_get_task_name(task_id))
.or_default()
.add_meta(&meta);
Some(meta)
}
None => None,
Some(Err(err)) => {
println!(
"Serializing task {} failed (meta): {:?}",
self.debug_get_task_description(task_id),
err
);
None
}
};
let data = match data {
Some(Ok(data)) => {
#[cfg(feature = "print_cache_item_size")]
task_cache_stats
.lock()
.entry(self.debug_get_task_name(task_id))
.or_default()
.add_data(&data);
Some(data)
}
None => None,
Some(Err(err)) => {
println!(
"Serializing task {} failed (data): {:?}",
self.debug_get_task_description(task_id),
err
);
None
}
};
(meta.is_some() || data.is_some()).then_some((task_id, meta, data))
},
)
.filter(|item| {
item.meta.is_some() || item.data.is_some() || item.task_type.is_some()
})
.peekable();
iter.peek().is_some().then_some(iter)
})
.collect::<Vec<_>>();

swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());

drop(snapshot_span);

if persisted_task_cache_log.is_empty() && task_snapshots.is_empty() {
if task_snapshots.is_empty() {
return Some((snapshot_time, false));
}

let _span = tracing::info_span!(parent: parent_span, "persist", reason = reason).entered();
{
if let Err(err) = self.backing_storage.save_snapshot(
suspended_operations,
persisted_task_cache_log,
task_snapshots,
) {
if let Err(err) = self
.backing_storage
.save_snapshot(suspended_operations, task_snapshots)
{
println!("Persisting failed: {err:?}");
return None;
}
Expand Down Expand Up @@ -1459,7 +1466,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
} else {
// Task doesn't exist in memory cache or backing storage
// So we might need to create a new task
let (task_id, mut task_type) = match raw_entry(&self.task_cache, &task_type) {
let (task_id, task_type) = match raw_entry(&self.task_cache, &task_type) {
RawEntry::Occupied(e) => {
let task_id = *e.get();
drop(e);
Expand All @@ -1474,11 +1481,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
(task_id, ArcOrOwned::Arc(task_type))
}
};
if let Some(log) = &self.persisted_task_cache_log {
let task_type_arc: Arc<CachedTaskType> = Arc::from(task_type);
log.lock(task_id).push((task_type_arc.clone(), task_id));
task_type = ArcOrOwned::Arc(task_type_arc);
}
(task_id, task_type)
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2421,6 +2421,7 @@ impl AggregationUpdateQueue {
&& !task.has_persistent_task_type()
{
let _ = task.set_persistent_task_type(task_type);
task.set_new_persistent_task(true);
}
let state = task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
let is_new = state.is_empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl ConnectChildOperation {
&& !child_task.has_persistent_task_type()
{
child_task.set_persistent_task_type(child_task_type.into());
child_task.set_new_persistent_task(true);
}

if !child_task.has_output()
Expand Down
Loading
Loading