Skip to content

Commit d0ea561

Browse files
feat: Add Task Processor metrics (#27)
--------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 5550108 commit d0ea561

15 files changed

+289
-149
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,17 @@ Flagsmith uses Prometheus to track performance metrics.
6060

6161
The following default metrics are exposed:
6262

63+
##### Common metrics
64+
6365
- `flagsmith_build_info`: Has the labels `version` and `ci_commit_sha`.
6466
- `http_server_request_duration_seconds`: Histogram labeled with `method`, `route`, and `response_status`.
6567
- `http_server_requests_total`: Counter labeled with `method`, `route`, and `response_status`.
68+
- `task_processor_enqueued_tasks_total`: Counter labeled with `task_identifier`.
69+
70+
##### Task Processor metrics
71+
72+
- `task_processor_finished_tasks_total`: Counter labeled with `task_identifier` and `result` (`"success"`, `"failure"`).
73+
- `task_processor_task_duration_seconds`: Histogram labeled with `task_identifier` and `result` (`"success"`, `"failure"`).
6674

6775
##### Guidelines
6876

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ dependencies = [
1515
"flagsmith-flag-engine",
1616
"gunicorn (>=19.1)",
1717
"prometheus-client (>=0.0.16)",
18-
"simplejson (>=3,<4)",
1918
"psycopg2-binary (>=2.9,<3)",
19+
"simplejson (>=3,<4)",
2020
]
2121
authors = [
2222
{ name = "Matthew Elwell" },

settings/dev.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
TASK_DELETE_RETENTION_DAYS = 15
4747
TASK_DELETE_RUN_EVERY = timedelta(days=1)
4848
TASK_DELETE_RUN_TIME = time(5, 0, 0)
49+
TASK_PROCESSOR_MODE = False
4950
TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR
5051

5152
# Avoid models.W042 warnings

src/common/prometheus/utils.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
import typing
1+
import importlib
22

33
import prometheus_client
44
from django.conf import settings
55
from prometheus_client.metrics import MetricWrapperBase
66
from prometheus_client.multiprocess import MultiProcessCollector
77

8-
T = typing.TypeVar("T", bound=MetricWrapperBase)
9-
108

119
class Histogram(prometheus_client.Histogram):
1210
DEFAULT_BUCKETS = settings.PROMETHEUS_HISTOGRAM_BUCKETS
@@ -16,3 +14,25 @@ def get_registry() -> prometheus_client.CollectorRegistry:
1614
registry = prometheus_client.CollectorRegistry()
1715
MultiProcessCollector(registry) # type: ignore[no-untyped-call]
1816
return registry
17+
18+
19+
def reload_metrics(*metric_module_names: str) -> None:
20+
"""
21+
Clear the registry of all collectors from the given modules
22+
and reload the modules to register the collectors again.
23+
24+
Used in tests to reset the state of the metrics module
25+
when needed.
26+
"""
27+
28+
registry = prometheus_client.REGISTRY
29+
30+
for module_name in metric_module_names:
31+
metrics_module = importlib.import_module(module_name)
32+
33+
for module_attr in vars(metrics_module).values():
34+
if isinstance(module_attr, MetricWrapperBase):
35+
# Unregister the collector from the registry
36+
registry.unregister(module_attr)
37+
38+
importlib.reload(metrics_module)

src/task_processor/decorators.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import os
32
import typing
43
from datetime import datetime, time, timedelta
54
from threading import Thread
@@ -8,7 +7,7 @@
87
from django.db.transaction import on_commit
98
from django.utils import timezone
109

11-
from task_processor import task_registry
10+
from task_processor import metrics, task_registry
1211
from task_processor.exceptions import InvalidArgumentsError, TaskQueueFullError
1312
from task_processor.models import RecurringTask, Task, TaskPriority
1413
from task_processor.task_run_method import TaskRunMethod
@@ -69,7 +68,8 @@ def delay(
6968
args: tuple[typing.Any, ...] = (),
7069
kwargs: dict[str, typing.Any] | None = None,
7170
) -> Task | None:
72-
logger.debug("Request to run task '%s' asynchronously.", self.task_identifier)
71+
task_identifier = self.task_identifier
72+
logger.debug("Request to run task '%s' asynchronously.", task_identifier)
7373

7474
kwargs = kwargs or {}
7575

@@ -84,13 +84,16 @@ def delay(
8484
_validate_inputs(*args, **kwargs)
8585
self.unwrapped(*args, **kwargs)
8686
elif settings.TASK_RUN_METHOD == TaskRunMethod.SEPARATE_THREAD:
87-
logger.debug("Running task '%s' in separate thread", self.task_identifier)
87+
logger.debug("Running task '%s' in separate thread", task_identifier)
8888
self.run_in_thread(args=args, kwargs=kwargs)
8989
else:
90-
logger.debug("Creating task for function '%s'...", self.task_identifier)
90+
logger.debug("Creating task for function '%s'...", task_identifier)
91+
metrics.task_processor_enqueued_tasks_total.labels(
92+
task_identifier=task_identifier
93+
).inc()
9194
try:
9295
task = Task.create(
93-
task_identifier=self.task_identifier,
96+
task_identifier=task_identifier,
9497
scheduled_for=delay_until or timezone.now(),
9598
priority=self.priority,
9699
queue_size=self.queue_size,
@@ -174,7 +177,7 @@ def register_recurring_task(
174177
first_run_time: time | None = None,
175178
timeout: timedelta | None = timedelta(minutes=30),
176179
) -> typing.Callable[[TaskCallable[TaskParameters]], TaskCallable[TaskParameters]]:
177-
if not os.environ.get("RUN_BY_PROCESSOR"):
180+
if not settings.TASK_PROCESSOR_MODE:
178181
# Do not register recurring tasks if not invoked by task processor
179182
return lambda f: f
180183

src/task_processor/metrics.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import prometheus_client
2+
from django.conf import settings
3+
4+
from common.prometheus import Histogram
5+
6+
task_processor_enqueued_tasks_total = prometheus_client.Counter(
7+
"task_processor_enqueued_tasks_total",
8+
"Total number of enqueued tasks",
9+
["task_identifier"],
10+
)
11+
12+
if settings.TASK_PROCESSOR_MODE:
13+
task_processor_finished_tasks_total = prometheus_client.Counter(
14+
"task_processor_finished_tasks_total",
15+
"Total number of finished tasks",
16+
["task_identifier", "result"],
17+
)
18+
task_processor_task_duration_seconds = Histogram(
19+
"task_processor_task_duration_seconds",
20+
"Task processor task duration in seconds",
21+
["task_identifier", "result"],
22+
)

src/task_processor/processor.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
import traceback
33
import typing
44
from concurrent.futures import ThreadPoolExecutor
5+
from contextlib import ExitStack
56
from datetime import timedelta
67

8+
from django.conf import settings
79
from django.utils import timezone
810

11+
from task_processor import metrics
912
from task_processor.models import (
1013
AbstractBaseTask,
1114
RecurringTask,
@@ -101,38 +104,63 @@ def run_recurring_tasks() -> list[RecurringTaskRun]:
101104
def _run_task(
102105
task: T,
103106
) -> typing.Tuple[T, AnyTaskRun]:
107+
assert settings.TASK_PROCESSOR_MODE, (
108+
"Attempt to run tasks in a non-task-processor environment"
109+
)
110+
111+
ctx = ExitStack()
112+
timer = metrics.task_processor_task_duration_seconds.time()
113+
ctx.enter_context(timer)
114+
115+
task_identifier = task.task_identifier
116+
104117
logger.debug(
105-
f"Running task {task.task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}"
118+
f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}"
106119
)
107120
task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined]
121+
result: str
108122

109123
try:
110124
with ThreadPoolExecutor(max_workers=1) as executor:
111125
future = executor.submit(task.run)
112126
timeout = task.timeout.total_seconds() if task.timeout else None
113127
future.result(timeout=timeout) # Wait for completion or timeout
114128

115-
task_run.result = TaskResult.SUCCESS.value
129+
task_run.result = result = TaskResult.SUCCESS.value
116130
task_run.finished_at = timezone.now()
117131
task.mark_success()
118-
logger.debug(f"Task {task.task_identifier} id={task.pk} completed")
132+
133+
logger.debug(f"Task {task_identifier} id={task.pk} completed")
119134

120135
except Exception as e:
121136
# For errors that don't include a default message (e.g., TimeoutError),
122137
# fall back to using repr.
123138
err_msg = str(e) or repr(e)
124139

140+
task.mark_failure()
141+
142+
task_run.result = result = TaskResult.FAILURE.value
143+
task_run.error_details = str(traceback.format_exc())
144+
125145
logger.error(
126146
"Failed to execute task '%s', with id %d. Exception: %s",
127-
task.task_identifier,
147+
task_identifier,
128148
task.pk,
129149
err_msg,
130150
exc_info=True,
131151
)
132152

133-
task.mark_failure()
153+
result_label_value = result.lower()
134154

135-
task_run.result = TaskResult.FAILURE.value
136-
task_run.error_details = str(traceback.format_exc())
155+
timer.labels(
156+
task_identifier=task_identifier,
157+
result=result_label_value,
158+
) # type: ignore[no-untyped-call]
159+
ctx.close()
160+
161+
metrics.task_processor_finished_tasks_total.labels(
162+
task_identifier=task_identifier,
163+
result=result_label_value,
164+
).inc()
137165

138166
return task, task_run

tests/unit/task_processor/conftest.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,25 @@
1-
import logging
21
import typing
32

43
import pytest
4+
from pytest_django.fixtures import SettingsWrapper
55

6+
from common.prometheus.utils import reload_metrics
67
from task_processor.task_registry import RegisteredTask
78

89

9-
@pytest.fixture
10-
def run_by_processor(monkeypatch: pytest.MonkeyPatch) -> None:
11-
monkeypatch.setenv("RUN_BY_PROCESSOR", "True")
10+
@pytest.fixture()
11+
def task_processor_mode(settings: SettingsWrapper) -> None:
12+
settings.TASK_PROCESSOR_MODE = True
13+
# The setting is supposed to be set before the metrics module is imported,
14+
# so reload it
15+
reload_metrics("task_processor.metrics")
1216

1317

14-
class GetTaskProcessorCaplog(typing.Protocol):
15-
def __call__(
16-
self, log_level: str | int = logging.INFO
17-
) -> pytest.LogCaptureFixture: ...
18-
19-
20-
@pytest.fixture
21-
def get_task_processor_caplog(
22-
caplog: pytest.LogCaptureFixture,
23-
) -> GetTaskProcessorCaplog:
24-
# caplog doesn't allow you to capture logging outputs from loggers that don't
25-
# propagate to root. Quick hack here to get the task_processor logger to
26-
# propagate.
27-
# TODO: look into using loguru.
28-
29-
def _inner(log_level: str | int = logging.INFO) -> pytest.LogCaptureFixture:
30-
task_processor_logger = logging.getLogger("task_processor")
31-
task_processor_logger.propagate = True
32-
# Assume required level for the logger.
33-
task_processor_logger.setLevel(log_level)
34-
caplog.set_level(log_level)
35-
return caplog
36-
37-
return _inner
18+
@pytest.fixture(autouse=True)
19+
def task_processor_mode_marked(request: pytest.FixtureRequest) -> None:
20+
for marker in request.node.iter_markers():
21+
if marker.name == "task_processor_mode":
22+
request.getfixturevalue("task_processor_mode")
3823

3924

4025
@pytest.fixture(autouse=True)

0 commit comments

Comments
 (0)