diff --git a/pyproject.toml b/pyproject.toml index 476c8dd..afe660a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,19 +77,81 @@ include = ["scheduler"] [tool.ruff] line-length = 120 exclude = [ + '__pycache__', + '.github', + '.venv', 'scheduler/migrations', 'testproject', - '.venv', - '.github', - '__pycache__', ] -[tool.ruff.format] -quote-style = "double" -indent-style = "space" -skip-magic-trailing-comma = false -line-ending = "auto" - +[tool.ruff.lint] +select = [ + "A", # flake8-builtins + "AIR", # Airflow + "ASYNC", # flake8-async + "C4", # flake8-comprehensions + "C90", # McCabe cyclomatic complexity + "DJ", # flake8-django + "E", # pycodestyle + "EXE", # flake8-executable + "F", # Pyflakes + "FA", # flake8-future-annotations + "FAST", # FastAPI + "FIX", # flake8-fixme + "FLY", # flynt + "FURB", # refurb + "ICN", # flake8-import-conventions + "INP", # flake8-no-pep420 + "INT", # flake8-gettext + "ISC", # flake8-implicit-str-concat + "LOG", # flake8-logging + "NPY", # NumPy-specific rules + "PD", # pandas-vet + "PERF", # Perflint + "PIE", # flake8-pie + "RSE", # flake8-raise + "SLOT", # flake8-slots + "T10", # flake8-debugger + "TC", # flake8-type-checking + "TD", # flake8-todos + "W", # pycodestyle + "YTT", # flake8-2020 + # "ANN", # flake8-annotations + # "ARG", # flake8-unused-arguments + # "B", # flake8-bugbear + # "BLE", # flake8-blind-except + # "COM", # flake8-commas + # "CPY", # flake8-copyright + # "D", # pydocstyle + # "DOC", # pydoclint + # "DTZ", # flake8-datetimez + # "EM", # flake8-errmsg + # "ERA", # eradicate + # "FBT", # flake8-boolean-trap + # "G", # flake8-logging-format + # "I", # isort + # "N", # pep8-naming + # "PGH", # pygrep-hooks + # "PL", # Pylint + # "PT", # flake8-pytest-style + # "PTH", # flake8-use-pathlib + # "PYI", # flake8-pyi + # "Q", # flake8-quotes + # "RET", # flake8-return + # "RUF", # Ruff-specific rules + # "S", # flake8-bandit + # "SIM", # flake8-simplify + # "SLF", # flake8-self + # "T20", # flake8-print + # "TID", # flake8-tidy-imports + # "TRY", # tryceratops + # "UP", # pyupgrade +] +extend-ignore = ["PIE790"] +mccabe.max-complexity = 13 +per-file-ignores."scheduler/models/args.py" = ["DJ012", "INT001"] +per-file-ignores."scheduler/models/ephemeral_models.py" = ["DJ008"] +per-file-ignores."scheduler/models/task.py" = ["DJ001", "DJ012"] [tool.mypy] packages = ['scheduler', ] diff --git a/scheduler/helpers/timeouts.py b/scheduler/helpers/timeouts.py index 0f539da..298ceb6 100644 --- a/scheduler/helpers/timeouts.py +++ b/scheduler/helpers/timeouts.py @@ -34,7 +34,7 @@ def __init__(self, timeout, exception=BaseTimeoutException, **kwargs): def __enter__(self): self.setup_death_penalty() - def __exit__(self, type, value, traceback): + def __exit__(self, type, value, traceback): # noqa: A002 # Always cancel immediately, since we're done try: self.cancel_death_penalty() @@ -49,10 +49,10 @@ def __exit__(self, type, value, traceback): return False def setup_death_penalty(self): - raise NotImplementedError() + raise NotImplementedError def cancel_death_penalty(self): - raise NotImplementedError() + raise NotImplementedError class UnixSignalDeathPenalty(BaseDeathPenalty): diff --git a/scheduler/management/commands/delete_failed_executions.py b/scheduler/management/commands/delete_failed_executions.py index 0335d7e..45718cf 100644 --- a/scheduler/management/commands/delete_failed_executions.py +++ b/scheduler/management/commands/delete_failed_executions.py @@ -18,8 +18,7 @@ def handle(self, *args, **options): queue = get_queue(options.get("queue", "default")) job_names = queue.failed_job_registry.all(queue.connection) jobs = JobModel.get_many(job_names, connection=queue.connection) - func_name = options.get("func", None) - if func_name is not None: + if func_name := options.get("func"): jobs = [job for job in jobs if job.func_name == func_name] dry_run = options.get("dry_run", False) click.echo(f"Found {len(jobs)} failed jobs") diff --git a/scheduler/management/commands/scheduler_worker.py b/scheduler/management/commands/scheduler_worker.py index 1ba5dcd..ee2d81d 100644 --- a/scheduler/management/commands/scheduler_worker.py +++ b/scheduler/management/commands/scheduler_worker.py @@ -129,8 +129,7 @@ def handle(self, **options: Any) -> None: "default", ] click.echo(f"Starting worker for queues {queues}") - pidfile = options.pop("pidfile") - if pidfile: + if pidfile := options.pop("pidfile"): with open(os.path.expanduser(pidfile), "w") as fp: fp.write(str(os.getpid())) diff --git a/scheduler/models/task.py b/scheduler/models/task.py index 30ee6b1..25403dd 100644 --- a/scheduler/models/task.py +++ b/scheduler/models/task.py @@ -344,8 +344,7 @@ def save(self, **kwargs: Any) -> None: schedule_job = kwargs.pop("schedule_job", True) if should_clean: self.clean() - update_fields = kwargs.get("update_fields", None) - if update_fields is not None: + if update_fields := kwargs.get("update_fields"): kwargs["update_fields"] = set(update_fields).union({"updated_at"}) super(Task, self).save(**kwargs) if schedule_job: diff --git a/scheduler/tests/jobs.py b/scheduler/tests/jobs.py index 0342e63..23b9362 100644 --- a/scheduler/tests/jobs.py +++ b/scheduler/tests/jobs.py @@ -21,12 +21,12 @@ def test_args_kwargs(*args, **kwargs): def two_seconds_job(): sleep(2) - logging.info(f"Job {_counter}") + logging.info(f"Job {_counter}") # noqa: LOG015 def long_job(): sleep(1000) - logging.info(f"Job {_counter}") + logging.info(f"Job {_counter}") # noqa: LOG015 test_non_callable = "I am a teapot" diff --git a/scheduler/tests/test_mgmt_commands/test_scheduler_worker.py b/scheduler/tests/test_mgmt_commands/test_scheduler_worker.py index 5627ca7..1840fc0 100644 --- a/scheduler/tests/test_mgmt_commands/test_scheduler_worker.py +++ b/scheduler/tests/test_mgmt_commands/test_scheduler_worker.py @@ -17,7 +17,7 @@ def test_scheduler_worker__no_queues_params(self): # enqueue some jobs that will fail job_names = [] - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(failing_job) job_names.append(job.name) @@ -56,7 +56,7 @@ def test_scheduler_worker__run_jobs(self): # enqueue some jobs that will fail job_names = [] - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(failing_job) job_names.append(job.name) @@ -76,7 +76,7 @@ def test_scheduler_worker__worker_with_two_queues(self): # enqueue some jobs that will fail job_names = [] - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(failing_job) job_names.append(job.name) job = queue2.create_and_enqueue_job(failing_job) diff --git a/scheduler/tests/test_task_types/test_task_model.py b/scheduler/tests/test_task_types/test_task_model.py index fcab1ec..cf5bf25 100644 --- a/scheduler/tests/test_task_types/test_task_model.py +++ b/scheduler/tests/test_task_types/test_task_model.py @@ -231,7 +231,7 @@ def test_parse_kwargs(self): taskarg_factory(TaskKwarg, key="key3", arg_type="bool", val=True, content_object=task) taskarg_factory(TaskKwarg, key="key4", arg_type="datetime", val=date, content_object=task) kwargs = task.parse_kwargs() - self.assertEqual(kwargs, {'key1': "one", 'key2': 2, 'key3': True, 'key4': date}) + self.assertEqual(kwargs, {"key1": "one", "key2": 2, "key3": True, "key4": date}) def test_callable_args_and_kwargs(self): task = task_factory(self.task_type, callable="scheduler.tests.jobs.test_args_kwargs") @@ -418,7 +418,7 @@ def test_admin_single_delete(self): self.assertTrue(task.is_scheduled()) prev_executions_count = len(_get_executions(task)) url = reverse("admin:scheduler_task_delete", args=[task.pk]) - data = {'post': "yes"} + data = {"post": "yes"} # act res = self.client.post(url, data=data, follow=True) # assert diff --git a/scheduler/tests/test_views/test_job_detail_action.py b/scheduler/tests/test_views/test_job_detail_action.py index 7061d26..e84f69a 100644 --- a/scheduler/tests/test_views/test_job_detail_action.py +++ b/scheduler/tests/test_views/test_job_detail_action.py @@ -63,7 +63,7 @@ def test_single_job_action_enqueue_job(self): queue = get_queue("django_tasks_scheduler_test") job_list = [] # enqueue some jobs that depends on other - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(test_job) job_list.append(job) @@ -84,7 +84,7 @@ def test_single_job_action_enqueue_job_sync_queue(self): queue = get_queue("scheduler_scheduler_active_test") job_list = [] # enqueue some jobs that depends on other - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(test_job) job_list.append(job) diff --git a/scheduler/tests/test_views/test_queue_actions.py b/scheduler/tests/test_views/test_queue_actions.py index c0a8a77..dbecebc 100644 --- a/scheduler/tests/test_views/test_queue_actions.py +++ b/scheduler/tests/test_views/test_queue_actions.py @@ -14,7 +14,7 @@ def test_job_list_action_delete_jobs__with_bad_next_url(self): # enqueue some jobs job_names = [] - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(test_job, job_info_ttl=0) job_names.append(job.name) @@ -36,7 +36,7 @@ def test_job_list_action_delete_jobs(self): # enqueue some jobs job_names = [] - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(test_job, job_info_ttl=0) job_names.append(job.name) @@ -57,7 +57,7 @@ def test_job_list_action_requeue_jobs(self): # enqueue some jobs that will fail job_names = [] - for _ in range(0, 3): + for _ in range(3): job = queue.create_and_enqueue_job(failing_job) job_names.append(job.name) diff --git a/scheduler/views/queue_views.py b/scheduler/views/queue_views.py index f97547f..be5d2c0 100644 --- a/scheduler/views/queue_views.py +++ b/scheduler/views/queue_views.py @@ -159,7 +159,7 @@ def get_statistics(run_maintenance_tasks: bool = False) -> Dict[str, List[Dict[s canceled_jobs=queue.canceled_job_registry.count(queue.connection), ) queues.append(queue_data) - except ConnectionErrorTypes as e: + except ConnectionErrorTypes as e: # noqa: PERF203 logger.error(f"Could not connect for queue {queue_name}: {e}") continue diff --git a/scheduler/worker/worker.py b/scheduler/worker/worker.py index 3a8a031..b664ce6 100644 --- a/scheduler/worker/worker.py +++ b/scheduler/worker/worker.py @@ -311,7 +311,7 @@ def _check_for_suspension(self, burst: bool) -> None: self.log( INFO, "Suspended in burst mode, exiting, Note: There could still be unfinished jobs on the queue" ) - raise StopRequested() + raise StopRequested if not notified: self.log(INFO, "Worker suspended, trigger ResumeCommand") @@ -459,7 +459,7 @@ def request_force_stop(self, signum: int, frame: Optional[FrameType]) -> None: self.log(DEBUG, f"Taking down job execution process {self._model.job_execution_process_pid} with me") self._kill_job_execution_process() self._wait_for_job_execution_process() - raise SystemExit() + raise SystemExit def request_stop(self, signum: int, frame: Optional[FrameType]) -> None: """Stops the current worker loop but waits for child processes to end gracefully (warm shutdown). @@ -484,7 +484,7 @@ def request_stop(self, signum: int, frame: Optional[FrameType]) -> None: "Stopping after current job execution process is finished. Press Ctrl+C again for a cold shutdown.", ) else: - raise StopRequested() + raise StopRequested def reorder_queues(self, reference_queue: Queue) -> None: """Reorder the queues according to the strategy. @@ -498,7 +498,8 @@ def reorder_queues(self, reference_queue: Queue) -> None: if self._dequeue_strategy not in [e.value for e in DequeueStrategy]: raise ValueError( - f"""[Worker {self.name}/{self._pid}]: Dequeue strategy should be one of {", ".join([e.value for e in DequeueStrategy])}""" + f"[Worker {self.name}/{self._pid}]: Dequeue strategy should be one of " + f"{', '.join([e.value for e in DequeueStrategy])}" ) if self._dequeue_strategy == DequeueStrategy.DEFAULT: return @@ -589,7 +590,8 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None: retpid, ret_val = self._wait_for_job_execution_process() break except JobExecutionMonitorTimeoutException: - # job execution process has not exited yet and is still running. Send a heartbeat to keep the worker alive. + # job execution process has not exited yet and is still running. Send a heartbeat to keep the worker + # alive. if job.started_at is not None: working_time = (utcnow() - job.started_at).total_seconds() self._model.set_current_job_working_time(working_time, self.connection) @@ -856,7 +858,8 @@ def get_queues(*queue_names: str) -> List[Queue]: curr_queue_config = get_queue_configuration(queue_name) if not queue_config.same_connection_params(curr_queue_config): raise QueueConnectionDiscrepancyError( - f'Queues must have the same broker connection. "{queue_name}" and "{queue_names[0]}" have different connection settings' + f'Queues must have the same broker connection. "{queue_name}" and "{queue_names[0]}" have different ' + "connection settings" ) queue = get_queue(queue_name) queues.append(queue)