Skip to content
This repository was archived by the owner on Feb 17, 2026. It is now read-only.

Commit 1412308

Browse files
author
Matt Griswold
committed
Merge branch 'FC-6202' into 'prep-release'
Fullctl Running Task Cancellation See merge request fullctl/fullctl!317
2 parents d67b18a + e52ae14 commit 1412308

File tree

4 files changed

+751
-8
lines changed

4 files changed

+751
-8
lines changed

src/fullctl/django/models/concrete/tasks.py

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import fullctl.service_bridge.aaactl as aaactl
2323
import fullctl.service_bridge.auditctl as auditctl
2424
from fullctl.django.models.abstract.base import HandleRefModel
25+
from fullctl.django.tasks.context import task_execution_context
2526
from fullctl.django.tasks.qualifiers import Dynamic
2627
from fullctl.django.tasks.util import worker_id
2728

@@ -32,6 +33,7 @@
3233
"TaskLimitError",
3334
"TaskAlreadyStarted",
3435
"ParentTaskNotFinished",
36+
"TaskCancelledException",
3537
"Task",
3638
"TaskClaim",
3739
"TaskHeartbeat",
@@ -52,9 +54,13 @@ class TaskClaimed(IOError):
5254
def __init__(self, task):
5355
super().__init__(f"Task already claimed by another worker: {task}")
5456

57+
5558
class TaskScheduleClaimed(IOError):
5659
def __init__(self, task_schedule):
57-
super().__init__(f"Task schedule already claimed by another worker: {task_schedule}")
60+
super().__init__(
61+
f"Task schedule already claimed by another worker: {task_schedule}"
62+
)
63+
5864

5965
class WorkerUnqualified(IOError):
6066
def __init__(self, task, qualifier):
@@ -116,6 +122,25 @@ class ParentTaskNotFinished(IOError):
116122
pass
117123

118124

125+
class TaskCancelledException(IOError):
126+
"""
127+
Raised when a task is cancelled during execution.
128+
129+
This allows graceful termination of task execution. The task execution
130+
framework catches this exception and:
131+
- Does NOT mark the task as failed
132+
- Preserves the cancelled status
133+
- Does NOT trigger error notifications
134+
135+
This exception should be raised by task.check_cancelled() when it
136+
detects that the task status has been set to "cancelled".
137+
"""
138+
139+
def __init__(self, task):
140+
super().__init__(f"Task was cancelled: {task.id}")
141+
self.task = task
142+
143+
119144
class ErrorNotificationConfig(pydantic.BaseModel):
120145
subject: str
121146
message: str
@@ -520,6 +545,34 @@ def _cancel(self, reason):
520545
self.status = "cancelled"
521546
self.save()
522547

548+
def check_cancelled(self):
549+
"""
550+
Check if the task has been cancelled and raise TaskCancelledException if so.
551+
552+
This method should be called periodically during long-running task execution
553+
to allow for graceful cancellation. It refreshes the task status from the
554+
database to check for cancellation requests.
555+
556+
Raises:
557+
TaskCancelledException: If the task status is 'cancelled'
558+
559+
Usage example:
560+
def run(self, *args, **kwargs):
561+
for item in large_collection:
562+
self.check_cancelled() # Check before processing each item
563+
process(item)
564+
565+
Note:
566+
In most cases, you should use check_task_cancelled() from
567+
fullctl.django.tasks.context instead of calling this directly.
568+
The context function works anywhere in the call stack.
569+
"""
570+
# Refresh status from database to get latest value
571+
self.refresh_from_db(fields=["status"])
572+
573+
if self.status == "cancelled":
574+
raise TaskCancelledException(self)
575+
523576
def _complete(self, output):
524577

525578
# if result is a dict we need to json encode it
@@ -605,11 +658,29 @@ def _run(self):
605658
try:
606659
param = self.param
607660
extensions.call(self, "before_run", *param["args"], **param["kwargs"])
608-
output = self.run(*param["args"], **param["kwargs"])
661+
662+
# Wrap task execution in context so task can be accessed
663+
# from anywhere in the call stack via check_task_cancelled()
664+
with task_execution_context(self):
665+
output = self.run(*param["args"], **param["kwargs"])
666+
609667
extensions.call(self, "after_run", result=output)
610668
t_end = time.time()
611669
self.time = t_end - t_start
612670
self._complete(output)
671+
except TaskCancelledException:
672+
# Task was cancelled during execution - this is expected behavior
673+
# Status is already set to "cancelled" by the cancel() call
674+
# We don't call _fail() to avoid marking it as an error
675+
t_end = time.time()
676+
self.time = t_end - t_start
677+
self.save()
678+
log.info(
679+
"Task cancelled during execution",
680+
task_id=self.id,
681+
task_op=self.op,
682+
execution_time=self.time,
683+
)
613684
except Exception as exc:
614685
self._fail(traceback.format_exc(), exc)
615686

@@ -763,7 +834,7 @@ def spawn_tasks(self):
763834

764835
if self.are_limited_tasks_pending():
765836
return []
766-
837+
767838
# try to create a claim for the schedule
768839
try:
769840
schedule_claim = TaskScheduleClaim.objects.create(
@@ -775,7 +846,9 @@ def spawn_tasks(self):
775846
raise TaskScheduleClaimed(self)
776847

777848
# clear old claims
778-
TaskScheduleClaim.objects.filter(task_schedule=self).exclude(id=schedule_claim.id).delete()
849+
TaskScheduleClaim.objects.filter(task_schedule=self).exclude(
850+
id=schedule_claim.id
851+
).delete()
779852

780853
for task in self.tasks.all():
781854
if task.status in ["pending", "running"]:
@@ -799,10 +872,9 @@ def spawn_tasks(self):
799872
self.status = "deactivated"
800873
self.save()
801874

802-
803-
804875
return tasks
805876

877+
806878
class TaskScheduleClaim(HandleRefModel):
807879
task_schedule = models.ForeignKey(TaskSchedule, on_delete=models.CASCADE)
808880
worker_id = models.CharField(max_length=255)
@@ -812,11 +884,12 @@ class Meta:
812884
db_table = "fullctl_task_schedule_claim"
813885
verbose_name = _("Task Schedule Claim")
814886
verbose_name_plural = _("Task Schedule Claims")
815-
unique_together = (("task_schedule", "schedule_date"))
887+
unique_together = ("task_schedule", "schedule_date")
816888

817889
class HandleRef:
818890
tag = "task_schedule_claim"
819891

892+
820893
class Monitor(HandleRefModel):
821894
email = models.EmailField(
822895
null=True, blank=True, help_text=_("Primary alert notification email")
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
"""
2+
Task execution context management.
3+
4+
Provides a thread-safe way to access the currently executing task
5+
from anywhere in the call stack without explicit parameter passing.
6+
"""
7+
8+
from contextvars import ContextVar
9+
from typing import Optional
10+
11+
__all__ = [
12+
"task_execution_context",
13+
"get_current_task",
14+
"check_task_cancelled",
15+
]
16+
17+
_current_task: ContextVar = ContextVar("current_task", default=None)
18+
19+
20+
class task_execution_context:
21+
"""
22+
Context manager for tracking the currently executing task.
23+
24+
This sets the task in context-local storage, making it accessible
25+
to any code in the call stack via get_current_task() without
26+
requiring explicit parameter passing.
27+
28+
Usage:
29+
with task_execution_context(task):
30+
# task is now accessible via get_current_task()
31+
do_work()
32+
33+
Example:
34+
class MyTask(Task):
35+
def run(self, *args, **kwargs):
36+
# Context automatically set by Task._run()
37+
process_data() # Can call check_task_cancelled() anywhere
38+
39+
Note:
40+
In most cases, you don't need to use this directly.
41+
Task._run() automatically wraps execution in this context.
42+
"""
43+
44+
def __init__(self, task):
45+
"""
46+
Initialize context manager.
47+
48+
Args:
49+
task: The Task instance to set as current
50+
"""
51+
self.task = task
52+
self.token = None
53+
54+
def __enter__(self):
55+
"""
56+
Enter the context and set current task.
57+
58+
Returns:
59+
The task instance
60+
"""
61+
# Set the task and get a token for cleanup
62+
self.token = _current_task.set(self.task)
63+
return self.task
64+
65+
def __exit__(self, exc_type, exc_val, exc_tb):
66+
"""
67+
Exit the context and restore previous task.
68+
69+
Args:
70+
exc_type: Exception type if an exception occurred
71+
exc_val: Exception value if an exception occurred
72+
exc_tb: Exception traceback if an exception occurred
73+
74+
Returns:
75+
False to propagate exceptions (don't suppress)
76+
"""
77+
# Reset to previous value using the token
78+
_current_task.reset(self.token)
79+
return False # Don't suppress exceptions
80+
81+
82+
def get_current_task() -> Optional["Task"]: # noqa: F821
83+
"""
84+
Get the currently executing task from context.
85+
86+
This can be called from anywhere within a task execution context
87+
to access the task instance without explicit parameter passing.
88+
89+
Returns:
90+
Task instance if within task execution context, None otherwise
91+
92+
Example:
93+
def some_deep_function():
94+
task = get_current_task()
95+
if task:
96+
print(f"Running in task {task.id}")
97+
else:
98+
print("Not in task context")
99+
"""
100+
return _current_task.get()
101+
102+
103+
def check_task_cancelled():
104+
"""
105+
Check if the current task has been cancelled.
106+
107+
This can be called from anywhere within a task execution context
108+
to gracefully handle cancellation. If called outside a task context,
109+
it does nothing (safe to use in code that may run outside tasks).
110+
111+
The function will check the database for the current task's status
112+
and raise TaskCancelledException if the task has been cancelled.
113+
114+
Raises:
115+
TaskCancelledException: If the current task is cancelled
116+
117+
Example:
118+
def process_items(items):
119+
for item in items:
120+
check_task_cancelled() # Check before each item
121+
process(item)
122+
123+
Note:
124+
This is safe to call even if not in a task context - it will
125+
simply do nothing if no task is currently executing.
126+
"""
127+
task = get_current_task()
128+
if task:
129+
task.check_cancelled()

0 commit comments

Comments
 (0)