5
5
import random
6
6
import signal
7
7
import time
8
+ from collections .abc import Mapping
8
9
from multiprocessing import cpu_count
9
10
from typing import Any
10
11
17
18
from sentry .utils .kafka import run_processor_with_signals
18
19
19
20
DEFAULT_BLOCK_SIZE = int (32 * 1e6 )
21
+ logger = logging .getLogger ("sentry.runner.commands.run" )
20
22
21
23
22
24
def _address_validate (
@@ -255,30 +257,30 @@ def taskworker_scheduler(redis_cluster: str, **options: Any) -> None:
255
257
"""
256
258
from django .conf import settings
257
259
258
- from sentry import options as featureflags
260
+ from sentry import options as runtime_options
261
+ from sentry .conf .types .taskworker import ScheduleConfig
259
262
from sentry .taskworker .registry import taskregistry
260
263
from sentry .taskworker .scheduler .runner import RunStorage , ScheduleRunner
261
264
from sentry .utils .redis import redis_clusters
262
265
263
266
for module in settings .TASKWORKER_IMPORTS :
264
267
__import__ (module )
265
268
266
- logger = logging .getLogger ("sentry.runner.commands.run" )
267
-
268
269
run_storage = RunStorage (redis_clusters .get (redis_cluster ))
269
270
270
271
with managed_bgtasks (role = "taskworker-scheduler" ):
271
272
runner = ScheduleRunner (taskregistry , run_storage )
272
- enabled_schedules = set (featureflags .get ("taskworker.scheduler.rollout" , []))
273
- for key , schedule_data in settings .TASKWORKER_SCHEDULES .items ():
274
- if key in enabled_schedules :
275
- runner .add (key , schedule_data )
273
+ schedules : Mapping [str , ScheduleConfig ] = {}
274
+ if runtime_options .get ("taskworker.enabled" ):
275
+ schedules = settings .TASKWORKER_SCHEDULES
276
+
277
+ for key , schedule_data in schedules .items ():
278
+ runner .add (key , schedule_data )
276
279
277
280
logger .info (
278
281
"taskworker.scheduler.schedule_data" ,
279
282
extra = {
280
- "enabled" : enabled_schedules ,
281
- "available" : list (settings .TASKWORKER_SCHEDULES .keys ()),
283
+ "schedule_keys" : list (schedules .keys ()),
282
284
},
283
285
)
284
286
@@ -487,7 +489,7 @@ def cron(**options: Any) -> None:
487
489
"Run periodic task dispatcher."
488
490
from django .conf import settings
489
491
490
- from sentry import options as featureflags
492
+ from sentry import options as runtime_options
491
493
492
494
if settings .CELERY_ALWAYS_EAGER :
493
495
raise click .ClickException (
@@ -496,14 +498,16 @@ def cron(**options: Any) -> None:
496
498
497
499
from sentry .celery import app
498
500
499
- old_schedule = app .conf .CELERYBEAT_SCHEDULE
500
- new_schedule = {}
501
- task_schedules = set (featureflags .get ("taskworker.scheduler.rollout" , []))
502
- for key , schedule_data in old_schedule .items ():
503
- if key not in task_schedules :
504
- new_schedule [key ] = schedule_data
501
+ schedule = app .conf .CELERYBEAT_SCHEDULE
502
+ if runtime_options .get ("taskworker.enabled" ):
503
+ click .secho (
504
+ "You have `taskworker.enabled` active, run `sentry run taskworker-scheduler` instead." ,
505
+ fg = "yellow" ,
506
+ )
507
+ click .secho ("Ignoring all schedules in settings.CELERYBEAT_SCHEDULE" , fg = "yellow" )
508
+ schedule = {}
505
509
506
- app .conf .update (CELERYBEAT_SCHEDULE = new_schedule )
510
+ app .conf .update (CELERYBEAT_SCHEDULE = schedule )
507
511
508
512
with managed_bgtasks (role = "cron" ):
509
513
app .Beat (
0 commit comments