diff --git a/backend/redis/redis.go b/backend/redis/redis.go index 22d64401..433eb755 100644 --- a/backend/redis/redis.go +++ b/backend/redis/redis.go @@ -86,6 +86,11 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) ( return nil, fmt.Errorf("loading Lua scripts: %w", err) } + // Ensure default consumer groups exist to prevent NOGROUP errors during startup + if err := rb.ensureDefaultConsumerGroups(ctx); err != nil { + return nil, fmt.Errorf("ensuring default consumer groups: %w", err) + } + return rb, nil } @@ -149,3 +154,22 @@ func (rb *redisBackend) FeatureSupported(feature backend.Feature) bool { return true } + +// ensureDefaultConsumerGroups creates the consumer groups for the default and system queues +// to prevent NOGROUP errors when the worker starts and tries to recover abandoned tasks +func (rb *redisBackend) ensureDefaultConsumerGroups(ctx context.Context) error { + // Initialize consumer groups for both default and system queues + queues := []workflow.Queue{workflow.QueueDefault, core.QueueSystem} + + // Prepare workflow queue consumer groups + if err := rb.workflowQueue.Prepare(ctx, rb.rdb, queues); err != nil { + return fmt.Errorf("preparing workflow queue consumer groups: %w", err) + } + + // Prepare activity queue consumer groups + if err := rb.activityQueue.Prepare(ctx, rb.rdb, queues); err != nil { + return fmt.Errorf("preparing activity queue consumer groups: %w", err) + } + + return nil +}