Skip to content

Commit 93c91ca

Browse files
authored
Turbopack: handle task cancelation (#76831)
### What? * handle task cancelation * wait for stopping turbo-tasks before running exit handlers * sort all output manifests
1 parent d6dcf36 commit 93c91ca

File tree

7 files changed

+83
-6
lines changed

7 files changed

+83
-6
lines changed

crates/napi/src/next_api/project.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,8 +543,8 @@ async fn project_on_exit_internal(project: &ProjectInstance) {
543543
pub async fn project_shutdown(
544544
#[napi(ts_arg_type = "{ __napiType: \"Project\" }")] project: External<ProjectInstance>,
545545
) {
546-
project_on_exit_internal(&project).await;
547546
project.turbo_tasks.stop_and_wait().await;
547+
project_on_exit_internal(&project).await;
548548
}
549549

550550
#[napi(object)]

packages/next/src/shared/lib/turbopack/manifest-loader.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,16 @@ export class TurbopackManifestLoader {
200200
mergeActionIds(manifest.node, m.node)
201201
mergeActionIds(manifest.edge, m.edge)
202202
}
203+
for (const key in manifest.node) {
204+
const entry = manifest.node[key]
205+
entry.workers = sortObjectByKey(entry.workers)
206+
entry.layer = sortObjectByKey(entry.layer)
207+
}
208+
for (const key in manifest.edge) {
209+
const entry = manifest.edge[key]
210+
entry.workers = sortObjectByKey(entry.workers)
211+
entry.layer = sortObjectByKey(entry.layer)
212+
}
203213

204214
return manifest
205215
}
@@ -247,6 +257,7 @@ export class TurbopackManifestLoader {
247257
for (const m of manifests) {
248258
Object.assign(manifest.pages, m.pages)
249259
}
260+
manifest.pages = sortObjectByKey(manifest.pages)
250261
return manifest
251262
}
252263

@@ -398,6 +409,7 @@ export class TurbopackManifestLoader {
398409
// polyfillFiles should always be the same, so we can overwrite instead of actually merging
399410
if (m.polyfillFiles.length) manifest.polyfillFiles = m.polyfillFiles
400411
}
412+
manifest.pages = sortObjectByKey(manifest.pages) as BuildManifest['pages']
401413
return manifest
402414
}
403415

@@ -550,6 +562,8 @@ export class TurbopackManifestLoader {
550562
manifest.pagesUsingSizeAdjust =
551563
manifest.pagesUsingSizeAdjust || m.pagesUsingSizeAdjust
552564
}
565+
manifest.app = sortObjectByKey(manifest.app)
566+
manifest.pages = sortObjectByKey(manifest.pages)
553567
return manifest
554568
}
555569

@@ -620,6 +634,8 @@ export class TurbopackManifestLoader {
620634
instrumentation = m.instrumentation
621635
}
622636
}
637+
manifest.functions = sortObjectByKey(manifest.functions)
638+
manifest.middleware = sortObjectByKey(manifest.middleware)
623639
const updateFunctionDefinition = (
624640
fun: EdgeFunctionDefinition
625641
): EdgeFunctionDefinition => {
@@ -696,7 +712,7 @@ export class TurbopackManifestLoader {
696712
for (const m of manifests) {
697713
Object.assign(manifest, m)
698714
}
699-
return manifest
715+
return sortObjectByKey(manifest)
700716
}
701717

702718
private async writePagesManifest(): Promise<void> {
@@ -733,3 +749,15 @@ export class TurbopackManifestLoader {
733749
}
734750
}
735751
}
752+
753+
function sortObjectByKey(obj: Record<string, any>) {
754+
return Object.keys(obj)
755+
.sort()
756+
.reduce(
757+
(acc, key) => {
758+
acc[key] = obj[key]
759+
return acc
760+
},
761+
{} as Record<string, any>
762+
)
763+
}

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
447447
this: &TurboTasksBackendInner<B>,
448448
task: &impl TaskGuard,
449449
reader: Option<TaskId>,
450+
ctx: &impl ExecuteContext<'_>,
450451
) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
451452
{
452453
match get!(task, InProgress) {
@@ -457,10 +458,18 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
457458
marked_as_completed,
458459
done_event,
459460
..
460-
})) if !*marked_as_completed => {
461-
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
461+
})) => {
462+
if !*marked_as_completed {
463+
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
464+
} else {
465+
None
466+
}
462467
}
463-
_ => None,
468+
Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
469+
"{} was canceled",
470+
ctx.get_task_description(task.id())
471+
))),
472+
None => None,
464473
}
465474
}
466475

@@ -545,7 +554,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
545554
}
546555
}
547556

548-
if let Some(value) = check_in_progress(self, &task, reader) {
557+
if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
549558
return value;
550559
}
551560

@@ -755,6 +764,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
755764
InProgressState::Scheduled { .. } => {
756765
// Already scheduled
757766
}
767+
InProgressState::Canceled => {
768+
bail!("{} was canceled", ctx.get_task_description(task_id));
769+
}
758770
}
759771
} else if task.add(CachedDataItem::new_scheduled(
760772
self.get_task_desc_fn(task_id),
@@ -1077,6 +1089,27 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
10771089
.map(|task_type| task_type.fn_type)
10781090
}
10791091

1092+
fn task_execution_canceled(
1093+
&self,
1094+
task_id: TaskId,
1095+
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
1096+
) {
1097+
let mut ctx = self.execute_context(turbo_tasks);
1098+
let mut task = ctx.task(task_id, TaskDataCategory::Data);
1099+
if let Some(in_progress) = remove!(task, InProgress) {
1100+
match in_progress {
1101+
InProgressState::Scheduled { done_event } => done_event.notify(usize::MAX),
1102+
InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
1103+
done_event.notify(usize::MAX)
1104+
}
1105+
InProgressState::Canceled => {}
1106+
}
1107+
}
1108+
task.add_new(CachedDataItem::InProgress {
1109+
value: InProgressState::Canceled,
1110+
});
1111+
}
1112+
10801113
fn try_start_task_execution(
10811114
&self,
10821115
task_id: TaskId,
@@ -2036,6 +2069,10 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
20362069
type TaskState = ();
20372070
fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}
20382071

2072+
fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
2073+
self.0.task_execution_canceled(task, turbo_tasks)
2074+
}
2075+
20392076
fn try_start_task_execution(
20402077
&self,
20412078
task_id: TaskId,

turbopack/crates/turbo-tasks-backend/src/data.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ pub struct InProgressStateInner {
320320
pub enum InProgressState {
321321
Scheduled { done_event: Event },
322322
InProgress(Box<InProgressStateInner>),
323+
Canceled,
323324
}
324325

325326
transient_traits!(InProgressState);

turbopack/crates/turbo-tasks-memory/src/memory_backend.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,14 @@ impl Backend for MemoryBackend {
395395
}
396396
}
397397

398+
fn task_execution_canceled(
399+
&self,
400+
_task: TaskId,
401+
_turbo_tasks: &dyn TurboTasksBackendApi<Self>,
402+
) {
403+
todo!()
404+
}
405+
398406
fn try_start_task_execution<'a>(
399407
&'a self,
400408
task: TaskId,

turbopack/crates/turbo-tasks/src/backend.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,8 @@ pub trait Backend: Sync + Send {
453453
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
454454
) -> Option<TaskExecutionSpec<'a>>;
455455

456+
fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);
457+
456458
fn task_execution_result(
457459
&self,
458460
task: TaskId,

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
647647
)));
648648
let single_execution_future = async {
649649
if this.stopped.load(Ordering::Acquire) {
650+
this.backend.task_execution_canceled(task_id, &*this);
650651
return false;
651652
}
652653

0 commit comments

Comments
 (0)