Skip to content

Commit 794c790

Browse files
authored
add in-memory WorkflowEngine layer (#5771)
1 parent 079975c commit 794c790

File tree

15 files changed

+876
-228
lines changed

15 files changed

+876
-228
lines changed

.changeset/bitter-maps-float.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/rpc": patch
3+
---
4+
5+
fix Rpc success types that sub-class Stream

.changeset/eighty-walls-dig.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/workflow": patch
3+
---
4+
5+
add in-memory WorkflowEngine layer

.changeset/late-parents-smoke.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
backport Entity keep alive from effect 4.0

.changeset/petite-areas-cough.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@effect/workflow": minor
3+
"@effect/cluster": patch
4+
---
5+
6+
add WorkflowEngine.makeUnsafe, which abstracts the serialization boundary

packages/cluster/src/ClusterWorkflowEngine.ts

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ import { DurableDeferred } from "@effect/workflow"
77
import * as Activity from "@effect/workflow/Activity"
88
import * as DurableClock from "@effect/workflow/DurableClock"
99
import * as Workflow from "@effect/workflow/Workflow"
10-
import { WorkflowEngine, WorkflowInstance } from "@effect/workflow/WorkflowEngine"
10+
import { makeUnsafe, WorkflowEngine, WorkflowInstance } from "@effect/workflow/WorkflowEngine"
1111
import * as Arr from "effect/Array"
1212
import * as Cause from "effect/Cause"
1313
import * as Context from "effect/Context"
1414
import * as DateTime from "effect/DateTime"
1515
import * as Duration from "effect/Duration"
1616
import * as Effect from "effect/Effect"
17-
import type * as Exit from "effect/Exit"
17+
import * as Exit from "effect/Exit"
1818
import * as Fiber from "effect/Fiber"
1919
import * as FiberId from "effect/FiberId"
2020
import * as Layer from "effect/Layer"
@@ -35,6 +35,7 @@ import { EntityId } from "./EntityId.js"
3535
import { EntityType } from "./EntityType.js"
3636
import { MessageStorage } from "./MessageStorage.js"
3737
import type { WithExitEncoded } from "./Reply.js"
38+
import * as Reply from "./Reply.js"
3839
import * as Sharding from "./Sharding.js"
3940
import * as Snowflake from "./Snowflake.js"
4041

@@ -252,11 +253,9 @@ export const make = Effect.gen(function*() {
252253
yield* sharding.reset(requestId.value)
253254
}, Effect.scoped)
254255

255-
return WorkflowEngine.of({
256-
register(workflow, execute) {
257-
// eslint-disable-next-line @typescript-eslint/no-this-alias
258-
const engine = this
259-
return Effect.suspend(() =>
256+
const engine = makeUnsafe({
257+
register: (workflow, execute) =>
258+
Effect.suspend(() =>
260259
sharding.registerEntity(
261260
ensureEntity(workflow),
262261
Effect.gen(function*() {
@@ -277,8 +276,8 @@ export const make = Effect.gen(function*() {
277276
return parent ? ensureSuccess(sendResumeParent(parent)) : Effect.void
278277
}
279278
return engine.deferredResult(InterruptSignal).pipe(
280-
Effect.flatMap((maybeResult) => {
281-
if (Option.isNone(maybeResult)) {
279+
Effect.flatMap((maybeExit) => {
280+
if (maybeExit === undefined) {
282281
return Effect.void
283282
}
284283
instance.suspended = false
@@ -354,10 +353,9 @@ export const make = Effect.gen(function*() {
354353
}
355354
})
356355
) as Effect.Effect<void, never, Scope.Scope>
357-
)
358-
},
356+
),
359357

360-
execute: ({ discard, executionId, parent, payload, workflow }) => {
358+
execute: (workflow, { discard, executionId, parent, payload }) => {
361359
ensureEntity(workflow)
362360
return RcMap.get(clients, workflow.name).pipe(
363361
Effect.flatMap((make) =>
@@ -376,7 +374,7 @@ export const make = Effect.gen(function*() {
376374
)
377375
},
378376

379-
poll: Effect.fnUntraced(function*({ executionId, workflow }) {
377+
poll: Effect.fnUntraced(function*(workflow, executionId) {
380378
const entity = ensureEntity(workflow)
381379
const exitSchema = Rpc.exitSchema(entity.protocol.requests.get("run")!)
382380
const oreply = yield* requestReply({
@@ -395,7 +393,7 @@ export const make = Effect.gen(function*() {
395393
}, Effect.orDie),
396394

397395
interrupt: Effect.fnUntraced(
398-
function*(this: WorkflowEngine["Type"], workflow, executionId) {
396+
function*(workflow, executionId) {
399397
ensureEntity(workflow)
400398
const oreply = yield* requestReply({
401399
workflow,
@@ -411,11 +409,11 @@ export const make = Effect.gen(function*() {
411409
return
412410
}
413411

414-
yield* this.deferredDone({
412+
yield* engine.deferredDone(InterruptSignal, {
415413
workflowName: workflow.name,
416414
executionId,
417415
deferredName: InterruptSignal.name,
418-
exit: { _tag: "Success", value: void 0 }
416+
exit: Exit.void
419417
})
420418
},
421419
Effect.retry({
@@ -429,7 +427,7 @@ export const make = Effect.gen(function*() {
429427
resume: (workflow, executionId) => ensureSuccess(resume(workflow, executionId)),
430428

431429
activityExecute: Effect.fnUntraced(
432-
function*({ activity, attempt }) {
430+
function*(activity, attempt) {
433431
const runtime = yield* Effect.runtime<WorkflowInstance>()
434432
const context = runtime.context
435433
const instance = Context.get(context, WorkflowInstance)
@@ -466,7 +464,6 @@ export const make = Effect.gen(function*() {
466464

467465
deferredResult: (deferred) =>
468466
WorkflowInstance.pipe(
469-
Effect.tap((instance) => Effect.annotateCurrentSpan("executionId", instance.executionId)),
470467
Effect.flatMap((instance) =>
471468
requestReply({
472469
workflow: instance.workflow,
@@ -476,11 +473,16 @@ export const make = Effect.gen(function*() {
476473
id: deferred.name
477474
})
478475
),
479-
Effect.map(Option.map((reply) =>
480-
reply.exit._tag === "Success"
481-
? reply.exit.value as any as Schema.ExitEncoded<unknown, unknown, unknown>
482-
: reply.exit
483-
)),
476+
Effect.map((oreply) => {
477+
if (Option.isNone(oreply)) {
478+
return undefined
479+
}
480+
const reply = oreply.value
481+
const decoded = decodeDeferredWithExit(reply as any)
482+
return decoded.exit._tag === "Success"
483+
? decoded.exit.value
484+
: decoded.exit
485+
}),
484486
Effect.retry({
485487
while: (e) => e._tag === "PersistenceError",
486488
times: 3,
@@ -502,20 +504,22 @@ export const make = Effect.gen(function*() {
502504
Effect.scoped
503505
),
504506

505-
scheduleClock(options) {
507+
scheduleClock(workflow, options) {
506508
const client = clockClient(options.executionId)
507509
return DateTime.now.pipe(
508510
Effect.flatMap((now) =>
509511
client.run({
510512
name: options.clock.name,
511-
workflowName: options.workflow.name,
513+
workflowName: workflow.name,
512514
wakeUp: DateTime.addDuration(now, options.clock.duration)
513515
}, { discard: true })
514516
),
515517
Effect.orDie
516518
)
517519
}
518520
})
521+
522+
return engine
519523
})
520524

521525
const retryPolicy = Schedule.exponential(200, 1.5).pipe(
@@ -569,11 +573,11 @@ const makeWorkflowEntity = (workflow: Workflow.Any) =>
569573
ActivityRpc
570574
]).annotateContext(workflow.annotations)
571575

572-
const ExitUnknown = Schema.encodedSchema(Schema.Exit({
576+
const ExitUnknown = Schema.Exit({
573577
success: Schema.Unknown,
574578
failure: Schema.Unknown,
575579
defect: Schema.Defect
576-
}))
580+
})
577581

578582
const DeferredRpc = Rpc.make("deferred", {
579583
payload: {
@@ -586,6 +590,8 @@ const DeferredRpc = Rpc.make("deferred", {
586590
.annotate(ClusterSchema.Persisted, true)
587591
.annotate(ClusterSchema.Uninterruptible, true)
588592

593+
const decodeDeferredWithExit = Schema.decodeSync(Reply.WithExit.schema(DeferredRpc))
594+
589595
const ResumeRpc = Rpc.make("resume", {
590596
payload: {},
591597
primaryKey: () => ""
@@ -628,11 +634,11 @@ const ClockEntityLayer = ClockEntity.toLayer(Effect.gen(function*() {
628634
return {
629635
run(request) {
630636
const deferred = DurableClock.make({ name: request.payload.name, duration: Duration.zero }).deferred
631-
return ensureSuccess(engine.deferredDone({
637+
return ensureSuccess(engine.deferredDone(deferred, {
632638
workflowName: request.payload.workflowName,
633639
executionId,
634640
deferredName: deferred.name,
635-
exit: { _tag: "Success", value: void 0 }
641+
exit: Exit.void
636642
}))
637643
}
638644
}

packages/cluster/src/Entity.ts

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/**
22
* @since 1.0.0
33
*/
4-
import type * as Rpc from "@effect/rpc/Rpc"
4+
import * as Headers from "@effect/platform/Headers"
5+
import * as Rpc from "@effect/rpc/Rpc"
56
import * as RpcClient from "@effect/rpc/RpcClient"
67
import * as RpcGroup from "@effect/rpc/RpcGroup"
78
import * as RpcServer from "@effect/rpc/RpcServer"
@@ -25,12 +26,14 @@ import { Scope } from "effect/Scope"
2526
import type * as Stream from "effect/Stream"
2627
import type { AlreadyProcessingMessage, MailboxFull, PersistenceError } from "./ClusterError.js"
2728
import { ShardGroup } from "./ClusterSchema.js"
29+
import * as ClusterSchema from "./ClusterSchema.js"
2830
import { EntityAddress } from "./EntityAddress.js"
2931
import type { EntityId } from "./EntityId.js"
3032
import { EntityType } from "./EntityType.js"
3133
import * as Envelope from "./Envelope.js"
3234
import { hashString } from "./internal/hash.js"
3335
import { ResourceMap } from "./internal/resourceMap.js"
36+
import * as Message from "./Message.js"
3437
import type * as Reply from "./Reply.js"
3538
import { RunnerAddress } from "./RunnerAddress.js"
3639
import * as ShardId from "./ShardId.js"
@@ -584,3 +587,66 @@ export const makeTestClient: <Type extends string, Rpcs extends Rpc.Any, LA, LE,
584587

585588
return (entityId: string) => map.get(entityId)
586589
})
590+
591+
/**
592+
* @since 1.0.0
593+
* @category Keep alive
594+
*/
595+
export const keepAlive: (
596+
enabled: boolean
597+
) => Effect.Effect<
598+
void,
599+
never,
600+
Sharding | CurrentAddress
601+
> = Effect.fnUntraced(function*(enabled: boolean) {
602+
const olatch = yield* Effect.serviceOption(KeepAliveLatch)
603+
if (olatch._tag === "None") return
604+
if (!enabled) {
605+
yield* olatch.value.open
606+
return
607+
}
608+
const sharding = yield* shardingTag
609+
const address = yield* CurrentAddress
610+
const requestId = yield* sharding.getSnowflake
611+
const span = yield* Effect.orDie(Effect.currentSpan)
612+
yield* Effect.orDie(sharding.sendOutgoing(
613+
new Message.OutgoingRequest({
614+
rpc: KeepAliveRpc,
615+
context: Context.empty() as any,
616+
envelope: Envelope.makeRequest({
617+
requestId,
618+
address,
619+
tag: KeepAliveRpc._tag,
620+
payload: void 0,
621+
headers: Headers.empty,
622+
traceId: span.traceId,
623+
spanId: span.spanId,
624+
sampled: span.sampled
625+
}),
626+
lastReceivedReply: Option.none(),
627+
respond: () => Effect.void
628+
}),
629+
true
630+
))
631+
}, (effect, enabled) =>
632+
Effect.withSpan(
633+
effect,
634+
"Entity/keepAlive",
635+
{ attributes: { enabled }, captureStackTrace: false }
636+
))
637+
638+
/**
639+
* @since 1.0.0
640+
* @category Keep alive
641+
*/
642+
export const KeepAliveRpc = Rpc.make("Cluster/Entity/keepAlive")
643+
.annotate(ClusterSchema.Persisted, true)
644+
.annotate(ClusterSchema.Uninterruptible, true)
645+
646+
/**
647+
* @since 1.0.0
648+
* @category Keep alive
649+
*/
650+
export class KeepAliveLatch extends Context.Tag(
651+
"effect/cluster/Entity/KeepAliveLatch"
652+
)<KeepAliveLatch, Effect.Latch>() {}

0 commit comments

Comments
 (0)