Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions backend/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
return nil, fmt.Errorf("loading Lua scripts: %w", err)
}

// Ensure default consumer groups exist to prevent NOGROUP errors during startup
if err := rb.ensureDefaultConsumerGroups(ctx); err != nil {
return nil, fmt.Errorf("ensuring default consumer groups: %w", err)
}

return rb, nil
}

Expand Down Expand Up @@ -149,3 +154,22 @@

return true
}

// ensureDefaultConsumerGroups creates the consumer groups for the default and system queues
// to prevent NOGROUP errors when the worker starts and tries to recover abandoned tasks
func (rb *redisBackend) ensureDefaultConsumerGroups(ctx context.Context) error {
// Initialize consumer groups for both default and system queues
queues := []workflow.Queue{workflow.QueueDefault, core.QueueSystem}

Check failure on line 162 in backend/redis/redis.go

View workflow job for this annotation

GitHub Actions / build

undefined: workflow
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
queues := []workflow.Queue{workflow.QueueDefault, core.QueueSystem}
queues := []workflow.Queue{core.QueueDefault, core.QueueSystem}


// Prepare workflow queue consumer groups
if err := rb.workflowQueue.Prepare(ctx, rb.rdb, queues); err != nil {
return fmt.Errorf("preparing workflow queue consumer groups: %w", err)
}

// Prepare activity queue consumer groups
if err := rb.activityQueue.Prepare(ctx, rb.rdb, queues); err != nil {
return fmt.Errorf("preparing activity queue consumer groups: %w", err)
}

return nil
}
Loading