Skip to content

Commit 41fe437

Browse files
feat: low queue for mass updates (#1526)
When there is a massive update on the Open Food Facts server, put them in lower priority queues, to keep user submissions in high priorities, and be able to ask them question quickly. fixes: #1519 --------- Co-authored-by: prasanna7codes <prasannalit098@gmail.com>
1 parent 9249361 commit 41fe437

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

robotoff/workers/queues.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ def get_high_queue(product_id: Optional[ProductIdentifier] = None) -> Queue:
5656
return high_queues[queue_idx]
5757

5858

59+
def get_low_queue() -> Queue:
60+
"""Return the low-priority queue."""
61+
return low_queue
62+
63+
5964
def enqueue_in_job(
6065
func: Callable,
6166
queue: Queue,

robotoff/workers/update_listener.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77
from robotoff import settings
88
from robotoff.types import ProductIdentifier, ServerType
99
from robotoff.utils.logger import get_logger
10-
from robotoff.workers.queues import enqueue_in_job, enqueue_job, get_high_queue
10+
from robotoff.workers.queues import (
11+
enqueue_in_job,
12+
enqueue_job,
13+
get_high_queue,
14+
get_low_queue,
15+
)
1116
from robotoff.workers.tasks import delete_product_insights_job
1217
from robotoff.workers.tasks.import_image import run_import_image_job
1318
from robotoff.workers.tasks.product_updated import update_insights_job
@@ -16,8 +21,7 @@
1621

1722

1823
def get_redis_client():
19-
"""Get the Redis client where Product Opener publishes it's product
20-
update."""
24+
"""Get the Redis client where Product Opener publishes its product updates."""
2125
return Redis(
2226
host=settings.REDIS_UPDATE_HOST,
2327
port=settings.REDIS_UPDATE_PORT,
@@ -36,11 +40,22 @@ def process_redis_update(self, redis_update: RedisUpdate):
3640
action = redis_update.action
3741
server_type = ServerType.from_product_type(redis_update.product_type)
3842
product_id = ProductIdentifier(redis_update.code, server_type)
43+
44+
# Check if the update was triggered by scanbot or specific mass update accounts
45+
is_scanbot_or_mass_update = redis_update.user_id in [
46+
"scanbot",
47+
"update_all_products",
48+
]
49+
# Select queue based on triggering actor
50+
selected_queue = (
51+
get_low_queue() if is_scanbot_or_mass_update else get_high_queue(product_id)
52+
)
53+
3954
if action == "deleted":
4055
logger.info("Product %s has been deleted", redis_update.code)
4156
enqueue_job(
4257
delete_product_insights_job,
43-
get_high_queue(product_id),
58+
selected_queue,
4459
job_kwargs={"result_ttl": 0},
4560
product_id=product_id,
4661
)
@@ -69,7 +84,7 @@ def process_redis_update(self, redis_update: RedisUpdate):
6984
)
7085
enqueue_job(
7186
run_import_image_job,
72-
get_high_queue(product_id),
87+
selected_queue,
7388
job_kwargs={"result_ttl": 0},
7489
product_id=product_id,
7590
image_url=image_url,
@@ -79,7 +94,7 @@ def process_redis_update(self, redis_update: RedisUpdate):
7994
logger.info("Product %s has been updated", redis_update.code)
8095
enqueue_in_job(
8196
update_insights_job,
82-
get_high_queue(product_id),
97+
selected_queue,
8398
settings.UPDATED_PRODUCT_WAIT,
8499
job_kwargs={"result_ttl": 0},
85100
product_id=product_id,
@@ -91,7 +106,7 @@ def run_update_listener():
91106
"""Run the update import daemon.
92107
93108
This daemon listens to the Redis stream containing information about
94-
product updates, and triggers
109+
product updates and triggers appropriate actions.
95110
"""
96111
redis_client = get_redis_client()
97112
update_listener = UpdateListener(

0 commit comments

Comments
 (0)