diff --git a/kolibri/conftest.py b/kolibri/conftest.py
index ab843ecd57d..0f06da472bb 100644
--- a/kolibri/conftest.py
+++ b/kolibri/conftest.py
@@ -7,19 +7,6 @@
TEMP_KOLIBRI_HOME = "./.pytest_kolibri_home"
-@pytest.fixture(scope="session")
-def django_db_setup(
- request,
- django_db_setup,
-):
- def dispose_sqlalchemy():
- from kolibri.core.tasks.main import connection
-
- connection.dispose()
-
- request.addfinalizer(dispose_sqlalchemy)
-
-
@pytest.fixture(scope="session", autouse=True)
def global_fixture():
if not os.path.exists(TEMP_KOLIBRI_HOME):
diff --git a/kolibri/core/auth/tasks.py b/kolibri/core/auth/tasks.py
index ca1a0ed727c..44865461260 100644
--- a/kolibri/core/auth/tasks.py
+++ b/kolibri/core/auth/tasks.py
@@ -7,6 +7,8 @@
from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils import timezone
+from morango.errors import MorangoError
+from requests.exceptions import HTTPError
from rest_framework import serializers
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.exceptions import ValidationError
@@ -41,9 +43,9 @@
from kolibri.core.tasks.permissions import IsAdminForJob
from kolibri.core.tasks.permissions import IsSuperAdmin
from kolibri.core.tasks.permissions import NotProvisioned
+from kolibri.core.tasks.utils import DatabaseLockedError
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.validation import JobValidator
-from kolibri.utils.time_utils import naive_utc_datetime
from kolibri.utils.translation import gettext as _
@@ -469,7 +471,7 @@ def enqueue_soud_sync_processing():
# Check if there is already an enqueued job
try:
- converted_next_run = naive_utc_datetime(timezone.now() + next_run)
+ converted_next_run = timezone.now() + next_run
orm_job = job_storage.get_orm_job(SOUD_SYNC_PROCESSING_JOB_ID)
if (
orm_job.state not in (State.COMPLETED, State.FAILED, State.CANCELED)
@@ -607,6 +609,7 @@ def validate(self, data):
permission_classes=[IsSuperAdmin() | NotProvisioned()],
status_fn=status_fn,
long_running=True,
+ retry_on=[DatabaseLockedError, MorangoError, HTTPError],
)
def peeruserimport(command, **kwargs):
call_command(command, **kwargs)
diff --git a/kolibri/core/auth/test/test_auth_tasks.py b/kolibri/core/auth/test/test_auth_tasks.py
index b907e9e3706..938e4518045 100644
--- a/kolibri/core/auth/test/test_auth_tasks.py
+++ b/kolibri/core/auth/test/test_auth_tasks.py
@@ -37,7 +37,6 @@
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
-from kolibri.utils.time_utils import naive_utc_datetime
DUMMY_PASSWORD = "password"
@@ -64,7 +63,7 @@ def fake_job(**kwargs):
class dummy_orm_job_data(object):
- scheduled_time = datetime.datetime(year=2023, month=1, day=1, tzinfo=None)
+ scheduled_time = datetime.datetime(year=2023, month=1, day=1)
repeat = 5
interval = 8600
retry_interval = 5
@@ -701,7 +700,7 @@ def test_enqueue_soud_sync_processing__future__scheduled(
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=30)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.QUEUED
- mock_job.scheduled_time = naive_utc_datetime(timezone.now())
+ mock_job.scheduled_time = timezone.now()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_not_called()
@@ -714,7 +713,7 @@ def test_enqueue_soud_sync_processing__future__running(
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=1)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.RUNNING
- mock_job.scheduled_time = naive_utc_datetime(timezone.now())
+ mock_job.scheduled_time = timezone.now()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_not_called()
@@ -727,9 +726,7 @@ def test_enqueue_soud_sync_processing__future__reschedule(
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=10)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.QUEUED
- mock_job.scheduled_time = naive_utc_datetime(
- timezone.now() + datetime.timedelta(seconds=15)
- )
+ mock_job.scheduled_time = timezone.now() + datetime.timedelta(seconds=15)
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))
@@ -743,9 +740,7 @@ def test_enqueue_soud_sync_processing__completed__enqueue(
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
- mock_job.scheduled_time = naive_utc_datetime(
- timezone.now() - datetime.timedelta(seconds=100)
- )
+ mock_job.scheduled_time = timezone.now() - datetime.timedelta(seconds=100)
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))
@@ -759,9 +754,7 @@ def test_enqueue_soud_sync_processing__race__already_running(
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
- mock_job.scheduled_time = naive_utc_datetime(
- timezone.now() - datetime.timedelta(seconds=100)
- )
+ mock_job.scheduled_time = timezone.now() - datetime.timedelta(seconds=100)
mock_task.enqueue_in.side_effect = JobRunning()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))
diff --git a/kolibri/core/tasks/api.py b/kolibri/core/tasks/api.py
index a2c4303049c..d40af9c0ade 100644
--- a/kolibri/core/tasks/api.py
+++ b/kolibri/core/tasks/api.py
@@ -2,9 +2,7 @@
from django.http.response import Http404
from django.utils.decorators import method_decorator
-from django.utils.timezone import make_aware
from django.views.decorators.csrf import csrf_protect
-from pytz import utc
from rest_framework import decorators
from rest_framework import serializers
from rest_framework import status
@@ -100,8 +98,7 @@ def _job_to_response(self, job):
"args": job.args,
"kwargs": job.kwargs,
"extra_metadata": job.extra_metadata,
- # Output is UTC naive, coerce to UTC aware.
- "scheduled_datetime": make_aware(orm_job.scheduled_time, utc).isoformat(),
+ "scheduled_datetime": orm_job.scheduled_time.isoformat(),
"repeat": orm_job.repeat,
"repeat_interval": orm_job.interval,
"retry_interval": orm_job.retry_interval,
@@ -162,6 +159,7 @@ def _enqueue_job_based_on_enqueue_args(self, registered_task, job, enqueue_args)
interval=enqueue_args.get("repeat_interval", 0),
repeat=enqueue_args.get("repeat", 0),
retry_interval=enqueue_args.get("retry_interval", None),
+ max_retries=enqueue_args.get("max_retries", None),
)
elif enqueue_args.get("enqueue_in"):
return job_storage.enqueue_in(
@@ -172,12 +170,14 @@ def _enqueue_job_based_on_enqueue_args(self, registered_task, job, enqueue_args)
interval=enqueue_args.get("repeat_interval", 0),
repeat=enqueue_args.get("repeat", 0),
retry_interval=enqueue_args.get("retry_interval", None),
+ max_retries=enqueue_args.get("max_retries", None),
)
return job_storage.enqueue_job(
job,
queue=registered_task.queue,
priority=enqueue_args.get("priority", registered_task.priority),
retry_interval=enqueue_args.get("retry_interval", None),
+ max_retries=enqueue_args.get("max_retries", None),
)
def create(self, request):
diff --git a/kolibri/core/tasks/decorators.py b/kolibri/core/tasks/decorators.py
index 74c115debad..4e4770666b3 100644
--- a/kolibri/core/tasks/decorators.py
+++ b/kolibri/core/tasks/decorators.py
@@ -17,6 +17,7 @@ def register_task(
permission_classes=None,
long_running=False,
status_fn=None,
+ retry_on=None,
):
"""
Registers the decorated function as task.
@@ -36,6 +37,7 @@ def register_task(
permission_classes=permission_classes,
long_running=long_running,
status_fn=status_fn,
+ retry_on=retry_on,
)
return RegisteredTask(
@@ -49,4 +51,5 @@ def register_task(
permission_classes=permission_classes,
long_running=long_running,
status_fn=status_fn,
+ retry_on=retry_on,
)
diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py
index 6147266d466..8677a81730c 100644
--- a/kolibri/core/tasks/job.py
+++ b/kolibri/core/tasks/job.py
@@ -362,6 +362,8 @@ def execute(self):
args, kwargs = copy.copy(self.args), copy.copy(self.kwargs)
+ exception = None
+
try:
# First check whether the job has been cancelled
self.check_for_cancel()
@@ -370,6 +372,7 @@ def execute(self):
except UserCancelledError:
self.storage.mark_job_as_canceled(self.job_id)
except Exception as e:
+ exception = e
# If any error occurs, mark the job as failed and save the exception
traceback_str = traceback.format_exc()
e.traceback = traceback_str
@@ -377,9 +380,11 @@ def execute(self):
"Job {} raised an exception: {}".format(self.job_id, traceback_str)
)
self.storage.mark_job_as_failed(self.job_id, e, traceback_str)
-
self.storage.reschedule_finished_job_if_needed(
- self.job_id, delay=self._retry_in_delay, **self._retry_in_kwargs
+ self.job_id,
+ delay=self._retry_in_delay,
+ exception=exception,
+ **self._retry_in_kwargs,
)
setattr(current_state_tracker, "job", None)
diff --git a/kolibri/core/tasks/main.py b/kolibri/core/tasks/main.py
index d06f977b484..98c500fca15 100644
--- a/kolibri/core/tasks/main.py
+++ b/kolibri/core/tasks/main.py
@@ -3,7 +3,6 @@
from django.utils.functional import SimpleLazyObject
from kolibri.core.tasks.storage import Storage
-from kolibri.core.tasks.utils import db_connection
from kolibri.core.tasks.worker import Worker
from kolibri.utils import conf
@@ -11,13 +10,8 @@
logger = logging.getLogger(__name__)
-connection = SimpleLazyObject(db_connection)
-
-
def __job_storage():
- return Storage(
- connection=connection,
- )
+ return Storage()
# This storage instance should be used to access job_storage db.
@@ -28,7 +22,6 @@ def __job_storage():
def initialize_workers(log_queue=None):
logger.info("Starting async task workers.")
return Worker(
- connection=connection,
regular_workers=conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"],
high_workers=conf.OPTIONS["Tasks"]["HIGH_PRIORITY_WORKERS"],
log_queue=log_queue,
diff --git a/kolibri/core/tasks/migrations/0001_initial.py b/kolibri/core/tasks/migrations/0001_initial.py
new file mode 100644
index 00000000000..4248ad572ec
--- /dev/null
+++ b/kolibri/core/tasks/migrations/0001_initial.py
@@ -0,0 +1,56 @@
+# Initial migration for the jobs table. This model/index creation should be
+# skipped if the table was previously created by SQLAlchemy.
+from django.db import migrations
+from django.db import models
+
+from kolibri.core.tasks.operations import AddIndexIfNotExists
+from kolibri.core.tasks.operations import CreateModelIfNotExists
+
+
+class Migration(migrations.Migration):
+
+ initial = True
+
+ dependencies = []
+
+ operations = [
+ CreateModelIfNotExists(
+ name="Job",
+ fields=[
+ ("id", models.CharField(max_length=36, primary_key=True)),
+ ("state", models.CharField(max_length=20, db_index=True)),
+ ("func", models.CharField(max_length=200, db_index=True)),
+ ("priority", models.IntegerField(db_index=True)),
+ ("queue", models.CharField(max_length=50, db_index=True)),
+ ("saved_job", models.TextField()),
+ ("time_created", models.DateTimeField(null=True, blank=True)),
+ ("time_updated", models.DateTimeField(null=True, blank=True)),
+ ("interval", models.IntegerField(default=0)),
+ ("retry_interval", models.IntegerField(null=True, blank=True)),
+ ("repeat", models.IntegerField(null=True, blank=True)),
+ ("scheduled_time", models.DateTimeField(null=True, blank=True)),
+ (
+ "worker_host",
+ models.CharField(max_length=100, null=True, blank=True),
+ ),
+ (
+ "worker_process",
+ models.CharField(max_length=50, null=True, blank=True),
+ ),
+ (
+ "worker_thread",
+ models.CharField(max_length=50, null=True, blank=True),
+ ),
+ ("worker_extra", models.TextField(null=True, blank=True)),
+ ],
+ options={
+ "db_table": "jobs",
+ },
+ ),
+ AddIndexIfNotExists(
+ model_name="job",
+ index=models.Index(
+ fields=["queue", "scheduled_time"], name="queue__scheduled_time"
+ ),
+ ),
+ ]
diff --git a/kolibri/core/tasks/migrations/0002_add_retries_fields.py b/kolibri/core/tasks/migrations/0002_add_retries_fields.py
new file mode 100644
index 00000000000..8c57c0c9820
--- /dev/null
+++ b/kolibri/core/tasks/migrations/0002_add_retries_fields.py
@@ -0,0 +1,23 @@
+# Generated by Django 3.2.25 on 2025-10-15 21:14
+from django.db import migrations
+from django.db import models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("kolibritasks", "0001_initial"),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name="job",
+ name="max_retries",
+ field=models.IntegerField(blank=True, null=True),
+ ),
+ migrations.AddField(
+ model_name="job",
+ name="retries",
+ field=models.IntegerField(blank=True, null=True),
+ ),
+ ]
diff --git a/kolibri/core/tasks/migrations/__init__.py b/kolibri/core/tasks/migrations/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/kolibri/core/tasks/models.py b/kolibri/core/tasks/models.py
new file mode 100644
index 00000000000..35ac0539f17
--- /dev/null
+++ b/kolibri/core/tasks/models.py
@@ -0,0 +1,108 @@
+from django.db import models
+
+from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE
+
+
+class KolibriTasksRouter(object):
+ """
+ Determine how to route database calls for the kolibritasks app.
+ All other models will be routed to the default database.
+ """
+
+ def db_for_read(self, model, **hints):
+ """Send all read operations on kolibritasks app models to JOB_STORAGE."""
+ if model._meta.app_label == "kolibritasks":
+ return JOB_STORAGE
+ return None
+
+ def db_for_write(self, model, **hints):
+ """Send all write operations on kolibritasks app models to JOB_STORAGE."""
+ if model._meta.app_label == "kolibritasks":
+ return JOB_STORAGE
+ return None
+
+ def allow_relation(self, obj1, obj2, **hints):
+ """Determine if relationship is allowed between two objects."""
+
+ if (
+ obj1._meta.app_label == "kolibritasks"
+ and obj2._meta.app_label == "kolibritasks"
+ ):
+ return True
+ elif "kolibritasks" not in [obj1._meta.app_label, obj2._meta.app_label]:
+ return None
+
+ return False
+
+ def allow_migrate(self, db, app_label, model_name=None, **hints):
+ """Ensure that the kolibritasks app's models get created on the right database."""
+ if app_label == "kolibritasks":
+ # The kolibritasks app should be migrated only on the JOB_STORAGE database.
+ return db == JOB_STORAGE
+ elif db == JOB_STORAGE:
+ # Ensure that all other apps don't get migrated on the JOB_STORAGE database.
+ return False
+
+ # No opinion for all other scenarios
+ return None
+
+
+# The Job model has been migrated from SqlAlchemy to use django models
+# generated by Copilot and tweaked
+class Job(models.Model):
+
+ # The hex UUID given to the job upon first creation.
+ id = models.CharField(max_length=36, primary_key=True)
+
+ # The job's state. Inflated here for easier querying to the job's state.
+ state = models.CharField(max_length=20, db_index=True)
+
+ # The job's function string. Inflated here for easier querying of which task type it is.
+ func = models.CharField(max_length=200, db_index=True)
+
+ # The job's priority. Helps to decide which job to run next.
+ priority = models.IntegerField(db_index=True)
+
+ # The queue name passed to the client when the job is scheduled.
+ queue = models.CharField(max_length=50, db_index=True)
+
+ # The JSON string that represents the job
+ saved_job = models.TextField()
+
+ time_created = models.DateTimeField(null=True, blank=True)
+ time_updated = models.DateTimeField(null=True, blank=True)
+
+ # Repeat interval in seconds.
+ interval = models.IntegerField(default=0)
+
+ # Retry interval in seconds.
+ retry_interval = models.IntegerField(null=True, blank=True)
+
+ # Number of times to repeat - None means repeat forever.
+ repeat = models.IntegerField(null=True, blank=True)
+
+ scheduled_time = models.DateTimeField(null=True, blank=True)
+
+ # Optional references to the worker host, process and thread that are running this job,
+ # and any extra metadata that can be used by specific worker implementations.
+ worker_host = models.CharField(max_length=100, null=True, blank=True)
+ worker_process = models.CharField(max_length=50, null=True, blank=True)
+ worker_thread = models.CharField(max_length=50, null=True, blank=True)
+ worker_extra = models.TextField(null=True, blank=True)
+
+ # Columns for retry logic
+ # Number of times the job has been retried
+ retries = models.IntegerField(null=True, blank=True)
+ # Maximum number of retries allowed for the job
+ max_retries = models.IntegerField(null=True, blank=True)
+
+ class Meta:
+ db_table = "jobs"
+ indexes = [
+ models.Index(
+ fields=["queue", "scheduled_time"], name="queue__scheduled_time"
+ ),
+ ]
+
+ def __str__(self):
+ return f"Job {self.id} - {self.func} ({self.state})"
diff --git a/kolibri/core/tasks/operations.py b/kolibri/core/tasks/operations.py
new file mode 100644
index 00000000000..0b25654daec
--- /dev/null
+++ b/kolibri/core/tasks/operations.py
@@ -0,0 +1,73 @@
+import logging
+
+from django.db import migrations
+
+logger = logging.getLogger(__name__)
+
+
+def _get_db_tables(schema_editor):
+ with schema_editor.connection.cursor() as cursor:
+ return [
+ table.name
+ for table in schema_editor.connection.introspection.get_table_list(cursor)
+ ]
+
+
+class CreateModelIfNotExists(migrations.CreateModel):
+ """
+ Migration operation that creates a model if it does not already exist in the database.
+ This is useful for ensuring compatibility between SQLAlchemy and Django migrations,
+ preventing errors when the model has already been created by SQLAlchemy.
+ """
+
+ def _model_exists(self, schema_editor, table_name):
+ tables = _get_db_tables(schema_editor)
+ return table_name in tables
+
+ def database_forwards(self, app_label, schema_editor, from_state, to_state):
+ to_model = to_state.apps.get_model(app_label, self.name)
+ table_name = to_model._meta.db_table
+
+ if not self._model_exists(schema_editor, table_name):
+ # Let the parent class handle the creation
+ super().database_forwards(app_label, schema_editor, from_state, to_state)
+ else:
+ logger.info(f"Table '{table_name}' already exists. Skipping creation.")
+
+ def describe(self):
+ return f"Creates the model {self.name} if it does not exist."
+
+
+class AddIndexIfNotExists(migrations.AddIndex):
+ """
+ Migration operation that adds an index to a model if it does not already exist in the database.
+ This is useful for ensuring compatibility between SQLAlchemy and Django migrations,
+ preventing errors when the index has already been added by SQLAlchemy.
+ """
+
+ def _index_exists(self, schema_editor, table_name, index_name):
+ # Check existing tables in the database
+ tables = _get_db_tables(schema_editor)
+ if table_name not in tables:
+ return False
+
+ with schema_editor.connection.cursor() as cursor:
+ # Get existing constraints (including indexes) on the table
+ constraints = schema_editor.connection.introspection.get_constraints(
+ cursor, table_name
+ )
+ return constraints.get(index_name) is not None
+
+ def database_forwards(self, app_label, schema_editor, from_state, to_state):
+ model = to_state.apps.get_model(app_label, self.model_name)
+ table_name = model._meta.db_table
+ index_name = self.index.name
+
+ if not self._index_exists(schema_editor, table_name, index_name):
+ # Let the parent class handle the index creation
+ super().database_forwards(app_label, schema_editor, from_state, to_state)
+ else:
+ logger.info(f"Index '{index_name}' already exists. Skipping creation.")
+
+ def describe(self):
+ return f"Adds the index {self.index} to {self.model_name} if it does not exist."
diff --git a/kolibri/core/tasks/registry.py b/kolibri/core/tasks/registry.py
index f47ce34e4f3..b97e7b41333 100644
--- a/kolibri/core/tasks/registry.py
+++ b/kolibri/core/tasks/registry.py
@@ -159,6 +159,7 @@ def __init__( # noqa: C901
permission_classes=None,
long_running=False,
status_fn=None,
+ retry_on=None,
):
"""
:param func: Function to be wrapped as a Registered task
@@ -229,6 +230,7 @@ def __init__( # noqa: C901
self.track_progress = track_progress
self.long_running = long_running
self._status_fn = status_fn
+ self.retry_on = self._validate_retry_on(retry_on)
# Make this wrapper object look seamlessly like the wrapped function
update_wrapper(self, func)
@@ -258,6 +260,17 @@ def _validate_permissions_classes(self, permission_classes):
else:
yield permission_class
+ def _validate_retry_on(self, retry_on):
+ if retry_on is None:
+ return []
+
+ if not isinstance(retry_on, list):
+ raise TypeError("retry_on must be a list of exceptions")
+ for item in retry_on:
+ if not issubclass(item, Exception):
+ raise TypeError("Each item in retry_on must be an Exception subclass")
+ return retry_on
+
def check_job_permissions(self, user, job, view):
for permission in self.permissions:
if not permission.has_permission(user, job, view):
diff --git a/kolibri/core/tasks/storage.py b/kolibri/core/tasks/storage.py
index 96328ec5854..02ac4b61617 100644
--- a/kolibri/core/tasks/storage.py
+++ b/kolibri/core/tasks/storage.py
@@ -1,23 +1,11 @@
import logging
-from contextlib import contextmanager
from datetime import datetime
from datetime import timedelta
-import pytz
-from sqlalchemy import Column
-from sqlalchemy import DateTime
-from sqlalchemy import func as sql_func
-from sqlalchemy import Index
-from sqlalchemy import Integer
-from sqlalchemy import or_
-from sqlalchemy import select
-from sqlalchemy import String
-from sqlalchemy import Table
-from sqlalchemy import text
-from sqlalchemy import update
-from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import declarative_base
-from sqlalchemy.orm import sessionmaker
+from django.db import connections
+from django.db import transaction
+from django.db.models import Q
+from django.db.utils import OperationalError
from kolibri.core.tasks.constants import DEFAULT_QUEUE
from kolibri.core.tasks.constants import Priority
@@ -27,101 +15,30 @@
from kolibri.core.tasks.hooks import StorageHook
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
+from kolibri.core.tasks.models import Job as ORMJob
+from kolibri.core.tasks.validation import validate_exception
from kolibri.core.tasks.validation import validate_interval
from kolibri.core.tasks.validation import validate_priority
from kolibri.core.tasks.validation import validate_repeat
from kolibri.core.tasks.validation import validate_timedelay
-from kolibri.utils.sql_alchemy import db_matches_schema
from kolibri.utils.time_utils import local_now
-from kolibri.utils.time_utils import naive_utc_datetime
-
-Base = declarative_base()
logger = logging.getLogger(__name__)
-class ORMJob(Base):
- """
- The DB representation of a common.classes.Job object,
- storing the relevant details needed by the job storage
- backend.
- """
-
- __tablename__ = "jobs"
-
- # The hex UUID given to the job upon first creation.
- id = Column(String, primary_key=True, autoincrement=False)
-
- # The job's state. Inflated here for easier querying to the job's state.
- state = Column(String, index=True)
-
- # The job's function string. Inflated here for easier querying of which task type it is.
- func = Column(String, index=True)
-
- # The job's priority. Helps to decide which job to run next.
- priority = Column(Integer, index=True)
-
- # The queue name passed to the client when the job is scheduled.
- queue = Column(String, index=True)
-
- # The JSON string that represents the job
- saved_job = Column(String)
-
- time_created = Column(DateTime(timezone=True), server_default=sql_func.now())
- time_updated = Column(DateTime(timezone=True), onupdate=sql_func.now())
-
- # Repeat interval in seconds.
- interval = Column(Integer, default=0)
-
- # Retry interval in seconds.
- retry_interval = Column(Integer, nullable=True)
-
- # Number of times to repeat - None means repeat forever.
- repeat = Column(Integer, nullable=True)
-
- scheduled_time = Column(DateTime())
-
- # Optional references to the worker host, process and thread that are running this job,
- # and any extra metadata that can be used by specific worker implementations.
- worker_host = Column(String, nullable=True)
- worker_process = Column(String, nullable=True)
- worker_thread = Column(String, nullable=True)
- worker_extra = Column(String, nullable=True)
-
- __table_args__ = (Index("queue__scheduled_time", "queue", "scheduled_time"),)
-
-
NO_VALUE = object()
class Storage(object):
- def __init__(self, connection, Base=Base):
- self.engine = connection
- if self.engine.name == "sqlite":
- self.set_sqlite_pragmas()
- self.Base = Base
- self.Base.metadata.create_all(self.engine)
- self.sessionmaker = sessionmaker(bind=self.engine)
+ def __init__(self):
+ self.set_sqlite_pragmas()
self._hooks = list(StorageHook.registered_hooks)
- @contextmanager
- def session_scope(self):
- session = self.sessionmaker()
- try:
- yield session
- session.commit()
- except Exception:
- session.rollback()
- raise
- finally:
- session.close()
-
def __len__(self):
"""
Returns the number of jobs currently in the storage.
"""
- with self.engine.connect() as conn:
- return conn.execute(sql_func.count(ORMJob.id)).scalar()
+ return ORMJob.objects.count()
def __contains__(self, item):
"""
@@ -131,37 +48,27 @@ def __contains__(self, item):
job_id = item
if isinstance(item, Job):
job_id = item.job_id
- with self.engine.connect() as connection:
- return (
- connection.execute(select(ORMJob).where(ORMJob.id == job_id)).fetchone()
- is not None
- )
-
- @staticmethod
- def recreate_default_tables(engine):
- """
- Recreates the default tables for the job storage backend.
- """
- Base.metadata.drop_all(engine)
- scheduledjobs_base = declarative_base()
- scheduledjobs_table = Table("scheduledjobs", scheduledjobs_base.metadata)
- scheduledjobs_table.drop(engine, checkfirst=True)
- Base.metadata.create_all(engine)
+ return ORMJob.objects.filter(id=job_id).exists()
def set_sqlite_pragmas(self):
"""
- Sets the connection PRAGMAs for the sqlalchemy engine stored in self.engine.
+ Sets the connection PRAGMAs for the sqlite database.
It currently sets:
- journal_mode to WAL
:return: None
"""
- try:
- with self.engine.connect() as conn:
- conn.execute(text("PRAGMA journal_mode = WAL;"))
- except OperationalError:
- pass
+ from django.db import connections
+
+ connection = connections[ORMJob.objects.db]
+ if connection.vendor == "sqlite":
+ try:
+ cursor = connection.cursor()
+ cursor.execute("PRAGMA journal_mode=WAL;")
+ cursor.close()
+ except OperationalError:
+ pass
def _orm_to_job(self, orm_job):
"""
@@ -176,7 +83,12 @@ def _orm_to_job(self, orm_job):
return job
def enqueue_job(
- self, job, queue=DEFAULT_QUEUE, priority=Priority.REGULAR, retry_interval=None
+ self,
+ job,
+ queue=DEFAULT_QUEUE,
+ priority=Priority.REGULAR,
+ retry_interval=None,
+ max_retries=None,
):
"""
Add the job given by j to the job queue.
@@ -193,6 +105,7 @@ def enqueue_job(
interval=0,
repeat=0,
retry_interval=retry_interval,
+ max_retries=max_retries,
)
except JobRunning:
logger.debug(
@@ -203,23 +116,25 @@ def enqueue_job(
return job.job_id
def enqueue_lifo(
- self, job, queue=DEFAULT_QUEUE, priority=Priority.REGULAR, retry_interval=None
+ self,
+ job,
+ queue=DEFAULT_QUEUE,
+ priority=Priority.REGULAR,
+ retry_interval=None,
+ max_retries=None,
):
- naive_utc_now = datetime.utcnow()
- with self.session_scope() as session:
- soonest_job = (
- session.query(ORMJob)
- .filter(ORMJob.state == State.QUEUED)
- .filter(ORMJob.scheduled_time <= naive_utc_now)
- .order_by(ORMJob.scheduled_time)
- .first()
- )
- dt = (
- pytz.timezone("UTC").localize(soonest_job.scheduled_time)
- - timedelta(microseconds=1)
- if soonest_job
- else self._now()
- )
+ now = self._now()
+ soonest_job = (
+ ORMJob.objects.filter(state=State.QUEUED)
+ .filter(scheduled_time__lte=now)
+ .order_by("scheduled_time")
+ .first()
+ )
+ dt = (
+ soonest_job.scheduled_time - timedelta(microseconds=1)
+ if soonest_job
+ else self._now()
+ )
try:
return self.schedule(
dt,
@@ -229,6 +144,7 @@ def enqueue_lifo(
interval=0,
repeat=0,
retry_interval=retry_interval,
+ max_retries=max_retries,
)
except JobRunning:
logger.debug(
@@ -272,16 +188,13 @@ def mark_job_as_canceling(self, job_id):
"""
self._update_job(job_id, State.CANCELING)
- def _filter_next_query(self, query, priority):
- naive_utc_now = datetime.utcnow()
- return (
- query.filter(ORMJob.state == State.QUEUED)
- .filter(ORMJob.scheduled_time <= naive_utc_now)
- .filter(ORMJob.priority <= priority)
- .order_by(ORMJob.priority, ORMJob.scheduled_time, ORMJob.time_created)
- )
+ def _filter_next_query(self, queryset, priority):
+ now = self._now()
+ return queryset.filter(
+ Q(scheduled_time__lte=now), state=State.QUEUED, priority__lte=priority
+ ).order_by("priority", "scheduled_time", "time_created")
- def _postgres_next_queued_job(self, session, priority):
+ def _postgres_next_queued_job(self, priority):
"""
For postgres we are doing our best to ensure that the selected job
is not then also selected by another potentially concurrent worker controller
@@ -289,77 +202,72 @@ def _postgres_next_queued_job(self, session, priority):
This should work as long as our connection uses the default isolation level
of READ_COMMITTED.
More details here: https://dba.stackexchange.com/a/69497
- For SQLAlchemy details here: https://stackoverflow.com/a/25943713
"""
- subquery = (
- self._filter_next_query(session.query(ORMJob.id), priority)
- .limit(1)
- .with_for_update(skip_locked=True)
- )
- return self.engine.execute(
- update(ORMJob)
- .values(state=State.SELECTED)
- .where(ORMJob.id == subquery.scalar_subquery())
- .returning(ORMJob.saved_job)
- ).fetchone()
+ with transaction.atomic():
+ next_job = (
+ self._filter_next_query(ORMJob.objects.all(), priority)
+ .select_for_update(skip_locked=True)
+ .first()
+ )
+
+ if next_job:
+ next_job.state = State.SELECTED
+ next_job.save()
+ return next_job
+ return None
- def _sqlite_next_queued_job(self, session, priority):
+ def _sqlite_next_queued_job(self, priority):
"""
Due to the difficulty in appropriately locking the task row
we do not support multiple task runners potentially duelling
to lock tasks for SQLite, so here we just do a minimal
best effort to mark the job as selected for running.
"""
- orm_job = self._filter_next_query(session.query(ORMJob), priority).first()
+ orm_job = self._filter_next_query(ORMJob.objects.all(), priority).first()
+
if orm_job:
orm_job.state = State.SELECTED
- session.add(orm_job)
+ orm_job.save()
return orm_job
def get_next_queued_job(self, priority=Priority.REGULAR):
- with self.session_scope() as s:
- method = (
- self._sqlite_next_queued_job
- if self.engine.dialect.name == "sqlite"
- else self._postgres_next_queued_job
- )
- orm_job = method(s, priority)
+ db_backend = connections[ORMJob.objects.db].vendor
- if orm_job:
- job = self._orm_to_job(orm_job)
- else:
- job = None
+ if db_backend == "sqlite":
+ orm_job = self._sqlite_next_queued_job(priority)
+ else:
+ orm_job = self._postgres_next_queued_job(priority)
- return job
+ if orm_job:
+ return self._orm_to_job(orm_job)
+ return None
def filter_jobs(
self, queue=None, queues=None, state=None, repeating=None, func=None
):
if queue and queues:
raise ValueError("Cannot specify both queue and queues")
- with self.engine.connect() as conn:
- q = select(ORMJob)
- if queue:
- q = q.where(ORMJob.queue == queue)
+ queryset = ORMJob.objects.all()
- if queues:
- q = q.where(ORMJob.queue.in_(queues))
+ if queue:
+ queryset = queryset.filter(queue=queue)
- if state:
- q = q.where(ORMJob.state == state)
+ if queues:
+ queryset = queryset.filter(queue__in=queues)
- if repeating is True:
- q = q.where(or_(ORMJob.repeat > 0, ORMJob.repeat == None)) # noqa E711
- elif repeating is False:
- q = q.where(ORMJob.repeat == 0)
+ if state:
+ queryset = queryset.filter(state=state)
- if func:
- q = q.where(ORMJob.func == func)
+ if repeating is True:
+ queryset = queryset.filter(Q(repeat__gt=0) | Q(repeat__isnull=True))
+ elif repeating is False:
+ queryset = queryset.filter(repeat=0)
- orm_jobs = conn.execute(q)
+ if func:
+ queryset = queryset.filter(func=func)
- return [self._orm_to_job(o) for o in orm_jobs]
+ return [self._orm_to_job(o) for o in queryset]
def get_canceling_jobs(self, queues=None):
return self.get_jobs_by_state(state=State.CANCELING, queues=queues)
@@ -373,25 +281,17 @@ def get_jobs_by_state(self, state, queues=None):
def get_all_jobs(self, queue=None, repeating=None):
return self.filter_jobs(queue=queue, repeating=repeating)
- def test_table_readable(self):
- # Have to use the self-referential `self.engine.engine` as the inspection
- # used inside this function complains if we use the `self.engine` object
- # as it is a Django SimpleLazyObject and it doesn't like it!
- db_matches_schema({ORMJob.__tablename__: ORMJob}, self.engine.engine)
-
def get_job(self, job_id):
orm_job = self.get_orm_job(job_id)
job = self._orm_to_job(orm_job)
return job
def get_orm_job(self, job_id):
- with self.engine.connect() as connection:
- orm_job = connection.execute(
- select(ORMJob).where(ORMJob.id == job_id)
- ).fetchone()
- if orm_job is None:
+ try:
+ orm_job = ORMJob.objects.get(id=job_id)
+ return orm_job
+ except ORMJob.DoesNotExist:
raise JobNotFound()
- return orm_job
def restart_job(self, job_id):
"""
@@ -471,28 +371,28 @@ def clear(self, queue=None, job_id=None, force=False):
:type force: bool
:param force: If True, clear the job (or jobs), even if it hasn't completed, failed or been cancelled.
"""
- with self.session_scope() as s:
- q = s.query(ORMJob)
+ with transaction.atomic():
+ queryset = ORMJob.objects.all()
if queue:
- q = q.filter_by(queue=queue)
+ queryset = queryset.filter(queue=queue)
if job_id:
- q = q.filter_by(id=job_id)
+ queryset = queryset.filter(id=job_id)
# filter only by the finished jobs, if we are not specified to force
if not force:
- q = q.filter(
- or_(
- ORMJob.state == State.COMPLETED,
- ORMJob.state == State.FAILED,
- ORMJob.state == State.CANCELED,
- )
+ queryset = queryset.filter(
+ Q(state=State.COMPLETED)
+ | Q(state=State.FAILED)
+ | Q(state=State.CANCELED)
)
+
if self._hooks:
- for orm_job in q:
+ for orm_job in queryset:
job = self._orm_to_job(orm_job)
for hook in self._hooks:
hook.clear(job, orm_job)
- q.delete(synchronize_session=False)
+
+ queryset.delete()
def update_job_progress(
self, job_id, progress, total_progress, extra_metadata=None
@@ -556,9 +456,9 @@ def save_worker_info(
# nothing to do
return
- with self.session_scope() as session:
+ with transaction.atomic():
try:
- _, orm_job = self._get_job_and_orm_job(job_id, session)
+ _, orm_job = self._get_job_and_orm_job(job_id)
if host is not None:
orm_job.worker_host = host
if process is not None:
@@ -567,11 +467,7 @@ def save_worker_info(
orm_job.worker_thread = thread
if extra is not None:
orm_job.worker_extra = extra
- session.add(orm_job)
- try:
- session.commit()
- except Exception as e:
- logger.error("Got an error running session.commit(): {}".format(e))
+ orm_job.save()
except JobNotFound:
logger.error(
"Tried to update job with id {} but it was not found".format(job_id)
@@ -587,6 +483,7 @@ def reschedule_finished_job_if_needed( # noqa: C901
interval=None,
repeat=NO_VALUE,
retry_interval=NO_VALUE,
+ exception=None,
):
"""
Because repeat and retry_interval are nullable, None is a semantic value, so we need to use a sentinel value NO_VALUE
@@ -609,6 +506,9 @@ def reschedule_finished_job_if_needed( # noqa: C901
if delay is not None:
validate_timedelay(delay)
+ if exception is not None:
+ validate_exception(exception)
+
orm_job = self.get_orm_job(job_id)
# Only allow this function to be run on a job that is in a finished state.
@@ -626,6 +526,8 @@ def reschedule_finished_job_if_needed( # noqa: C901
retry_interval=retry_interval
if retry_interval is not NO_VALUE
else orm_job.retry_interval,
+ max_retries=orm_job.max_retries,
+ retries=orm_job.retries,
)
# Set a null new_scheduled_time so that we finish processing if none of the cases below pertain.
@@ -637,12 +539,17 @@ def reschedule_finished_job_if_needed( # noqa: C901
# enqueuing changes - so if it is still set to repeat, it will repeat again after the
# delayed rerun.
new_scheduled_time = self._now() + delay
- elif orm_job.state == State.FAILED and kwargs["retry_interval"] is not None:
- # If the task has failed, and a retry interval has been specified (either in the original enqueue,
- # or from the passed in kwargs) then requeue as a retry.
+ elif self._should_retry_on_failed_task(
+ orm_job, exception, kwargs["retry_interval"]
+ ):
new_scheduled_time = self._now() + timedelta(
seconds=kwargs["retry_interval"]
+ if kwargs["retry_interval"] is not None
+ else 10
)
+ # Increment the retries count.
+ current_retries = orm_job.retries if orm_job.retries is not None else 0
+ kwargs["retries"] = current_retries + 1
elif (
orm_job.state in {State.COMPLETED, State.FAILED, State.CANCELED}
@@ -661,10 +568,32 @@ def reschedule_finished_job_if_needed( # noqa: C901
# Use the schedule method so that any scheduling hooks are run for this next run of the job.
self.schedule(new_scheduled_time, job, **kwargs)
+ def _should_retry_on_failed_task(self, orm_job, exception, retry_interval):
+ """
+ Determine if a job should be retried based on its retry settings and the exception raised.
+ """
+ if orm_job.state != State.FAILED:
+ return False
+
+ if retry_interval is None and orm_job.max_retries is None:
+ # retry_interval or max_retries should be set to enable retries
+ return False
+
+ current_retries = orm_job.retries if orm_job.retries is not None else 0
+ if orm_job.max_retries is not None and current_retries >= orm_job.max_retries:
+ return False
+
+ job = self._orm_to_job(orm_job)
+ retry_on = job.task.retry_on
+ if retry_on and exception:
+ return any(isinstance(exception, exc) for exc in retry_on)
+
+ return True
+
def _update_job(self, job_id, state=None, **kwargs):
- with self.session_scope() as session:
+ with transaction.atomic():
try:
- job, orm_job = self._get_job_and_orm_job(job_id, session)
+ job, orm_job = self._get_job_and_orm_job(job_id)
if state is not None:
orm_job.state = job.state = state
for kwarg in kwargs:
@@ -677,11 +606,7 @@ def _update_job(self, job_id, state=None, **kwargs):
)
)
orm_job.saved_job = job.to_json()
- session.add(orm_job)
- try:
- session.commit()
- except Exception as e:
- logger.error("Got an error running session.commit(): {}".format(e))
+ orm_job.save()
for hook in self._hooks:
hook.update(job, orm_job, state=state, **kwargs)
return job, orm_job
@@ -699,9 +624,10 @@ def _update_job(self, job_id, state=None, **kwargs):
)
)
- def _get_job_and_orm_job(self, job_id, session):
- orm_job = session.query(ORMJob).filter_by(id=job_id).one_or_none()
- if orm_job is None:
+ def _get_job_and_orm_job(self, job_id):
+ try:
+ orm_job = ORMJob.objects.get(id=job_id)
+ except ORMJob.DoesNotExist:
raise JobNotFound()
job = self._orm_to_job(orm_job)
return job, orm_job
@@ -715,6 +641,7 @@ def enqueue_at(
interval=0,
repeat=0,
retry_interval=None,
+ max_retries=None,
):
"""
Add the job for the specified time
@@ -727,6 +654,7 @@ def enqueue_at(
interval=interval,
repeat=repeat,
retry_interval=retry_interval,
+ max_retries=max_retries,
)
def enqueue_in(
@@ -738,6 +666,7 @@ def enqueue_in(
interval=0,
repeat=0,
retry_interval=None,
+ max_retries=None,
):
"""
Add the job in the specified time delta
@@ -753,6 +682,7 @@ def enqueue_in(
interval=interval,
repeat=repeat,
retry_interval=retry_interval,
+ max_retries=max_retries,
)
def schedule(
@@ -764,6 +694,8 @@ def schedule(
interval=0,
repeat=0,
retry_interval=None,
+ retries=None,
+ max_retries=None,
):
"""
Add the job for the specified time, interval, and number of repeats.
@@ -784,33 +716,38 @@ def schedule(
if not isinstance(job, Job):
raise ValueError("Job argument must be a Job object.")
- with self.session_scope() as session:
- orm_job = session.get(ORMJob, job.job_id)
+ with transaction.atomic():
+ orm_job = ORMJob.objects.filter(id=job.job_id).first()
if orm_job and orm_job.state == State.RUNNING:
raise JobRunning()
job.state = State.QUEUED
- orm_job = ORMJob(
- id=job.job_id,
- state=job.state,
- func=job.func,
- priority=priority,
- queue=queue,
- interval=interval,
- repeat=repeat,
- retry_interval=retry_interval,
- scheduled_time=naive_utc_datetime(dt),
- saved_job=job.to_json(),
- )
- session.merge(orm_job)
- try:
- session.commit()
- except Exception as e:
- logger.error("Got an error running session.commit(): {}".format(e))
+ orm_job_data = {
+ "id": job.job_id,
+ "state": job.state,
+ "func": job.func,
+ "priority": priority,
+ "queue": queue,
+ "interval": interval,
+ "repeat": repeat,
+ "retry_interval": retry_interval,
+ "retries": retries,
+ "max_retries": max_retries,
+ "scheduled_time": dt,
+ "saved_job": job.to_json(),
+ }
+
+ if orm_job:
+ # Update existing job
+ for key, value in orm_job_data.items():
+ setattr(orm_job, key, value)
+ orm_job.save()
+ else:
+ orm_job = ORMJob.objects.create(**orm_job_data)
self._run_scheduled_hooks(orm_job)
- return job.job_id
+ return job.job_id
def _run_scheduled_hooks(self, orm_job):
job = self._orm_to_job(orm_job)
diff --git a/kolibri/core/tasks/test/base.py b/kolibri/core/tasks/test/base.py
deleted file mode 100644
index 05f387e4c43..00000000000
--- a/kolibri/core/tasks/test/base.py
+++ /dev/null
@@ -1,21 +0,0 @@
-import os
-import tempfile
-from contextlib import contextmanager
-
-from kolibri.core.tasks.utils import db_connection
-from kolibri.utils.tests.helpers import override_option
-
-
-@contextmanager
-def connection():
- fd, filepath = tempfile.mkstemp()
- with override_option("Tasks", "JOB_STORAGE_FILEPATH", filepath):
- engine = db_connection()
- yield engine
- engine.dispose()
- os.close(fd)
- try:
- os.remove(filepath)
- except OSError:
- # Don't fail test because of difficulty cleaning up.
- pass
diff --git a/kolibri/core/tasks/test/taskrunner/test_job_running.py b/kolibri/core/tasks/test/taskrunner/test_job_running.py
index b5c6a7c640c..0892ce7888b 100644
--- a/kolibri/core/tasks/test/taskrunner/test_job_running.py
+++ b/kolibri/core/tasks/test/taskrunner/test_job_running.py
@@ -6,8 +6,6 @@
from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
-from kolibri.core.tasks.storage import Storage
-from kolibri.core.tasks.test.base import connection
from kolibri.core.tasks.utils import callable_to_import_path
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.utils import import_path_to_callable
@@ -17,12 +15,11 @@
@pytest.fixture
def storage_fixture():
- with connection() as conn:
- e = Worker(connection=conn)
- b = Storage(conn)
- b.clear(force=True)
- yield b
- e.shutdown()
+ e = Worker()
+ b = e.storage
+ b.clear(force=True)
+ yield b
+ e.shutdown()
@pytest.fixture
@@ -162,6 +159,7 @@ def update_progress_cancelable_job():
return
+@pytest.mark.django_db(databases="__all__", transaction=True)
class TestJobStorage(object):
def test_does_not_enqueue_a_function(self, storage_fixture):
try:
diff --git a/kolibri/core/tasks/test/taskrunner/test_scheduler.py b/kolibri/core/tasks/test/taskrunner/test_scheduler.py
index 3d4b0156679..c7d9735075c 100644
--- a/kolibri/core/tasks/test/taskrunner/test_scheduler.py
+++ b/kolibri/core/tasks/test/taskrunner/test_scheduler.py
@@ -2,27 +2,31 @@
import pytest
+from kolibri.core.tasks.decorators import register_task
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.storage import Storage
-from kolibri.core.tasks.test.base import connection
from kolibri.utils.time_utils import local_now
-from kolibri.utils.time_utils import naive_utc_datetime
@pytest.fixture
def job_storage():
- with connection() as c:
- s = Storage(connection=c)
- s.clear(force=True)
- yield s
- s.clear(force=True)
+ s = Storage()
+ s.clear(force=True)
+ yield s
+ s.clear(force=True)
+@register_task
+def add(x, y):
+ return x + y
+
+
+@pytest.mark.django_db(databases="__all__")
class TestScheduler(object):
@pytest.fixture
def job(self):
- return Job(id)
+ return Job(add)
def test_enqueue_at_a_function(self, job_storage, job):
job_id = job_storage.enqueue_at(local_now(), job)
@@ -34,10 +38,9 @@ def test_enqueue_at_a_function_sets_time(self, job_storage, job):
now = local_now()
job_id = job_storage.enqueue_at(now, job)
- with job_storage.session_scope() as session:
- _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session)
- scheduled_time = scheduled_job.scheduled_time
- assert scheduled_time == naive_utc_datetime(now)
+ _, scheduled_job = job_storage._get_job_and_orm_job(job_id)
+ scheduled_time = scheduled_job.scheduled_time
+ assert scheduled_time == now
def test_enqueue_at_preserves_extra_metadata(self, job_storage, job):
metadata = {"saved": True}
@@ -59,19 +62,17 @@ def test_enqueue_in_a_function_sets_time(self, job_storage, job):
job_storage._now = lambda: now
job_id = job_storage.enqueue_in(diff, job)
- with job_storage.session_scope() as session:
- _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session)
- scheduled_time = scheduled_job.scheduled_time
- assert scheduled_time == naive_utc_datetime(now) + diff
+ _, scheduled_job = job_storage._get_job_and_orm_job(job_id)
+ scheduled_time = scheduled_job.scheduled_time
+ assert scheduled_time == now + diff
def test_schedule_a_function_sets_time(self, job_storage, job):
now = local_now()
job_id = job_storage.schedule(now, job)
- with job_storage.session_scope() as session:
- _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session)
- scheduled_time = scheduled_job.scheduled_time
- assert scheduled_time == naive_utc_datetime(now)
+ _, scheduled_job = job_storage._get_job_and_orm_job(job_id)
+ scheduled_time = scheduled_job.scheduled_time
+ assert scheduled_time == now
def test_schedule_a_function_gives_value_error_without_datetime(
self, job_storage, job
@@ -112,9 +113,8 @@ def test_scheduled_repeating_function_sets_endless_repeat_new_job(
job_id = job_storage.schedule(now, job, interval=1000, repeat=None)
job_storage.complete_job(job_id)
job_storage.reschedule_finished_job_if_needed(job_id)
- with job_storage.session_scope() as session:
- _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session)
- repeat = scheduled_job.repeat
+ _, scheduled_job = job_storage._get_job_and_orm_job(job_id)
+ repeat = scheduled_job.repeat
assert repeat is None
def test_scheduled_repeating_function_enqueues_job(self, job_storage, job):
@@ -131,9 +131,8 @@ def test_scheduled_repeating_function_sets_new_job_with_one_fewer_repeats(
job_id = job_storage.schedule(now, job, interval=1000, repeat=1)
job_storage.complete_job(job_id)
job_storage.reschedule_finished_job_if_needed(job_id)
- with job_storage.session_scope() as session:
- _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session)
- repeat = scheduled_job.repeat
+ _, scheduled_job = job_storage._get_job_and_orm_job(job_id)
+ repeat = scheduled_job.repeat
assert repeat == 0
def test_scheduled_repeating_function_sets_new_job_at_interval(
@@ -144,12 +143,9 @@ def test_scheduled_repeating_function_sets_new_job_at_interval(
job_storage._now = lambda: now
job_storage.complete_job(job_id)
job_storage.reschedule_finished_job_if_needed(job_id)
- with job_storage.session_scope() as session:
- _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session)
- scheduled_time = scheduled_job.scheduled_time
- assert scheduled_time == naive_utc_datetime(now) + datetime.timedelta(
- seconds=1000
- )
+ _, scheduled_job = job_storage._get_job_and_orm_job(job_id)
+ scheduled_time = scheduled_job.scheduled_time
+ assert scheduled_time == now + datetime.timedelta(seconds=1000)
def test_scheduled_repeating_function_failure_sets_new_job_at_retry_interval(
self, job_storage, job
@@ -161,17 +157,16 @@ def test_scheduled_repeating_function_failure_sets_new_job_at_retry_interval(
job_storage._now = lambda: now
job_storage.mark_job_as_failed(job_id, "Exception", "Traceback")
job_storage.reschedule_finished_job_if_needed(job_id)
- with job_storage.session_scope() as session:
- _, scheduled_job = job_storage._get_job_and_orm_job(job_id, session)
- scheduled_time = scheduled_job.scheduled_time
- assert scheduled_time == naive_utc_datetime(now) + datetime.timedelta(seconds=5)
+ _, scheduled_job = job_storage._get_job_and_orm_job(job_id)
+ scheduled_time = scheduled_job.scheduled_time
+ assert scheduled_time == now + datetime.timedelta(seconds=5)
class TestReschedule(TestScheduler):
@pytest.fixture
def job(self, job_storage):
now = local_now()
- job_id = job_storage.schedule(now, Job(id), interval=1, repeat=123)
+ job_id = job_storage.schedule(now, Job(add), interval=1, repeat=123)
return job_storage.get_job(job_id)
def test_reschedule_a_function_gives_job_running_error(self, job_storage, job):
diff --git a/kolibri/core/tasks/test/taskrunner/test_storage.py b/kolibri/core/tasks/test/taskrunner/test_storage.py
index 873ab439b0e..76155a96565 100644
--- a/kolibri/core/tasks/test/taskrunner/test_storage.py
+++ b/kolibri/core/tasks/test/taskrunner/test_storage.py
@@ -5,6 +5,7 @@
import pytest
import pytz
from mock import patch
+from requests.exceptions import HTTPError
from kolibri.core.tasks.constants import DEFAULT_QUEUE
from kolibri.core.tasks.constants import Priority
@@ -14,7 +15,6 @@
from kolibri.core.tasks.job import State
from kolibri.core.tasks.registry import TaskRegistry
from kolibri.core.tasks.storage import Storage
-from kolibri.core.tasks.test.base import connection
from kolibri.core.tasks.utils import callable_to_import_path
from kolibri.utils.time_utils import local_now
@@ -24,38 +24,44 @@
@pytest.fixture
def defaultbackend():
- with connection() as c:
- b = Storage(c)
- b.clear(force=True)
- yield b
- b.clear(force=True)
+ b = Storage()
+ b.clear(force=True)
+ yield b
+ b.clear(force=True)
-@pytest.fixture
-def func():
- @register_task
- def add(x, y):
- return x + y
+@register_task(
+ retry_on=[ValueError, TypeError],
+)
+def add(x, y):
+ return x + y
- TaskRegistry["kolibri.core.tasks.test.taskrunner.test_storage.add"] = add
- yield add
- TaskRegistry.clear()
+@pytest.fixture(autouse=True)
+def register_add_task():
+ # register before tests
+ TaskRegistry[callable_to_import_path(add)] = add
+ try:
+ yield
+ finally:
+ # clear after tests
+ TaskRegistry.clear()
@pytest.fixture
-def simplejob(func):
- return Job(func)
+def simplejob():
+ return Job(add)
+@pytest.mark.django_db(databases="__all__")
class TestBackend:
- def test_can_enqueue_single_job(self, defaultbackend, simplejob, func):
+ def test_can_enqueue_single_job(self, defaultbackend, simplejob):
job_id = defaultbackend.enqueue_job(simplejob, QUEUE)
new_job = defaultbackend.get_job(job_id)
# Does the returned job record the function we set to run?
- assert str(new_job.func) == callable_to_import_path(func)
+ assert str(new_job.func) == callable_to_import_path(add)
# Does the job have the right state (QUEUED)?
assert new_job.state == State.QUEUED
@@ -310,6 +316,134 @@ def test_can_reschedule_finished_job(self, defaultbackend, simplejob):
assert requeued_job.state == State.QUEUED
assert requeued_orm_job.scheduled_time > previous_scheduled_time
+ def test_job_retry_on_matching_exception(self, defaultbackend, simplejob):
+ exception = ValueError("Error")
+ job_id = defaultbackend.enqueue_job(
+ simplejob, QUEUE, retry_interval=5, max_retries=3
+ )
+ defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
+
+ orm_job = defaultbackend.get_orm_job(job_id)
+ previous_scheduled_time = orm_job.scheduled_time
+
+ defaultbackend.reschedule_finished_job_if_needed(
+ simplejob.job_id, exception=exception
+ )
+ requeued_orm_job = defaultbackend.get_orm_job(job_id)
+ requeued_job = defaultbackend.get_job(job_id)
+
+ assert requeued_job.state == State.QUEUED
+ assert requeued_orm_job.scheduled_time > previous_scheduled_time
+ assert requeued_orm_job.retries == 1
+
+ def test_job_retry_on_matching_exception__no_max_retries(
+ self, defaultbackend, simplejob
+ ):
+ exception = ValueError("Error")
+ job_id = defaultbackend.enqueue_job(simplejob, QUEUE, retry_interval=5)
+ defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
+
+ orm_job = defaultbackend.get_orm_job(job_id)
+ previous_scheduled_time = orm_job.scheduled_time
+
+ defaultbackend.reschedule_finished_job_if_needed(
+ simplejob.job_id, exception=exception
+ )
+ requeued_orm_job = defaultbackend.get_orm_job(job_id)
+ requeued_job = defaultbackend.get_job(job_id)
+
+ assert requeued_job.state == State.QUEUED
+ assert requeued_orm_job.scheduled_time > previous_scheduled_time
+ assert requeued_orm_job.retries == 1
+
+ def test_job_retry_on_matching_exception__no_retry_interval(
+ self, defaultbackend, simplejob
+ ):
+ exception = TypeError("Error")
+ job_id = defaultbackend.enqueue_job(simplejob, QUEUE, max_retries=3)
+ defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
+
+ orm_job = defaultbackend.get_orm_job(job_id)
+ previous_scheduled_time = orm_job.scheduled_time
+
+ defaultbackend.reschedule_finished_job_if_needed(
+ simplejob.job_id, exception=exception
+ )
+ requeued_orm_job = defaultbackend.get_orm_job(job_id)
+ requeued_job = defaultbackend.get_job(job_id)
+
+ assert requeued_job.state == State.QUEUED
+ assert requeued_orm_job.scheduled_time > previous_scheduled_time
+ assert requeued_orm_job.retries == 1
+
+ def test_job_not_retry_on_matching_exception__no_retry_params(
+ self, defaultbackend, simplejob
+ ):
+ # If job has no retry params, it should not retry even if exception matches
+ exception = ValueError("Error")
+ job_id = defaultbackend.enqueue_job(simplejob, QUEUE)
+ defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
+
+ orm_job = defaultbackend.get_orm_job(job_id)
+ previous_scheduled_time = orm_job.scheduled_time
+
+ defaultbackend.reschedule_finished_job_if_needed(
+ simplejob.job_id, exception=exception
+ )
+ requeued_orm_job = defaultbackend.get_orm_job(job_id)
+ requeued_job = defaultbackend.get_job(job_id)
+
+ assert requeued_job.state == State.FAILED
+ assert requeued_orm_job.scheduled_time == previous_scheduled_time
+ assert requeued_orm_job.retries is None
+
+ def test_job_not_retry_on_non_matching_exception(self, defaultbackend, simplejob):
+ exception = HTTPError("Error")
+ job_id = defaultbackend.enqueue_job(
+ simplejob, QUEUE, retry_interval=5, max_retries=3
+ )
+ defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
+
+ orm_job = defaultbackend.get_orm_job(job_id)
+ previous_scheduled_time = orm_job.scheduled_time
+
+ defaultbackend.reschedule_finished_job_if_needed(
+ simplejob.job_id, exception=exception
+ )
+ requeued_orm_job = defaultbackend.get_orm_job(job_id)
+ requeued_job = defaultbackend.get_job(job_id)
+
+ assert requeued_job.state == State.FAILED
+ assert requeued_orm_job.scheduled_time == previous_scheduled_time
+ assert requeued_orm_job.retries is None
+
+ def test_job_not_retry_on_limit_max_retries(self, defaultbackend, simplejob):
+ exception = ValueError("Error")
+ job_id = defaultbackend.enqueue_job(simplejob, QUEUE, max_retries=1)
+ defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
+
+ # Retry first time
+ defaultbackend.reschedule_finished_job_if_needed(
+ simplejob.job_id, exception=exception
+ )
+ defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
+
+ orm_job = defaultbackend.get_orm_job(job_id)
+ previous_scheduled_time = orm_job.scheduled_time
+
+ # When trying to retry second time, it should not retry as max_retries is reached
+ defaultbackend.reschedule_finished_job_if_needed(
+ simplejob.job_id, exception=exception
+ )
+
+ requeued_orm_job = defaultbackend.get_orm_job(job_id)
+ requeued_job = defaultbackend.get_job(job_id)
+
+ retries = requeued_orm_job.retries
+ assert requeued_job.state == State.FAILED
+ assert requeued_orm_job.scheduled_time == previous_scheduled_time
+ assert retries == 1
+
def test_reschedule_finished_job_canceled(self, defaultbackend, simplejob):
# Test case where the job is canceled.
job_id = defaultbackend.enqueue_job(simplejob, QUEUE)
diff --git a/kolibri/core/tasks/test/taskrunner/test_worker.py b/kolibri/core/tasks/test/taskrunner/test_worker.py
index 49d10e0d9dc..965ea0d64ae 100644
--- a/kolibri/core/tasks/test/taskrunner/test_worker.py
+++ b/kolibri/core/tasks/test/taskrunner/test_worker.py
@@ -6,7 +6,6 @@
from kolibri.core.tasks.constants import Priority
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
-from kolibri.core.tasks.test.base import connection
from kolibri.core.tasks.test.taskrunner.test_job_running import EventProxy
from kolibri.core.tasks.worker import Worker
from kolibri.utils import conf
@@ -42,14 +41,14 @@ def toggle_flag(flag_id):
@pytest.fixture
def worker():
- with connection() as c:
- b = Worker(c, regular_workers=1, high_workers=1)
- b.storage.clear(force=True)
- yield b
- b.storage.clear(force=True)
- b.shutdown()
+ b = Worker(regular_workers=1, high_workers=1)
+ b.storage.clear(force=True)
+ yield b
+ b.storage.clear(force=True)
+ b.shutdown()
+@pytest.mark.django_db(databases="__all__", transaction=True)
def test_keyerror_prevention(worker):
# Create a job with the same ID as the one in worker.enqueue_job_runs_job
job = Job(id, args=(9,))
@@ -64,6 +63,7 @@ def test_keyerror_prevention(worker):
assert job.state == "COMPLETED"
+@pytest.mark.django_db(databases="__all__", transaction=True)
def test_keyerror_prevention_multiple_jobs(worker):
# Create multiple jobs with the same ID to trigger the race condition
job1 = Job(id, args=(9,))
@@ -91,6 +91,7 @@ def test_keyerror_prevention_multiple_jobs(worker):
assert job2.state == "COMPLETED"
+@pytest.mark.django_db(databases="__all__", transaction=True)
class TestWorker:
def test_enqueue_job_runs_job(self, worker):
job = Job(id, args=(9,))
@@ -106,7 +107,7 @@ def test_enqueue_job_runs_job_once(self, worker, flag):
# Do conditional check in here, as it seems to not work properly
# inside a pytest.mark.skipIf
if conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "postgres":
- b = Worker(worker.storage.engine, regular_workers=1, high_workers=1)
+ b = Worker(regular_workers=1, high_workers=1)
job = Job(toggle_flag, args=(flag.event_id,))
worker.storage.enqueue_job(job, QUEUE)
diff --git a/kolibri/core/tasks/test/test_api.py b/kolibri/core/tasks/test/test_api.py
index f6e1d33406f..7977f66a0bb 100644
--- a/kolibri/core/tasks/test/test_api.py
+++ b/kolibri/core/tasks/test/test_api.py
@@ -2,11 +2,9 @@
import pytz
from django.urls import reverse
-from django.utils.timezone import make_aware
from mock import call
from mock import Mock
from mock import patch
-from pytz import utc
from rest_framework import serializers
from rest_framework import status
from rest_framework.test import APIClient
@@ -53,13 +51,15 @@ def fake_job(**kwargs):
class dummy_orm_job_data(object):
- scheduled_time = datetime.datetime(year=2023, month=1, day=1, tzinfo=None)
+ scheduled_time = datetime.datetime(year=2023, month=1, day=1)
repeat = 5
interval = 8600
retry_interval = 5
class BaseAPITestCase(APITestCase):
+ databases = "__all__"
+
@classmethod
def setUpTestData(cls):
DeviceSettings.objects.create(is_provisioned=True)
@@ -279,9 +279,7 @@ def add(x, y):
"kwargs": {},
"extra_metadata": {},
"facility_id": None,
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -340,9 +338,7 @@ def add(**kwargs):
"kwargs": {},
"extra_metadata": {},
"facility_id": None,
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -360,9 +356,7 @@ def add(**kwargs):
"kwargs": {},
"extra_metadata": {},
"facility_id": None,
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -446,9 +440,7 @@ def add(**kwargs):
"extra_metadata": {
"facility": "kolibri HQ",
},
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -540,9 +532,7 @@ def add(x, y):
"extra_metadata": {
"facility": "kolibri HQ",
},
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -562,9 +552,7 @@ def add(x, y):
"extra_metadata": {
"facility": "kolibri HQ",
},
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -1228,9 +1216,7 @@ def subtract(x, y):
"args": (),
"kwargs": {},
"extra_metadata": {},
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -1248,9 +1234,7 @@ def subtract(x, y):
"args": (),
"kwargs": {},
"extra_metadata": {},
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -1268,9 +1252,7 @@ def subtract(x, y):
"args": (),
"kwargs": {},
"extra_metadata": {},
- "scheduled_datetime": make_aware(
- dummy_orm_job_data.scheduled_time, utc
- ).isoformat(),
+ "scheduled_datetime": dummy_orm_job_data.scheduled_time.isoformat(),
"repeat": dummy_orm_job_data.repeat,
"repeat_interval": dummy_orm_job_data.interval,
"retry_interval": dummy_orm_job_data.retry_interval,
@@ -1474,6 +1456,8 @@ def test_clear_task_respect_permissions(self, mock_job_storage):
@patch("kolibri.core.tasks.api.job_storage")
class TaskAPIPermissionsTestCase(APITestCase):
+ databases = "__all__"
+
def setUp(self):
DeviceSettings.objects.create(is_provisioned=True)
self.facility = Facility.objects.create(name="facility")
@@ -1487,6 +1471,8 @@ def test_list_permissions(self, job_storage_mock):
class CSRFProtectedTaskTestCase(APITestCase):
+ databases = "__all__"
+
def setUp(self):
self.client_csrf = APIClient(enforce_csrf_checks=True)
diff --git a/kolibri/core/tasks/test/test_decorators.py b/kolibri/core/tasks/test/test_decorators.py
index 17105a7bafe..ac9b2e590ba 100644
--- a/kolibri/core/tasks/test/test_decorators.py
+++ b/kolibri/core/tasks/test/test_decorators.py
@@ -26,6 +26,7 @@ def add(x, y):
cancellable=True,
track_progress=True,
status_fn=status_fn,
+ retry_on=[],
)(add)
MockRegisteredTask.assert_called_once_with(
@@ -39,6 +40,7 @@ def add(x, y):
track_progress=True,
long_running=False,
status_fn=status_fn,
+ retry_on=[],
)
def test_register_decorator_registers_without_args(self):
diff --git a/kolibri/core/tasks/test/test_job.py b/kolibri/core/tasks/test/test_job.py
index 91051a8113d..3d275426c93 100644
--- a/kolibri/core/tasks/test/test_job.py
+++ b/kolibri/core/tasks/test/test_job.py
@@ -3,6 +3,7 @@
import mock
from django.test.testcases import TestCase
+from requests.exceptions import HTTPError
from kolibri.core.tasks.constants import Priority
from kolibri.core.tasks.exceptions import JobNotRunning
@@ -17,6 +18,10 @@ def status_fn(job):
pass
+def fn_with_http_error():
+ raise HTTPError("Test exception")
+
+
class JobTest(TestCase):
def setUp(self):
self.job = Job(id, track_progress=True)
diff --git a/kolibri/core/tasks/test/test_no_connection.py b/kolibri/core/tasks/test/test_no_connection.py
index 0edddee1292..a2249ef1bec 100644
--- a/kolibri/core/tasks/test/test_no_connection.py
+++ b/kolibri/core/tasks/test/test_no_connection.py
@@ -1,3 +1,7 @@
+import pytest
+
+
+@pytest.mark.django_db(databases="__all__", transaction=True)
def test_importing_job_storage_no_open_connection():
from kolibri.core.tasks.main import job_storage
diff --git a/kolibri/core/tasks/utils.py b/kolibri/core/tasks/utils.py
index f2d824c17db..65e4649a5e8 100644
--- a/kolibri/core/tasks/utils.py
+++ b/kolibri/core/tasks/utils.py
@@ -7,6 +7,7 @@
from threading import Thread
import click
+from django.db.utils import OperationalError
from django.utils.functional import SimpleLazyObject
from django.utils.module_loading import import_string
from sqlalchemy import create_engine
@@ -385,3 +386,19 @@ def fd_safe_executor(fds_per_task=2):
)
return executor(max_workers=max_workers)
+
+
+class DatabaseLockedError(OperationalError):
+ """
+ Custom exception that is only raised when the underlying error
+ is an OperationalError whose message contains 'database is locked'.
+ """
+
+ def __init__(self, *args, **kwargs):
+ error_message = str(args[0]) if args else ""
+
+ if "database is locked" not in error_message.lower():
+ # If the condition is not met, re-raise the original error.
+ raise OperationalError(*args, **kwargs)
+
+ super().__init__(*args, **kwargs)
diff --git a/kolibri/core/tasks/validation.py b/kolibri/core/tasks/validation.py
index b914c3d7f23..dae5f78d126 100644
--- a/kolibri/core/tasks/validation.py
+++ b/kolibri/core/tasks/validation.py
@@ -33,6 +33,11 @@ def validate_timedelay(value):
raise TypeError("time delay must be a datetime.timedelta object")
+def validate_exception(value):
+ if not isinstance(value, BaseException):
+ raise TypeError("exception must be an error object")
+
+
class EnqueueArgsSerializer(serializers.Serializer):
"""
A serializer for `enqueue_args` object of incoming user request data.
diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py
index 252792ff1e4..6e1797e65b0 100644
--- a/kolibri/core/tasks/worker.py
+++ b/kolibri/core/tasks/worker.py
@@ -5,7 +5,6 @@
from kolibri.core.tasks.constants import Priority
from kolibri.core.tasks.storage import Storage
-from kolibri.core.tasks.utils import db_connection
from kolibri.core.tasks.utils import InfiniteLoopThread
from kolibri.utils.multiprocessing_compat import PoolExecutor
@@ -25,9 +24,7 @@ def execute_job(
:return: None
"""
- connection = db_connection()
-
- storage = Storage(connection)
+ storage = Storage()
job = storage.get_job(job_id)
@@ -35,8 +32,6 @@ def execute_job(
job.execute()
- connection.dispose()
-
# Close any django connections opened here
django_connection.close()
@@ -60,7 +55,7 @@ def execute_job_with_python_worker(job_id, log_queue=None):
class Worker(object):
- def __init__(self, connection, regular_workers=2, high_workers=1, log_queue=None):
+ def __init__(self, regular_workers=2, high_workers=1, log_queue=None):
# Internally, we use concurrent.future.Future to run and track
# job executions. We need to keep track of which future maps to which
# job they were made from, and we use the job_future_mapping dict to do
@@ -72,7 +67,7 @@ def __init__(self, connection, regular_workers=2, high_workers=1, log_queue=None
# Key: job_id, Value: future object
self.future_job_mapping = {}
- self.storage = Storage(connection)
+ self.storage = Storage()
self.requeue_stalled_jobs()
diff --git a/kolibri/deployment/default/settings/base.py b/kolibri/deployment/default/settings/base.py
index a876ca40c7f..3aa2b7cccc0 100644
--- a/kolibri/deployment/default/settings/base.py
+++ b/kolibri/deployment/default/settings/base.py
@@ -21,6 +21,7 @@
import kolibri
from kolibri.deployment.default.cache import CACHES
from kolibri.deployment.default.sqlite_db_names import ADDITIONAL_SQLITE_DATABASES
+from kolibri.deployment.default.sqlite_db_names import JOB_STORAGE
from kolibri.plugins.utils.settings import apply_settings
from kolibri.utils import conf
from kolibri.utils import i18n
@@ -141,6 +142,12 @@
# https://docs.djangoproject.com/en/3.2/ref/settings/#databases
if conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "sqlite":
+
+ job_storage_path = conf.OPTIONS["Tasks"]["JOB_STORAGE_FILEPATH"]
+ # if job_storage_path is relative, make it relative to KOLIBRI_HOME
+ if not os.path.isabs(job_storage_path):
+ job_storage_path = os.path.join(conf.KOLIBRI_HOME, job_storage_path)
+
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
@@ -150,9 +157,18 @@
),
"OPTIONS": {"timeout": 100},
},
+ JOB_STORAGE: {
+ "ENGINE": "django.db.backends.sqlite3",
+ "NAME": job_storage_path,
+ "OPTIONS": {"timeout": 100},
+ "TEST": {
+ "NAME": os.path.join(conf.KOLIBRI_HOME, "test_job_storage.sqlite3")
+ },
+ },
}
for additional_db in ADDITIONAL_SQLITE_DATABASES:
+ # TODO ADD JOB_STORAGE to ADDITIONAL_SQLITE_DATABASES and check it here
DATABASES[additional_db] = {
"ENGINE": "django.db.backends.sqlite3",
"NAME": os.path.join(conf.KOLIBRI_HOME, "{}.sqlite3".format(additional_db)),
@@ -163,6 +179,7 @@
"kolibri.core.notifications.models.NotificationsRouter",
"kolibri.core.device.models.SyncQueueRouter",
"kolibri.core.discovery.models.NetworkLocationRouter",
+ "kolibri.core.tasks.models.KolibriTasksRouter",
)
elif conf.OPTIONS["Database"]["DATABASE_ENGINE"] == "postgres":
diff --git a/kolibri/deployment/default/sqlite_db_names.py b/kolibri/deployment/default/sqlite_db_names.py
index 3118a4049ff..f4c22f78d22 100644
--- a/kolibri/deployment/default/sqlite_db_names.py
+++ b/kolibri/deployment/default/sqlite_db_names.py
@@ -10,5 +10,6 @@
NOTIFICATIONS = "notifications"
+JOB_STORAGE = "job_storage"
ADDITIONAL_SQLITE_DATABASES = (SYNC_QUEUE, NETWORK_LOCATION, NOTIFICATIONS)
diff --git a/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue b/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue
index b63544532e8..d7967db99e4 100644
--- a/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue
+++ b/kolibri/plugins/setup_wizard/assets/src/views/ImportMultipleUsers.vue
@@ -5,6 +5,7 @@
:step="step"
:steps="steps"
:showBackArrow="true"
+ :backArrowDisabled="learnersBeingImported.length > 0"
:eventOnGoBack="backArrowEvent"
:title="selectAUser$()"
:description="facilityDescription"
@@ -52,6 +53,7 @@
+
@@ -65,8 +67,10 @@
import PaginatedListContainer from 'kolibri-common/components/PaginatedListContainer';
import { lodUsersManagementStrings } from 'kolibri-common/strings/lodUsersManagementStrings';
import { DemographicConstants } from 'kolibri/constants';
- import { TaskStatuses } from 'kolibri-common/utils/syncTaskUtils';
+ import { TaskStatuses, TaskTypes } from 'kolibri-common/utils/syncTaskUtils';
import UserTable from 'kolibri-common/components/UserTable';
+ import useSnackbar from 'kolibri/composables/useSnackbar';
+ import GlobalSnackbar from 'kolibri/components/GlobalSnackbar';
import { FooterMessageTypes, SoudQueue } from '../constants';
import OnboardingStepBase from './OnboardingStepBase';
@@ -85,14 +89,19 @@
OnboardingStepBase,
PaginatedListContainer,
UserTable,
+ GlobalSnackbar,
},
mixins: [commonCoreStrings, commonSyncElements],
setup() {
- const { selectAUser$, importedLabel$ } = lodUsersManagementStrings;
+ const { selectAUser$, importedLabel$, importUserError$ } = lodUsersManagementStrings;
+ const { createSnackbar } = useSnackbar();
return {
+ createSnackbar,
+
selectAUser$,
importedLabel$,
+ importUserError$,
};
},
data() {
@@ -151,6 +160,9 @@
beforeMount() {
this.isPolling = true;
this.pollImportTask();
+ this.learnersBeingImported = this.wizardService.state.context.usersBeingImported.map(
+ u => u.id,
+ );
},
methods: {
importedLearners() {
@@ -159,14 +171,20 @@
pollImportTask() {
TaskResource.list({ queue: SoudQueue }).then(tasks => {
if (tasks.length) {
+ let isFailingTasks = false;
tasks.forEach(task => {
- if (task.status === TaskStatuses.COMPLETED) {
- // Remove completed user id from 'being imported'
+ if ([TaskStatuses.COMPLETED, TaskStatuses.FAILED].includes(task.status)) {
+ // Remove completed/failed user id from 'being imported'
const taskUserId = task.extra_metadata.user_id;
this.learnersBeingImported = this.learnersBeingImported.filter(
id => id != taskUserId,
);
-
+ this.wizardService.send({
+ type: 'REMOVE_USER_BEING_IMPORTED',
+ value: taskUserId,
+ });
+ }
+ if (task.status === TaskStatuses.COMPLETED) {
// Update the wizard context to know this user has been imported - only if they
// haven't already been added to the list (ie, imported by other means)
const taskUsername = task.extra_metadata.username;
@@ -186,8 +204,14 @@
value: taskUsername,
});
}
+ } else if (task.status === TaskStatuses.FAILED) {
+ isFailingTasks = true;
}
});
+ if (isFailingTasks) {
+ this.createSnackbar(this.importUserError$());
+ TaskResource.clearAll(SoudQueue);
+ }
}
});
if (this.isPolling) {
@@ -196,11 +220,11 @@
}, 2000);
}
},
- startImport(learner) {
+ async startImport(learner) {
// Push the learner into being imported, we'll remove it if we get an error later on
this.learnersBeingImported.push(learner.id);
- const task_name = 'kolibri.core.auth.tasks.peeruserimport';
+ const task_name = TaskTypes.IMPORTLODUSER;
const params = {
type: task_name,
...this.wizardService.state.context.remoteAdmin,
@@ -209,6 +233,10 @@
device_id: this.device.id,
user_id: learner.id,
using_admin: true,
+ enqueue_args: {
+ retry_interval: 5,
+ max_retries: 3,
+ },
};
if (!this.wizardService.state.context.firstImportedLodUser) {
this.wizardService.send({
@@ -216,9 +244,21 @@
value: { username: learner.username, password: DemographicConstants.NOT_SPECIFIED },
});
}
- TaskResource.startTask(params).catch(() => {
+ try {
+ const newTask = await TaskResource.startTask(params);
+ this.wizardService.send({
+ type: 'ADD_USER_BEING_IMPORTED',
+ value: {
+ id: learner.id,
+ full_name: learner.full_name,
+ username: learner.username,
+ taskId: newTask.id,
+ },
+ });
+ } catch (error) {
+ this.createSnackbar(this.importUserError$());
this.learnersBeingImported = this.learnersBeingImported.filter(id => id != learner.id);
- });
+ }
},
isImported(learner) {
return this.importedLearners().find(u => u === learner.username);
diff --git a/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue b/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue
index 1b27c5a2ba9..79cc7c8bf87 100644
--- a/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue
+++ b/kolibri/plugins/setup_wizard/assets/src/views/OnboardingStepBase.vue
@@ -67,6 +67,7 @@
v-if="showBackArrow"
icon="back"
style="margin-left: -12px"
+ :disabled="backArrowDisabled"
@click="wizardService.send(eventOnGoBack)"
/>
@@ -188,6 +189,10 @@
type: Boolean,
default: false,
},
+ backArrowDisabled: {
+ type: Boolean,
+ default: false,
+ },
noBackAction: {
type: Boolean,
default: false,
diff --git a/kolibri/utils/main.py b/kolibri/utils/main.py
index a32c5ddeaa0..f8b9719c13a 100644
--- a/kolibri/utils/main.py
+++ b/kolibri/utils/main.py
@@ -37,7 +37,6 @@
from kolibri.utils.sanity_checks import check_log_file_location
from kolibri.utils.sanity_checks import DatabaseInaccessible
from kolibri.utils.sanity_checks import DatabaseNotMigrated
-from kolibri.utils.sanity_checks import ensure_job_tables_created
from kolibri.utils.server import get_status
from kolibri.utils.server import NotRunning
@@ -288,16 +287,6 @@ def initialize( # noqa C901
if not skip_update:
_upgrades_before_django_setup(updated, version)
- try:
- ensure_job_tables_created()
- except Exception as e:
- logging.error(
- "The job tables were not fully migrated. Tried to "
- "create them in the database and an error occurred: "
- "{}".format(e)
- )
- raise
-
_setup_django()
_post_django_initialization()
diff --git a/kolibri/utils/sanity_checks.py b/kolibri/utils/sanity_checks.py
index 3a9f4719050..a6a199b7ace 100644
--- a/kolibri/utils/sanity_checks.py
+++ b/kolibri/utils/sanity_checks.py
@@ -6,13 +6,10 @@
from django.apps import apps
from django.db.utils import OperationalError
from django.db.utils import ProgrammingError
-from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
-from sqlalchemy.exc import ProgrammingError as SQLAlchemyProgrammingError
from .conf import KOLIBRI_HOME
from .conf import OPTIONS
from .options import generate_empty_options_file
-from kolibri.utils.sql_alchemy import DBSchemaError
logger = logging.getLogger(__name__)
@@ -119,20 +116,6 @@ def check_database_is_migrated():
raise DatabaseInaccessible(db_exception=e)
-def ensure_job_tables_created():
- from kolibri.core.tasks.main import job_storage
- from kolibri.core.tasks.main import connection
- from kolibri.core.tasks.storage import Storage
-
- try:
- job_storage.test_table_readable()
- except (SQLAlchemyOperationalError, SQLAlchemyProgrammingError, DBSchemaError):
- logger.warning("Database table for job storage was not accessible, recreating.")
- Storage.recreate_default_tables(connection)
- except Exception as e:
- raise DatabaseInaccessible(db_exception=e)
-
-
def check_default_options_exist():
options_path = os.path.join(KOLIBRI_HOME, "options.ini")
if not os.path.exists(options_path):
diff --git a/kolibri/utils/tests/test_sanity_check.py b/kolibri/utils/tests/test_sanity_check.py
index b288d814e22..41edac1660e 100644
--- a/kolibri/utils/tests/test_sanity_check.py
+++ b/kolibri/utils/tests/test_sanity_check.py
@@ -4,8 +4,6 @@
from django.db.utils import OperationalError
from django.test import TestCase
from mock import patch
-from sqlalchemy.exc import OperationalError as SQLAlchemyOperationalError
-from sqlalchemy.exc import ProgrammingError as SQLAlchemyProgrammingError
from kolibri.utils import sanity_checks
from kolibri.utils.sanity_checks import DatabaseNotMigrated
@@ -56,21 +54,3 @@ def test_check_database_is_migrated(self):
get_or_create_current_instance.side_effect = OperationalError("Test")
with self.assertRaises(DatabaseNotMigrated):
sanity_checks.check_database_is_migrated()
-
- @patch("kolibri.core.tasks.storage.Storage")
- def test_ensure_job_tables_created_operational_error(self, Storage):
- with patch("kolibri.core.tasks.main.job_storage") as job_storage:
- job_storage.test_table_readable.side_effect = SQLAlchemyOperationalError(
- "Test", "", ""
- )
- sanity_checks.ensure_job_tables_created()
- Storage.recreate_default_tables.assert_called_once()
-
- @patch("kolibri.core.tasks.storage.Storage")
- def test_ensure_job_tables_created_programming_error(self, Storage):
- with patch("kolibri.core.tasks.main.job_storage") as job_storage:
- job_storage.test_table_readable.side_effect = SQLAlchemyProgrammingError(
- "Test", "", ""
- )
- sanity_checks.ensure_job_tables_created()
- Storage.recreate_default_tables.assert_called_once()
diff --git a/kolibri/utils/tests/test_server.py b/kolibri/utils/tests/test_server.py
index 53e63836a6f..9708f0c80c5 100755
--- a/kolibri/utils/tests/test_server.py
+++ b/kolibri/utils/tests/test_server.py
@@ -9,7 +9,6 @@
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.storage import Storage
-from kolibri.core.tasks.test.base import connection
from kolibri.utils import server
from kolibri.utils.constants import installation_types
@@ -81,11 +80,10 @@ def test_whl(self):
@pytest.fixture
def job_storage():
- with connection() as c:
- s = Storage(connection=c)
- s.clear()
- yield s
- s.clear()
+ s = Storage()
+ s.clear()
+ yield s
+ s.clear()
class TestServerServices(object):
@@ -128,6 +126,7 @@ def test_services_shutdown_on_stop(self):
]
+@pytest.mark.django_db(databases="__all__", transaction=True)
class TestServerDefaultScheduledTasks(object):
@mock.patch("kolibri.core.discovery.utils.network.broadcast.KolibriBroadcast")
def test_scheduled_jobs_persist_on_restart(