File tree Expand file tree Collapse file tree 3 files changed +16
-9
lines changed
deployment/django/celery/worker Expand file tree Collapse file tree 3 files changed +16
-9
lines changed Original file line number Diff line number Diff line change 184184 "schedule" : 1.0 ,
185185 },
186186}
187+ CELERY_WORKER_CONCURRENCY = env .int ("CELERY_WORKER_CONCURRENCY" , default = 1 )
187188
188189DATA_ROOT = os .path .join (BASE_DIR , "data" )
189190
Original file line number Diff line number Diff line change 33set -o errexit
44set -o nounset
55
6- # Allow override via env var , otherwise 4x CPU cores for I/O-bound tasks,
7- # capped at 32 to stay within PostgreSQL's default max_connections (100) with headroom for Django and CeleryBeat.
6+ # Use override if set , otherwise default to nproc (1 worker per core), capped at 32
7+ # to stay within PostgreSQL's default max_connections (100) with headroom for Django and CeleryBeat.
88if [ -z " ${CELERY_WORKER_CONCURRENCY:- } " ]; then
9- CPU_COUNT=$( nproc)
10- CONCURRENCY=$(( CPU_COUNT * 4 ))
9+ CONCURRENCY=$( nproc)
1110 if [ " $CONCURRENCY " -gt 32 ]; then
1211 CONCURRENCY=32
1312 fi
1413else
1514 CONCURRENCY=$CELERY_WORKER_CONCURRENCY
1615fi
1716
17+ export CELERY_WORKER_CONCURRENCY=$CONCURRENCY
1818watchgod celery.__main__.main --args -A configuration.celery_app worker -l INFO -c $CONCURRENCY
Original file line number Diff line number Diff line change @@ -38,8 +38,14 @@ def check_for_updates():
3838 event_ids = list (claimed .values_list ('id' , flat = True ))
3939 ScheduledEvent .objects .filter (id__in = event_ids ).update (enqueued = True )
4040
41- for event_id in event_ids :
42- try :
43- run_event .apply_async (args = [event_id ]) # add event to celery queue for a worker to pick up
44- except Exception as e :
45- logging .error (f"failed to enqueue event { event_id } : { e } " )
41+ if getattr (settings , 'CELERY_WORKER_CONCURRENCY' , 0 ) == 1 :
42+ # Single worker — execute inline to avoid async dispatch overhead
43+ for event_id in event_ids :
44+ run_event .apply (args = [event_id ])
45+ else :
46+ # Multiple workers — dispatch for parallel processing
47+ for event_id in event_ids :
48+ try :
49+ run_event .apply_async (args = [event_id ])
50+ except Exception as e :
51+ logging .error (f"failed to enqueue event { event_id } : { e } " )
You can’t perform that action at this time.
0 commit comments