Skip to content

Commit 7bf9e21

Browse files
committed
feat: Add workflows
1 parent fb3e17b commit 7bf9e21

31 files changed

+835
-279
lines changed

cmd/smallflow/main.go

Lines changed: 71 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,83 +3,109 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"github.com/alitto/pond/v2"
6+
"github.com/morebec/go-misas/misas"
7+
"github.com/morebec/go-misas/mpostgres"
78
"github.com/morebec/go-misas/muuid"
89
"github.com/morebec/go-misas/mx"
9-
"github.com/morebec/smallflow/internal/core"
10-
"github.com/morebec/smallflow/internal/core/adapters"
1110
"github.com/morebec/smallflow/internal/orchestrator"
1211
adapters2 "github.com/morebec/smallflow/internal/orchestrator/adapters"
12+
"github.com/morebec/smallflow/internal/workflowmgmt"
13+
"github.com/morebec/smallflow/internal/workflowmgmt/adapters"
1314
"time"
1415
)
1516

1617
func main() {
1718
fmt.Println("Build, run, and observe workflows without the overhead!")
1819

19-
eventStore := &adapters.InMemoryEventStore{}
2020
clock := mx.NewRealTimeClock(time.UTC)
21+
22+
dbConn, err := mpostgres.OpenConn("postgres://smallflow:smallflow@localhost:5432/postgres?sslmode=disable")
23+
if err != nil {
24+
panic(err)
25+
}
26+
27+
var eventStore misas.EventStore
28+
eventStore, err = mpostgres.NewEventStore(clock, dbConn)
29+
if err != nil {
30+
panic(err)
31+
}
32+
33+
eventRegistry := mx.NewMessageRegistry[misas.EventTypeName, misas.Event]()
34+
35+
eventRegistry.Register(workflowmgmt.WorkflowEnabledEventTypeName, workflowmgmt.WorkflowEnabledEvent{})
36+
eventRegistry.Register(workflowmgmt.WorkflowDisabledEventTypeName, workflowmgmt.WorkflowDisabledEvent{})
37+
eventRegistry.Register(workflowmgmt.WorkflowTriggeredEventTypeName, workflowmgmt.WorkflowTriggeredEvent{})
38+
eventRegistry.Register(workflowmgmt.WorkflowStartedEventTypeName, workflowmgmt.WorkflowStartedEvent{})
39+
eventRegistry.Register(workflowmgmt.WorkflowEndedEventTypeName, workflowmgmt.WorkflowEndedEvent{})
40+
eventRegistry.Register(workflowmgmt.StepStartedEventTypeName, workflowmgmt.StepStartedEvent{})
41+
eventRegistry.Register(workflowmgmt.StepEndedEventTypeName, workflowmgmt.StepEndedEvent{})
42+
43+
eventStore = mx.NewEventStoreDeserializerDecorator(eventStore, eventRegistry)
44+
2145
workflowRepo := &adapters.EventStoreWorkflowRepository{
22-
EventStore: eventStore,
46+
EventStore: eventStore,
47+
EventRegistry: eventRegistry,
48+
UUIDGenerator: muuid.NewRandomUUIDGenerator(),
2349
}
2450
runRepo := &adapters.EventStoreRunRepository{
25-
EventStore: eventStore,
51+
EventStore: eventStore,
52+
EventRegistry: eventRegistry,
53+
UUIDGenerator: muuid.NewRandomUUIDGenerator(),
2654
}
2755

28-
api := core.NewSubsystem(clock, workflowRepo, runRepo, muuid.NewRandomUUIDGenerator()).API
29-
orch := &orchestrator.WorkflowOrchestrator{
30-
Clock: clock,
31-
API: api,
32-
LeaseManager: orchestrator.WorkflowLeaseManager{
33-
Clock: clock,
34-
Repository: adapters2.NewInMemoryWorkflowLeaseRepository(),
35-
},
36-
Pool: pond.NewPool(10),
56+
api := workflowmgmt.NewSubsystem(clock, workflowRepo, runRepo, muuid.NewRandomUUIDGenerator()).API
57+
workflowLeaseRepository, err := adapters2.NewPostgresWorkflowLeaseRepository(dbConn)
58+
if err != nil {
59+
panic(err)
3760
}
3861

39-
ctx := context.Background()
40-
41-
fmt.Println("Enabling workflow...")
42-
if result := api.HandleCommand(ctx, core.EnableWorkflowCommand{
43-
WorkflowID: "my-workflow",
44-
}); result.Error != nil {
45-
panic(result.Error)
62+
leaseManager := orchestrator.WorkflowLeaseManager{
63+
Clock: clock,
64+
Repository: workflowLeaseRepository,
4665
}
4766

48-
fmt.Println("Triggering workflow...")
49-
if result := api.HandleCommand(ctx, core.TriggerWorkflowCommand{
50-
WorkflowID: "my-workflow",
51-
RunID: muuid.NewRandomUUIDGenerator().Generate().String(),
52-
}); result.Error != nil {
53-
panic(result.Error)
67+
checkpointStore, err := mpostgres.NewPostgreSQLCheckpointStore(dbConn)
68+
if err != nil {
69+
panic(err)
5470
}
71+
orch := orchestrator.NewWorkflowOrchestrator(
72+
clock,
73+
api,
74+
leaseManager,
75+
muuid.NewRandomUUIDGenerator(),
76+
eventStore,
77+
checkpointStore,
78+
)
79+
orch.Start()
80+
defer orch.Stop()
81+
82+
ctx := context.Background()
5583

56-
fmt.Println("Disabling workflow...")
57-
if result := api.HandleCommand(ctx, core.DisableWorkflowCommand{
84+
fmt.Println("Enabling workflow...")
85+
if result := api.HandleCommand(ctx, workflowmgmt.EnableWorkflowCommand{
5886
WorkflowID: "my-workflow",
5987
}); result.Error != nil {
6088
panic(result.Error)
6189
}
6290

63-
fmt.Println("Dispatching events through the orchestrator...")
64-
orch.Start()
65-
defer orch.Stop()
66-
67-
for _, event := range eventStore.Events() {
68-
if err := orch.HandleEvent(ctx, event); err != nil {
69-
panic(err)
91+
for i := range 1 {
92+
fmt.Printf("Triggering workflow #%d...\n", i+1)
93+
if result := api.HandleCommand(ctx, workflowmgmt.TriggerWorkflowCommand{
94+
WorkflowID: "my-workflow",
95+
RunID: muuid.NewRandomUUIDGenerator().Generate().String(),
96+
}); result.Error != nil {
97+
panic(result.Error)
7098
}
7199
}
72100

73-
for orch.IsRunning() {
74-
select {
75-
case <-time.After(15 * time.Second):
76-
fmt.Println("Stopping orchestrator after 15 seconds...")
77-
orch.Stop()
78-
}
101+
<-time.After(30 * time.Second)
102+
fmt.Println("Current events in the event store:")
103+
stream, err := eventStore.ReadFromStream(ctx, eventStore.GlobalStreamID(), misas.ReadFromEventStreamOptions{}.FromStart().Forward())
104+
if err != nil {
105+
panic(err)
79106
}
80107

81-
fmt.Println("Current events in the event store:")
82-
for i, event := range eventStore.Events() {
108+
for i, event := range stream.Events {
83109
fmt.Printf("Event %d: %T → %+v\n", i, event, event)
84110
}
85111
}

compose.debug.yaml

Lines changed: 0 additions & 8 deletions
This file was deleted.

compose.yaml

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
services:
2-
smallflow:
3-
image: smallflow
4-
build:
5-
context: .
6-
dockerfile: ./Dockerfile
2+
# smallflow:
3+
# image: smallflow
4+
# build:
5+
# context: .
6+
# dockerfile: ./Dockerfile
7+
# ports:
8+
# - 3000:3000
9+
10+
postgres:
11+
image: postgres:18-alpine
12+
environment:
13+
POSTGRES_USER: smallflow
14+
POSTGRES_PASSWORD: smallflow
15+
POSTGRES_DB: smallflow
716
ports:
8-
- 3000:3000
17+
- "5432:5432"
18+
volumes:
19+
- postgres:/var/lib/postgresql/data
20+
21+
volumes:
22+
postgres:

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ replace github.com/morebec/go-misas => ./../go-misas-back
77
require (
88
github.com/alitto/pond/v2 v2.5.0
99
github.com/morebec/go-misas v0.0.0-00010101000000-000000000000
10+
github.com/samber/lo v1.49.1
1011
)
1112

1213
require (
1314
github.com/google/uuid v1.6.0 // indirect
14-
github.com/samber/lo v1.49.1 // indirect
15+
github.com/lib/pq v1.10.9 // indirect
1516
go.opentelemetry.io/otel v1.35.0 // indirect
1617
go.opentelemetry.io/otel/trace v1.35.0 // indirect
1718
golang.org/x/text v0.24.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
2+
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
13
github.com/alitto/pond/v2 v2.5.0 h1:vPzS5GnvSDRhWQidmj2djHllOmjFExVFbDGCw1jdqDw=
24
github.com/alitto/pond/v2 v2.5.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE=
35
github.com/cucumber/gherkin/go/v26 v26.2.0 h1:EgIjePLWiPeslwIWmNQ3XHcypPsWAHoMCz/YEBKP4GI=
@@ -20,6 +22,8 @@ github.com/hashicorp/go-memdb v1.3.4 h1:XSL3NR682X/cVk2IeV0d70N4DZ9ljI885xAEU8Io
2022
github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg=
2123
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
2224
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
25+
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
26+
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
2327
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2428
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2529
github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew=

internal/core/adapters/es.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

internal/core/adapters/run_repo.go

Lines changed: 0 additions & 45 deletions
This file was deleted.

internal/core/adapters/workflow_repo.go

Lines changed: 0 additions & 65 deletions
This file was deleted.

internal/orchestrator/adapters/lease.go

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)