Skip to content

Commit c53a377

Browse files
committed
Fix redis NOGROUP errors when running samples
Running the samples that use redis where workers are separate generates non-stop errors like the following: 2025/09/15 23:01:02 ERROR error polling task error="dequeueing task: NOGROUP No such key 'task-stream:system:activities' or consumer group 'task-workers' in XREADGROUP with GROUP option" The original error occurred because: * The worker tries to recover abandoned tasks during startup using XAUTOCLAIM * This requires existing consumer groups for the Redis streams * Previously, consumer groups were only created when PrepareWorkflowQueues/PrepareActivityQueues were explicitly called * The recovery operations happened before queue preparation, causing NOGROUP errors The fix ensures that consumer groups for the essential queues (default and _system_) are created automatically during Redis backend initialization, eliminating the race condition.
1 parent aa62ce6 commit c53a377

File tree

1 file changed

+24
-0
lines changed

1 file changed

+24
-0
lines changed

backend/redis/redis.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
8787
return nil, fmt.Errorf("loading Lua scripts: %w", err)
8888
}
8989

90+
// Ensure default consumer groups exist to prevent NOGROUP errors during startup
91+
if err := rb.ensureDefaultConsumerGroups(ctx); err != nil {
92+
return nil, fmt.Errorf("ensuring default consumer groups: %w", err)
93+
}
94+
9095
return rb, nil
9196
}
9297

@@ -152,3 +157,22 @@ func (rb *redisBackend) FeatureSupported(feature backend.Feature) bool {
152157

153158
return true
154159
}
160+
161+
// ensureDefaultConsumerGroups creates the consumer groups for the default and system queues
162+
// to prevent NOGROUP errors when the worker starts and tries to recover abandoned tasks
163+
func (rb *redisBackend) ensureDefaultConsumerGroups(ctx context.Context) error {
164+
// Initialize consumer groups for both default and system queues
165+
queues := []workflow.Queue{workflow.QueueDefault, core.QueueSystem}
166+
167+
// Prepare workflow queue consumer groups
168+
if err := rb.workflowQueue.Prepare(ctx, rb.rdb, queues); err != nil {
169+
return fmt.Errorf("preparing workflow queue consumer groups: %w", err)
170+
}
171+
172+
// Prepare activity queue consumer groups
173+
if err := rb.activityQueue.Prepare(ctx, rb.rdb, queues); err != nil {
174+
return fmt.Errorf("preparing activity queue consumer groups: %w", err)
175+
}
176+
177+
return nil
178+
}

0 commit comments

Comments
 (0)