Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ backends are great candidates for community contributions.

## Basic Usage

Start by adding `django_lightweight_queue` to your `INSTALLED_APPS`:

```python
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
...,
"django_lightweight_queue",
]
```

After that, define your task in any file you want:

```python
import time
from django_lightweight_queue import task
Expand Down Expand Up @@ -67,12 +80,12 @@ present in the specified file are inherited from the global configuration.

There are four built-in backends:

| Backend | Type | Description |
| -------------- | ----------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Synchronous | Development | Executes the task inline, without any actual queuing. |
| Redis | Production | Executes tasks at-most-once using [Redis][redis] for storage of the enqueued tasks. |
| Reliable Redis | Production | Executes tasks at-least-once using [Redis][redis] for storage of the enqueued tasks (subject to Redis consistency). Does not guarantee the task _completes_. |
| Debug Web | Debugging | Instead of running jobs it prints the url to a view that can be used to run a task in a transaction which will be rolled back. This is useful for debugging and optimising tasks. |
| Backend | Import Location | Type | Description |
| -------------- |:----------------------------------------------------------------------| ----------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Synchronous | django_lightweight_queue.backends.synchronous.SynchronousBackend | Development | Executes the task inline, without any actual queuing. |
| Redis | django_lightweight_queue.backends.redis.RedisBackend | Production | Executes tasks at-most-once using [Redis][redis] for storage of the enqueued tasks. |
| Reliable Redis | django_lightweight_queue.backends.reliable_redis.ReliableRedisBackend | Production | Executes tasks at-least-once using [Redis][redis] for storage of the enqueued tasks (subject to Redis consistency). Does not guarantee the task _completes_. |
| Debug Web | django_lightweight_queue.backends.debug_web.DebugWebBackend | Debugging | Instead of running jobs it prints the url to a view that can be used to run a task in a transaction which will be rolled back. This is useful for debugging and optimising tasks. |

[redis]: https://redis.io/

Expand Down
195 changes: 160 additions & 35 deletions django_lightweight_queue/app_settings.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,177 @@
from typing import Dict, Union, Mapping, TypeVar, Callable, Optional, Sequence
from typing import Union, Mapping, TypeVar, Callable, Optional, Sequence

from django.conf import settings
from django.conf import settings as django_settings

from . import constants
from .types import Logger, QueueName

T = TypeVar('T')


def setting(suffix: str, default: T) -> T:
attr_name = '{}{}'.format(constants.SETTING_NAME_PREFIX, suffix)
return getattr(settings, attr_name, default)
class Settings():
def _get(self, suffix: str, default: T) -> T:
attr_name = '{}{}'.format(constants.SETTING_NAME_PREFIX, suffix)
return getattr(django_settings, attr_name, default)

# adjustable values at runtime
_workers = None
_backend = None
_logger_factory = None
_backend_overrides = None
_middleware = None
_ignore_apps = None
_redis_host = None
_redis_port = None
_redis_password = None
_redis_prefix = None
_enable_prometheus = None
_prometheus_start_port = None
_atomic_jobs = None

WORKERS = setting('WORKERS', {}) # type: Dict[QueueName, int]
BACKEND = setting(
'BACKEND',
'django_lightweight_queue.backends.synchronous.SynchronousBackend',
) # type: str
@property
def WORKERS(self):
if not self._workers:
self._workers = self._get('WORKERS', {})
return self._workers

LOGGER_FACTORY = setting(
'LOGGER_FACTORY',
'logging.getLogger',
) # type: Union[str, Callable[[str], Logger]]
@WORKERS.setter
def WORKERS(self, value):
self._workers = value

# Allow per-queue overrides of the backend.
BACKEND_OVERRIDES = setting('BACKEND_OVERRIDES', {}) # type: Mapping[QueueName, str]
@property
def BACKEND(self):
if not self._backend:
self._backend = self._get(
'BACKEND',
'django_lightweight_queue.backends.synchronous.SynchronousBackend',
)
return self._backend # type: str

MIDDLEWARE = setting('MIDDLEWARE', (
'django_lightweight_queue.middleware.logging.LoggingMiddleware',
'django_lightweight_queue.middleware.transaction.TransactionMiddleware',
)) # type: Sequence[str]
@BACKEND.setter
def BACKEND(self, value):
self._backend = value

# Apps to ignore when looking for tasks. Apps must be specified as the dotted
# name used in `INSTALLED_APPS`. This is expected to be useful when you need to
# have a file called `tasks.py` within an app, but don't want
# django-lightweight-queue to import that file.
# Note: this _doesn't_ prevent tasks being registered from these apps.
IGNORE_APPS = setting('IGNORE_APPS', ()) # type: Sequence[str]
@property
def LOGGER_FACTORY(self):
if not self._logger_factory:
self._logger_factory = self._get(
'LOGGER_FACTORY',
'logging.getLogger',
)
return self._logger_factory # type: Union[str, Callable[[str], Logger]]

# Backend-specific settings
REDIS_HOST = setting('REDIS_HOST', '127.0.0.1') # type: str
REDIS_PORT = setting('REDIS_PORT', 6379) # type: int
REDIS_PASSWORD = setting('REDIS_PASSWORD', None) # type: Optional[str]
REDIS_PREFIX = setting('REDIS_PREFIX', '') # type: str
@LOGGER_FACTORY.setter
def LOGGER_FACTORY(self, value):
self._logger_factory = value

ENABLE_PROMETHEUS = setting('ENABLE_PROMETHEUS', False) # type: bool
# Workers will export metrics on this port, and ports following it
PROMETHEUS_START_PORT = setting('PROMETHEUS_START_PORT', 9300) # type: int
@property
def BACKEND_OVERRIDES(self):
# Allow per-queue overrides of the backend.
if not self._backend_overrides:
self._backend_overrides = self._get('BACKEND_OVERRIDES', {})
return self._backend_overrides # type: Mapping[QueueName, str]

ATOMIC_JOBS = setting('ATOMIC_JOBS', True) # type: bool
@BACKEND_OVERRIDES.setter
def BACKEND_OVERRIDES(self, value):
self._backend_overrides = value

@property
def MIDDLEWARE(self):
if not self._middleware:
self._middleware = self._get('MIDDLEWARE', (
'django_lightweight_queue.middleware.logging.LoggingMiddleware',
))
return self._middleware # type: Sequence[str]

@MIDDLEWARE.setter
def MIDDLEWARE(self, value):
self._middleware = value

@property
def IGNORE_APPS(self):
# Apps to ignore when looking for tasks. Apps must be specified as the dotted
# name used in `INSTALLED_APPS`. This is expected to be useful when you need to
# have a file called `tasks.py` within an app, but don't want
# django-lightweight-queue to import that file.
# Note: this _doesn't_ prevent tasks being registered from these apps.
if not self._ignore_apps:
self._ignore_apps = self._get('IGNORE_APPS', ())
return self._ignore_apps # type: Sequence[str]

@IGNORE_APPS.setter
def IGNORE_APPS(self, value):
self._ignore_apps = value

@property
def REDIS_HOST(self):
if not self._redis_host:
self._redis_host = self._get('REDIS_HOST', '127.0.0.1')
return self._redis_host # type: str

@REDIS_HOST.setter
def REDIS_HOST(self, value):
self._redis_host = value

@property
def REDIS_PORT(self):
if not self._redis_port:
self._redis_port = self._get('REDIS_PORT', 6379)
return self._redis_port # type: int

@REDIS_PORT.setter
def REDIS_PORT(self, value):
self._redis_port = value

@property
def REDIS_PASSWORD(self):
if not self._redis_password:
self._redis_password = self._get('REDIS_PASSWORD', None)
return self._redis_password # type: Optional[str]

@REDIS_PASSWORD.setter
def REDIS_PASSWORD(self, value):
self._redis_password = value

@property
def REDIS_PREFIX(self):
if not self._redis_prefix:
self._redis_prefix = self._get('REDIS_PREFIX', '')
return self._redis_prefix # type: str

@REDIS_PREFIX.setter
def REDIS_PREFIX(self, value):
self._redis_prefix = value

@property
def ENABLE_PROMETHEUS(self):
if not self._enable_prometheus:
self._enable_prometheus = self._get('ENABLE_PROMETHEUS', False)
return self._enable_prometheus # type: bool

@ENABLE_PROMETHEUS.setter
def ENABLE_PROMETHEUS(self, value):
self._enable_prometheus = value

@property
def PROMETHEUS_START_PORT(self):
# Workers will export metrics on this port, and ports following it
if not self._prometheus_start_port:
self._prometheus_start_port = self._get('PROMETHEUS_START_PORT', 9300)
return self._prometheus_start_port # type: int

@PROMETHEUS_START_PORT.setter
def PROMETHEUS_START_PORT(self, value):
self._prometheus_start_port = value

@property
def ATOMIC_JOBS(self):
if not self._atomic_jobs:
self._atomic_jobs = self._get('ATOMIC_JOBS', True)
return self._atomic_jobs # type: bool

@ATOMIC_JOBS.setter
def ATOMIC_JOBS(self, value):
self._atomic_jobs = value


settings = Settings()
12 changes: 6 additions & 6 deletions django_lightweight_queue/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

import redis

from .. import app_settings
from ..job import Job
from .base import BackendWithPauseResume
from ..types import QueueName, WorkerNumber
from ..utils import block_for_time
from ..app_settings import settings


class RedisBackend(BackendWithPauseResume):
Expand All @@ -17,9 +17,9 @@ class RedisBackend(BackendWithPauseResume):

def __init__(self) -> None:
self.client = redis.StrictRedis(
host=app_settings.REDIS_HOST,
port=app_settings.REDIS_PORT,
password=app_settings.REDIS_PASSWORD,
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=settings.REDIS_PASSWORD,
)

def enqueue(self, job: Job, queue: QueueName) -> None:
Expand Down Expand Up @@ -79,9 +79,9 @@ def is_paused(self, queue: QueueName) -> bool:
return bool(self.client.exists(self._pause_key(queue)))

def _key(self, queue: QueueName) -> str:
if app_settings.REDIS_PREFIX:
if settings.REDIS_PREFIX:
return '{}:django_lightweight_queue:{}'.format(
app_settings.REDIS_PREFIX,
settings.REDIS_PREFIX,
queue,
)

Expand Down
12 changes: 6 additions & 6 deletions django_lightweight_queue/backends/reliable_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

import redis

from .. import app_settings
from ..job import Job
from .base import BackendWithDeduplicate, BackendWithPauseResume
from ..types import QueueName, WorkerNumber
from ..utils import block_for_time, get_worker_numbers
from ..app_settings import settings
from ..progress_logger import ProgressLogger, NULL_PROGRESS_LOGGER

# Work around https://github.com/python/mypy/issues/9914. Name needs to match
Expand Down Expand Up @@ -39,9 +39,9 @@ class ReliableRedisBackend(BackendWithDeduplicate, BackendWithPauseResume):

def __init__(self) -> None:
self.client = redis.StrictRedis(
host=app_settings.REDIS_HOST,
port=app_settings.REDIS_PORT,
password=app_settings.REDIS_PASSWORD,
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=settings.REDIS_PASSWORD,
)

def startup(self, queue: QueueName) -> None:
Expand Down Expand Up @@ -245,9 +245,9 @@ def _processing_key(self, queue: QueueName, worker_number: WorkerNumber) -> str:
return self._prefix_key(key)

def _prefix_key(self, key: str) -> str:
if app_settings.REDIS_PREFIX:
if settings.REDIS_PREFIX:
return '{}:{}'.format(
app_settings.REDIS_PREFIX,
settings.REDIS_PREFIX,
key,
)

Expand Down
6 changes: 3 additions & 3 deletions django_lightweight_queue/exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from prometheus_client.exposition import MetricsHandler

from . import app_settings
from .types import QueueName, WorkerNumber
from .app_settings import settings


def get_config_response(
Expand All @@ -23,7 +23,7 @@ def get_config_response(
"targets": [
"{}:{}".format(
gethostname(),
app_settings.PROMETHEUS_START_PORT + index,
settings.PROMETHEUS_START_PORT + index,
),
],
"labels": {
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(self, *args, **kwargs):
super(MetricsServer, self).__init__(*args, **kwargs)

def run(self):
httpd = HTTPServer(('0.0.0.0', app_settings.PROMETHEUS_START_PORT), RequestHandler)
httpd = HTTPServer(('0.0.0.0', settings.PROMETHEUS_START_PORT), RequestHandler)
httpd.timeout = 2

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from django.core.management.base import BaseCommand, CommandParser

from ... import app_settings
from ...utils import get_backend, get_queue_counts, load_extra_config
from ...app_settings import settings
from ...cron_scheduler import get_cron_config


Expand Down Expand Up @@ -37,7 +37,7 @@ def handle(self, **options: Any) -> None:

print("")
print("Middleware:")
for x in app_settings.MIDDLEWARE:
for x in settings.MIDDLEWARE:
print(" * {}".format(x))

print("")
Expand Down
Loading