Skip to content

Commit 8e566f3

Browse files
committed
Move tasks from task package to backend
1 parent 2047825 commit 8e566f3

File tree

15 files changed

+84
-98
lines changed

15 files changed

+84
-98
lines changed

backend/backend.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"log/slog"
77

8-
"github.com/cschleiden/go-workflows/backend/task"
98
"github.com/cschleiden/go-workflows/converter"
109
"github.com/cschleiden/go-workflows/internal/contextpropagation"
1110
core "github.com/cschleiden/go-workflows/internal/core"
@@ -45,7 +44,7 @@ type Backend interface {
4544
SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
4645

4746
// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
48-
GetWorkflowTask(ctx context.Context) (*task.Workflow, error)
47+
GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)
4948

5049
// ExtendWorkflowTask extends the lock of a workflow task
5150
ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error
@@ -56,11 +55,11 @@ type Backend interface {
5655
// which will be added to the workflow instance history. workflowEvents are new events for the
5756
// completed or other workflow instances.
5857
CompleteWorkflowTask(
59-
ctx context.Context, task *task.Workflow, instance *workflow.Instance, state core.WorkflowInstanceState,
58+
ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState,
6059
executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error
6160

6261
// GetActivityTask returns a pending activity task or nil if there are no pending activities
63-
GetActivityTask(ctx context.Context) (*task.Activity, error)
62+
GetActivityTask(ctx context.Context) (*ActivityTask, error)
6463

6564
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
6665
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error

backend/mock_Backend.go

Lines changed: 12 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/mysql.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
"github.com/cschleiden/go-workflows/backend"
15-
"github.com/cschleiden/go-workflows/backend/task"
1615
"github.com/cschleiden/go-workflows/converter"
1716
"github.com/cschleiden/go-workflows/internal/contextpropagation"
1817
"github.com/cschleiden/go-workflows/internal/core"
@@ -315,7 +314,7 @@ func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, ev
315314
}
316315

317316
// GetWorkflowInstance returns a pending workflow task or nil if there are no pending worflow executions
318-
func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
317+
func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) {
319318
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
320319
Isolation: sql.LevelReadCommitted,
321320
})
@@ -393,7 +392,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
393392
}
394393
}
395394

396-
t := &task.Workflow{
395+
t := &backend.WorkflowTask{
397396
ID: wfi.InstanceID,
398397
WorkflowInstance: wfi,
399398
WorkflowInstanceState: core.WorkflowInstanceStateActive,
@@ -469,7 +468,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
469468
// completed or other workflow instances.
470469
func (b *mysqlBackend) CompleteWorkflowTask(
471470
ctx context.Context,
472-
task *task.Workflow,
471+
task *backend.WorkflowTask,
473472
instance *workflow.Instance,
474473
state core.WorkflowInstanceState,
475474
executedEvents, activityEvents, timerEvents []*history.Event,
@@ -617,7 +616,7 @@ func (b *mysqlBackend) ExtendWorkflowTask(ctx context.Context, taskID string, in
617616
}
618617

619618
// GetActivityTask returns a pending activity task or nil if there are no pending activities
620-
func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, error) {
619+
func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*backend.ActivityTask, error) {
621620
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
622621
Isolation: sql.LevelReadCommitted,
623622
})
@@ -671,7 +670,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
671670
return nil, fmt.Errorf("locking activity: %w", err)
672671
}
673672

674-
t := &task.Activity{
673+
t := &backend.ActivityTask{
675674
ID: event.ID,
676675
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
677676
Event: event,

backend/redis/activity.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package redis
33
import (
44
"context"
55

6-
"github.com/cschleiden/go-workflows/backend/task"
6+
"github.com/cschleiden/go-workflows/backend"
77
"github.com/cschleiden/go-workflows/internal/core"
88
"github.com/cschleiden/go-workflows/internal/history"
99
)
1010

11-
func (rb *redisBackend) GetActivityTask(ctx context.Context) (*task.Activity, error) {
11+
func (rb *redisBackend) GetActivityTask(ctx context.Context) (*backend.ActivityTask, error) {
1212
activityTask, err := rb.activityQueue.Dequeue(ctx, rb.rdb, rb.options.ActivityLockTimeout, rb.options.BlockTimeout)
1313
if err != nil {
1414
return nil, err
@@ -18,7 +18,7 @@ func (rb *redisBackend) GetActivityTask(ctx context.Context) (*task.Activity, er
1818
return nil, nil
1919
}
2020

21-
return &task.Activity{
21+
return &backend.ActivityTask{
2222
WorkflowInstance: activityTask.Data.Instance,
2323
ID: activityTask.TaskID, // Use the queue generated ID here
2424
Event: activityTask.Data.Event,

backend/redis/workflow.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"strconv"
88
"time"
99

10-
"github.com/cschleiden/go-workflows/backend/task"
10+
"github.com/cschleiden/go-workflows/backend"
1111
"github.com/cschleiden/go-workflows/internal/core"
1212
"github.com/cschleiden/go-workflows/internal/history"
1313
"github.com/cschleiden/go-workflows/internal/tracing"
@@ -54,7 +54,7 @@ var futureEventsCmd = redis.NewScript(`
5454
return #events
5555
`)
5656

57-
func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
57+
func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) {
5858
// Check for future events
5959
now := time.Now().UnixMilli()
6060
nowStr := strconv.FormatInt(now, 10)
@@ -101,7 +101,7 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
101101
newEvents = append(newEvents, event)
102102
}
103103

104-
return &task.Workflow{
104+
return &backend.WorkflowTask{
105105
ID: instanceTask.TaskID,
106106
WorkflowInstance: instanceState.Instance,
107107
WorkflowInstanceState: instanceState.State,
@@ -148,7 +148,7 @@ var requeueInstanceCmd = redis.NewScript(`
148148

149149
func (rb *redisBackend) CompleteWorkflowTask(
150150
ctx context.Context,
151-
task *task.Workflow,
151+
task *backend.WorkflowTask,
152152
instance *core.WorkflowInstance,
153153
state core.WorkflowInstanceState,
154154
executedEvents, activityEvents, timerEvents []*history.Event,

backend/sqlite/sqlite.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
"github.com/cschleiden/go-workflows/backend"
15-
"github.com/cschleiden/go-workflows/backend/task"
1615
"github.com/cschleiden/go-workflows/converter"
1716
"github.com/cschleiden/go-workflows/internal/contextpropagation"
1817
"github.com/cschleiden/go-workflows/internal/core"
@@ -269,7 +268,7 @@ func (sb *sqliteBackend) SignalWorkflow(ctx context.Context, instanceID string,
269268
return tx.Commit()
270269
}
271270

272-
func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
271+
func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*backend.WorkflowTask, error) {
273272
tx, err := sb.db.BeginTx(ctx, nil)
274273
if err != nil {
275274
return nil, err
@@ -332,7 +331,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
332331
}
333332
}
334333

335-
t := &task.Workflow{
334+
t := &backend.WorkflowTask{
336335
ID: wfi.InstanceID,
337336
WorkflowInstance: wfi,
338337
WorkflowInstanceState: core.WorkflowInstanceStateActive,
@@ -371,7 +370,7 @@ func (sb *sqliteBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, e
371370

372371
func (sb *sqliteBackend) CompleteWorkflowTask(
373372
ctx context.Context,
374-
task *task.Workflow,
373+
task *backend.WorkflowTask,
375374
instance *workflow.Instance,
376375
state core.WorkflowInstanceState,
377376
executedEvents, activityEvents, timerEvents []*history.Event,
@@ -508,7 +507,7 @@ func (sb *sqliteBackend) ExtendWorkflowTask(ctx context.Context, taskID string,
508507
return tx.Commit()
509508
}
510509

511-
func (sb *sqliteBackend) GetActivityTask(ctx context.Context) (*task.Activity, error) {
510+
func (sb *sqliteBackend) GetActivityTask(ctx context.Context) (*backend.ActivityTask, error) {
512511
tx, err := sb.db.BeginTx(ctx, nil)
513512
if err != nil {
514513
return nil, err
@@ -572,7 +571,7 @@ func (sb *sqliteBackend) GetActivityTask(ctx context.Context) (*task.Activity, e
572571
return nil, fmt.Errorf("unmarshaling metadata: %w", err)
573572
}
574573

575-
t := &task.Activity{
574+
t := &backend.ActivityTask{
576575
ID: event.ID,
577576
WorkflowInstance: core.NewWorkflowInstance(instanceID, executionID),
578577
Event: event,

backend/task/activity.go

Lines changed: 0 additions & 14 deletions
This file was deleted.

backend/task/workflow.go renamed to backend/tasks.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
package task
1+
package backend
22

33
import (
44
"github.com/cschleiden/go-workflows/internal/core"
55
"github.com/cschleiden/go-workflows/internal/history"
66
)
77

8-
type Workflow struct {
8+
type WorkflowTask struct {
99
// ID is an identifier for this task. It's set by the backend
1010
ID string
1111

@@ -25,3 +25,11 @@ type Workflow struct {
2525
// Backend specific data, only the producer of the task should rely on this.
2626
CustomData any
2727
}
28+
29+
type ActivityTask struct {
30+
ID string
31+
32+
WorkflowInstance *core.WorkflowInstance
33+
34+
Event *history.Event
35+
}

internal/activity/executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"log/slog"
88
"reflect"
99

10-
"github.com/cschleiden/go-workflows/backend/task"
10+
"github.com/cschleiden/go-workflows/backend"
1111
"github.com/cschleiden/go-workflows/converter"
1212
"github.com/cschleiden/go-workflows/internal/args"
1313
"github.com/cschleiden/go-workflows/internal/contextpropagation"
@@ -38,7 +38,7 @@ func NewExecutor(logger *slog.Logger, tracer trace.Tracer, converter converter.C
3838
}
3939
}
4040

41-
func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (payload.Payload, error) {
41+
func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTask) (payload.Payload, error) {
4242
a := task.Event.Attributes.(*history.ActivityScheduledAttributes)
4343

4444
activity, err := e.r.GetActivity(a.Name)

internal/activity/executor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"testing"
88
"time"
99

10-
"github.com/cschleiden/go-workflows/backend/task"
10+
"github.com/cschleiden/go-workflows/backend"
1111
"github.com/cschleiden/go-workflows/converter"
1212
"github.com/cschleiden/go-workflows/internal/args"
1313
"github.com/cschleiden/go-workflows/internal/core"
@@ -116,7 +116,7 @@ func TestExecutor_ExecuteActivity(t *testing.T) {
116116
converter: converter.DefaultConverter,
117117
tracer: trace.NewNoopTracerProvider().Tracer(""),
118118
}
119-
got, err := e.ExecuteActivity(context.Background(), &task.Activity{
119+
got, err := e.ExecuteActivity(context.Background(), &backend.ActivityTask{
120120
ID: uuid.NewString(),
121121
WorkflowInstance: core.NewWorkflowInstance("instanceID", "executionID"),
122122
Event: history.NewHistoryEvent(1, time.Now(), history.EventType_ActivityScheduled, attr),

0 commit comments

Comments
 (0)