Skip to content

Commit f1f0781

Browse files
authored
feat(celery): add dedicated worker for long-running tasks (#6470)
### 📣 Summary Introduce a new Celery worker dedicated to handling long-running tasks separately from regular workloads. ### 📖 Description This change adds a dedicated Celery worker queue for long-running or resource-intensive tasks. The goal is to isolate these jobs from shorter, high-frequency tasks to prevent queue blocking.
1 parent a34bb9d commit f1f0781

File tree

7 files changed

+65
-15
lines changed

7 files changed

+65
-15
lines changed

docker/entrypoint_celery_kobocat_worker.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ exec celery -A kobo worker --loglevel=info \
1414
--logfile=${KPI_LOGS_DIR}/celery_kobocat_worker.log \
1515
--pidfile=/tmp/celery_kobocat_worker.pid \
1616
--queues=kobocat_queue \
17-
--exclude-queues=kpi_low_priority_queue,kpi_queue \
17+
--exclude-queues=kpi_low_priority_queue,kpi_queue,kpi_long_running_tasks_queue \
1818
--uid=${UWSGI_USER} \
1919
--gid=${UWSGI_GROUP} \
2020
--autoscale ${AUTOSCALE_MIN},${AUTOSCALE_MAX}

docker/entrypoint_celery_kpi_worker.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ exec celery -A kobo worker --loglevel=info \
1414
--logfile=${KPI_LOGS_DIR}/celery_kpi_worker.log \
1515
--pidfile=/tmp/celery_kpi_worker.pid \
1616
--queues=kpi_queue \
17-
--exclude-queues=kpi_low_priority_queue,kobocat_queue \
17+
--exclude-queues=kpi_low_priority_queue,kobocat_queue,kpi_long_running_tasks_queue \
1818
--uid=${UWSGI_USER} \
1919
--gid=${UWSGI_GROUP} \
2020
--autoscale ${AUTOSCALE_MIN},${AUTOSCALE_MAX}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/bash
2+
set -e
3+
source /etc/profile
4+
5+
# Run the Celery worker for long-running jobs ONLY
6+
7+
cd "${KPI_SRC_DIR}"
8+
9+
AUTOSCALE_MIN="${CELERY_AUTOSCALE_MIN:-2}"
10+
AUTOSCALE_MAX="${CELERY_AUTOSCALE_MAX:-6}"
11+
12+
exec celery -A kobo worker --loglevel=info \
13+
--hostname=kpi_worker_long_running_tasks@%h \
14+
--logfile=${KPI_LOGS_DIR}/celery_kpi_worker_long_running_tasks.log \
15+
--pidfile=/tmp/celery_kpi_worker_long_running_tasks.pid \
16+
--queues=kpi_long_running_tasks_queue \
17+
--exclude-queues=kpi_queue,kobocat_queue,kpi_low_priority_queue \
18+
--uid=${UWSGI_USER} \
19+
--gid=${UWSGI_GROUP} \
20+
--autoscale ${AUTOSCALE_MIN},${AUTOSCALE_MAX}

docker/entrypoint_celery_kpi_low_priority_worker.bash renamed to docker/entrypoint_celery_kpi_worker_low_priority.bash

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ AUTOSCALE_MIN="${CELERY_AUTOSCALE_MIN:-2}"
1010
AUTOSCALE_MAX="${CELERY_AUTOSCALE_MAX:-6}"
1111

1212
exec celery -A kobo worker --loglevel=info \
13-
--hostname=kpi_low_priority_worker@%h \
14-
--logfile=${KPI_LOGS_DIR}/celery_kpi_low_priority_worker.log \
15-
--pidfile=/tmp/celery_kpi_low_priority_worker.pid \
13+
--hostname=kpi_worker_low_priority@%h \
14+
--logfile=${KPI_LOGS_DIR}/celery_kpi_worker_low_priority.log \
15+
--pidfile=/tmp/celery_kpi_worker_low_priority.pid \
1616
--queues=kpi_low_priority_queue \
17-
--exclude-queues=kpi_queue,kobocat_queue \
17+
--exclude-queues=kpi_queue,kobocat_queue,kpi_long_running_tasks_queue \
1818
--uid=${UWSGI_USER} \
1919
--gid=${UWSGI_GROUP} \
2020
--autoscale ${AUTOSCALE_MIN},${AUTOSCALE_MAX}

kobo/apps/long_running_migrations/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010

1111
@celery_app.task(
12-
queue='kpi_low_priority_queue',
12+
queue='kpi_long_running_tasks_queue',
1313
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
1414
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
1515
)

kobo/apps/user_reports/tasks.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
from django.conf import settings
24
from django.core.cache import cache
35
from django.utils import timezone
@@ -17,14 +19,14 @@
1719
cleanup_stale_snapshots_and_refresh_mv,
1820
get_or_create_run,
1921
iter_org_chunks_after,
20-
process_chunk,
22+
process_chunk, refresh_user_reports_materialized_view,
2123
)
2224
from kobo.celery import celery_app
2325
from kpi.utils.log import logging
2426

2527

2628
@celery_app.task(
27-
queue='kpi_low_priority_queue',
29+
queue='kpi_long_running_tasks_queue',
2830
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
2931
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
3032
)
@@ -71,38 +73,66 @@ def refresh_user_report_snapshots(**kwargs):
7173
if not lock.acquire(blocking=False, blocking_timeout=0):
7274
logging.info('Nothing to do, task is already running!')
7375
return
76+
else:
77+
logging.info('Starting process, refreshing materialized view!')
7478

7579
# Claim the existing snapshot run or create a new one
7680
run = get_or_create_run()
81+
82+
# Update last heart-beat
83+
BillingAndUsageSnapshotRun.objects.filter(pk=run.pk).update(
84+
date_modified=timezone.now()
85+
)
86+
7787
last_processed_org_id = run.last_processed_org_id or ''
88+
last_time = time.time()
89+
7890
try:
7991
while chunk_qs := iter_org_chunks_after(last_processed_org_id):
92+
logging.info(
93+
f'Processing queue, last_processed_org_id: {last_processed_org_id}'
94+
)
8095
billing_map = get_current_billing_period_dates_by_org(chunk_qs)
96+
logging.info('\tBilling map retrieved')
8197
limits_map = get_organizations_effective_limits(chunk_qs, True, True)
98+
logging.info('\tLimits map retrieved')
8299
usage_map = calc.calculate_usage_batch(chunk_qs, billing_map)
100+
logging.info('\tUsage map retrieved')
83101
last_processed_org_id = process_chunk(
84102
chunk_qs, usage_map, limits_map, run.pk
85103
)
86104

87105
# Update the run progress
106+
logging.info(
107+
f'\tUpdating hearbeat, '
108+
f'new last_processed_org_id: {last_processed_org_id}'
109+
)
88110
BillingAndUsageSnapshotRun.objects.filter(pk=run.pk).update(
89111
last_processed_org_id=last_processed_org_id,
90112
date_modified=timezone.now(),
91113
)
92114

115+
if time.time() - last_time >= 15 * 60:
116+
logging.info('\tRefreshing the materialized view…')
117+
last_time = time.time()
118+
refresh_user_reports_materialized_view()
119+
93120
# All orgs processed: cleanup stale, refresh MV and mark run as completed
121+
logging.info('Clean-up')
94122
cleanup_stale_snapshots_and_refresh_mv(run.pk)
123+
logging.info('Mark run as complete')
95124
BillingAndUsageSnapshotRun.objects.filter(pk=run.pk).update(
96125
status=BillingAndUsageSnapshotStatus.COMPLETED,
97126
date_modified=timezone.now(),
98127
)
99128

100-
# Release the lock
101-
lock.release()
102-
103129
except Exception as ex:
104130
run = BillingAndUsageSnapshotRun.objects.get(pk=run.pk)
105131
details = run.details or {}
106132
details.update({'last_error': str(ex), 'ts': timezone.now().isoformat()})
107133
run.details = details
108134
run.save(update_fields=['details', 'date_modified'])
135+
finally:
136+
# Release the lock
137+
lock.release()
138+
logging.info('Lock released!')

kobo/settings/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,11 +1444,11 @@ def dj_stripe_request_callback_method():
14441444
'schedule': crontab(minute='*/30'),
14451445
'options': {'queue': 'kpi_low_priority_queue'}
14461446
},
1447-
# Schedule every 30 minutes
1447+
# Schedule every 15 minutes
14481448
'refresh-user-report-snapshot': {
14491449
'task': 'kobo.apps.user_reports.tasks.refresh_user_report_snapshots',
1450-
'schedule': crontab(minute='*/30'),
1451-
'options': {'queue': 'kpi_low_priority_queue'},
1450+
'schedule': crontab(minute='*/15'),
1451+
'options': {'queue': 'kpi_long_running_tasks_queue'},
14521452
},
14531453
# Schedule every day at midnight UTC
14541454
'project-ownership-garbage-collector': {

0 commit comments

Comments
 (0)