Skip to content

Commit 24c2a1c

Browse files
committed
Fix/improve event grouping
1 parent 2fb0a2c commit 24c2a1c

File tree

5 files changed

+39
-9
lines changed

5 files changed

+39
-9
lines changed

backend/mysql/mysql.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ func (b *mysqlBackend) CompleteWorkflowTask(
517517
if event.Type == history.EventType_WorkflowExecutionStarted {
518518
a := event.Attributes.(*history.ExecutionStartedAttributes)
519519
// Create new instance
520-
if err := createInstance(ctx, tx, targetInstance, a.Metadata, true); err != nil {
520+
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
521521
return err
522522
}
523523

backend/redis/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
210210
if event.Type == history.EventType_WorkflowExecutionStarted {
211211
// Create new instance
212212
a := event.Attributes.(*history.ExecutionStartedAttributes)
213-
if err := createInstanceP(ctx, p, targetInstance, a.Metadata, true); err != nil {
213+
if err := createInstanceP(ctx, p, &targetInstance, a.Metadata, true); err != nil {
214214
return err
215215
}
216216
}
@@ -222,7 +222,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
222222
}
223223

224224
// Try to queue workflow task
225-
if targetInstance != instance {
225+
if targetInstance != *instance {
226226
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstance.InstanceID, nil); err != nil {
227227
return fmt.Errorf("enqueuing workflow task: %w", err)
228228
}

backend/sqlite/sqlite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
406406
if event.Type == history.EventType_WorkflowExecutionStarted {
407407
a := event.Attributes.(*history.ExecutionStartedAttributes)
408408
// Create new instance
409-
if err := createInstance(ctx, tx, targetInstance, a.Metadata, true); err != nil {
409+
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
410410
return err
411411
}
412412

internal/history/grouping.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@ package history
22

33
import "github.com/cschleiden/go-workflows/internal/core"
44

5-
func EventsByWorkflowInstance(events []WorkflowEvent) map[*core.WorkflowInstance][]Event {
6-
groupedEvents := make(map[*core.WorkflowInstance][]Event)
5+
func EventsByWorkflowInstance(events []WorkflowEvent) map[core.WorkflowInstance][]Event {
6+
groupedEvents := make(map[core.WorkflowInstance][]Event)
77

88
for _, m := range events {
9-
if _, ok := groupedEvents[m.WorkflowInstance]; !ok {
10-
groupedEvents[m.WorkflowInstance] = []Event{}
9+
instance := *m.WorkflowInstance
10+
11+
if _, ok := groupedEvents[instance]; !ok {
12+
groupedEvents[instance] = []Event{}
1113
}
1214

13-
groupedEvents[m.WorkflowInstance] = append(groupedEvents[m.WorkflowInstance], m.HistoryEvent)
15+
groupedEvents[instance] = append(groupedEvents[instance], m.HistoryEvent)
1416
}
1517

1618
return groupedEvents

internal/history/grouping_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package history
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/cschleiden/go-workflows/internal/core"
8+
"github.com/google/uuid"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestGrouping_MultipleEventsSameInstance(t *testing.T) {
13+
id := uuid.NewString()
14+
15+
r := EventsByWorkflowInstance([]WorkflowEvent{
16+
{
17+
WorkflowInstance: core.NewWorkflowInstance(id, "exid"),
18+
HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}),
19+
},
20+
{
21+
WorkflowInstance: core.NewWorkflowInstance(id, "exid"),
22+
HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}),
23+
},
24+
})
25+
26+
require.Len(t, r, 1)
27+
require.Len(t, r[*core.NewWorkflowInstance(id, "exid")], 2)
28+
}

0 commit comments

Comments
 (0)