Skip to content

Commit c798be9

Browse files
authored
Merge pull request #351 from cschleiden/queues
Add support for workflow and activity queues
2 parents 9d0dd35 + 97c903b commit c798be9

File tree

90 files changed

+2292
-820
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+2292
-820
lines changed

backend/backend.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package backend
33
import (
44
"context"
55
"errors"
6-
"log/slog"
76

8-
"github.com/cschleiden/go-workflows/backend/converter"
97
"github.com/cschleiden/go-workflows/backend/history"
108
"github.com/cschleiden/go-workflows/backend/metrics"
119
"github.com/cschleiden/go-workflows/core"
@@ -42,47 +40,47 @@ type Backend interface {
4240
// If the given instance does not exist, it will return an error
4341
SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
4442

43+
// PrepareWorkflowQueues prepares workflow queues for later consumption using this backend instane
44+
PrepareWorkflowQueues(ctx context.Context, queues []workflow.Queue) error
45+
46+
// PrepareActivityQueues prepares activity queues for later consumption using this backend instance
47+
PrepareActivityQueues(ctx context.Context, queues []workflow.Queue) error
48+
4549
// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
46-
GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)
50+
GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*WorkflowTask, error)
4751

4852
// ExtendWorkflowTask extends the lock of a workflow task
49-
ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error
53+
ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error
5054

5155
// CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
5256
//
5357
// This checkpoints the execution. events are new events from the last workflow execution
5458
// which will be added to the workflow instance history. workflowEvents are new events for the
5559
// completed or other workflow instances.
5660
CompleteWorkflowTask(
57-
ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState,
61+
ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState,
5862
executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error
5963

6064
// GetActivityTask returns a pending activity task or nil if there are no pending activities
61-
GetActivityTask(ctx context.Context) (*ActivityTask, error)
62-
63-
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
64-
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error
65+
GetActivityTask(ctx context.Context, queues []workflow.Queue) (*ActivityTask, error)
6566

6667
// ExtendActivityTask extends the lock of an activity task
67-
ExtendActivityTask(ctx context.Context, activityID string) error
68+
ExtendActivityTask(ctx context.Context, task *ActivityTask) error
69+
70+
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
71+
CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error
6872

6973
// GetStats returns stats about the backend
7074
GetStats(ctx context.Context) (*Stats, error)
7175

72-
// Logger returns the configured logger for the backend
73-
Logger() *slog.Logger
74-
7576
// Tracer returns the configured trace provider for the backend
7677
Tracer() trace.Tracer
7778

7879
// Metrics returns the configured metrics client for the backend
7980
Metrics() metrics.Client
8081

81-
// Converter returns the configured converter for the backend
82-
Converter() converter.Converter
83-
84-
// ContextPropagators returns the configured context propagators for the backend
85-
ContextPropagators() []workflow.ContextPropagator
82+
// Options returns the configured options for the backend
83+
Options() *Options
8684

8785
// Close closes any underlying resources
8886
Close() error

backend/history/activity_scheduled.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package history
33
import (
44
"github.com/cschleiden/go-workflows/backend/metadata"
55
"github.com/cschleiden/go-workflows/backend/payload"
6+
"github.com/cschleiden/go-workflows/core"
67
)
78

89
type ActivityScheduledAttributes struct {
@@ -13,4 +14,6 @@ type ActivityScheduledAttributes struct {
1314
Inputs []payload.Payload `json:"inputs,omitempty"`
1415

1516
Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`
17+
18+
Queue core.Queue `json:"queue,omitempty"`
1619
}

backend/history/subworkflow_scheduled.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
)
88

99
type SubWorkflowScheduledAttributes struct {
10+
SubWorkflowQueue core.Queue `json:"sub_workflow_queue,omitempty"`
11+
1012
SubWorkflowInstance *core.WorkflowInstance `json:"sub_workflow_instance,omitempty"`
1113

1214
Name string `json:"name,omitempty"`

backend/history/workflow_started.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package history
33
import (
44
"github.com/cschleiden/go-workflows/backend/metadata"
55
"github.com/cschleiden/go-workflows/backend/payload"
6+
"github.com/cschleiden/go-workflows/core"
67
)
78

89
type ExecutionStartedAttributes struct {
10+
Queue core.Queue `json:"queue,omitempty"`
11+
912
Name string `json:"name,omitempty"`
1013

1114
Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`

backend/mock_Backend.go

Lines changed: 77 additions & 87 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)