Skip to content

Commit 7243ce4

Browse files
authored
Merge branch 'main' into cschleiden/check-params
2 parents 79bb1b8 + ae76e98 commit 7243ce4

36 files changed

+181
-181
lines changed

backend/backend.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const TracerName = "go-workflow"
2222
//go:generate mockery --name=Backend --inpackage
2323
type Backend interface {
2424
// CreateWorkflowInstance creates a new workflow instance
25-
CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error
25+
CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error
2626

2727
// CancelWorkflowInstance cancels a running workflow instance
2828
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error
@@ -32,12 +32,12 @@ type Backend interface {
3232

3333
// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
3434
// is given, only events after that event are returned. Otherwise the full history is returned.
35-
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]history.Event, error)
35+
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error)
3636

3737
// SignalWorkflow signals a running workflow instance
3838
//
3939
// If the given instance does not exist, it will return an error
40-
SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error
40+
SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
4141

4242
// GetWorkflowInstance returns a pending workflow task or nil if there are no pending worflow executions
4343
GetWorkflowTask(ctx context.Context) (*task.Workflow, error)
@@ -52,13 +52,13 @@ type Backend interface {
5252
// completed or other workflow instances.
5353
CompleteWorkflowTask(
5454
ctx context.Context, task *task.Workflow, instance *workflow.Instance, state core.WorkflowInstanceState,
55-
executedEvents, activityEvents, timerEvents []history.Event, workflowEvents []history.WorkflowEvent) error
55+
executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error
5656

5757
// GetActivityTask returns a pending activity task or nil if there are no pending activities
5858
GetActivityTask(ctx context.Context) (*task.Activity, error)
5959

6060
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
61-
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event history.Event) error
61+
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error
6262

6363
// ExtendActivityTask extends the lock of an activity task
6464
ExtendActivityTask(ctx context.Context, activityID string) error

backend/mock_Backend.go

Lines changed: 13 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/events.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ import (
88
"github.com/cschleiden/go-workflows/internal/history"
99
)
1010

11-
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []history.Event) error {
11+
func insertPendingEvents(ctx context.Context, tx *sql.Tx, instanceID string, newEvents []*history.Event) error {
1212
return insertEvents(ctx, tx, "pending_events", instanceID, newEvents)
1313
}
1414

15-
func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instanceID string, historyEvents []history.Event) error {
15+
func insertHistoryEvents(ctx context.Context, tx *sql.Tx, instanceID string, historyEvents []*history.Event) error {
1616
return insertEvents(ctx, tx, "history", instanceID, historyEvents)
1717
}
1818

19-
func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID string, events []history.Event) error {
19+
func insertEvents(ctx context.Context, tx *sql.Tx, tableName string, instanceID string, events []*history.Event) error {
2020
const batchSize = 20
2121
for batchStart := 0; batchStart < len(events); batchStart += batchSize {
2222
batchEnd := batchStart + batchSize

backend/mysql/mysql.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type mysqlBackend struct {
6363
}
6464

6565
// CreateWorkflowInstance creates a new workflow instance
66-
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error {
66+
func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
6767
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
6868
Isolation: sql.LevelReadCommitted,
6969
})
@@ -78,7 +78,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor
7878
}
7979

8080
// Initial history is empty, store only new events
81-
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
81+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []*history.Event{event}); err != nil {
8282
return fmt.Errorf("inserting new event: %w", err)
8383
}
8484

@@ -127,14 +127,14 @@ func (b *mysqlBackend) CancelWorkflowInstance(ctx context.Context, instance *wor
127127
return err
128128
}
129129

130-
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{*event}); err != nil {
130+
if err := insertPendingEvents(ctx, tx, instanceID, []*history.Event{event}); err != nil {
131131
return fmt.Errorf("inserting cancellation event: %w", err)
132132
}
133133

134134
return tx.Commit()
135135
}
136136

137-
func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]history.Event, error) {
137+
func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error) {
138138
tx, err := b.db.BeginTx(ctx, nil)
139139
if err != nil {
140140
return nil, err
@@ -160,13 +160,13 @@ func (b *mysqlBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
160160
return nil, fmt.Errorf("getting history: %w", err)
161161
}
162162

163-
h := make([]history.Event, 0)
163+
h := make([]*history.Event, 0)
164164

165165
for historyEvents.Next() {
166166
var instanceID string
167167
var attributes []byte
168168

169-
historyEvent := history.Event{}
169+
historyEvent := &history.Event{}
170170

171171
if err := historyEvents.Scan(
172172
&historyEvent.ID,
@@ -260,7 +260,7 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
260260
}
261261

262262
// SignalWorkflow signals a running workflow instance
263-
func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, event history.Event) error {
263+
func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error {
264264
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
265265
Isolation: sql.LevelReadCommitted,
266266
})
@@ -275,7 +275,7 @@ func (b *mysqlBackend) SignalWorkflow(ctx context.Context, instanceID string, ev
275275
return backend.ErrInstanceNotFound
276276
}
277277

278-
if err := insertPendingEvents(ctx, tx, instanceID, []history.Event{event}); err != nil {
278+
if err := insertPendingEvents(ctx, tx, instanceID, []*history.Event{event}); err != nil {
279279
return fmt.Errorf("inserting signal event: %w", err)
280280
}
281281

@@ -365,7 +365,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
365365
WorkflowInstance: wfi,
366366
WorkflowInstanceState: core.WorkflowInstanceStateActive,
367367
Metadata: metadata,
368-
NewEvents: []history.Event{},
368+
NewEvents: []*history.Event{},
369369
}
370370

371371
// Get new events
@@ -383,7 +383,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
383383
var instanceID string
384384
var attributes []byte
385385

386-
historyEvent := history.Event{}
386+
historyEvent := &history.Event{}
387387

388388
if err := events.Scan(
389389
&historyEvent.ID,
@@ -440,7 +440,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
440440
task *task.Workflow,
441441
instance *workflow.Instance,
442442
state core.WorkflowInstanceState,
443-
executedEvents, activityEvents, timerEvents []history.Event,
443+
executedEvents, activityEvents, timerEvents []*history.Event,
444444
workflowEvents []history.WorkflowEvent,
445445
) error {
446446
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
@@ -537,7 +537,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
537537
}
538538
}
539539

540-
historyEvents := []history.Event{}
540+
historyEvents := []*history.Event{}
541541
for _, m := range events {
542542
historyEvents = append(historyEvents, m.HistoryEvent)
543543
}
@@ -611,7 +611,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
611611
var instanceID, executionID string
612612
var attributes []byte
613613
var metadataJson sql.NullString
614-
event := history.Event{}
614+
event := &history.Event{}
615615

616616
if err := res.Scan(
617617
&id, &event.ID, &instanceID, &executionID, &metadataJson, &event.Type,
@@ -660,7 +660,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
660660
}
661661

662662
// CompleteActivityTask completes a activity task retrieved using GetActivityTask
663-
func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workflow.Instance, id string, event history.Event) error {
663+
func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workflow.Instance, id string, event *history.Event) error {
664664
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
665665
Isolation: sql.LevelReadCommitted,
666666
})
@@ -691,7 +691,7 @@ func (b *mysqlBackend) CompleteActivityTask(ctx context.Context, instance *workf
691691
}
692692

693693
// Insert new event generated during this workflow execution
694-
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []history.Event{event}); err != nil {
694+
if err := insertPendingEvents(ctx, tx, instance.InstanceID, []*history.Event{event}); err != nil {
695695
return fmt.Errorf("inserting new events for completed activity: %w", err)
696696
}
697697

@@ -730,7 +730,7 @@ func (b *mysqlBackend) ExtendActivityTask(ctx context.Context, activityID string
730730
return tx.Commit()
731731
}
732732

733-
func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, event history.Event) error {
733+
func scheduleActivity(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance, event *history.Event) error {
734734
a, err := history.SerializeAttributes(event.Attributes)
735735
if err != nil {
736736
return err

backend/mysql/mysql_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestMySqlBackendE2E(t *testing.T) {
9999

100100
var _ test.TestBackend = (*mysqlBackend)(nil)
101101

102-
func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]history.Event, error) {
102+
func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
103103
tx, err := mb.db.BeginTx(ctx, nil)
104104
if err != nil {
105105
return nil, err
@@ -115,13 +115,13 @@ func (mb *mysqlBackend) GetFutureEvents(ctx context.Context) ([]history.Event, e
115115
return nil, fmt.Errorf("getting history: %w", err)
116116
}
117117

118-
f := make([]history.Event, 0)
118+
f := make([]*history.Event, 0)
119119

120120
for futureEvents.Next() {
121121
var instanceID string
122122
var attributes []byte
123123

124-
fe := history.Event{}
124+
fe := &history.Event{}
125125

126126
if err := futureEvents.Scan(
127127
&fe.ID,

backend/redis/activity.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ func (rb *redisBackend) ExtendActivityTask(ctx context.Context, activityID strin
4343
return err
4444
}
4545

46-
func (rb *redisBackend) CompleteActivityTask(ctx context.Context, instance *core.WorkflowInstance, activityID string, event history.Event) error {
46+
func (rb *redisBackend) CompleteActivityTask(ctx context.Context, instance *core.WorkflowInstance, activityID string, event *history.Event) error {
4747
p := rb.rdb.TxPipeline()
4848

49-
if err := rb.addWorkflowInstanceEventP(ctx, p, instance, &event); err != nil {
49+
if err := rb.addWorkflowInstanceEventP(ctx, p, instance, event); err != nil {
5050
return err
5151
}
5252

backend/redis/events.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ var addEventsToStreamCmd = redis.NewScript(`
3737
return msgID
3838
`)
3939

40-
func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []history.Event) error {
40+
func addEventsToHistoryStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
4141
eventsData := make([]string, 0)
4242
for _, event := range events {
4343
eventData, err := json.Marshal(event)

backend/redis/instance.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/go-redis/redis/v8"
1414
)
1515

16-
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event history.Event) error {
16+
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
1717
state, err := readInstance(ctx, rb.rdb, instance.InstanceID)
1818
if err != nil && err != backend.ErrInstanceNotFound {
1919
return err
@@ -57,7 +57,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
5757
return nil
5858
}
5959

60-
func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]history.Event, error) {
60+
func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error) {
6161
start := "-"
6262

6363
if lastSequenceID != nil {
@@ -69,9 +69,9 @@ func (rb *redisBackend) GetWorkflowInstanceHistory(ctx context.Context, instance
6969
return nil, err
7070
}
7171

72-
var events []history.Event
72+
var events []*history.Event
7373
for _, msg := range msgs {
74-
var event history.Event
74+
var event *history.Event
7575
if err := json.Unmarshal([]byte(msg.Values["event"].(string)), &event); err != nil {
7676
return nil, fmt.Errorf("unmarshaling event: %w", err)
7777
}

backend/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ type redisBackend struct {
102102
type activityData struct {
103103
Instance *core.WorkflowInstance `json:"instance,omitempty"`
104104
ID string `json:"id,omitempty"`
105-
Event history.Event `json:"event,omitempty"`
105+
Event *history.Event `json:"event,omitempty"`
106106
}
107107

108108
func (rb *redisBackend) Logger() log.Logger {

backend/redis/redis_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ var _ log.Logger = (*nullLogger)(nil)
128128
var _ test.TestBackend = (*redisBackend)(nil)
129129

130130
// GetFutureEvents
131-
func (rb *redisBackend) GetFutureEvents(ctx context.Context) ([]history.Event, error) {
131+
func (rb *redisBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
132132
r, err := rb.rdb.ZRangeByScore(ctx, futureEventsKey(), &redis.ZRangeBy{
133133
Min: "-inf",
134134
Max: "+inf",
@@ -138,15 +138,15 @@ func (rb *redisBackend) GetFutureEvents(ctx context.Context) ([]history.Event, e
138138
return nil, fmt.Errorf("getting future events: %w", err)
139139
}
140140

141-
events := make([]history.Event, 0)
141+
events := make([]*history.Event, 0)
142142

143143
for _, eventID := range r {
144144
eventStr, err := rb.rdb.HGet(ctx, eventID, "event").Result()
145145
if err != nil {
146146
return nil, fmt.Errorf("getting event %v: %w", eventID, err)
147147
}
148148

149-
var event history.Event
149+
var event *history.Event
150150
if err := json.Unmarshal([]byte(eventStr), &event); err != nil {
151151
return nil, fmt.Errorf("unmarshaling event %v: %w", eventID, err)
152152
}

0 commit comments

Comments
 (0)