Skip to content

Commit 92d1f7f

Browse files
Merge branch 'main' into fix_redis_diag_panic
2 parents 9bc9acd + 5096b37 commit 92d1f7f

File tree

26 files changed

+656
-370
lines changed

26 files changed

+656
-370
lines changed

backend/monoprocess/monoprocess.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ type monoprocessBackend struct {
1717

1818
workflowSignal chan struct{}
1919
activitySignal chan struct{}
20-
signalTimeout time.Duration
2120

2221
logger *slog.Logger
2322
}
@@ -106,7 +105,9 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
106105
// Note that the worker will be notified even if the timer event gets
107106
// cancelled. This is ok, because the poller will simply find no task
108107
// and continue.
109-
time.AfterFunc(attr.At.Sub(time.Now()), func() { b.notifyWorkflowWorker(context.Background()) })
108+
time.AfterFunc(time.Until(attr.At), func() {
109+
b.notifyWorkflowWorker(ctx)
110+
})
110111
}
111112

112113
b.notifyWorkflowWorker(ctx)

backend/mysql/mysql.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cschleiden/go-workflows/backend/metrics"
2020
"github.com/cschleiden/go-workflows/core"
2121
"github.com/cschleiden/go-workflows/internal/metrickeys"
22+
"github.com/cschleiden/go-workflows/internal/workflowerrors"
2223
"github.com/cschleiden/go-workflows/workflow"
2324
_ "github.com/go-sql-driver/mysql"
2425
"github.com/google/uuid"
@@ -146,7 +147,7 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, instance *wor
146147
defer tx.Rollback()
147148

148149
// Create workflow instance
149-
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata, false); err != nil {
150+
if err := createInstance(ctx, tx, instance, event.Attributes.(*history.ExecutionStartedAttributes).Metadata); err != nil {
150151
return err
151152
}
152153

@@ -304,7 +305,13 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
304305
return state, nil
305306
}
306307

307-
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata, ignoreDuplicate bool) error {
308+
func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, metadata *workflow.Metadata) error {
309+
// Check for existing instance
310+
if err := tx.QueryRowContext(ctx, "SELECT 1 FROM `instances` WHERE instance_id = ? AND state = ? LIMIT 1", wfi.InstanceID, core.WorkflowInstanceStateActive).
311+
Scan(new(int)); err != sql.ErrNoRows {
312+
return backend.ErrInstanceAlreadyExists
313+
}
314+
308315
var parentInstanceID, parentExecutionID *string
309316
var parentEventID *int64
310317
if wfi.SubWorkflow() {
@@ -318,9 +325,9 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
318325
return fmt.Errorf("marshaling metadata: %w", err)
319326
}
320327

321-
res, err := tx.ExecContext(
328+
_, err = tx.ExecContext(
322329
ctx,
323-
"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
330+
"INSERT INTO `instances` (instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, metadata, state) VALUES (?, ?, ?, ?, ?, ?, ?)",
324331
wfi.InstanceID,
325332
wfi.ExecutionID,
326333
parentInstanceID,
@@ -333,17 +340,6 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, met
333340
return fmt.Errorf("inserting workflow instance: %w", err)
334341
}
335342

336-
if !ignoreDuplicate {
337-
rows, err := res.RowsAffected()
338-
if err != nil {
339-
return err
340-
}
341-
342-
if rows != 1 {
343-
return backend.ErrInstanceAlreadyExists
344-
}
345-
}
346-
347343
return nil
348344
}
349345

@@ -624,23 +620,33 @@ func (b *mysqlBackend) CompleteWorkflowTask(
624620
groupedEvents := history.EventsByWorkflowInstance(workflowEvents)
625621

626622
for targetInstance, events := range groupedEvents {
627-
for _, m := range events {
628-
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
629-
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
630-
// Create new instance
631-
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata, true); err != nil {
632-
return err
623+
// Are we creating a new sub-workflow instance?
624+
m := events[0]
625+
if m.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted {
626+
a := m.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes)
627+
// Create new instance
628+
if err := createInstance(ctx, tx, m.WorkflowInstance, a.Metadata); err != nil {
629+
if err == backend.ErrInstanceAlreadyExists {
630+
if err := insertPendingEvents(ctx, tx, instance, []*history.Event{
631+
history.NewPendingEvent(time.Now(), history.EventType_SubWorkflowFailed, &history.SubWorkflowFailedAttributes{
632+
Error: workflowerrors.FromError(backend.ErrInstanceAlreadyExists),
633+
}, history.ScheduleEventID(m.WorkflowInstance.ParentEventID)),
634+
}); err != nil {
635+
return fmt.Errorf("inserting sub-workflow failed event: %w", err)
636+
}
637+
638+
continue
633639
}
634640

635-
break
641+
return fmt.Errorf("creating sub-workflow instance: %w", err)
636642
}
637643
}
638644

645+
// Insert pending events for target instance
639646
historyEvents := []*history.Event{}
640647
for _, m := range events {
641648
historyEvents = append(historyEvents, m.HistoryEvent)
642649
}
643-
644650
if err := insertPendingEvents(ctx, tx, &targetInstance, historyEvents); err != nil {
645651
return fmt.Errorf("inserting messages: %w", err)
646652
}

backend/redis/delete.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import (
1111
// KEYS[1] - instance key
1212
// KEYS[2] - pending events key
1313
// KEYS[3] - history key
14-
// KEYS[4] - instances-by-creation key
14+
// KEYS[4] - payload key
15+
// KEYS[5] - active-instance-execution key
16+
// KEYS[6] - instances-by-creation key
1517
// ARGV[1] - instance segment
1618
var deleteCmd = redis.NewScript(
17-
`redis.call("DEL", KEYS[1], KEYS[2], KEYS[3])
18-
return redis.call("ZREM", KEYS[4], ARGV[1])`)
19+
`redis.call("DEL", KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5])
20+
return redis.call("ZREM", KEYS[6], ARGV[1])`)
1921

2022
// deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending
2123
// workflow tasks. It's assumed that the instance is in the finished state.
@@ -26,6 +28,8 @@ func deleteInstance(ctx context.Context, rdb redis.UniversalClient, instance *co
2628
instanceKey(instance),
2729
pendingEventsKey(instance),
2830
historyKey(instance),
31+
payloadKey(instance),
32+
activeInstanceExecutionKey(instance.InstanceID),
2933
instancesByCreation(),
3034
}, instanceSegment(instance)).Err(); err != nil {
3135
return fmt.Errorf("failed to delete instance: %w", err)

backend/redis/events.go

Lines changed: 0 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"strconv"
87

98
"github.com/cschleiden/go-workflows/backend/history"
109
"github.com/cschleiden/go-workflows/core"
@@ -73,89 +72,3 @@ func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string,
7372
},
7473
}).Err()
7574
}
76-
77-
// addEventsToStream adds the given events to the given event stream. If successful, the message id of the last event added
78-
// is returned
79-
// KEYS[1] - stream key
80-
// ARGV[1] - event data as serialized strings
81-
var addEventsToStreamCmd = redis.NewScript(`
82-
local msgID = ""
83-
for i = 1, #ARGV, 2 do
84-
msgID = redis.call("XADD", KEYS[1], ARGV[i], "event", ARGV[i + 1])
85-
end
86-
return msgID
87-
`)
88-
89-
func addEventsToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, events []*history.Event) error {
90-
eventsData := make([]string, 0)
91-
for _, event := range events {
92-
eventData, err := marshalEventWithoutAttributes(event)
93-
if err != nil {
94-
return err
95-
}
96-
97-
// log.Println("addEventsToHistoryStreamP:", event.SequenceID, string(eventData))
98-
99-
eventsData = append(eventsData, historyID(event.SequenceID))
100-
eventsData = append(eventsData, string(eventData))
101-
}
102-
103-
addEventsToStreamCmd.Run(ctx, p, []string{streamKey}, eventsData)
104-
105-
return nil
106-
}
107-
108-
// Adds an event to be delivered in the future. Not cluster-safe.
109-
// KEYS[1] - future event zset key
110-
// KEYS[2] - future event key
111-
// KEYS[3] - instance payload key
112-
// ARGV[1] - timestamp
113-
// ARGV[2] - Instance segment
114-
// ARGV[3] - event id
115-
// ARGV[4] - event data
116-
// ARGV[5] - event payload
117-
var addFutureEventCmd = redis.NewScript(`
118-
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
119-
redis.call("HSET", KEYS[2], "instance", ARGV[2], "id", ARGV[3], "event", ARGV[4])
120-
redis.call("HSETNX", KEYS[3], ARGV[3], ARGV[5])
121-
return 0
122-
`)
123-
124-
func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) error {
125-
eventData, err := marshalEventWithoutAttributes(event)
126-
if err != nil {
127-
return err
128-
}
129-
130-
payloadEventData, err := json.Marshal(event.Attributes)
131-
if err != nil {
132-
return err
133-
}
134-
135-
return addFutureEventCmd.Run(
136-
ctx, p,
137-
[]string{futureEventsKey(), futureEventKey(instance, event.ScheduleEventID), payloadKey(instance)},
138-
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
139-
instanceSegment(instance),
140-
event.ID,
141-
string(eventData),
142-
string(payloadEventData),
143-
).Err()
144-
}
145-
146-
// Remove a scheduled future event. Not cluster-safe.
147-
// KEYS[1] - future event zset key
148-
// KEYS[2] - future event key
149-
// KEYS[3] - instance payload key
150-
var removeFutureEventCmd = redis.NewScript(`
151-
redis.call("ZREM", KEYS[1], KEYS[2])
152-
local eventID = redis.call("HGET", KEYS[2], "id")
153-
redis.call("HDEL", KEYS[3], eventID)
154-
return redis.call("DEL", KEYS[2])
155-
`)
156-
157-
// removeFutureEvent removes a scheduled future event for the given event. Events are associated via their ScheduleEventID
158-
func removeFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, event *history.Event) {
159-
key := futureEventKey(instance, event.ScheduleEventID)
160-
removeFutureEventCmd.Run(ctx, p, []string{futureEventsKey(), key, payloadKey(instance)})
161-
}

backend/redis/events_future.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
redis "github.com/redis/go-redis/v9"
10+
)
11+
12+
// Find all due future events. For each event:
13+
// - Look up event data
14+
// - Add to pending event stream for workflow instance
15+
// - Try to queue workflow task for workflow instance
16+
// - Remove event from future event set and delete event data
17+
//
18+
// KEYS[1] - future event set key
19+
// KEYS[2] - workflow task queue stream
20+
// KEYS[3] - workflow task queue set
21+
// ARGV[1] - current timestamp for zrange
22+
//
23+
// Note: this does not work with Redis Cluster since not all keys are passed into the script.
24+
var futureEventsCmd = redis.NewScript(`
25+
-- Find events which should become visible now
26+
local events = redis.call("ZRANGE", KEYS[1], "-inf", ARGV[1], "BYSCORE")
27+
for i = 1, #events do
28+
local instanceSegment = redis.call("HGET", events[i], "instance")
29+
30+
-- Add event to pending event stream
31+
local eventData = redis.call("HGET", events[i], "event")
32+
local pending_events_key = "pending-events:" .. instanceSegment
33+
redis.call("XADD", pending_events_key, "*", "event", eventData)
34+
35+
-- Try to queue workflow task
36+
local already_queued = redis.call("SADD", KEYS[3], instanceSegment)
37+
if already_queued ~= 0 then
38+
redis.call("XADD", KEYS[2], "*", "id", instanceSegment, "data", "")
39+
end
40+
41+
-- Delete event hash data
42+
redis.call("DEL", events[i])
43+
redis.call("ZREM", KEYS[1], events[i])
44+
end
45+
46+
return #events
47+
`)
48+
49+
func scheduleFutureEvents(ctx context.Context, rb *redisBackend) error {
50+
now := time.Now().UnixMilli()
51+
nowStr := strconv.FormatInt(now, 10)
52+
53+
queueKeys := rb.workflowQueue.Keys()
54+
55+
if _, err := futureEventsCmd.Run(ctx, rb.rdb, []string{
56+
futureEventsKey(),
57+
queueKeys.StreamKey,
58+
queueKeys.SetKey,
59+
}, nowStr).Result(); err != nil && err != redis.Nil {
60+
return fmt.Errorf("checking future events: %w", err)
61+
}
62+
63+
return nil
64+
}

backend/redis/expire.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
// KEYS[3] - instance key
1818
// KEYS[4] - pending events key
1919
// KEYS[5] - history key
20+
// KEYS[6] - payload key
2021
// ARGV[1] - current timestamp
2122
// ARGV[2] - expiration time in seconds
2223
// ARGV[3] - expiration timestamp in unix milliseconds
@@ -55,6 +56,7 @@ func setWorkflowInstanceExpiration(ctx context.Context, rdb redis.UniversalClien
5556
instanceKey(instance),
5657
pendingEventsKey(instance),
5758
historyKey(instance),
59+
payloadKey(instance),
5860
},
5961
nowStr,
6062
expiration.Seconds(),

0 commit comments

Comments
 (0)