Skip to content

Commit 521f2ce

Browse files
jwildercschleiden
authored andcommitted
Fix redis NOGROUP errors when running samples
Running the samples that use redis where workers are separate generates non-stop errors like the following: 2025/09/15 23:01:02 ERROR error polling task error="dequeueing task: NOGROUP No such key 'task-stream:system:activities' or consumer group 'task-workers' in XREADGROUP with GROUP option" The original error occurred because: * The worker tries to recover abandoned tasks during startup using XAUTOCLAIM * This requires existing consumer groups for the Redis streams * Previously, consumer groups were only created when PrepareWorkflowQueues/PrepareActivityQueues were explicitly called * The recovery operations happened before queue preparation, causing NOGROUP errors The fix ensures that consumer groups for the essential queues (default and _system_) are created automatically during Redis backend initialization, eliminating the race condition.
1 parent 561137c commit 521f2ce

File tree

1 file changed

+24
-0
lines changed

1 file changed

+24
-0
lines changed

backend/redis/redis.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption) (
8686
return nil, fmt.Errorf("loading Lua scripts: %w", err)
8787
}
8888

89+
// Ensure default consumer groups exist to prevent NOGROUP errors during startup
90+
if err := rb.ensureDefaultConsumerGroups(ctx); err != nil {
91+
return nil, fmt.Errorf("ensuring default consumer groups: %w", err)
92+
}
93+
8994
return rb, nil
9095
}
9196

@@ -149,3 +154,22 @@ func (rb *redisBackend) FeatureSupported(feature backend.Feature) bool {
149154

150155
return true
151156
}
157+
158+
// ensureDefaultConsumerGroups creates the consumer groups for the default and system queues
159+
// to prevent NOGROUP errors when the worker starts and tries to recover abandoned tasks
160+
func (rb *redisBackend) ensureDefaultConsumerGroups(ctx context.Context) error {
161+
// Initialize consumer groups for both default and system queues
162+
queues := []workflow.Queue{workflow.QueueDefault, core.QueueSystem}
163+
164+
// Prepare workflow queue consumer groups
165+
if err := rb.workflowQueue.Prepare(ctx, rb.rdb, queues); err != nil {
166+
return fmt.Errorf("preparing workflow queue consumer groups: %w", err)
167+
}
168+
169+
// Prepare activity queue consumer groups
170+
if err := rb.activityQueue.Prepare(ctx, rb.rdb, queues); err != nil {
171+
return fmt.Errorf("preparing activity queue consumer groups: %w", err)
172+
}
173+
174+
return nil
175+
}

0 commit comments

Comments
 (0)