Skip to content

Commit cc4d2c3

Browse files
authored
ensure no more Activites are attempted before suspending (#5880)
1 parent bd08028 commit cc4d2c3

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

.changeset/khaki-teeth-create.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/workflow": patch
3+
---
4+
5+
ensure no more Activites are attempted before suspending

packages/workflow/src/Workflow.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -561,12 +561,9 @@ export const wrapActivityResult = <A, E, R>(
561561
const instance = Context.get(context, InstanceTag)
562562
const state = instance.activityState
563563
if (instance.suspended) {
564-
return state.count > 0 ?
565-
state.latch.await.pipe(
566-
Effect.andThen(Effect.yieldNow()),
567-
Effect.andThen(suspend(instance))
568-
) :
569-
suspend(instance)
564+
return waitForZero(instance).pipe(
565+
Effect.andThen(suspend(instance))
566+
)
570567
}
571568
if (state.count === 0) state.latch.unsafeClose()
572569
state.count++
@@ -576,10 +573,23 @@ export const wrapActivityResult = <A, E, R>(
576573
if (Exit.isSuccess(exit) && isResult(exit.value) && exit.value._tag === "Suspended" && exit.value.cause) {
577574
instance.cause = instance.cause ? Cause.sequential(instance.cause, exit.value.cause) : exit.value.cause
578575
}
579-
return state.count === 0 ? state.latch.open : isSuspended ? state.latch.await : Effect.void
576+
return state.count === 0 ? state.latch.open : isSuspended ? waitForZero(instance) : Effect.void
580577
})
581578
})
582579

580+
const waitForZero = Effect.fnUntraced(function*(instance: WorkflowInstance["Type"]) {
581+
const state = instance.activityState
582+
while (true) {
583+
if (state.count > 0) {
584+
yield* state.latch.await
585+
yield* Effect.yieldNow()
586+
continue
587+
}
588+
yield* Effect.yieldNow()
589+
if (state.count === 0) return
590+
}
591+
})
592+
583593
/**
584594
* Accesses the workflow scope.
585595
*

0 commit comments

Comments
 (0)