Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions scheduler/admin/task_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ def get_job_executions_for_task(queue_name: str, scheduled_task: Task) -> List[J
)

res = sorted(
list(filter(lambda j: job_execution_of(j, scheduled_task), job_list)), key=lambda j: j.created_at, reverse=True
filter(lambda j: job_execution_of(j, scheduled_task), job_list), key=lambda j: j.created_at, reverse=True
)
return res


class JobArgInline(GenericStackedInline):
model = TaskArg
extra = 0
fieldsets = ((None, dict(fields=("arg_type", "val"))),)
fieldsets = ((None, {"fields": ("arg_type", "val")}),)


class JobKwargInline(GenericStackedInline):
model = TaskKwarg
extra = 0
fieldsets = ((None, dict(fields=("key", ("arg_type", "val")))),)
fieldsets = ((None, {"fields": ("key", ("arg_type", "val"))}),)


def get_message_bit(rows_updated: int) -> str:
Expand Down Expand Up @@ -86,40 +86,40 @@ class Media:
fieldsets = (
(
None,
dict(
fields=(
{
"fields": (
"name",
"callable",
("enabled", "timeout", "result_ttl"),
"task_type",
)
),
},
),
(
None,
dict(fields=("scheduled_time",), classes=("tasktype-OnceTaskType",)),
{"fields": ("scheduled_time",), "classes": ("tasktype-OnceTaskType",)},
),
(
None,
dict(fields=("cron_string",), classes=("tasktype-CronTaskType",)),
{"fields": ("cron_string",), "classes": ("tasktype-CronTaskType",)},
),
(
None,
dict(
fields=(
{
"fields": (
(
"interval",
"interval_unit",
),
"repeat",
),
classes=("tasktype-RepeatableTaskType",),
),
"classes": ("tasktype-RepeatableTaskType",),
},
),
(_("Queue settings"), dict(fields=(("queue", "at_front"), "job_name"))),
(_("Queue settings"), {"fields": (("queue", "at_front"), "job_name")}),
(
_("Previous runs info"),
dict(fields=(("successful_runs", "last_successful_run"), ("failed_runs", "last_failed_run"))),
{"fields": (("successful_runs", "last_successful_run"), ("failed_runs", "last_failed_run"))},
),
)

Expand Down Expand Up @@ -157,7 +157,7 @@ def change_view(self, request: HttpRequest, object_id, form_url="", extra_contex
execution_list = get_job_executions_for_task(obj.queue, obj)
except ConnectionErrorTypes as e:
logger.warn(f"Could not get job executions: {e}")
execution_list = list()
execution_list = []
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE)
page_number = request.GET.get("p", 1)
page_obj = paginator.get_page(page_number)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from scheduler.helpers.callback import Callback
from scheduler.types import ConnectionType

JOB_METHODS_LIST: List[str] = list()
JOB_METHODS_LIST: List[str] = []


class job:
Expand Down
18 changes: 9 additions & 9 deletions scheduler/helpers/queues/queue_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def queue_perform_job(job_model: JobModel, connection: ConnectionType) -> Any:


class Queue:
REGISTRIES = dict(
finished="finished_job_registry",
failed="failed_job_registry",
scheduled="scheduled_job_registry",
active="active_job_registry",
canceled="canceled_job_registry",
queued="queued_job_registry",
)
REGISTRIES = {
"finished": "finished_job_registry",
"failed": "failed_job_registry",
"scheduled": "scheduled_job_registry",
"active": "active_job_registry",
"canceled": "canceled_job_registry",
"queued": "queued_job_registry",
}

def __init__(self, connection: ConnectionType, name: str, is_async: bool = True) -> None:
"""Initializes a Queue object.
Expand Down Expand Up @@ -150,7 +150,7 @@ def get_registry(self, name: str) -> JobNamesRegistry:
raise NoSuchRegistryError(f"Unknown registry name {name}")

def get_all_job_names(self) -> List[str]:
all_job_names = list()
all_job_names = []
all_job_names.extend(self.queued_job_registry.all(self.connection))
all_job_names.extend(self.finished_job_registry.all(self.connection))
all_job_names.extend(self.active_job_registry.all(self.connection))
Expand Down
4 changes: 1 addition & 3 deletions scheduler/management/commands/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@ def add_arguments(self, parser: CommandParser) -> None:

def handle(self, *args: Any, **options: Any) -> None:
file = open(options.get("filename"), "w") if options.get("filename") else sys.stdout
res = list()

tasks = Task.objects.all()
if options.get("enabled"):
tasks = tasks.filter(enabled=True)
for task in tasks:
res.append(task.to_dict())
res = [task.to_dict() for task in tasks]

if options.get("format") == "json":
import json
Expand Down
6 changes: 2 additions & 4 deletions scheduler/management/commands/import.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ def create_task_from_dict(task_dict: Dict[str, Any], update: bool) -> Optional[T
if not settings.USE_TZ and not timezone.is_naive(target):
target = timezone.make_naive(target)
kwargs["scheduled_time"] = target
model_fields = set(
map(lambda field: field.attname, filter(lambda field: hasattr(field, "attname"), Task._meta.get_fields()))
)
model_fields = {field.attname for field in filter(lambda field: hasattr(field, "attname"), Task._meta.get_fields())}
keys_to_ignore = list(filter(lambda _k: _k not in model_fields, kwargs.keys()))
for k in keys_to_ignore:
del kwargs[k]
Expand Down Expand Up @@ -116,7 +114,7 @@ def add_arguments(self, parser: CommandParser) -> None:

def handle(self, *args: Any, **options: Any) -> None:
file = open(options.get("filename")) if options.get("filename") else sys.stdin # type: ignore[arg-type]
jobs = list()
jobs = []
if options.get("format") == "json":
import json

Expand Down
2 changes: 1 addition & 1 deletion scheduler/management/commands/scheduler_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _print_stats_dashboard(
click.echo(f"| {'Name':<16} | Queued | Active | Finished | Canceled | Workers |")
self._print_separator()
for ind, queue in enumerate(statistics["queues"]):
vals = list((queue[k] for k in KEYS))
vals = [queue[k] for k in KEYS]
# Deal with colors
if not with_color:
colors = ["" for _ in KEYS]
Expand Down
2 changes: 1 addition & 1 deletion scheduler/management/commands/scheduler_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def handle(self, **options: Any) -> None:

# Check whether sentry is enabled
if options.get("sentry_dsn") is not None:
sentry_opts = dict(ca_certs=options.get("sentry_ca_certs"), debug=options.get("sentry_debug"))
sentry_opts = {"ca_certs": options.get("sentry_ca_certs"), "debug": options.get("sentry_debug")}
dsn: str = options.get("sentry_dsn") # type: ignore
register_sentry(dsn, **sentry_opts)

Expand Down
62 changes: 31 additions & 31 deletions scheduler/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def is_scheduled(self) -> bool:
self.rqueue.queued_job_registry.exists(pipeline, self.job_name)
self.rqueue.active_job_registry.exists(pipeline, self.job_name)
results = pipeline.execute()
res = any([item is not None for item in results])
res = any(item is not None for item in results)

# If the job_name is not scheduled/queued/started,
# update the job_id to None. (The job_id belongs to a previous run which is completed)
Expand Down Expand Up @@ -230,14 +230,14 @@ def _enqueue_args(self) -> Dict[str, Any]:
- Set job-id to proper format
- set job meta
"""
res = dict(
meta=dict(),
task_type=self.task_type,
scheduled_task_id=self.id,
on_success=Callback(success_callback),
on_failure=Callback(failure_callback),
name=self._next_job_id(),
)
res = {
"meta": {},
"task_type": self.task_type,
"scheduled_task_id": self.id,
"on_success": Callback(success_callback),
"on_failure": Callback(failure_callback),
"name": self._next_job_id(),
}
if self.at_front:
res["at_front"] = self.at_front
if self.timeout:
Expand Down Expand Up @@ -287,29 +287,29 @@ def _schedule_time(self) -> datetime:
def to_dict(self) -> Dict[str, Any]:
"""Export model to dictionary, so it can be saved as external file backup"""
interval_unit = str(self.interval_unit) if self.interval_unit else None
res = dict(
model=str(self.task_type),
name=self.name,
callable=self.callable,
callable_args=[dict(arg_type=arg.arg_type, val=arg.val) for arg in self.callable_args.all()],
callable_kwargs=[
dict(arg_type=arg.arg_type, key=arg.key, val=arg.val) for arg in self.callable_kwargs.all()
res = {
"model": str(self.task_type),
"name": self.name,
"callable": self.callable,
"callable_args": [{"arg_type": arg.arg_type, "val": arg.val} for arg in self.callable_args.all()],
"callable_kwargs": [
{"arg_type": arg.arg_type, "key": arg.key, "val": arg.val} for arg in self.callable_kwargs.all()
],
enabled=self.enabled,
queue=self.queue,
repeat=getattr(self, "repeat", None),
at_front=self.at_front,
timeout=self.timeout,
result_ttl=self.result_ttl,
cron_string=getattr(self, "cron_string", None),
scheduled_time=self._schedule_time().isoformat(),
interval=getattr(self, "interval", None),
interval_unit=interval_unit,
successful_runs=getattr(self, "successful_runs", None),
failed_runs=getattr(self, "failed_runs", None),
last_successful_run=getattr(self, "last_successful_run", None),
last_failed_run=getattr(self, "last_failed_run", None),
)
"enabled": self.enabled,
"queue": self.queue,
"repeat": getattr(self, "repeat", None),
"at_front": self.at_front,
"timeout": self.timeout,
"result_ttl": self.result_ttl,
"cron_string": getattr(self, "cron_string", None),
"scheduled_time": self._schedule_time().isoformat(),
"interval": getattr(self, "interval", None),
"interval_unit": interval_unit,
"successful_runs": getattr(self, "successful_runs", None),
"failed_runs": getattr(self, "failed_runs", None),
"last_successful_run": getattr(self, "last_successful_run", None),
"last_failed_run": getattr(self, "last_failed_run", None),
}
return res

def get_absolute_url(self) -> str:
Expand Down
2 changes: 1 addition & 1 deletion scheduler/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

logger = logging.getLogger("scheduler")

_QUEUES: Dict[str, QueueConfiguration] = dict()
_QUEUES: Dict[str, QueueConfiguration] = {}
SCHEDULER_CONFIG: SchedulerConfiguration = SchedulerConfiguration()


Expand Down
8 changes: 4 additions & 4 deletions scheduler/tests/test_mgmt_commands/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def tearDown(self) -> None:
os.remove(self.tmpfile.name)

def test_export__should_export_job(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True))
tasks.append(task_factory(TaskType.REPEATABLE, enabled=True))

Expand All @@ -35,7 +35,7 @@ def test_export__should_export_job(self):
self.assertEqual(result[1], tasks[1].to_dict())

def test_export__should_export_enabled_jobs_only(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True))
tasks.append(task_factory(TaskType.REPEATABLE, enabled=False))

Expand All @@ -47,7 +47,7 @@ def test_export__should_export_enabled_jobs_only(self):
self.assertEqual(result[0], tasks[0].to_dict())

def test_export__should_export_job_yaml_without_yaml_lib(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True))
tasks.append(task_factory(TaskType.REPEATABLE, enabled=True))

Expand All @@ -58,7 +58,7 @@ def test_export__should_export_job_yaml_without_yaml_lib(self):
self.assertEqual(cm.exception.code, 1)

def test_export__should_export_job_yaml_green(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True))
tasks.append(task_factory(TaskType.REPEATABLE, enabled=True))
tasks.append(task_factory(TaskType.CRON, enabled=True))
Expand Down
12 changes: 6 additions & 6 deletions scheduler/tests/test_mgmt_commands/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def tearDown(self) -> None:
os.remove(self.tmpfile.name)

def test_import__should_schedule_job(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True, instance_only=True))
tasks.append(task_factory(TaskType.REPEATABLE, enabled=True, instance_only=True))
res = json.dumps([j.to_dict() for j in tasks])
Expand All @@ -38,7 +38,7 @@ def test_import__should_schedule_job(self):
self.assertEqual(getattr(tasks[0], attr), getattr(db_task, attr))

def test_import__should_schedule_job_yaml(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True, instance_only=True))
tasks.append(task_factory(TaskType.REPEATABLE, enabled=True, instance_only=True))
res = yaml.dump([j.to_dict() for j in tasks], default_flow_style=False)
Expand All @@ -55,7 +55,7 @@ def test_import__should_schedule_job_yaml(self):
self.assertEqual(getattr(tasks[0], attr), getattr(task, attr))

def test_import__should_schedule_job_yaml_without_yaml_lib(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True, instance_only=True))
tasks.append(task_factory(TaskType.REPEATABLE, enabled=True, instance_only=True))
res = yaml.dump([j.to_dict() for j in tasks], default_flow_style=False)
Expand All @@ -68,7 +68,7 @@ def test_import__should_schedule_job_yaml_without_yaml_lib(self):
self.assertEqual(cm.exception.code, 1)

def test_import__should_schedule_job_reset(self):
tasks = list()
tasks = []
task_factory(TaskType.ONCE, enabled=True)
task_factory(TaskType.ONCE, enabled=True)
tasks.append(task_factory(TaskType.ONCE, enabled=True))
Expand All @@ -91,7 +91,7 @@ def test_import__should_schedule_job_reset(self):
self.assertEqual(getattr(tasks[1], attr), getattr(task, attr))

def test_import__should_schedule_job_update_existing(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True))
tasks.append(task_factory(TaskType.ONCE, enabled=True))
res = json.dumps([j.to_dict() for j in tasks])
Expand All @@ -107,7 +107,7 @@ def test_import__should_schedule_job_update_existing(self):
self.assertEqual(getattr(tasks[0], attr), getattr(task, attr))

def test_import__should_schedule_job_without_update_existing(self):
tasks = list()
tasks = []
tasks.append(task_factory(TaskType.ONCE, enabled=True))
tasks.append(task_factory(TaskType.ONCE, enabled=True))
res = json.dumps([j.to_dict() for j in tasks])
Expand Down
4 changes: 2 additions & 2 deletions scheduler/tests/test_mgmt_commands/test_scheduler_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from scheduler.helpers.queues import get_queue


@override_settings(SCHEDULER_QUEUES=dict(default={"HOST": "localhost", "PORT": 6379, "DB": 0}))
@override_settings(SCHEDULER_QUEUES={"default": {"HOST": "localhost", "PORT": 6379, "DB": 0}})
class SchedulerStatsTest(TestCase):
EXPECTED_OUTPUT = {
"queues": [
Expand All @@ -33,7 +33,7 @@ class SchedulerStatsTest(TestCase):
def setUp(self):
super(SchedulerStatsTest, self).setUp()
SchedulerStatsTest.OLD_QUEUES = settings._QUEUES
settings._QUEUES = dict()
settings._QUEUES = {}
settings.conf_settings()
get_queue("default").connection.flushall()

Expand Down
Loading
Loading