Skip to content

Commit 4277a8a

Browse files
authored
Merge pull request #288 from cschleiden/duplicate-instances
Prevent multiple active instances with the same id
2 parents b7e6351 + 7bd93ba commit 4277a8a

File tree

14 files changed

+684
-333
lines changed

14 files changed

+684
-333
lines changed

backend/mysql/mysql.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cschleiden/go-workflows/backend/metrics"
2020
"github.com/cschleiden/go-workflows/core"
2121
"github.com/cschleiden/go-workflows/internal/metrickeys"
22+
"github.com/cschleiden/go-workflows/internal/workflowerrors"
2223
"github.com/cschleiden/go-workflows/workflow"
2324
_ "github.com/go-sql-driver/mysql"
2425
"github.com/google/uuid"
@@ -146,7 +147,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor
146147
defer tx.Rollback()
147148

148149
// Create workflow instance
149-
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
150+
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil {
150151
return err
151152
}
152153

@@ -304,7 +305,13 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
304305
return state, nil
305306
}
306307

307-
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error {
308+
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
309+
// Check for existing instance
310+
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
311+
Scan(new(int)); err != sql.ErrNoRows {
312+
return backend.ErrInstanceAlreadyExists
313+
}
314+
308315
var parentInstanceID, parentExecutionID *string
309316
var parentEventID *int64
310317
if wfi.SubWorkflow() {
@@ -318,9 +325,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
318325
return fmt.Errorf("marshaling metadata: %w", err)
319326
}
320327

321-
res, err := tx.ExecContext(
328+
_, err = tx.ExecContext(
322329
ctx,
323-
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
330+
"INSERT INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
324331
wfi.InstanceID,
325332
wfi.ExecutionID,
326333
parentInstanceID,
@@ -333,17 +340,6 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
333340
return fmt.Errorf("inserting workflow instance: %w", err)
334341
}
335342

336-
if !ignoreDuplicate {
337-
rows, err := res.RowsAffected()
338-
if err != nil {
339-
return err
340-
}
341-
342-
if rows != 1 {
343-
return backend.ErrInstanceAlreadyExists
344-
}
345-
}
346-
347343
return nil
348344
}
349345

@@ -624,23 +620,33 @@ func (b *mysqlBackend) CompleteWorkflowTask(
624620
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
625621

626622
for targetInstance, events := range groupedEvents {
627-
for _, m := range events {
628-
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
629-
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
630-
// Create new instance
631-
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
632-
return err
623+
// Are we creating a new sub-workflow instance?
624+
m := events[0]
625+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
626+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
627+
// Create new instance
628+
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil {
629+
if err == backend.ErrInstanceAlreadyExists {
630+
if err := insertPendingEvents(ctx, tx, instance, []*history.Event{
631+
history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{
632+
Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists),
633+
}, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)),
634+
}); err != nil {
635+
return fmt.Errorf("inserting sub-workflow failed event: %w", err)
636+
}
637+
638+
continue
633639
}
634640

635-
break
641+
return fmt.Errorf("creating sub-workflow instance: %w", err)
636642
}
637643
}
638644

645+
// Insert pending events for target instance
639646
historyEvents := []*history.Event{}
640647
for _, m := range events {
641648
historyEvents = append(historyEvents, m.HistoryEvent)
642649
}
643-
644650
if err := insertPendingEvents(ctx, tx, &targetInstance, historyEvents); err != nil {
645651
return fmt.Errorf("inserting messages: %w", err)
646652
}

backend/redis/events.go

Lines changed: 0 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"strconv"
87

98
"github.com/cschleiden/go-workflows/backend/history"
109
"github.com/cschleiden/go-workflows/core"
@@ -73,89 +72,3 @@ func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string,
7372
},
7473
}).Err()
7574
}
76-
77-
// addEventsToStream adds the given events to the given event stream. If successful, the message id of the last event added
78-
// is returned
79-
// KEYS[1] - stream key
80-
// ARGV[1] - event data as serialized strings
81-
var addEventsToStreamCmd = redis.NewScript(`
82-
local msgID = ""
83-
for i = 1, #ARGV, 2 do
84-
msgID = redis.call("XADD", KEYS[1], ARGV[i], "event", ARGV[i + 1])
85-
end
86-
return msgID
87-
`)
88-
89-
func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
90-
eventsData := make([]string, 0)
91-
for _, event := range events {
92-
eventData, err := marshalEventWithoutAttributes(event)
93-
if err != nil {
94-
return err
95-
}
96-
97-
// log.Println("addEventsToHistoryStreamP:", event.SequenceID, string(eventData))
98-
99-
eventsData = append(eventsData, historyID(event.SequenceID))
100-
eventsData = append(eventsData, string(eventData))
101-
}
102-
103-
addEventsToStreamCmd.Run(ctx, p, []string{streamKey}, eventsData)
104-
105-
return nil
106-
}
107-
108-
// Adds an event to be delivered in the future. Not cluster-safe.
109-
// KEYS[1] - future event zset key
110-
// KEYS[2] - future event key
111-
// KEYS[3] - instance payload key
112-
// ARGV[1] - timestamp
113-
// ARGV[2] - Instance segment
114-
// ARGV[3] - event id
115-
// ARGV[4] - event data
116-
// ARGV[5] - event payload
117-
var addFutureEventCmd = redis.NewScript(`
118-
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
119-
redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4])
120-
redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5])
121-
return 0
122-
`)
123-
124-
func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
125-
eventData, err := marshalEventWithoutAttributes(event)
126-
if err != nil {
127-
return err
128-
}
129-
130-
payloadEventData, err := json.Marshal(event.Attributes)
131-
if err != nil {
132-
return err
133-
}
134-
135-
return addFutureEventCmd.Run(
136-
ctx, p,
137-
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)},
138-
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
139-
instanceSegment(instance),
140-
event.ID,
141-
string(eventData),
142-
string(payloadEventData),
143-
).Err()
144-
}
145-
146-
// Remove a scheduled future event. Not cluster-safe.
147-
// KEYS[1] - future event zset key
148-
// KEYS[2] - future event key
149-
// KEYS[3] - instance payload key
150-
var removeFutureEventCmd = redis.NewScript(`
151-
redis.call("ZREM", KEYS[1], KEYS[2])
152-
local eventID = redis.call("HGET", KEYS[2], "id")
153-
redis.call("HDEL", KEYS[3], eventID)
154-
return redis.call("DEL", KEYS[2])
155-
`)
156-
157-
// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
158-
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
159-
key := futureEventKey(instance, event.ScheduleEventID)
160-
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)})
161-
}

backend/redis/events_future.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"strconv"
8+
"time"
9+
10+
"github.com/cschleiden/go-workflows/backend/history"
11+
"github.com/cschleiden/go-workflows/core"
12+
redis "github.com/redis/go-redis/v9"
13+
)
14+
15+
// Adds an event to be delivered in the future. Not cluster-safe.
16+
// KEYS[1] - future event zset key
17+
// KEYS[2] - future event key
18+
// KEYS[3] - instance payload key
19+
// ARGV[1] - timestamp/score for set
20+
// ARGV[2] - Instance segment
21+
// ARGV[3] - event id
22+
// ARGV[4] - event data
23+
// ARGV[5] - event payload
24+
var addFutureEventCmd = redis.NewScript(`
25+
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
26+
redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4])
27+
redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5])
28+
return 0
29+
`)
30+
31+
func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
32+
eventData, err := marshalEventWithoutAttributes(event)
33+
if err != nil {
34+
return err
35+
}
36+
37+
payloadEventData, err := json.Marshal(event.Attributes)
38+
if err != nil {
39+
return err
40+
}
41+
42+
return addFutureEventCmd.Run(
43+
ctx, p,
44+
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)},
45+
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
46+
instanceSegment(instance),
47+
event.ID,
48+
string(eventData),
49+
string(payloadEventData),
50+
).Err()
51+
}
52+
53+
// Remove a scheduled future event. Not cluster-safe.
54+
// KEYS[1] - future event zset key
55+
// KEYS[2] - future event key
56+
// KEYS[3] - instance payload key
57+
var removeFutureEventCmd = redis.NewScript(`
58+
redis.call("ZREM", KEYS[1], KEYS[2])
59+
local eventID = redis.call("HGET", KEYS[2], "id")
60+
redis.call("HDEL", KEYS[3], eventID)
61+
return redis.call("DEL", KEYS[2])
62+
`)
63+
64+
// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
65+
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
66+
key := futureEventKey(instance, event.ScheduleEventID)
67+
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)})
68+
}
69+
70+
// Find all due future events. For each event:
71+
// - Look up event data
72+
// - Add to pending event stream for workflow instance
73+
// - Try to queue workflow task for workflow instance
74+
// - Remove event from future event set and delete event data
75+
//
76+
// KEYS[1] - future event set key
77+
// KEYS[2] - workflow task queue stream
78+
// KEYS[3] - workflow task queue set
79+
// ARGV[1] - current timestamp for zrange
80+
//
81+
// Note: this does not work with Redis Cluster since not all keys are passed into the script.
82+
var futureEventsCmd = redis.NewScript(`
83+
-- Find events which should become visible now
84+
local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE")
85+
for i = 1, #events do
86+
local instanceSegment = redis.call("HGET", events[i], "instance")
87+
88+
-- Add event to pending event stream
89+
local eventData = redis.call("HGET", events[i], "event")
90+
local pending_events_key = "pending-events:" .. instanceSegment
91+
redis.call("XADD", pending_events_key, "*", "event", eventData)
92+
93+
-- Try to queue workflow task
94+
local already_queued = redis.call("SADD", KEYS[3], instanceSegment)
95+
if already_queued ~= 0 then
96+
redis.call("XADD", KEYS[2], "*", "id", instanceSegment, "data", "")
97+
end
98+
99+
-- Delete event hash data
100+
redis.call("DEL", events[i])
101+
redis.call("ZREM", KEYS[1], events[i])
102+
end
103+
104+
return #events
105+
`)
106+
107+
func scheduleFutureEvents(ctx context.Context, rb *redisBackend) error {
108+
now := time.Now().UnixMilli()
109+
nowStr := strconv.FormatInt(now, 10)
110+
111+
queueKeys := rb.workflowQueue.Keys()
112+
113+
if _, err := futureEventsCmd.Run(ctx, rb.rdb, []string{
114+
futureEventsKey(),
115+
queueKeys.StreamKey,
116+
queueKeys.SetKey,
117+
}, nowStr).Result(); err != nil && err != redis.Nil {
118+
return fmt.Errorf("checking future events: %w", err)
119+
}
120+
121+
return nil
122+
}

backend/redis/instance.go

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ import (
1515
)
1616

1717
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
18-
state, err := readInstance(ctx, rb.rdb, instanceKey(instance))
18+
activeInstance, err := readActiveInstanceExecution(ctx, rb.rdb, instance.InstanceID)
1919
if err != nil && err != backend.ErrInstanceNotFound {
2020
return err
2121
}
2222

23-
if state != nil {
23+
if activeInstance != nil {
2424
return backend.ErrInstanceAlreadyExists
2525
}
2626

2727
p := rb.rdb.TxPipeline()
2828

29-
if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
29+
if err := createInstanceP(ctx, p, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil {
3030
return err
3131
}
3232

@@ -145,7 +145,7 @@ type instanceState struct {
145145
LastSequenceID int64 `json:"last_sequence_id,omitempty"`
146146
}
147147

148-
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata, ignoreDuplicate bool) error {
148+
func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, metadata *metadata.WorkflowMetadata) error {
149149
key := instanceKey(instance)
150150

151151
createdAt := time.Now()
@@ -175,25 +175,6 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
175175
return nil
176176
}
177177

178-
func updateInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, state *instanceState) error {
179-
key := instanceKey(instance)
180-
181-
b, err := json.Marshal(state)
182-
if err != nil {
183-
return fmt.Errorf("marshaling instance state: %w", err)
184-
}
185-
186-
p.Set(ctx, key, string(b), 0)
187-
188-
if state.State != core.WorkflowInstanceStateActive {
189-
p.SRem(ctx, instancesActive(), instanceSegment(instance))
190-
}
191-
192-
// CreatedAt does not change, so skip updating the instancesByCreation() ZSET
193-
194-
return nil
195-
}
196-
197178
func readInstance(ctx context.Context, rdb redis.UniversalClient, instanceKey string) (*instanceState, error) {
198179
p := rdb.Pipeline()
199180

@@ -255,9 +236,3 @@ func setActiveInstanceExecutionP(ctx context.Context, p redis.Pipeliner, instanc
255236

256237
return p.Set(ctx, key, string(b), 0).Err()
257238
}
258-
259-
func removeActiveInstanceExecutionP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance) error {
260-
key := activeInstanceExecutionKey(instance.InstanceID)
261-
262-
return p.Del(ctx, key).Err()
263-
}

0 commit comments

Comments
 (0)