Skip to content

Commit c7d56b3

Browse files
authored
Task tree: backend performance improvements (#255)
1 parent 13857a0 commit c7d56b3

File tree

6 files changed

+146
-36
lines changed

6 files changed

+146
-36
lines changed

dev/logger.Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:3.7
1+
FROM python:3.11
22

33
COPY requirements.txt /karton/
44
COPY setup.py /karton/

karton/core/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "5.3.4"
1+
__version__ = "5.4.0"

karton/core/backend.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,58 @@ def get_all_tasks(
532532
self.iter_all_tasks(chunk_size=chunk_size, parse_resources=parse_resources)
533533
)
534534

535+
def _iter_legacy_task_tree(
536+
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
537+
) -> Iterator[Task]:
538+
"""
539+
Processes tasks made by <5.4.0 (unrouted from <5.4.0 producers or existing
540+
before upgrade)
541+
542+
Used internally by iter_task_tree.
543+
"""
544+
# Iterate over all karton tasks that do not match the new task id format
545+
legacy_task_keys = self.redis.scan_iter(
546+
match=f"{KARTON_TASK_NAMESPACE}:[^{{]*", count=chunk_size
547+
)
548+
for chunk in chunks_iter(legacy_task_keys, chunk_size):
549+
yield from filter(
550+
lambda task: task.root_uid == root_uid,
551+
(
552+
Task.unserialize(
553+
task_data, backend=self, parse_resources=parse_resources
554+
)
555+
for task_data in self.redis.mget(chunk)
556+
if task_data is not None
557+
),
558+
)
559+
560+
def iter_task_tree(
561+
self, root_uid: str, chunk_size: int = 1000, parse_resources: bool = True
562+
) -> Iterator[Task]:
563+
"""
564+
Iterates all tasks that belong to the same analysis task tree
565+
and have the same root_uid
566+
567+
:param root_uid: Root identifier of task tree
568+
:param chunk_size: Size of chunks passed to the Redis SCAN and MGET command
569+
:param parse_resources: If set to False, resources are not parsed.
570+
It speeds up deserialization. Read :py:meth:`Task.unserialize` documentation
571+
to learn more.
572+
:return: Iterator with task objects
573+
"""
574+
# Process <5.4.0 tasks (unrouted from <5.4.0 producers
575+
# or existing before upgrade)
576+
yield from self._iter_legacy_task_tree(
577+
root_uid, chunk_size=chunk_size, parse_resources=parse_resources
578+
)
579+
# Process >=5.4.0 tasks
580+
task_keys = self.redis.scan_iter(
581+
match=f"{KARTON_TASK_NAMESPACE}:{{{root_uid}}}:*", count=chunk_size
582+
)
583+
yield from self._iter_tasks(
584+
task_keys, chunk_size=chunk_size, parse_resources=parse_resources
585+
)
586+
535587
def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None:
536588
"""
537589
Register or update task in Redis.

karton/core/inspect.py

Lines changed: 66 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections import defaultdict
2-
from typing import Dict, List
2+
from typing import Dict, List, Optional
33

44
from .backend import KartonBackend, KartonBind
55
from .task import Task, TaskState
@@ -9,9 +9,9 @@ class KartonQueue:
99
"""
1010
View object representing a Karton queue
1111
12-
:param bind: :py:meth:`KartonBind` object representing the queue bind
12+
:param bind: :class:`KartonBind` object representing the queue bind
1313
:param tasks: List of tasks currently in queue
14-
:param state: :py:meth:`KartonBackend` object to be used
14+
:param state: :class:`KartonState` object to be used
1515
"""
1616

1717
def __init__(
@@ -48,7 +48,7 @@ class KartonAnalysis:
4848
4949
:param root_uid: Analysis root task uid
5050
:param tasks: List of tasks
51-
:param state: :py:meth:`KartonBackend` object to be used
51+
:param state: :class:`KartonState` object to be used
5252
"""
5353

5454
def __init__(self, root_uid: str, tasks: List[Task], state: "KartonState") -> None:
@@ -89,7 +89,7 @@ def get_queues_for_tasks(
8989
Group task objects by their queue name
9090
9191
:param tasks: Task objects to group
92-
:param state: :py:meth:`KartonBackend` to bind to created queues
92+
:param state: :class:`KartonState` object to be used
9393
:return: A dictionary containing the queue names and lists of tasks
9494
"""
9595
tasks_per_queue = defaultdict(list)
@@ -119,30 +119,68 @@ class KartonState:
119119
:param backend: :py:meth:`KartonBackend` object to use for data fetching
120120
"""
121121

122-
def __init__(self, backend: KartonBackend) -> None:
122+
def __init__(self, backend: KartonBackend, parse_resources: bool = False) -> None:
123123
self.backend = backend
124124
self.binds = {bind.identity: bind for bind in backend.get_binds()}
125125
self.replicas = backend.get_online_consumers()
126-
self.tasks = backend.get_all_tasks()
127-
self.pending_tasks = [
128-
task for task in self.tasks if task.status != TaskState.FINISHED
129-
]
130-
131-
# Tasks grouped by root_uid
132-
tasks_per_analysis = defaultdict(list)
133-
134-
for task in self.pending_tasks:
135-
tasks_per_analysis[task.root_uid].append(task)
136-
137-
self.analyses = {
138-
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
139-
for root_uid, tasks in tasks_per_analysis.items()
140-
}
141-
queues = get_queues_for_tasks(self.pending_tasks, self)
142-
# Present registered queues without tasks
143-
for bind_name, bind in self.binds.items():
144-
if bind_name not in queues:
145-
queues[bind_name] = KartonQueue(
146-
bind=self.binds[bind_name], tasks=[], state=self
126+
self.parse_resources = parse_resources
127+
128+
self._tasks: Optional[List[Task]] = None
129+
self._pending_tasks: Optional[List[Task]] = None
130+
self._analyses: Optional[Dict[str, KartonAnalysis]] = None
131+
self._queues: Optional[Dict[str, KartonQueue]] = None
132+
133+
@property
134+
def tasks(self) -> List[Task]:
135+
if self._tasks is None:
136+
self._tasks = self.backend.get_all_tasks(
137+
parse_resources=self.parse_resources
138+
)
139+
return self._tasks
140+
141+
@property
142+
def pending_tasks(self) -> List[Task]:
143+
if self._pending_tasks is None:
144+
self._pending_tasks = [
145+
task for task in self.tasks if task.status != TaskState.FINISHED
146+
]
147+
return self._pending_tasks
148+
149+
@property
150+
def analyses(self) -> Dict[str, KartonAnalysis]:
151+
if self._analyses is None:
152+
# Tasks grouped by root_uid
153+
tasks_per_analysis = defaultdict(list)
154+
155+
for task in self.pending_tasks:
156+
tasks_per_analysis[task.root_uid].append(task)
157+
158+
self._analyses = {
159+
root_uid: KartonAnalysis(root_uid=root_uid, tasks=tasks, state=self)
160+
for root_uid, tasks in tasks_per_analysis.items()
161+
}
162+
return self._analyses
163+
164+
@property
165+
def queues(self) -> Dict[str, KartonQueue]:
166+
if self._queues is None:
167+
queues = get_queues_for_tasks(self.pending_tasks, self)
168+
# Present registered queues without tasks
169+
for bind_name, bind in self.binds.items():
170+
if bind_name not in queues:
171+
queues[bind_name] = KartonQueue(
172+
bind=self.binds[bind_name], tasks=[], state=self
173+
)
174+
self._queues = queues
175+
return self._queues
176+
177+
def get_analysis(self, root_uid: str) -> KartonAnalysis:
178+
return KartonAnalysis(
179+
root_uid=root_uid,
180+
tasks=list(
181+
self.backend.iter_task_tree(
182+
root_uid, parse_resources=self.parse_resources
147183
)
148-
self.queues = queues
184+
),
185+
state=self,
186+
)

karton/core/task.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,18 @@ def __init__(
106106
raise ValueError("Persistent headers should be an instance of a dict")
107107

108108
if uid is None:
109-
self.uid = str(uuid.uuid4())
109+
task_uid = str(uuid.uuid4())
110+
if root_uid is None:
111+
self.root_uid = task_uid
112+
else:
113+
self.root_uid = root_uid
114+
# New-style UID format introduced in v5.4.0
115+
# {12345678-1234-1234-1234-12345678abcd}:12345678-1234-1234-1234-12345678abcd
116+
self.uid = f"{{{self.root_uid}}}:{task_uid}"
110117
else:
111118
self.uid = uid
112-
113-
if root_uid is None:
114-
self.root_uid = self.uid
115-
else:
119+
if root_uid is None:
120+
raise ValueError("root_uid cannot be None when uid is not None")
116121
self.root_uid = root_uid
117122

118123
self.orig_uid = orig_uid
@@ -137,6 +142,21 @@ def headers_persistent(self) -> Dict[str, Any]:
137142
def receiver(self) -> Optional[str]:
138143
return self.headers.get("receiver")
139144

145+
@property
146+
def task_uid(self) -> str:
147+
return self.fquid_to_uid(self.uid)
148+
149+
@staticmethod
150+
def fquid_to_uid(fquid: str) -> str:
151+
"""
152+
Gets task uid from fully-qualified fquid ({root_uid}:task_uid)
153+
154+
:return: Task uid
155+
"""
156+
if ":" not in fquid:
157+
return fquid
158+
return fquid.split(":")[-1]
159+
140160
def fork_task(self) -> "Task":
141161
"""
142162
Fork task to transfer single task to many queues (but use different UID).

karton/system/system.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def gc_collect(self) -> None:
166166

167167
def route_task(self, task: Task, binds: List[KartonBind]) -> None:
168168
# Performs routing of task
169-
self.log.info("[%s] Processing task %s", task.root_uid, task.uid)
169+
self.log.info("[%s] Processing task %s", task.root_uid, task.task_uid)
170170
# store the producer-task relationship in redis for task tracking
171171
self.backend.log_identity_output(
172172
task.headers.get("origin", "unknown"), task.headers

0 commit comments

Comments
 (0)