-
-
Notifications
You must be signed in to change notification settings - Fork 6.4k
Implement a blocking retrieve of data for redis queues. #36467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This reduces polling by using the redis BLPOP command instead of LPOP and retrying via backoff, reducing the load on redis. It also adds a test for the workerqueue abstraction via redis.
Signed-off-by: techknowlogick <[email protected]>
| q.mu.Unlock() | ||
|
|
||
| return data, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the implementation be simplified like this?
res, err := q.client.BLPop(ctx, 0, q.cfg.QueueFullName).Result()
if err ... {
return
}
if q.isUnique {
_ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err()
}
Use 0 to "block indefinitely". I would suppose redis client can cancel the request correctly if the context is canceled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just tested this and with a timeout of 0 I could not cancel the context reliably for BLPOP, it did hang. I searched and found an issue discussing this: redis/go-redis#2556
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, maybe it's the reason that why we didn't use BLPop because it is unable to be canceled immediately, especially when we need to shutdown the queue in short time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, overall, this change doesn't really change the situation?
Before: "LPop" per 2 seconds (backoffUpper = 2 * time.Second)
After: "BLPop" per second (q.client.BLPop(ctx, time.Second))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the timeout cannot be increased (e.g. the cancel is really necessary to be immediate) than it is probably worthless, we can close the pull request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some more experiments with BLPOP and the only sure way to cancel one of this blocking ones without polling is to close the underlying REDIS connection. In one of my projects I will exactly do that, have separate connection for each queue and close it when stopping the worker. But I am not sure if this would be the proper way for gitea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't really looked into the problem.
I saw you have some discussions with @techknowlogick in discord, so they can help to dig in and answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR updates the Redis-backed queue pop implementation to reduce active polling by using Redis BLPOP for blocking retrieval, while retaining retry/backoff behavior and adjusting uniqueness-set cleanup.
Changes:
- Replace
LPOP+ backoff polling with aBLPOP-based loop inPopItem. - Add connection-error backoff handling around
BLPOP. - Ensure uniqueness-set cleanup (
SREM) runs with a dedicated timeout context.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| res, err := q.client.BLPop(ctx, time.Second, q.cfg.QueueFullName).Result() | ||
| if err != nil { | ||
| return true, nil, nil | ||
| if err == redis.Nil { |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BLPop is executed outside q.mu, but q.mu is documented/used to keep the list+set operations in sync for unique queues. With the current flow, an item can be popped from the list (BLPop returns) while it’s still present in the uniqueness set until the mutex is acquired and SRem runs; a concurrent PushItem can then see SAdd==0 and return ErrAlreadyInQueue even though the item has already been removed from the queue. Consider adjusting the uniqueness strategy to avoid this window (e.g., when SAdd returns 0, verify the item is actually still in the list before returning ErrAlreadyInQueue, and/or clean stale set entries and retry).
| // use a separate context for cleanup to ensure it runs even if request context is canceled | ||
| cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| _ = q.client.SRem(cleanupCtx, q.cfg.SetFullName, data).Err() | ||
| cancel() |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup timeout 5*time.Second is a new magic duration. Consider extracting it to a named constant (e.g. a package-level redisCleanupTimeout) to make the intent configurable/consistent and easier to tune. Also prefer defer cancel() immediately after WithTimeout(...) to avoid leaking the timer if this block is refactored to have early returns.
| res, err := q.client.BLPop(ctx, time.Second, q.cfg.QueueFullName).Result() | ||
| if err != nil { |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BLPop is a blocking call that ties up a Redis connection for up to the timeout duration. Because nosql.Manager shares a single UniversalClient per connection string, each running queue will effectively occupy a pooled connection most of the time when idle; with multiple queues on the same Redis conn string this can lead to connection-pool starvation for PushItem/other Redis users. Consider documenting/tuning this (e.g. ensure the queue Redis URI sets an adequate pool_size, or use a dedicated connection string/client for queues).
This reduces polling by using the redis BLPOP command instead of LPOP and retrying via backoff, reducing the load on redis. It also adds a test for the workerqueue abstraction via redis.