Skip to content

Commit cbe93c0

Browse files
committed
create monoprocess backend
1 parent 1b65795 commit cbe93c0

File tree

3 files changed

+305
-0
lines changed

3 files changed

+305
-0
lines changed

backend/monoprocess/diagnostics.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package monoprocess
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/cschleiden/go-workflows/diag"
8+
"github.com/cschleiden/go-workflows/internal/core"
9+
)
10+
11+
var _ diag.Backend = (*monoprocessBackend)(nil)
12+
13+
func (b *monoprocessBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
14+
if diagBackend, ok := b.Backend.(diag.Backend); ok {
15+
return diagBackend.GetWorkflowInstance(ctx, instance)
16+
}
17+
return nil, errors.New("not implemented")
18+
}
19+
20+
func (b *monoprocessBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
21+
if diagBackend, ok := b.Backend.(diag.Backend); ok {
22+
return diagBackend.GetWorkflowInstances(ctx, afterInstanceID, afterExecutionID, count)
23+
}
24+
return nil, errors.New("not implemented")
25+
}
26+
27+
func (b *monoprocessBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
28+
if diagBackend, ok := b.Backend.(diag.Backend); ok {
29+
return diagBackend.GetWorkflowTree(ctx, instance)
30+
}
31+
return nil, errors.New("not implemented")
32+
}

backend/monoprocess/monoprocess.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package monoprocess
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"reflect"
7+
"time"
8+
9+
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/internal/core"
11+
"github.com/cschleiden/go-workflows/internal/history"
12+
"github.com/cschleiden/go-workflows/internal/task"
13+
"github.com/cschleiden/go-workflows/workflow"
14+
)
15+
16+
type monoprocessBackend struct {
17+
backend.Backend
18+
19+
workflowSignal chan struct{}
20+
activitySignal chan struct{}
21+
signalTimeout time.Duration
22+
23+
logger *slog.Logger
24+
}
25+
26+
func NewMonoprocessBackend(b backend.Backend, signalBufferSize int, signalTimeout time.Duration) *monoprocessBackend {
27+
if signalTimeout <= 0 {
28+
signalTimeout = time.Second
29+
}
30+
mb := &monoprocessBackend{
31+
Backend: b,
32+
workflowSignal: make(chan struct{}, signalBufferSize),
33+
activitySignal: make(chan struct{}, signalBufferSize),
34+
signalTimeout: signalTimeout,
35+
logger: b.Logger(),
36+
}
37+
return mb
38+
}
39+
40+
func (b *monoprocessBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
41+
if w, err := b.Backend.GetWorkflowTask(ctx); w != nil || err != nil {
42+
return w, err
43+
}
44+
b.logger.Debug("worker waiting for workflow task signal")
45+
select {
46+
case <-ctx.Done():
47+
return nil, ctx.Err()
48+
case <-b.workflowSignal:
49+
b.logger.Debug("worker got a workflow task signal")
50+
return b.GetWorkflowTask(ctx)
51+
}
52+
}
53+
54+
func (b *monoprocessBackend) GetActivityTask(ctx context.Context) (*task.Activity, error) {
55+
if a, err := b.Backend.GetActivityTask(ctx); a != nil || err != nil {
56+
return a, err
57+
}
58+
b.logger.Debug("worker waiting for activity task signal")
59+
select {
60+
case <-ctx.Done():
61+
return nil, ctx.Err()
62+
case <-b.activitySignal:
63+
b.logger.Debug("worker got an activity task signal")
64+
return b.GetActivityTask(ctx)
65+
}
66+
}
67+
68+
func (b *monoprocessBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
69+
if err := b.Backend.CreateWorkflowInstance(ctx, instance, event); err != nil {
70+
return err
71+
}
72+
b.notifyWorkflowWorker(ctx)
73+
return nil
74+
}
75+
76+
func (b *monoprocessBackend) CompleteWorkflowTask(
77+
ctx context.Context,
78+
task *task.Workflow,
79+
instance *workflow.Instance,
80+
state core.WorkflowInstanceState,
81+
executedEvents, activityEvents, timerEvents []*history.Event,
82+
workflowEvents []history.WorkflowEvent,
83+
) error {
84+
if err := b.Backend.CompleteWorkflowTask(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents); err != nil {
85+
return err
86+
}
87+
for _, e := range activityEvents {
88+
if e.Type != history.EventType_ActivityScheduled {
89+
continue
90+
}
91+
if !b.notifyActivityWorker(ctx) {
92+
break // no reason to notify more
93+
}
94+
}
95+
for _, e := range timerEvents {
96+
attr, ok := e.Attributes.(*history.TimerFiredAttributes)
97+
if !ok {
98+
b.logger.Warn("unknown attributes type in timer event", "type", reflect.TypeOf(e.Attributes).String())
99+
continue
100+
}
101+
b.logger.Debug("scheduling timer to notify workflow worker")
102+
time.AfterFunc(attr.At.Sub(time.Now()), func() { b.notifyWorkflowWorker(ctx) }) // TODO: cancel timer if the event gets cancelled
103+
}
104+
for _, e := range workflowEvents {
105+
if e.HistoryEvent.Type != history.EventType_WorkflowExecutionStarted &&
106+
e.HistoryEvent.Type != history.EventType_SubWorkflowCompleted &&
107+
e.HistoryEvent.Type != history.EventType_WorkflowExecutionCanceled {
108+
continue
109+
}
110+
if !b.notifyWorkflowWorker(ctx) {
111+
break // no reason to notify more
112+
}
113+
}
114+
return nil
115+
}
116+
117+
func (b *monoprocessBackend) CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error {
118+
if err := b.Backend.CompleteActivityTask(ctx, instance, activityID, event); err != nil {
119+
return err
120+
}
121+
b.notifyWorkflowWorker(ctx)
122+
return nil
123+
}
124+
125+
func (b *monoprocessBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error {
126+
if err := b.Backend.CancelWorkflowInstance(ctx, instance, cancelEvent); err != nil {
127+
return err
128+
}
129+
b.notifyWorkflowWorker(ctx)
130+
return nil
131+
}
132+
133+
func (b *monoprocessBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error {
134+
if err := b.Backend.SignalWorkflow(ctx, instanceID, event); err != nil {
135+
return err
136+
}
137+
b.notifyWorkflowWorker(ctx)
138+
return nil
139+
}
140+
141+
func (b *monoprocessBackend) notifyActivityWorker(ctx context.Context) bool {
142+
select {
143+
case <-ctx.Done():
144+
// we didn't manage to notify the worker that there is a new task, it
145+
// will pick it up after the poll timeout
146+
b.logger.Debug("failed to signal activity task to worker, context cancelled")
147+
case <-time.After(b.signalTimeout):
148+
// we didn't manage to notify the worker that there is a new task, it
149+
// will pick it up after the poll timeout
150+
b.logger.Debug("failed to signal activity task to worker, timeout")
151+
case b.activitySignal <- struct{}{}:
152+
b.logger.Debug("signalled a new activity task to worker")
153+
return true
154+
}
155+
return false
156+
}
157+
158+
func (b *monoprocessBackend) notifyWorkflowWorker(ctx context.Context) bool {
159+
select {
160+
case <-ctx.Done():
161+
// we didn't manage to notify the worker that there is a new task, it
162+
// will pick it up after the poll timeout
163+
b.logger.Debug("failed to signal workflow task to worker, context cancelled")
164+
case <-time.After(b.signalTimeout):
165+
// we didn't manage to notify the worker that there is a new task, it
166+
// will pick it up after the poll timeout
167+
b.logger.Debug("failed to signal workflow task to worker, timeout")
168+
case b.workflowSignal <- struct{}{}:
169+
b.logger.Debug("signalled a new workflow task to worker")
170+
return true
171+
}
172+
return false
173+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package monoprocess
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"reflect"
8+
"testing"
9+
"time"
10+
"unsafe"
11+
12+
"github.com/cschleiden/go-workflows/backend"
13+
"github.com/cschleiden/go-workflows/backend/sqlite"
14+
"github.com/cschleiden/go-workflows/backend/test"
15+
"github.com/cschleiden/go-workflows/internal/history"
16+
)
17+
18+
func Test_MonoprocessBackend(t *testing.T) {
19+
if testing.Short() {
20+
t.Skip()
21+
}
22+
23+
test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
24+
// Disable sticky workflow behavior for the test execution
25+
options = append(options, backend.WithStickyTimeout(0))
26+
27+
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...), 0, time.Millisecond)
28+
}, nil)
29+
}
30+
31+
func Test_EndToEndMonoprocessBackend(t *testing.T) {
32+
if testing.Short() {
33+
t.Skip()
34+
}
35+
36+
test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
37+
// Disable sticky workflow behavior for the test execution
38+
options = append(options, backend.WithStickyTimeout(0))
39+
40+
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...), 0, 0)
41+
}, nil)
42+
}
43+
44+
var _ test.TestBackend = (*monoprocessBackend)(nil)
45+
46+
func (b *monoprocessBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
47+
// Reflection hack to access db in private field of sqlite.sqliteBackend
48+
privateDbField := reflect.ValueOf(b.Backend).Elem().FieldByName("db")
49+
hackedDbField := reflect.NewAt(privateDbField.Type(), unsafe.Pointer(privateDbField.UnsafeAddr())).Elem()
50+
db := hackedDbField.Interface().(*sql.DB)
51+
52+
tx, err := db.BeginTx(ctx, nil)
53+
if err != nil {
54+
return nil, err
55+
}
56+
defer tx.Rollback()
57+
58+
// There is no index on `visible_at`, but this is okay for test only usage.
59+
futureEvents, err := tx.QueryContext(
60+
ctx,
61+
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
62+
)
63+
if err != nil {
64+
return nil, fmt.Errorf("getting history: %w", err)
65+
}
66+
67+
f := make([]*history.Event, 0)
68+
69+
for futureEvents.Next() {
70+
var instanceID, executionID string
71+
var attributes []byte
72+
73+
fe := &history.Event{}
74+
75+
if err := futureEvents.Scan(
76+
&fe.ID,
77+
&fe.SequenceID,
78+
&instanceID,
79+
&executionID,
80+
&fe.Type,
81+
&fe.Timestamp,
82+
&fe.ScheduleEventID,
83+
&attributes,
84+
&fe.VisibleAt,
85+
); err != nil {
86+
return nil, fmt.Errorf("scanning event: %w", err)
87+
}
88+
89+
a, err := history.DeserializeAttributes(fe.Type, attributes)
90+
if err != nil {
91+
return nil, fmt.Errorf("deserializing attributes: %w", err)
92+
}
93+
94+
fe.Attributes = a
95+
96+
f = append(f, fe)
97+
}
98+
99+
return f, nil
100+
}

0 commit comments

Comments
 (0)