Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-persistence/src/value_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl ValueBuffer<'_> {
match self {
ValueBuffer::Borrowed(b) => b.into(),
ValueBuffer::Vec(v) => v.into_boxed_slice(),
ValueBuffer::SmallVec(sv) => sv.into_vec().into_boxed_slice(),
ValueBuffer::SmallVec(sv) => sv.into_boxed_slice(),
}
}
}
Expand Down
273 changes: 162 additions & 111 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2421,6 +2421,7 @@ impl AggregationUpdateQueue {
&& !task.has_persistent_task_type()
{
let _ = task.set_persistent_task_type(task_type);
task.set_new_persistent_task(true);
}
let state = task.get_activeness_mut_or_insert_with(|| ActivenessState::new(task_id));
let is_new = state.is_empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl ConnectChildOperation {
&& !child_task.has_persistent_task_type()
{
child_task.set_persistent_task_type(child_task_type.into());
child_task.set_new_persistent_task(true);
}

if !child_task.has_output()
Expand Down
338 changes: 167 additions & 171 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ struct TaskStorageSchema {
#[field(storage = "flag", category = "transient")]
prefetched: bool,

/// Whether this task is new and needs its type persisted to the task cache.
/// Set when task is created, cleared after persisting.
#[field(storage = "flag", category = "transient")]
new_persistent_task: bool,

// =========================================================================
// CHILDREN & AGGREGATION (meta)
// =========================================================================
Expand Down
51 changes: 17 additions & 34 deletions turbopack/crates/turbo-tasks-backend/src/backing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ use std::{any::type_name, sync::Arc};

use anyhow::Result;
use either::Either;
use smallvec::SmallVec;
use turbo_bincode::TurboBincodeBuffer;
use turbo_tasks::{TaskId, backend::CachedTaskType};

use crate::{
backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage},
utils::chunked_vec::ChunkedVec,
};
use crate::backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage};

/// A single item yielded by the snapshot iterator during persistence.
pub struct SnapshotItem {
pub task_id: TaskId,
/// Serialized task meta data, if modified
pub meta: Option<TurboBincodeBuffer>,
/// Serialized task data, if modified
pub data: Option<TurboBincodeBuffer>,
/// Task type for new tasks that need to be added to the task cache
pub task_type: Option<Arc<CachedTaskType>>,
}

/// Represents types accepted by [`TurboTasksBackend::new`]. Typically this is the value returned by
/// [`default_backing_storage`] or [`noop_backing_storage`].
Expand Down Expand Up @@ -44,21 +52,9 @@ pub trait BackingStorageSealed: 'static + Send + Sync {
fn next_free_task_id(&self) -> Result<TaskId>;
fn uncompleted_operations(&self) -> Result<Vec<AnyOperation>>;

fn save_snapshot<I>(
&self,
operations: Vec<Arc<AnyOperation>>,
task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
snapshots: Vec<I>,
) -> Result<()>
fn save_snapshot<I>(&self, operations: Vec<Arc<AnyOperation>>, snapshots: Vec<I>) -> Result<()>
where
I: Iterator<
Item = (
TaskId,
Option<SmallVec<[u8; 16]>>,
Option<SmallVec<[u8; 16]>>,
),
> + Send
+ Sync;
I: Iterator<Item = SnapshotItem> + Send + Sync;
fn start_read_transaction(&self) -> Option<Self::ReadTransaction<'_>>;
/// # Safety
///
Expand Down Expand Up @@ -121,25 +117,12 @@ where
either::for_both!(self, this => this.uncompleted_operations())
}

fn save_snapshot<I>(
&self,
operations: Vec<Arc<AnyOperation>>,
task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
snapshots: Vec<I>,
) -> Result<()>
fn save_snapshot<I>(&self, operations: Vec<Arc<AnyOperation>>, snapshots: Vec<I>) -> Result<()>
where
I: Iterator<
Item = (
TaskId,
Option<SmallVec<[u8; 16]>>,
Option<SmallVec<[u8; 16]>>,
),
> + Send
+ Sync,
I: Iterator<Item = SnapshotItem> + Send + Sync,
{
either::for_both!(self, this => this.save_snapshot(
operations,
task_cache_updates,
snapshots,
))
}
Expand Down
Loading
Loading