Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 71 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,81 @@ include = ["scheduler"]
[tool.ruff]
line-length = 120
exclude = [
'__pycache__',
'.github',
'.venv',
'scheduler/migrations',
'testproject',
'.venv',
'.github',
'__pycache__',
]

[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"

[tool.ruff.lint]
select = [
"A", # flake8-builtins
"AIR", # Airflow
"ASYNC", # flake8-async
"C4", # flake8-comprehensions
"C90", # McCabe cyclomatic complexity
"DJ", # flake8-django
"E", # pycodestyle
"EXE", # flake8-executable
"F", # Pyflakes
"FA", # flake8-future-annotations
"FAST", # FastAPI
"FIX", # flake8-fixme
"FLY", # flynt
"FURB", # refurb
"ICN", # flake8-import-conventions
"INP", # flake8-no-pep420
"INT", # flake8-gettext
"ISC", # flake8-implicit-str-concat
"LOG", # flake8-logging
"NPY", # NumPy-specific rules
"PD", # pandas-vet
"PERF", # Perflint
"PIE", # flake8-pie
"RSE", # flake8-raise
"SLOT", # flake8-slots
"T10", # flake8-debugger
"TC", # flake8-type-checking
"TD", # flake8-todos
"W", # pycodestyle
"YTT", # flake8-2020
# "ANN", # flake8-annotations
# "ARG", # flake8-unused-arguments
# "B", # flake8-bugbear
# "BLE", # flake8-blind-except
# "COM", # flake8-commas
# "CPY", # flake8-copyright
# "D", # pydocstyle
# "DOC", # pydoclint
# "DTZ", # flake8-datetimez
# "EM", # flake8-errmsg
# "ERA", # eradicate
# "FBT", # flake8-boolean-trap
# "G", # flake8-logging-format
# "I", # isort
# "N", # pep8-naming
# "PGH", # pygrep-hooks
# "PL", # Pylint
# "PT", # flake8-pytest-style
# "PTH", # flake8-use-pathlib
# "PYI", # flake8-pyi
# "Q", # flake8-quotes
# "RET", # flake8-return
# "RUF", # Ruff-specific rules
# "S", # flake8-bandit
# "SIM", # flake8-simplify
# "SLF", # flake8-self
# "T20", # flake8-print
# "TID", # flake8-tidy-imports
# "TRY", # tryceratops
# "UP", # pyupgrade
]
extend-ignore = ["PIE790"]
mccabe.max-complexity = 13
per-file-ignores."scheduler/models/args.py" = ["DJ012", "INT001"]
per-file-ignores."scheduler/models/ephemeral_models.py" = ["DJ008"]
per-file-ignores."scheduler/models/task.py" = ["DJ001", "DJ012"]

[tool.mypy]
packages = ['scheduler', ]
Expand Down
6 changes: 3 additions & 3 deletions scheduler/helpers/timeouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, timeout, exception=BaseTimeoutException, **kwargs):
def __enter__(self):
self.setup_death_penalty()

def __exit__(self, type, value, traceback):
def __exit__(self, type, value, traceback): # noqa: A002
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
Expand All @@ -49,10 +49,10 @@ def __exit__(self, type, value, traceback):
return False

def setup_death_penalty(self):
raise NotImplementedError()
raise NotImplementedError

def cancel_death_penalty(self):
raise NotImplementedError()
raise NotImplementedError


class UnixSignalDeathPenalty(BaseDeathPenalty):
Expand Down
3 changes: 1 addition & 2 deletions scheduler/management/commands/delete_failed_executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def handle(self, *args, **options):
queue = get_queue(options.get("queue", "default"))
job_names = queue.failed_job_registry.all(queue.connection)
jobs = JobModel.get_many(job_names, connection=queue.connection)
func_name = options.get("func", None)
if func_name is not None:
if func_name := options.get("func"):
jobs = [job for job in jobs if job.func_name == func_name]
dry_run = options.get("dry_run", False)
click.echo(f"Found {len(jobs)} failed jobs")
Expand Down
3 changes: 1 addition & 2 deletions scheduler/management/commands/scheduler_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ def handle(self, **options: Any) -> None:
"default",
]
click.echo(f"Starting worker for queues {queues}")
pidfile = options.pop("pidfile")
if pidfile:
if pidfile := options.pop("pidfile"):
with open(os.path.expanduser(pidfile), "w") as fp:
fp.write(str(os.getpid()))

Expand Down
3 changes: 1 addition & 2 deletions scheduler/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,7 @@ def save(self, **kwargs: Any) -> None:
schedule_job = kwargs.pop("schedule_job", True)
if should_clean:
self.clean()
update_fields = kwargs.get("update_fields", None)
if update_fields is not None:
if update_fields := kwargs.get("update_fields"):
kwargs["update_fields"] = set(update_fields).union({"updated_at"})
super(Task, self).save(**kwargs)
if schedule_job:
Expand Down
4 changes: 2 additions & 2 deletions scheduler/tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ def test_args_kwargs(*args, **kwargs):

def two_seconds_job():
sleep(2)
logging.info(f"Job {_counter}")
logging.info(f"Job {_counter}") # noqa: LOG015


def long_job():
sleep(1000)
logging.info(f"Job {_counter}")
logging.info(f"Job {_counter}") # noqa: LOG015


test_non_callable = "I am a teapot"
Expand Down
6 changes: 3 additions & 3 deletions scheduler/tests/test_mgmt_commands/test_scheduler_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_scheduler_worker__no_queues_params(self):

# enqueue some jobs that will fail
job_names = []
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(failing_job)
job_names.append(job.name)

Expand Down Expand Up @@ -56,7 +56,7 @@ def test_scheduler_worker__run_jobs(self):

# enqueue some jobs that will fail
job_names = []
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(failing_job)
job_names.append(job.name)

Expand All @@ -76,7 +76,7 @@ def test_scheduler_worker__worker_with_two_queues(self):

# enqueue some jobs that will fail
job_names = []
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(failing_job)
job_names.append(job.name)
job = queue2.create_and_enqueue_job(failing_job)
Expand Down
4 changes: 2 additions & 2 deletions scheduler/tests/test_task_types/test_task_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def test_parse_kwargs(self):
taskarg_factory(TaskKwarg, key="key3", arg_type="bool", val=True, content_object=task)
taskarg_factory(TaskKwarg, key="key4", arg_type="datetime", val=date, content_object=task)
kwargs = task.parse_kwargs()
self.assertEqual(kwargs, {'key1': "one", 'key2': 2, 'key3': True, 'key4': date})
self.assertEqual(kwargs, {"key1": "one", "key2": 2, "key3": True, "key4": date})

def test_callable_args_and_kwargs(self):
task = task_factory(self.task_type, callable="scheduler.tests.jobs.test_args_kwargs")
Expand Down Expand Up @@ -418,7 +418,7 @@ def test_admin_single_delete(self):
self.assertTrue(task.is_scheduled())
prev_executions_count = len(_get_executions(task))
url = reverse("admin:scheduler_task_delete", args=[task.pk])
data = {'post': "yes"}
data = {"post": "yes"}
# act
res = self.client.post(url, data=data, follow=True)
# assert
Expand Down
4 changes: 2 additions & 2 deletions scheduler/tests/test_views/test_job_detail_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_single_job_action_enqueue_job(self):
queue = get_queue("django_tasks_scheduler_test")
job_list = []
# enqueue some jobs that depends on other
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(test_job)
job_list.append(job)

Expand All @@ -84,7 +84,7 @@ def test_single_job_action_enqueue_job_sync_queue(self):
queue = get_queue("scheduler_scheduler_active_test")
job_list = []
# enqueue some jobs that depends on other
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(test_job)
job_list.append(job)

Expand Down
6 changes: 3 additions & 3 deletions scheduler/tests/test_views/test_queue_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_job_list_action_delete_jobs__with_bad_next_url(self):

# enqueue some jobs
job_names = []
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(test_job, job_info_ttl=0)
job_names.append(job.name)

Expand All @@ -36,7 +36,7 @@ def test_job_list_action_delete_jobs(self):

# enqueue some jobs
job_names = []
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(test_job, job_info_ttl=0)
job_names.append(job.name)

Expand All @@ -57,7 +57,7 @@ def test_job_list_action_requeue_jobs(self):

# enqueue some jobs that will fail
job_names = []
for _ in range(0, 3):
for _ in range(3):
job = queue.create_and_enqueue_job(failing_job)
job_names.append(job.name)

Expand Down
2 changes: 1 addition & 1 deletion scheduler/views/queue_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_statistics(run_maintenance_tasks: bool = False) -> Dict[str, List[Dict[s
canceled_jobs=queue.canceled_job_registry.count(queue.connection),
)
queues.append(queue_data)
except ConnectionErrorTypes as e:
except ConnectionErrorTypes as e: # noqa: PERF203
logger.error(f"Could not connect for queue {queue_name}: {e}")
continue

Expand Down
15 changes: 9 additions & 6 deletions scheduler/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def _check_for_suspension(self, burst: bool) -> None:
self.log(
INFO, "Suspended in burst mode, exiting, Note: There could still be unfinished jobs on the queue"
)
raise StopRequested()
raise StopRequested

if not notified:
self.log(INFO, "Worker suspended, trigger ResumeCommand")
Expand Down Expand Up @@ -459,7 +459,7 @@ def request_force_stop(self, signum: int, frame: Optional[FrameType]) -> None:
self.log(DEBUG, f"Taking down job execution process {self._model.job_execution_process_pid} with me")
self._kill_job_execution_process()
self._wait_for_job_execution_process()
raise SystemExit()
raise SystemExit

def request_stop(self, signum: int, frame: Optional[FrameType]) -> None:
"""Stops the current worker loop but waits for child processes to end gracefully (warm shutdown).
Expand All @@ -484,7 +484,7 @@ def request_stop(self, signum: int, frame: Optional[FrameType]) -> None:
"Stopping after current job execution process is finished. Press Ctrl+C again for a cold shutdown.",
)
else:
raise StopRequested()
raise StopRequested

def reorder_queues(self, reference_queue: Queue) -> None:
"""Reorder the queues according to the strategy.
Expand All @@ -498,7 +498,8 @@ def reorder_queues(self, reference_queue: Queue) -> None:

if self._dequeue_strategy not in [e.value for e in DequeueStrategy]:
raise ValueError(
f"""[Worker {self.name}/{self._pid}]: Dequeue strategy should be one of {", ".join([e.value for e in DequeueStrategy])}"""
f"[Worker {self.name}/{self._pid}]: Dequeue strategy should be one of "
f"{', '.join([e.value for e in DequeueStrategy])}"
)
if self._dequeue_strategy == DequeueStrategy.DEFAULT:
return
Expand Down Expand Up @@ -589,7 +590,8 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
retpid, ret_val = self._wait_for_job_execution_process()
break
except JobExecutionMonitorTimeoutException:
# job execution process has not exited yet and is still running. Send a heartbeat to keep the worker alive.
# job execution process has not exited yet and is still running. Send a heartbeat to keep the worker
# alive.
if job.started_at is not None:
working_time = (utcnow() - job.started_at).total_seconds()
self._model.set_current_job_working_time(working_time, self.connection)
Expand Down Expand Up @@ -856,7 +858,8 @@ def get_queues(*queue_names: str) -> List[Queue]:
curr_queue_config = get_queue_configuration(queue_name)
if not queue_config.same_connection_params(curr_queue_config):
raise QueueConnectionDiscrepancyError(
f'Queues must have the same broker connection. "{queue_name}" and "{queue_names[0]}" have different connection settings'
f'Queues must have the same broker connection. "{queue_name}" and "{queue_names[0]}" have different '
"connection settings"
)
queue = get_queue(queue_name)
queues.append(queue)
Expand Down
Loading