Skip to content

Commit fbb1332

Browse files
authored
Merge pull request #348 from cschleiden/namespaces
Pass workflow events as pointers
2 parents 1e0a1df + 4b922d1 commit fbb1332

File tree

16 files changed

+30
-26
lines changed

16 files changed

+30
-26
lines changed

backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type Backend interface {
5555
// completed or other workflow instances.
5656
CompleteWorkflowTask(
5757
ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState,
58-
executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error
58+
executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error
5959

6060
// GetActivityTask returns a pending activity task or nil if there are no pending activities
6161
GetActivityTask(ctx context.Context) (*ActivityTask, error)

backend/history/grouping.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package history
22

33
import "github.com/cschleiden/go-workflows/core"
44

5-
func EventsByWorkflowInstance(events []WorkflowEvent) map[core.WorkflowInstance][]WorkflowEvent {
6-
groupedEvents := make(map[core.WorkflowInstance][]WorkflowEvent)
5+
func EventsByWorkflowInstance(events []*WorkflowEvent) map[core.WorkflowInstance][]*WorkflowEvent {
6+
groupedEvents := make(map[core.WorkflowInstance][]*WorkflowEvent)
77

88
for _, m := range events {
99
instance := *m.WorkflowInstance
1010

1111
if _, ok := groupedEvents[instance]; !ok {
12-
groupedEvents[instance] = []WorkflowEvent{}
12+
groupedEvents[instance] = []*WorkflowEvent{}
1313
}
1414

1515
groupedEvents[instance] = append(groupedEvents[instance], m)

backend/history/grouping_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func TestGrouping_MultipleEventsSameInstance(t *testing.T) {
1313
id := uuid.NewString()
1414
instance := core.NewWorkflowInstance(id, "exid")
1515

16-
r := EventsByWorkflowInstance([]WorkflowEvent{
16+
r := EventsByWorkflowInstance([]*WorkflowEvent{
1717
{
1818
WorkflowInstance: instance,
1919
HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}),

backend/mock_Backend.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/monoprocess/monoprocess.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
8585
instance *workflow.Instance,
8686
state core.WorkflowInstanceState,
8787
executedEvents, activityEvents, timerEvents []*history.Event,
88-
workflowEvents []history.WorkflowEvent,
88+
workflowEvents []*history.WorkflowEvent,
8989
) error {
9090
if err := b.Backend.CompleteWorkflowTask(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents); err != nil {
9191
return err

backend/mysql/mysql.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
535535
instance *workflow.Instance,
536536
state core.WorkflowInstanceState,
537537
executedEvents, activityEvents, timerEvents []*history.Event,
538-
workflowEvents []history.WorkflowEvent,
538+
workflowEvents []*history.WorkflowEvent,
539539
) error {
540540
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
541541
Isolation: sql.LevelReadCommitted,

backend/options.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ type Options struct {
3737
// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
3838
// by that timeframe, it's considered abandoned and another worker might pick it up
3939
ActivityLockTimeout time.Duration
40+
41+
WorkflowNamespace string
42+
43+
ActivityNamespace string
4044
}
4145

4246
var DefaultOptions Options = Options{

backend/redis/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
9797
instance *core.WorkflowInstance,
9898
state core.WorkflowInstanceState,
9999
executedEvents, activityEvents, timerEvents []*history.Event,
100-
workflowEvents []history.WorkflowEvent,
100+
workflowEvents []*history.WorkflowEvent,
101101
) error {
102102
keys := make([]string, 0)
103103
args := make([]interface{}, 0)

backend/sqlite/sqlite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
459459
instance *workflow.Instance,
460460
state core.WorkflowInstanceState,
461461
executedEvents, activityEvents, timerEvents []*history.Event,
462-
workflowEvents []history.WorkflowEvent,
462+
workflowEvents []*history.WorkflowEvent,
463463
) error {
464464
tx, err := sb.db.BeginTx(ctx, nil)
465465
if err != nil {

backend/test/backendtest.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,11 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test
174174
require.NotNil(t, tk)
175175

176176
// Complete workflow task
177-
err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{})
177+
err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{})
178178
require.NoError(t, err)
179179

180180
// Task is already completed, this should error
181-
err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{})
181+
err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{})
182182
require.Error(t, err)
183183
},
184184
},
@@ -212,7 +212,7 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test
212212
activityScheduledEvent,
213213
}
214214

215-
workflowEvents := []history.WorkflowEvent{}
215+
workflowEvents := []*history.WorkflowEvent{}
216216

217217
err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateActive, events, activityEvents, []*history.Event{}, workflowEvents)
218218
require.NoError(t, err)
@@ -257,7 +257,7 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test
257257
events[i].SequenceID = sequenceID
258258
}
259259

260-
err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateFinished, events, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{})
260+
err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateFinished, events, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{})
261261
require.NoError(t, err)
262262

263263
time.Sleep(time.Second)
@@ -285,7 +285,7 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test
285285
// Simulate context and sub-workflow cancellation
286286
task, err := b.GetWorkflowTask(ctx)
287287
require.NoError(t, err)
288-
err = b.CompleteWorkflowTask(ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{
288+
err = b.CompleteWorkflowTask(ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{
289289
{
290290
WorkflowInstance: subInstance1,
291291
HistoryEvent: history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionCanceled, &history.SubWorkflowCancellationRequestedAttributes{
@@ -374,6 +374,6 @@ func startWorkflow(t *testing.T, ctx context.Context, b backend.Backend, c *clie
374374
require.NoError(t, err)
375375

376376
err = b.CompleteWorkflowTask(
377-
ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{})
377+
ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{})
378378
require.NoError(t, err)
379379
}

0 commit comments

Comments
 (0)