Skip to content

Commit 47bd33b

Browse files
committed
feat(manual-mode): Add manual model
1 parent abec78b commit 47bd33b

File tree

3 files changed

+52
-26
lines changed

3 files changed

+52
-26
lines changed

settings/dev.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,9 @@
6868
DOCGEN_MODE = env.bool("DOCGEN_MODE", default=False)
6969
TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR
7070

71+
# To be used by test that depend on task processor to run tasks manually by
72+
# calling `run_tasks`
73+
TASK_PROCESSOR_MANUAL_MODE = env.bool("TASK_PROCESSOR_MANUAL_MODE", default=False)
74+
7175
# Avoid models.W042 warnings
7276
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"

src/task_processor/decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def register_recurring_task(
177177
first_run_time: time | None = None,
178178
timeout: timedelta | None = timedelta(minutes=30),
179179
) -> typing.Callable[[TaskCallable[TaskParameters]], TaskCallable[TaskParameters]]:
180-
if not settings.TASK_PROCESSOR_MODE:
180+
if not settings.TASK_PROCESSOR_MODE or settings.TASK_PROCESSOR_MANUAL_MODE:
181181
# Do not register recurring tasks if not invoked by task processor
182182
return lambda f: f
183183

src/task_processor/processor.py

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import functools
12
import logging
23
import traceback
34
import typing
@@ -109,33 +110,65 @@ def run_recurring_tasks(database: str) -> list[RecurringTaskRun]:
109110
return []
110111

111112

113+
def task_metrics(
114+
func: typing.Callable[[T], typing.Tuple[T, AnyTaskRun]],
115+
) -> typing.Callable[[T], typing.Tuple[T, AnyTaskRun]]:
116+
@functools.wraps(func)
117+
def wrapper(
118+
task: T, *args: typing.Any, **kwargs: typing.Any
119+
) -> typing.Tuple[T, AnyTaskRun]:
120+
# Only collect metrics when TASK_PROCESSOR_MODE is True
121+
if not settings.TASK_PROCESSOR_MODE:
122+
return func(task, *args, **kwargs)
123+
124+
ctx = ExitStack()
125+
timer = metrics.flagsmith_task_processor_task_duration_seconds.time()
126+
ctx.enter_context(timer)
127+
128+
task_obj, task_run = func(task, *args, **kwargs)
129+
result = task_run.result
130+
131+
# Get task info for labels
132+
task_identifier = task.task_identifier
133+
registered_task = get_task(task_identifier)
134+
135+
labels = {
136+
"task_identifier": task_identifier,
137+
"task_type": registered_task.task_type.value.lower(),
138+
"result": result.lower(),
139+
} # type: ignore[union-attr]
140+
141+
timer.labels(**labels) # type: ignore[no-untyped-call]
142+
ctx.close()
143+
metrics.flagsmith_task_processor_finished_tasks_total.labels(**labels).inc()
144+
145+
return task_obj, task_run
146+
147+
return wrapper
148+
149+
150+
@task_metrics
112151
def _run_task(
113152
task: T,
114153
) -> typing.Tuple[T, AnyTaskRun]:
115-
assert settings.TASK_PROCESSOR_MODE, (
116-
"Attempt to run tasks in a non-task-processor environment"
117-
)
118-
119-
ctx = ExitStack()
120-
timer = metrics.flagsmith_task_processor_task_duration_seconds.time()
121-
ctx.enter_context(timer)
122-
154+
assert (
155+
settings.TASK_PROCESSOR_MODE or settings.TASK_PROCESSOR_MANUAL_MODE
156+
), "Attempt to run tasks in a non-task-processor environment"
123157
task_identifier = task.task_identifier
124158
registered_task = get_task(task_identifier)
125159

126160
logger.debug(
127161
f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}"
128162
)
129163
task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined]
130-
result: str
131164

132165
try:
133166
with ThreadPoolExecutor(max_workers=1) as executor:
134167
future = executor.submit(task.run)
135168
timeout = task.timeout.total_seconds() if task.timeout else None
136169
future.result(timeout=timeout) # Wait for completion or timeout
137170

138-
task_run.result = result = TaskResult.SUCCESS.value
171+
task_run.result = TaskResult.SUCCESS.value
139172
task_run.finished_at = timezone.now()
140173
task.mark_success()
141174

@@ -148,7 +181,7 @@ def _run_task(
148181

149182
task.mark_failure()
150183

151-
task_run.result = result = TaskResult.FAILURE.value
184+
task_run.result = TaskResult.FAILURE.value
152185
task_run.error_details = str(traceback.format_exc())
153186

154187
logger.error(
@@ -160,9 +193,9 @@ def _run_task(
160193
)
161194

162195
if isinstance(e, TaskBackoffError):
163-
assert registered_task.task_type == TaskType.STANDARD, (
164-
"Attempt to back off a recurring task (currently not supported)"
165-
)
196+
assert (
197+
registered_task.task_type == TaskType.STANDARD
198+
), "Attempt to back off a recurring task (currently not supported)"
166199
if typing.TYPE_CHECKING:
167200
assert isinstance(task, Task)
168201
if task.num_failures <= 3:
@@ -176,15 +209,4 @@ def _run_task(
176209
delay_until,
177210
)
178211

179-
labels = {
180-
"task_identifier": task_identifier,
181-
"task_type": registered_task.task_type.value.lower(),
182-
"result": result.lower(),
183-
}
184-
185-
timer.labels(**labels) # type: ignore[no-untyped-call]
186-
ctx.close()
187-
188-
metrics.flagsmith_task_processor_finished_tasks_total.labels(**labels).inc()
189-
190212
return task, task_run

0 commit comments

Comments
 (0)