Skip to content

Commit cf50eb4

Browse files
authored
add WorkflowEngine interruptUnsafe (#1907)
1 parent 734e486 commit cf50eb4

File tree

3 files changed

+104
-38
lines changed

3 files changed

+104
-38
lines changed

.changeset/public-deer-ring.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
add WorkflowEngine interruptUnsafe

packages/effect/src/unstable/cluster/ClusterWorkflowEngine.ts

Lines changed: 77 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import * as Entity from "./Entity.ts"
3030
import * as EntityAddress from "./EntityAddress.ts"
3131
import * as EntityId from "./EntityId.ts"
3232
import * as EntityType from "./EntityType.ts"
33+
import * as Envelope from "./Envelope.ts"
34+
import * as Message from "./Message.ts"
3335
import { MessageStorage } from "./MessageStorage.ts"
3436
import type { WithExitEncoded } from "./Reply.ts"
3537
import * as Reply from "./Reply.ts"
@@ -131,22 +133,30 @@ export const make = Effect.gen(function*() {
131133
})
132134
const clockClient = yield* ClockEntity.client
133135

134-
const requestIdFor = Effect.fnUntraced(function*(options: {
136+
const entityAddressFor = (options: {
135137
readonly workflow: Workflow.Any
136138
readonly entityType: string
137139
readonly executionId: string
138-
readonly tag: string
139-
readonly id: string
140-
}) {
140+
}) => {
141141
const shardGroup = ServiceMap.get(options.workflow.annotations, ClusterSchema.ShardGroup)(
142142
options.executionId as EntityId.EntityId
143143
)
144144
const entityId = EntityId.make(options.executionId)
145-
const address = EntityAddress.make({
145+
return EntityAddress.make({
146146
entityType: EntityType.make(options.entityType),
147147
entityId,
148148
shardId: sharding.getShardId(entityId, shardGroup)
149149
})
150+
}
151+
152+
const requestIdFor = Effect.fnUntraced(function*(options: {
153+
readonly workflow: Workflow.Any
154+
readonly entityType: string
155+
readonly executionId: string
156+
readonly tag: string
157+
readonly id: string
158+
}) {
159+
const address = entityAddressFor(options)
150160
return yield* storage.requestIdForPrimaryKey({ address, tag: options.tag, id: options.id })
151161
})
152162

@@ -253,6 +263,46 @@ export const make = Effect.gen(function*() {
253263
yield* sharding.reset(requestId.value)
254264
}, Effect.scoped)
255265

266+
const interrupt = Effect.fnUntraced(
267+
function*(workflow: Workflow.Any, executionId: string) {
268+
ensureEntity(workflow)
269+
const requestId = yield* requestIdFor({
270+
workflow,
271+
entityType: `Workflow/${workflow.name}`,
272+
executionId,
273+
tag: "run",
274+
id: ""
275+
})
276+
if (Option.isNone(requestId)) {
277+
return Option.none()
278+
}
279+
const reply = yield* replyForRequestId(requestId.value)
280+
281+
const nonSuspendedReply = Option.filter(
282+
reply,
283+
(reply) => reply.exit._tag !== "Success" || reply.exit.value._tag !== "Suspended"
284+
)
285+
if (Option.isSome(nonSuspendedReply)) {
286+
return Option.none()
287+
}
288+
289+
yield* engine.deferredDone(InterruptSignal, {
290+
workflowName: workflow.name,
291+
executionId,
292+
deferredName: InterruptSignal.name,
293+
exit: Exit.void
294+
})
295+
296+
return requestId
297+
},
298+
Effect.retry({
299+
while: (e) => e._tag === "PersistenceError",
300+
times: 3,
301+
schedule: Schedule.exponential(250)
302+
}),
303+
Effect.orDie
304+
)
305+
256306
const engine = WorkflowEngine.makeUnsafe({
257307
register: (workflow, execute) =>
258308
Effect.suspend(() =>
@@ -386,39 +436,28 @@ export const make = Effect.gen(function*() {
386436
return Option.some(yield* exit)
387437
}, Effect.orDie),
388438

389-
interrupt: Effect.fnUntraced(
390-
function*(workflow, executionId) {
391-
ensureEntity(workflow)
392-
const reply = yield* requestReply({
393-
workflow,
394-
entityType: `Workflow/${workflow.name}`,
395-
executionId,
396-
tag: "run",
397-
id: ""
398-
})
399-
400-
const nonSuspendedReply = Option.filter(
401-
reply,
402-
(reply) => reply.exit._tag !== "Success" || reply.exit.value._tag !== "Suspended"
403-
)
404-
if (Option.isSome(nonSuspendedReply)) {
405-
return
406-
}
407-
408-
yield* engine.deferredDone(InterruptSignal, {
409-
workflowName: workflow.name,
410-
executionId,
411-
deferredName: InterruptSignal.name,
412-
exit: Exit.void
413-
})
414-
},
415-
Effect.retry({
416-
while: (e) => e._tag === "PersistenceError",
417-
times: 3,
418-
schedule: Schedule.exponential(250)
419-
}),
420-
Effect.orDie
421-
),
439+
interrupt: (workflow, executionId) => Effect.asVoid(interrupt(workflow, executionId)),
440+
interruptUnsafe: Effect.fnUntraced(function*(workflow, executionId) {
441+
const requestId = yield* interrupt(workflow, executionId)
442+
if (Option.isNone(requestId)) return
443+
const entity = ensureEntity(workflow)
444+
const runRpc = entity.protocol.requests.get("run")!
445+
yield* Effect.orDie(sharding.sendOutgoing(
446+
new Message.OutgoingEnvelope({
447+
rpc: runRpc,
448+
envelope: new Envelope.Interrupt({
449+
id: yield* sharding.getSnowflake,
450+
address: entityAddressFor({
451+
workflow,
452+
entityType: `Workflow/${workflow.name}`,
453+
executionId
454+
}),
455+
requestId: requestId.value
456+
})
457+
}),
458+
false
459+
))
460+
}),
422461

423462
resume: (workflow, executionId) => ensureSuccess(resume(workflow, executionId)),
424463

packages/effect/src/unstable/workflow/WorkflowEngine.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,15 @@ export class WorkflowEngine extends ServiceMap.Service<
111111
executionId: string
112112
) => Effect.Effect<void>
113113

114+
/**
115+
* Unsafely interrupt a registered workflow, potentially ignoring
116+
* compensation finalizers and orphaning child workflows.
117+
*/
118+
readonly interruptUnsafe: (
119+
workflow: Workflow.Any,
120+
executionId: string
121+
) => Effect.Effect<void>
122+
114123
/**
115124
* Resume a registered workflow.
116125
*/
@@ -282,6 +291,10 @@ export interface Encoded {
282291
workflow: Workflow.Any,
283292
executionId: string
284293
) => Effect.Effect<void>
294+
readonly interruptUnsafe: (
295+
workflow: Workflow.Any,
296+
executionId: string
297+
) => Effect.Effect<void>
285298
readonly resume: (
286299
workflow: Workflow.Any,
287300
executionId: string
@@ -416,6 +429,7 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Service"] =>
416429
}),
417430
poll: options.poll,
418431
interrupt: options.interrupt,
432+
interruptUnsafe: options.interruptUnsafe,
419433
resume: options.resume,
420434
activityExecute: Effect.fnUntraced(function*<
421435
Success extends Schema.Top,
@@ -617,6 +631,14 @@ export const layerMemory: Layer.Layer<WorkflowEngine> = Layer.effect(WorkflowEng
617631
state.instance.interrupted = true
618632
yield* resume(executionId)
619633
}),
634+
interruptUnsafe: Effect.fnUntraced(function*(_workflow, executionId) {
635+
const state = executions.get(executionId)
636+
if (!state) return
637+
state.instance.interrupted = true
638+
if (state.fiber) {
639+
yield* Fiber.interrupt(state.fiber)
640+
}
641+
}),
620642
resume(_workflow, executionId) {
621643
return resume(executionId)
622644
},

0 commit comments

Comments
 (0)