Skip to content

Commit 0ee3530

Browse files
authored
Merge pull request #240 from lovromazgon/monoprocess-backend
Monoprocess backend
2 parents 07a9af1 + 90442ea commit 0ee3530

File tree

5 files changed

+289
-56
lines changed

5 files changed

+289
-56
lines changed

backend/monoprocess/diagnostics.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package monoprocess
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/cschleiden/go-workflows/core"
8+
"github.com/cschleiden/go-workflows/diag"
9+
)
10+
11+
var _ diag.Backend = (*monoprocessBackend)(nil)
12+
13+
func (b *monoprocessBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
14+
if diagBackend, ok := b.Backend.(diag.Backend); ok {
15+
return diagBackend.GetWorkflowInstance(ctx, instance)
16+
}
17+
return nil, errors.New("not implemented")
18+
}
19+
20+
func (b *monoprocessBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
21+
if diagBackend, ok := b.Backend.(diag.Backend); ok {
22+
return diagBackend.GetWorkflowInstances(ctx, afterInstanceID, afterExecutionID, count)
23+
}
24+
return nil, errors.New("not implemented")
25+
}
26+
27+
func (b *monoprocessBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
28+
if diagBackend, ok := b.Backend.(diag.Backend); ok {
29+
return diagBackend.GetWorkflowTree(ctx, instance)
30+
}
31+
return nil, errors.New("not implemented")
32+
}

backend/monoprocess/monoprocess.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package monoprocess
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"reflect"
7+
"time"
8+
9+
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/backend/history"
11+
"github.com/cschleiden/go-workflows/core"
12+
"github.com/cschleiden/go-workflows/workflow"
13+
)
14+
15+
type monoprocessBackend struct {
16+
backend.Backend
17+
18+
workflowSignal chan struct{}
19+
activitySignal chan struct{}
20+
signalTimeout time.Duration
21+
22+
logger *slog.Logger
23+
}
24+
25+
var _ backend.Backend = (*monoprocessBackend)(nil)
26+
27+
// NewMonoprocessBackend wraps an existing backend and improves its responsiveness
28+
// in case the backend and worker are running in the same process. This backend
29+
// uses channels to notify the worker every time there is a new task ready to be
30+
// worked on. Note that only one worker will be notified.
31+
// IMPORTANT: Only use this backend when the backend and worker are running in
32+
// the same process.
33+
func NewMonoprocessBackend(b backend.Backend) *monoprocessBackend {
34+
mb := &monoprocessBackend{
35+
Backend: b,
36+
workflowSignal: make(chan struct{}, 1),
37+
activitySignal: make(chan struct{}, 1),
38+
logger: b.Logger(),
39+
}
40+
return mb
41+
}
42+
43+
func (b *monoprocessBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) {
44+
// loop until either we find a task or the context is cancelled
45+
for {
46+
if w, err := b.Backend.GetWorkflowTask(ctx); w != nil || err != nil {
47+
return w, err
48+
}
49+
b.logger.DebugContext(ctx, "worker waiting for workflow task signal")
50+
select {
51+
case <-ctx.Done():
52+
return nil, ctx.Err()
53+
case <-b.workflowSignal:
54+
b.logger.DebugContext(ctx, "worker got a workflow task signal")
55+
}
56+
}
57+
}
58+
59+
func (b *monoprocessBackend) GetActivityTask(ctx context.Context) (*backend.ActivityTask, error) {
60+
// loop until either we find a task or the context is cancelled
61+
for {
62+
if a, err := b.Backend.GetActivityTask(ctx); a != nil || err != nil {
63+
return a, err
64+
}
65+
b.logger.DebugContext(ctx, "worker waiting for activity task signal")
66+
select {
67+
case <-ctx.Done():
68+
return nil, ctx.Err()
69+
case <-b.activitySignal:
70+
b.logger.DebugContext(ctx, "worker got an activity task signal")
71+
}
72+
}
73+
}
74+
75+
func (b *monoprocessBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
76+
if err := b.Backend.CreateWorkflowInstance(ctx, instance, event); err != nil {
77+
return err
78+
}
79+
b.notifyWorkflowWorker(ctx)
80+
return nil
81+
}
82+
83+
func (b *monoprocessBackend) CompleteWorkflowTask(
84+
ctx context.Context,
85+
task *backend.WorkflowTask,
86+
instance *workflow.Instance,
87+
state core.WorkflowInstanceState,
88+
executedEvents, activityEvents, timerEvents []*history.Event,
89+
workflowEvents []history.WorkflowEvent,
90+
) error {
91+
if err := b.Backend.CompleteWorkflowTask(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents); err != nil {
92+
return err
93+
}
94+
95+
if len(activityEvents) > 0 {
96+
b.notifyActivityWorker(ctx)
97+
}
98+
99+
for _, e := range timerEvents {
100+
attr, ok := e.Attributes.(*history.TimerFiredAttributes)
101+
if !ok {
102+
b.logger.WarnContext(ctx, "unknown attributes type in timer event", "type", reflect.TypeOf(e.Attributes).String())
103+
continue
104+
}
105+
b.logger.DebugContext(ctx, "scheduling timer to notify workflow worker")
106+
// Note that the worker will be notified even if the timer event gets
107+
// cancelled. This is ok, because the poller will simply find no task
108+
// and continue.
109+
time.AfterFunc(attr.At.Sub(time.Now()), func() { b.notifyWorkflowWorker(context.Background()) })
110+
}
111+
112+
b.notifyWorkflowWorker(ctx)
113+
return nil
114+
}
115+
116+
func (b *monoprocessBackend) CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error {
117+
if err := b.Backend.CompleteActivityTask(ctx, instance, activityID, event); err != nil {
118+
return err
119+
}
120+
b.notifyWorkflowWorker(ctx)
121+
return nil
122+
}
123+
124+
func (b *monoprocessBackend) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error {
125+
if err := b.Backend.CancelWorkflowInstance(ctx, instance, cancelEvent); err != nil {
126+
return err
127+
}
128+
b.notifyWorkflowWorker(ctx)
129+
return nil
130+
}
131+
132+
func (b *monoprocessBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error {
133+
if err := b.Backend.SignalWorkflow(ctx, instanceID, event); err != nil {
134+
return err
135+
}
136+
b.notifyWorkflowWorker(ctx)
137+
return nil
138+
}
139+
140+
func (b *monoprocessBackend) notifyActivityWorker(ctx context.Context) {
141+
select {
142+
case b.activitySignal <- struct{}{}:
143+
b.logger.DebugContext(ctx, "signalled a new activity task to worker")
144+
default:
145+
// the signal channel already contains a signal, no need to add another
146+
}
147+
}
148+
149+
func (b *monoprocessBackend) notifyWorkflowWorker(ctx context.Context) {
150+
select {
151+
case b.workflowSignal <- struct{}{}:
152+
b.logger.DebugContext(ctx, "signalled a new workflow task to worker")
153+
default:
154+
// the signal channel already contains a signal, no need to add another
155+
}
156+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package monoprocess
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/backend/history"
10+
"github.com/cschleiden/go-workflows/backend/sqlite"
11+
"github.com/cschleiden/go-workflows/backend/test"
12+
)
13+
14+
func Test_MonoprocessBackend(t *testing.T) {
15+
if testing.Short() {
16+
t.Skip()
17+
}
18+
19+
test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
20+
// Disable sticky workflow behavior for the test execution
21+
options = append(options, backend.WithStickyTimeout(0))
22+
23+
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
24+
}, nil)
25+
}
26+
27+
func Test_EndToEndMonoprocessBackend(t *testing.T) {
28+
if testing.Short() {
29+
t.Skip()
30+
}
31+
32+
test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
33+
// Disable sticky workflow behavior for the test execution
34+
options = append(options, backend.WithStickyTimeout(0))
35+
36+
return NewMonoprocessBackend(sqlite.NewInMemoryBackend(options...))
37+
}, nil)
38+
}
39+
40+
var _ test.TestBackend = (*monoprocessBackend)(nil)
41+
42+
func (b *monoprocessBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
43+
if testBackend, ok := b.Backend.(test.TestBackend); ok {
44+
return testBackend.GetFutureEvents(ctx)
45+
}
46+
return nil, errors.New("not implemented")
47+
}

backend/sqlite/events.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,64 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/cschleiden/go-workflows/backend/test"
1011
"github.com/cschleiden/go-workflows/backend/history"
1112
"github.com/cschleiden/go-workflows/core"
1213
)
1314

15+
var _ test.TestBackend = (*sqliteBackend)(nil)
16+
17+
func (sb *sqliteBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
18+
tx, err := sb.db.BeginTx(ctx, nil)
19+
if err != nil {
20+
return nil, err
21+
}
22+
defer tx.Rollback()
23+
24+
// There is no index on `visible_at`, but this is okay for test only usage.
25+
futureEvents, err := tx.QueryContext(
26+
ctx,
27+
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
28+
)
29+
if err != nil {
30+
return nil, fmt.Errorf("getting history: %w", err)
31+
}
32+
33+
f := make([]*history.Event, 0)
34+
35+
for futureEvents.Next() {
36+
var instanceID, executionID string
37+
var attributes []byte
38+
39+
fe := &history.Event{}
40+
41+
if err := futureEvents.Scan(
42+
&fe.ID,
43+
&fe.SequenceID,
44+
&instanceID,
45+
&executionID,
46+
&fe.Type,
47+
&fe.Timestamp,
48+
&fe.ScheduleEventID,
49+
&attributes,
50+
&fe.VisibleAt,
51+
); err != nil {
52+
return nil, fmt.Errorf("scanning event: %w", err)
53+
}
54+
55+
a, err := history.DeserializeAttributes(fe.Type, attributes)
56+
if err != nil {
57+
return nil, fmt.Errorf("deserializing attributes: %w", err)
58+
}
59+
60+
fe.Attributes = a
61+
62+
f = append(f, fe)
63+
}
64+
65+
return f, nil
66+
}
67+
1468
func getPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance) ([]*history.Event, error) {
1569
now := time.Now()
1670
events, err := tx.QueryContext(

backend/sqlite/sqlite_test.go

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package sqlite
22

33
import (
4-
"context"
5-
"fmt"
64
"testing"
75

86
"github.com/cschleiden/go-workflows/backend"
9-
"github.com/cschleiden/go-workflows/backend/history"
107
"github.com/cschleiden/go-workflows/backend/test"
118
)
129

@@ -35,56 +32,3 @@ func Test_EndToEndSqliteBackend(t *testing.T) {
3532
return NewInMemoryBackend(options...)
3633
}, nil)
3734
}
38-
39-
var _ test.TestBackend = (*sqliteBackend)(nil)
40-
41-
func (sb *sqliteBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
42-
tx, err := sb.db.BeginTx(ctx, nil)
43-
if err != nil {
44-
return nil, err
45-
}
46-
defer tx.Rollback()
47-
48-
// There is no index on `visible_at`, but this is okay for test only usage.
49-
futureEvents, err := tx.QueryContext(
50-
ctx,
51-
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
52-
)
53-
if err != nil {
54-
return nil, fmt.Errorf("getting history: %w", err)
55-
}
56-
57-
f := make([]*history.Event, 0)
58-
59-
for futureEvents.Next() {
60-
var instanceID, executionID string
61-
var attributes []byte
62-
63-
fe := &history.Event{}
64-
65-
if err := futureEvents.Scan(
66-
&fe.ID,
67-
&fe.SequenceID,
68-
&instanceID,
69-
&executionID,
70-
&fe.Type,
71-
&fe.Timestamp,
72-
&fe.ScheduleEventID,
73-
&attributes,
74-
&fe.VisibleAt,
75-
); err != nil {
76-
return nil, fmt.Errorf("scanning event: %w", err)
77-
}
78-
79-
a, err := history.DeserializeAttributes(fe.Type, attributes)
80-
if err != nil {
81-
return nil, fmt.Errorf("deserializing attributes: %w", err)
82-
}
83-
84-
fe.Attributes = a
85-
86-
f = append(f, fe)
87-
}
88-
89-
return f, nil
90-
}

0 commit comments

Comments
 (0)