Skip to content

Commit 7171c6d

Browse files
authored
[Python, TS] Fix: Cancellation Propagation (#2865)
* feat: don't send fail / success event on cancelled * feat: basic bounded dict to handle cancellations without leaking memory * chore: changelog * fix: rm redundant line * chore: ts ver * chore: changelogs * fix: add default * fix: early return
1 parent 1a5ea82 commit 7171c6d

File tree

7 files changed

+58
-11
lines changed

7 files changed

+58
-11
lines changed

sdks/python/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [1.22.11] - 2026-01-27
9+
10+
### Changed
11+
12+
- Improves handling of cancellations for tasks to limit how often tasks receive a cancellation but then are marked as succeeded anyways.
13+
814
## [1.22.10] - 2026-01-26
915

1016
### Added
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from collections import OrderedDict
2+
from typing import TypeVar
3+
4+
K = TypeVar("K")
5+
V = TypeVar("V")
6+
7+
8+
class BoundedDict(OrderedDict[K, V]):
9+
def __init__(self, maxsize: int):
10+
super().__init__()
11+
self.maxsize = maxsize
12+
13+
def __setitem__(self, key: K, value: V) -> None:
14+
if key in self:
15+
self.move_to_end(key)
16+
17+
super().__setitem__(key, value)
18+
19+
if len(self) > self.maxsize:
20+
self.popitem(last=False)

sdks/python/hatchet_sdk/worker/runner/runner.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from hatchet_sdk.runnables.task import Task
5050
from hatchet_sdk.runnables.types import R, TWorkflowInput
5151
from hatchet_sdk.serde import HATCHET_PYDANTIC_SENTINEL
52+
from hatchet_sdk.utils.cache import BoundedDict
5253
from hatchet_sdk.utils.serde import remove_null_unicode_character
5354
from hatchet_sdk.utils.typing import DataclassInstance
5455
from hatchet_sdk.worker.action_listener_process import ActionEvent
@@ -86,6 +87,7 @@ def __init__(
8687
self.slots = slots
8788
self.tasks: dict[ActionKey, asyncio.Task[Any]] = {} # Store run ids and futures
8889
self.contexts: dict[ActionKey, Context] = {} # Store run ids and contexts
90+
self.cancellations = BoundedDict[str, bool](maxsize=1000)
8991
self.action_registry = action_registry or {}
9092

9193
self.event_queue = event_queue
@@ -156,8 +158,9 @@ def step_run_callback(
156158
) -> Callable[[asyncio.Task[Any]], None]:
157159
def inner_callback(task: asyncio.Task[Any]) -> None:
158160
self.cleanup_run_id(action.key)
161+
was_cancelled = self.cancellations.pop(action.key, False)
159162

160-
if task.cancelled():
163+
if was_cancelled or task.cancelled():
161164
return
162165

163166
try:
@@ -348,6 +351,9 @@ def cleanup_run_id(self, key: ActionKey) -> None:
348351
del self.threads[key]
349352

350353
if key in self.contexts:
354+
if self.contexts[key].exit_flag:
355+
self.cancellations[key] = True
356+
351357
del self.contexts[key]
352358

353359
@overload
@@ -467,6 +473,7 @@ async def handle_cancel_action(self, action: Action) -> None:
467473
# call cancel to signal the context to stop
468474
if key in self.contexts:
469475
self.contexts[key]._set_cancellation_flag()
476+
self.cancellations[key] = True
470477

471478
await asyncio.sleep(1)
472479

sdks/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "hatchet-sdk"
3-
version = "1.22.10"
3+
version = "1.22.11"
44
description = "This is the official Python SDK for Hatchet, a distributed, fault-tolerant task queue. The SDK allows you to easily integrate Hatchet's task scheduling and workflow orchestration capabilities into your Python applications."
55
authors = [
66
"Alexander Belanger <[email protected]>",

sdks/typescript/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ All notable changes to Hatchet's TypeScript SDK will be documented in this chang
44

55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
## [1.10.6] - 2026-01-27
9+
10+
### Changed
11+
12+
- Improves handling of cancellations for tasks to limit how often tasks receive a cancellation but then are marked as succeeded anyways.

sdks/typescript/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@hatchet-dev/typescript-sdk",
3-
"version": "1.10.5",
3+
"version": "1.10.6",
44
"description": "Background task orchestration & visibility for developers",
55
"types": "dist/index.d.ts",
66
"files": [

sdks/typescript/src/v1/client/worker/worker-internal.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -550,9 +550,13 @@ export class V1Worker {
550550
};
551551

552552
const success = async (result: any) => {
553-
this.logger.info(`Step run ${action.stepRunId} succeeded`);
554-
555553
try {
554+
if (context.cancelled) {
555+
return;
556+
}
557+
558+
this.logger.info(`Step run ${action.stepRunId} succeeded`);
559+
556560
// Send the action event to the dispatcher
557561
const event = this.getStepActionEvent(
558562
action,
@@ -595,15 +599,19 @@ export class V1Worker {
595599
};
596600

597601
const failure = async (error: any) => {
598-
this.logger.error(`Step run ${action.stepRunId} failed: ${error.message}`);
599-
600-
if (error.stack) {
601-
this.logger.error(error.stack);
602-
}
603-
604602
const shouldNotRetry = error instanceof NonRetryableError;
605603

606604
try {
605+
if (context.cancelled) {
606+
return;
607+
}
608+
609+
this.logger.error(`Step run ${action.stepRunId} failed: ${error.message}`);
610+
611+
if (error.stack) {
612+
this.logger.error(error.stack);
613+
}
614+
607615
// Send the action event to the dispatcher
608616
const event = this.getStepActionEvent(
609617
action,

0 commit comments

Comments
 (0)