Skip to content

Commit 2d531ae

Browse files
authored
Merge pull request #14 from openaleph/feat/tracer
Feat/tracer
2 parents 7d2a7e0 + 9cbca31 commit 2d531ae

File tree

8 files changed

+220
-37
lines changed

8 files changed

+220
-37
lines changed

openaleph_procrastinate/settings.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import random
2+
from typing import TYPE_CHECKING
23

4+
from anystore.types import Uri
35
from pydantic import AliasChoices, Field
46
from pydantic_settings import BaseSettings, SettingsConfigDict
57

8+
if TYPE_CHECKING:
9+
from openaleph_procrastinate.tracer import Tracer
10+
611
MAX_PRIORITY = 100
712
MIN_PRIORITY = 0
813
DEFAULT_DB_URI = "memory://"
@@ -42,6 +47,12 @@ def get_priority(self, priority: int | None = None) -> int:
4247
max_priority = max(min_priority, self.max_priority)
4348
return random.randint(min_priority, max_priority)
4449

50+
def get_tracer(self, uri: Uri | None = None) -> "Tracer":
51+
"""Get task status tracer."""
52+
from openaleph_procrastinate.tracer import get_tracer
53+
54+
return get_tracer(self.queue, self.task, uri)
55+
4556

4657
class DeferSettings(BaseSettings):
4758
"""
@@ -100,6 +111,11 @@ class DeferSettings(BaseSettings):
100111
)
101112
"""ftm-assets"""
102113

114+
vectorize: ServiceSettings = ServiceSettings(
115+
queue="vectorize", task="ftm_vectorize.tasks.vectorize", defer=False
116+
)
117+
"""ftm-vectorize"""
118+
103119
# OpenAleph
104120

105121
index: ServiceSettings = ServiceSettings(
@@ -224,6 +240,9 @@ class OpenAlephSettings(BaseSettings):
224240
)
225241
"""FollowTheMoney Fragments store uri"""
226242

243+
redis_url: str | None = Field(default=None, validation_alias="redis_url")
244+
"""Redis instance uri"""
245+
227246
procrastinate_dehydrate_entities: bool = True
228247
"""Dehydrate entity in job payload, jobs need to re-fetch entity from store"""
229248

openaleph_procrastinate/tasks.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import functools
22
import random
3-
from typing import Any, Callable
3+
from typing import Any, Callable, cast
44

55
from anystore.logging import get_logger
6+
from anystore.types import Uri
67
from procrastinate.app import App
78

89
from openaleph_procrastinate.exceptions import ErrorHandler
9-
from openaleph_procrastinate.model import AnyJob, DatasetJob, Job
10+
from openaleph_procrastinate.model import AnyJob, DatasetJob, Job, Status
11+
from openaleph_procrastinate.tracer import get_tracer
1012

1113
log = get_logger(__name__)
1214

@@ -19,13 +21,28 @@ def unpack_job(data: dict[str, Any]) -> AnyJob:
1921
return Job(**data)
2022

2123

24+
def handle_trace(job: AnyJob, status: Status, tracer_uri: Uri | None) -> None:
25+
if tracer_uri is not None and isinstance(job, DatasetJob):
26+
tracer = get_tracer(job.queue, job.task, tracer_uri)
27+
for entity in job.get_entities():
28+
tracer.mark(cast(str, entity.id), status)
29+
30+
2231
def task(app: App, **kwargs):
2332
# https://procrastinate.readthedocs.io/en/stable/howto/advanced/middleware.html
33+
tracer_uri = kwargs.pop("tracer_uri", None)
34+
2435
def wrap(func: Callable[..., None]):
2536
def _inner(*job_args, **job_kwargs):
2637
# turn the json data into the job model instance
2738
job = unpack_job(job_kwargs)
28-
func(*job_args, job)
39+
handle_trace(job, "doing", tracer_uri)
40+
try:
41+
func(*job_args, job)
42+
handle_trace(job, "succeeded", tracer_uri)
43+
except Exception as e:
44+
handle_trace(job, "failed", tracer_uri)
45+
raise e
2946

3047
# need to call to not register tasks twice (procrastinate complains)
3148
wrapped_func = functools.update_wrapper(_inner, func, updated=())

openaleph_procrastinate/tracer.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""
2+
Simple, quick & dirty tracer module to track task status for entities in a
3+
shared redis instance. This is useful to share task status across services for
4+
quick lookup (psql would be too expensive), e.g. to show a loading spinner in
5+
the UI if something is processing. This module currently is a quick shot to
6+
solve the status lookup for specific queues, it should be refactored at one
7+
point into a more general tracer module that e.g. can also store processing
8+
exceptions in postgres. For this, creating a better class-based queue system and
9+
merge the tracing into it should be considered.
10+
11+
The tracer is used in the @task decorator (middleware) when tracer_uri=... is set
12+
in the kwargs. e.g.:
13+
14+
```python
15+
@task(queue="my-queue", trace_uri="redis://localhost")
16+
def process(job) -> None:
17+
pass
18+
```
19+
20+
(See test suite for example)
21+
22+
The tracer backend accepts any uri (sql, file-like, ...), but it is preferable
23+
to use redis for performance reasons.
24+
"""
25+
26+
from functools import cache
27+
28+
from anystore.store import get_store
29+
from anystore.types import Uri
30+
from anystore.util import join_relpaths
31+
32+
from openaleph_procrastinate.model import Status
33+
from openaleph_procrastinate.settings import OpenAlephSettings
34+
35+
36+
class Tracer:
37+
def __init__(self, queue: str, task: str, uri: Uri | None = None) -> None:
38+
if uri is None:
39+
settings = OpenAlephSettings()
40+
uri = settings.redis_url
41+
self._store = get_store(uri or "memory://")
42+
self.queue = queue
43+
self.task = task
44+
45+
def _make_key(self, entity_id: str) -> str:
46+
return join_relpaths(
47+
"openaleph-procrastinate", "tracer", self.queue, self.task, entity_id
48+
)
49+
50+
def mark(self, entity_id: str, status: Status) -> None:
51+
"""Mark an entity status for the given queue and task. If status is
52+
'succeeded' remove the data from the tracer."""
53+
key = self._make_key(entity_id)
54+
if status == "succeeded":
55+
return self._store.delete(key, ignore_errors=True)
56+
self._store.put(key, status)
57+
58+
def add(self, entity_id: str) -> None:
59+
"""Mark as todo"""
60+
self.mark(entity_id, "todo")
61+
62+
def start(self, entity_id: str) -> None:
63+
"""Mark as doing"""
64+
self.mark(entity_id, "doing")
65+
66+
def finish(self, entity_id: str) -> None:
67+
"""Mark done which is actually popping (deleting) from the tracer"""
68+
self.mark(entity_id, "succeeded")
69+
70+
def is_processing(self, entity_id: str) -> bool:
71+
"""Check if a task for the entity_id is either pending or doing"""
72+
key = self._make_key(entity_id)
73+
if not self._store.exists(key):
74+
return False
75+
status = self._store.get(key)
76+
return status in ("todo", "doing")
77+
78+
79+
@cache
80+
def get_tracer(queue: str, task: str, uri: Uri | None) -> Tracer:
81+
return Tracer(queue, task, uri)

0 commit comments

Comments
 (0)