Skip to content

Commit df5d91e

Browse files
committed
Fix event grouping when delivering multiple events to a workflow instance in one checkpoint
1 parent 4581b48 commit df5d91e

File tree

6 files changed

+47
-36
lines changed

6 files changed

+47
-36
lines changed

backend/mysql/mysql.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -510,22 +510,27 @@ func (b *mysqlBackend) CompleteWorkflowTask(
510510
}
511511

512512
// Insert new workflow events
513-
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
513+
groupedEvents := history.EventsByWorkflowInstanceID(workflowEvents)
514514

515-
for targetInstance, events := range groupedEvents {
516-
for _, event := range events {
517-
if event.Type == history.EventType_WorkflowExecutionStarted {
518-
a := event.Attributes.(*history.ExecutionStartedAttributes)
515+
for targetInstanceID, events := range groupedEvents {
516+
for _, m := range events {
517+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
518+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
519519
// Create new instance
520-
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
520+
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
521521
return err
522522
}
523523

524524
break
525525
}
526526
}
527527

528-
if err := insertPendingEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
528+
historyEvents := []history.Event{}
529+
for _, m := range events {
530+
historyEvents = append(historyEvents, m.HistoryEvent)
531+
}
532+
533+
if err := insertPendingEvents(ctx, tx, targetInstanceID, historyEvents); err != nil {
529534
return fmt.Errorf("inserting messages: %w", err)
530535
}
531536
}

backend/redis/workflow.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,29 +201,29 @@ func (rb *redisBackend) CompleteWorkflowTask(
201201
}
202202

203203
// Send new workflow events to the respective streams
204-
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
205-
for targetInstance, events := range groupedEvents {
204+
groupedEvents := history.EventsByWorkflowInstanceID(workflowEvents)
205+
for targetInstanceID, events := range groupedEvents {
206206
// Insert pending events for target instance
207-
for _, event := range events {
208-
event := event
207+
for _, m := range events {
208+
m := m
209209

210-
if event.Type == history.EventType_WorkflowExecutionStarted {
210+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
211211
// Create new instance
212-
a := event.Attributes.(*history.ExecutionStartedAttributes)
213-
if err := createInstanceP(ctx, p, &targetInstance, a.Metadata, true); err != nil {
212+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
213+
if err := createInstanceP(ctx, p, m.WorkflowInstance, a.Metadata, true); err != nil {
214214
return err
215215
}
216216
}
217217

218218
// Add pending event to stream
219-
if err := addEventToStreamP(ctx, p, pendingEventsKey(targetInstance.InstanceID), &event); err != nil {
219+
if err := addEventToStreamP(ctx, p, pendingEventsKey(targetInstanceID), &m.HistoryEvent); err != nil {
220220
return err
221221
}
222222
}
223223

224224
// Try to queue workflow task
225-
if targetInstance != *instance {
226-
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstance.InstanceID, nil); err != nil {
225+
if targetInstanceID != instance.InstanceID {
226+
if err := rb.workflowQueue.Enqueue(ctx, p, targetInstanceID, nil); err != nil {
227227
return fmt.Errorf("enqueuing workflow task: %w", err)
228228
}
229229
}

backend/sqlite/sqlite.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -399,14 +399,14 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
399399
}
400400

401401
// Insert new workflow events
402-
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
402+
groupedEvents := history.EventsByWorkflowInstanceID(workflowEvents)
403403

404-
for targetInstance, events := range groupedEvents {
405-
for _, event := range events {
406-
if event.Type == history.EventType_WorkflowExecutionStarted {
407-
a := event.Attributes.(*history.ExecutionStartedAttributes)
404+
for targetInstanceID, events := range groupedEvents {
405+
for _, m := range events {
406+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
407+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
408408
// Create new instance
409-
if err := createInstance(ctx, tx, &targetInstance, a.Metadata, true); err != nil {
409+
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
410410
return err
411411
}
412412

@@ -415,7 +415,11 @@ func (sb *sqliteBackend) CompleteWorkflowTask(
415415
}
416416

417417
// Insert pending events for target instance
418-
if err := insertPendingEvents(ctx, tx, targetInstance.InstanceID, events); err != nil {
418+
historyEvents := []history.Event{}
419+
for _, m := range events {
420+
historyEvents = append(historyEvents, m.HistoryEvent)
421+
}
422+
if err := insertPendingEvents(ctx, tx, targetInstanceID, historyEvents); err != nil {
419423
return fmt.Errorf("inserting messages: %w", err)
420424
}
421425
}

backend/test/e2e.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,9 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
298298
}
299299
wf := func(ctx workflow.Context) (int, error) {
300300
id, _ := workflow.SideEffect(ctx, func(ctx workflow.Context) string {
301-
return uuid.New().String()
301+
id := uuid.New().String()
302+
workflow.Logger(ctx).Warn("side effect", "id", id)
303+
return id
302304
}).Get(ctx)
303305

304306
f := workflow.CreateSubWorkflowInstance[int](ctx, workflow.SubWorkflowOptions{

internal/history/grouping.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
package history
22

3-
import "github.com/cschleiden/go-workflows/internal/core"
4-
5-
func EventsByWorkflowInstance(events []WorkflowEvent) map[core.WorkflowInstance][]Event {
6-
groupedEvents := make(map[core.WorkflowInstance][]Event)
3+
func EventsByWorkflowInstanceID(events []WorkflowEvent) map[string][]WorkflowEvent {
4+
groupedEvents := make(map[string][]WorkflowEvent)
75

86
for _, m := range events {
97
instance := *m.WorkflowInstance
108

11-
if _, ok := groupedEvents[instance]; !ok {
12-
groupedEvents[instance] = []Event{}
9+
if _, ok := groupedEvents[instance.InstanceID]; !ok {
10+
groupedEvents[instance.InstanceID] = []WorkflowEvent{}
1311
}
1412

15-
groupedEvents[instance] = append(groupedEvents[instance], m.HistoryEvent)
13+
groupedEvents[instance.InstanceID] = append(groupedEvents[instance.InstanceID], m)
1614
}
1715

1816
return groupedEvents

internal/history/grouping_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@ import (
1212
func TestGrouping_MultipleEventsSameInstance(t *testing.T) {
1313
id := uuid.NewString()
1414

15-
r := EventsByWorkflowInstance([]WorkflowEvent{
15+
r := EventsByWorkflowInstanceID([]WorkflowEvent{
1616
{
1717
WorkflowInstance: core.NewWorkflowInstance(id, "exid"),
1818
HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}),
1919
},
2020
{
21-
WorkflowInstance: core.NewWorkflowInstance(id, "exid"),
22-
HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}),
21+
WorkflowInstance: core.NewWorkflowInstance(id, ""),
22+
HistoryEvent: NewPendingEvent(time.Now(), EventType_SignalReceived, &SubWorkflowScheduledAttributes{}),
2323
},
2424
})
2525

2626
require.Len(t, r, 1)
27-
require.Len(t, r[*core.NewWorkflowInstance(id, "exid")], 2)
27+
require.Len(t, r[id], 2)
28+
require.Equal(t, r[id][0].HistoryEvent.Type, EventType_SubWorkflowScheduled)
29+
require.Equal(t, r[id][1].HistoryEvent.Type, EventType_SignalReceived)
2830
}

0 commit comments

Comments
 (0)