Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions backend/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
return nil, fmt.Errorf("loading Lua scripts: %w", err)
}

// Ensure default consumer groups exist to prevent NOGROUP errors during startup
if err := rb.ensureDefaultConsumerGroups(ctx); err != nil {
return nil, fmt.Errorf("ensuring default consumer groups: %w", err)
}

return rb, nil
}

Expand Down Expand Up @@ -149,3 +154,22 @@

return true
}

// ensureDefaultConsumerGroups creates the consumer groups for the default and system queues
// to prevent NOGROUP errors when the worker starts and tries to recover abandoned tasks
func (rb *redisBackend) ensureDefaultConsumerGroups(ctx context.Context) error {
// Initialize consumer groups for both default and system queues
queues := []workflow.Queue{workflow.QueueDefault, core.QueueSystem}

Check failure on line 162 in backend/redis/redis.go

View workflow job for this annotation

GitHub Actions / build

undefined: workflow

// Prepare workflow queue consumer groups
if err := rb.workflowQueue.Prepare(ctx, rb.rdb, queues); err != nil {
return fmt.Errorf("preparing workflow queue consumer groups: %w", err)
}

// Prepare activity queue consumer groups
if err := rb.activityQueue.Prepare(ctx, rb.rdb, queues); err != nil {
return fmt.Errorf("preparing activity queue consumer groups: %w", err)
}

return nil
}
Loading