Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions backend/redis/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redis

import (
"context"
"fmt"

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
Expand Down Expand Up @@ -48,17 +49,36 @@ func (rb *redisBackend) CompleteActivityTask(ctx context.Context, task *backend.
return err
}

p := rb.rdb.TxPipeline()

if err := rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), task.WorkflowInstance, result); err != nil {
// Marshal event data
eventData, payload, err := marshalEvent(result)
if err != nil {
return err
}

// Unlock activity
if _, err := rb.activityQueue.Complete(ctx, p, task.Queue, task.ID); err != nil {
return err
activityQueueKeys := rb.activityQueue.Keys(task.Queue)
workflowQueueKeys := rb.workflowQueue.Keys(workflow.Queue(instanceState.Queue))

err = completeActivityTaskCmd.Run(ctx, rb.rdb,
[]string{
activityQueueKeys.SetKey,
activityQueueKeys.StreamKey,
rb.keys.pendingEventsKey(task.WorkflowInstance),
rb.keys.payloadKey(task.WorkflowInstance),
rb.workflowQueue.queueSetKey,
workflowQueueKeys.SetKey,
workflowQueueKeys.StreamKey,
},
task.ID,
rb.activityQueue.groupName,
result.ID,
eventData,
payload,
rb.workflowQueue.groupName,
instanceSegment(task.WorkflowInstance),
).Err()
if err != nil {
return fmt.Errorf("completing activity task: %w", err)
}

_, err = p.Exec(ctx)
return err
return nil
}
14 changes: 1 addition & 13 deletions backend/redis/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,14 @@ import (
"fmt"

"github.com/cschleiden/go-workflows/core"
redis "github.com/redis/go-redis/v9"
)

// KEYS[1] - instance key
// KEYS[2] - pending events key
// KEYS[3] - history key
// KEYS[4] - payload key
// KEYS[5] - active-instance-execution key
// KEYS[6] - instances-by-creation key
// ARGV[1] - instance segment
var deleteCmd = redis.NewScript(
`redis.call("DEL", KEYS[1], KEYS[2], KEYS[3], KEYS[4], KEYS[5])
return redis.call("ZREM", KEYS[6], ARGV[1])`)

// deleteInstance deletes an instance from Redis. It does not attempt to remove any future events or pending
// workflow tasks. It's assumed that the instance is in the finished state.
//
// Note: might want to revisit this in the future if we want to support removing hung instances.
func (rb *redisBackend) deleteInstance(ctx context.Context, instance *core.WorkflowInstance) error {
if err := deleteCmd.Run(ctx, rb.rdb, []string{
if err := deleteInstanceCmd.Run(ctx, rb.rdb, []string{
rb.keys.instanceKey(instance),
rb.keys.pendingEventsKey(instance),
rb.keys.historyKey(instance),
Expand Down
44 changes: 0 additions & 44 deletions backend/redis/events.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package redis

import (
"context"
"encoding/json"
"fmt"

"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
"github.com/redis/go-redis/v9"
)

type eventWithoutAttributes struct {
Expand All @@ -32,43 +28,3 @@ func marshalEventWithoutAttributes(event *history.Event) (string, error) {

return string(data), nil
}

// KEYS[1 - payload key
// ARGV[1..n] - payload values
var addPayloadsCmd = redis.NewScript(`
for i = 1, #ARGV, 2 do
redis.pcall("HSETNX", KEYS[1], ARGV[i], ARGV[i+1])
end

return 0
`)

func (rb *redisBackend) addEventPayloadsP(ctx context.Context, p redis.Pipeliner, instance *core.WorkflowInstance, events []*history.Event) error {
args := make([]interface{}, 0)

for _, event := range events {
payload, err := json.Marshal(event.Attributes)
if err != nil {
return fmt.Errorf("marshaling event payload: %w", err)
}

args = append(args, event.ID, string(payload))
}

return addPayloadsCmd.Run(ctx, p, []string{rb.keys.payloadKey(instance)}, args...).Err()
}

func addEventToStreamP(ctx context.Context, p redis.Pipeliner, streamKey string, event *history.Event) error {
eventData, err := marshalEventWithoutAttributes(event)
if err != nil {
return err
}

return p.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
ID: "*",
Values: map[string]any{
"event": eventData,
},
}).Err()
}
34 changes: 22 additions & 12 deletions backend/redis/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,9 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
return fmt.Errorf("marshaling instance: %w", err)
}

eventData, err := marshalEventWithoutAttributes(event)
eventData, payloadData, err := marshalEvent(event)
if err != nil {
return fmt.Errorf("marshaling event: %w", err)
}

payloadData, err := json.Marshal(event.Attributes)
if err != nil {
return fmt.Errorf("marshaling event payload: %w", err)
return err
}

keyInfo := rb.workflowQueue.Keys(a.Queue)
Expand Down Expand Up @@ -133,12 +128,27 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
return err
}

// Prepare event data
eventData, payloadData, err := marshalEvent(event)
if err != nil {
return err
}

keyInfo := rb.workflowQueue.Keys(workflow.Queue(instanceState.Queue))

// Cancel instance
if _, err := rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
return rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), instance, event)
}); err != nil {
// fmt.Println(cmds)
return fmt.Errorf("adding cancellation event to workflow instance: %w", err)
if err := cancelWorkflowInstanceCmd.Run(ctx, rb.rdb, []string{
rb.keys.payloadKey(instance),
rb.keys.pendingEventsKey(instance),
keyInfo.SetKey,
keyInfo.StreamKey,
},
event.ID,
eventData,
payloadData,
instanceSegment(instance),
).Err(); err != nil {
return fmt.Errorf("canceling workflow instance: %w", err)
}

return nil
Expand Down
26 changes: 10 additions & 16 deletions backend/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ var luaScripts embed.FS
var (
createWorkflowInstanceCmd *redis.Script
completeWorkflowTaskCmd *redis.Script
completeActivityTaskCmd *redis.Script
deleteInstanceCmd *redis.Script
futureEventsCmd *redis.Script
expireWorkflowInstanceCmd *redis.Script
cancelWorkflowInstanceCmd *redis.Script
signalWorkflowCmd *redis.Script
)

func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (*redisBackend, error) {
Expand Down Expand Up @@ -60,26 +64,16 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
activityQueue: activityQueue,
}

// Preload scripts here. Usually redis-go attempts to execute them first, and if redis doesn't know
// them, loads them. This doesn't work when using (transactional) pipelines, so eagerly load them on startup.
cmds := map[string]*redis.StringCmd{
"deleteInstanceCmd": deleteCmd.Load(ctx, rb.rdb),
"addPayloadsCmd": addPayloadsCmd.Load(ctx, rb.rdb),
}
for name, cmd := range cmds {
// fmt.Println(name, cmd.Val())

if cmd.Err() != nil {
return nil, fmt.Errorf("loading redis script: %v %w", name, cmd.Err())
}
}

// Load all Lua scripts
cmdMapping := map[string]**redis.Script{
"create_workflow_instance.lua": &createWorkflowInstanceCmd,
"cancel_workflow_instance.lua": &cancelWorkflowInstanceCmd,
"complete_activity_task.lua": &completeActivityTaskCmd,
"complete_workflow_task.lua": &completeWorkflowTaskCmd,
"schedule_future_events.lua": &futureEventsCmd,
"create_workflow_instance.lua": &createWorkflowInstanceCmd,
"delete_instance.lua": &deleteInstanceCmd,
"expire_workflow_instance.lua": &expireWorkflowInstanceCmd,
"schedule_future_events.lua": &futureEventsCmd,
"signal_workflow.lua": &signalWorkflowCmd,
}

if err := loadScripts(ctx, rb.rdb, cmdMapping); err != nil {
Expand Down
23 changes: 23 additions & 0 deletions backend/redis/scripts/cancel_workflow_instance.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
local payloadHashKey = KEYS[1]
local pendingEventsKey = KEYS[2]
local workflowSetKey = KEYS[3]
local workflowStreamKey = KEYS[4]

local eventId = ARGV[1]
local eventData = ARGV[2]
local payload = ARGV[3]
local instanceSegment = ARGV[4]

-- Add event payload
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent error handling: Line 12 uses redis.pcall (which suppresses errors) while lines 15, 18, and 20 use redis.call (which propagates errors). Since payload storage is critical for event processing, use redis.call here for consistent error propagation.

Suggested change
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
redis.call("HSETNX", payloadHashKey, eventId, payload)

Copilot uses AI. Check for mistakes.

-- Add event to pending events stream
redis.call("XADD", pendingEventsKey, "*", "event", eventData)

-- Queue workflow task
local added = redis.call("SADD", workflowSetKey, instanceSegment)
if added == 1 then
redis.call("XADD", workflowStreamKey, "*", "id", instanceSegment, "data", "")
end

return true
43 changes: 43 additions & 0 deletions backend/redis/scripts/complete_activity_task.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Complete an activity task, add the result event to the workflow instance, and enqueue the workflow task
-- KEYS[1] = activity set key
-- KEYS[2] = activity stream key
-- KEYS[3] = pending events stream key
-- KEYS[4] = payload hash key
-- KEYS[5] = workflow queues set key
-- KEYS[6] = workflow set key (for specific queue)
-- KEYS[7] = workflow stream key (for specific queue)
-- ARGV[1] = task id (activity)
-- ARGV[2] = group name (activity group)
-- ARGV[3] = event id
-- ARGV[4] = event data (json, without attributes)
-- ARGV[5] = payload data (json, can be empty)
-- ARGV[6] = workflow queue group name
-- ARGV[7] = workflow instance segment id

-- Complete the activity task (from queue/complete.lua)
local task = redis.call("XRANGE", KEYS[2], ARGV[1], ARGV[1])
if #task == 0 then
return nil
end

local id = task[1][2][2]
redis.call("SREM", KEYS[1], id)
redis.call("XACK", KEYS[2], "NOMKSTREAM", ARGV[2], ARGV[1])
Comment thread
cschleiden marked this conversation as resolved.
redis.call("XDEL", KEYS[2], ARGV[1])

-- Add event to pending events stream for workflow instance
redis.call("XADD", KEYS[3], "*", "event", ARGV[4])

-- Store payload if provided (only if not empty)
if ARGV[5] ~= "" then
redis.pcall("HSETNX", KEYS[4], ARGV[3], ARGV[5])
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent error handling: Line 33 uses redis.pcall (which suppresses errors) while other Redis operations use redis.call (which propagates errors). Since payload storage is critical for event processing, use redis.call here for consistent error propagation.

Suggested change
redis.pcall("HSETNX", KEYS[4], ARGV[3], ARGV[5])
redis.call("HSETNX", KEYS[4], ARGV[3], ARGV[5])

Copilot uses AI. Check for mistakes.
end

-- Enqueue workflow task (from queue/enqueue.lua)
redis.call("SADD", KEYS[5], KEYS[6])
local added = redis.call("SADD", KEYS[6], ARGV[7])
if added == 1 then
redis.call("XADD", KEYS[7], "*", "id", ARGV[7], "data", "")
end

return true
14 changes: 14 additions & 0 deletions backend/redis/scripts/delete_instance.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
local instanceKey = KEYS[1]
local pendingEventsKey = KEYS[2]
local historyKey = KEYS[3]
local payloadKey = KEYS[4]
local activeInstanceExecutionKey = KEYS[5]
local instancesByCreationKey = KEYS[6]

local instanceSegment = ARGV[1]

-- Delete all instance-related keys
redis.call("DEL", instanceKey, pendingEventsKey, historyKey, payloadKey, activeInstanceExecutionKey)

-- Remove instance from sorted set
return redis.call("ZREM", instancesByCreationKey, instanceSegment)
35 changes: 35 additions & 0 deletions backend/redis/scripts/signal_workflow.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Signal a workflow instance by adding an event to its pending events stream and queuing it
--
-- KEYS[1] - payload hash key
-- KEYS[2] - pending events stream key
-- KEYS[3] - workflow task set key
-- KEYS[4] - workflow task stream key
--
-- ARGV[1] - event id
-- ARGV[2] - event data (JSON)
-- ARGV[3] - event payload (JSON)
-- ARGV[4] - instance segment

local payloadHashKey = KEYS[1]
local pendingEventsKey = KEYS[2]
local workflowSetKey = KEYS[3]
local workflowStreamKey = KEYS[4]

local eventId = ARGV[1]
local eventData = ARGV[2]
local payload = ARGV[3]
local instanceSegment = ARGV[4]

-- Add event payload
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
Copy link

Copilot AI Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent error handling: Line 24 uses redis.pcall (which suppresses errors) while lines 27, 30, and 32 use redis.call (which propagates errors). Since payload storage is critical for event processing, use redis.call here for consistent error propagation.

Suggested change
redis.pcall("HSETNX", payloadHashKey, eventId, payload)
redis.call("HSETNX", payloadHashKey, eventId, payload)

Copilot uses AI. Check for mistakes.

-- Add event to pending events stream
redis.call("XADD", pendingEventsKey, "*", "event", eventData)

-- Queue workflow task
local added = redis.call("SADD", workflowSetKey, instanceSegment)
if added == 1 then
redis.call("XADD", workflowStreamKey, "*", "id", instanceSegment, "data", "")
end

return true
Loading
Loading