Skip to content

Commit a3a35b5

Browse files
author
Andrei Neagu
committed
wired task_start via common interface
1 parent 5ca6450 commit a3a35b5

File tree

8 files changed

+136
-85
lines changed

8 files changed

+136
-85
lines changed

packages/service-library/src/servicelib/aiohttp/long_running_tasks/_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from pydantic import AnyHttpUrl, TypeAdapter
1010

1111
from ...aiohttp import status
12+
from ...long_running_tasks import http_endpoint_responses
1213
from ...long_running_tasks.constants import (
1314
DEFAULT_STALE_TASK_CHECK_INTERVAL,
1415
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
@@ -55,7 +56,8 @@ async def start_long_running_task(
5556
task_name = _create_task_name_from_request(request_)
5657
task_id = None
5758
try:
58-
task_id = long_running_manager.tasks_manager.start_task(
59+
task_id = await http_endpoint_responses.start_task(
60+
long_running_manager.tasks_manager,
5961
registerd_task_name,
6062
fire_and_forget=fire_and_forget,
6163
task_context=task_context,
Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Any
22

33
from .models import TaskBase, TaskId, TaskStatus
4-
from .task import TaskContext, TasksManager, TrackedTask
4+
from .task import RegisteredTaskName, TaskContext, TasksManager, TrackedTask
55

66

77
def list_tasks(
@@ -16,6 +16,7 @@ def list_tasks(
1616
def get_task_status(
1717
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
1818
) -> TaskStatus:
19+
"""returns the status of a task"""
1920
return tasks_manager.get_task_status(
2021
task_id=task_id, with_task_context=task_context
2122
)
@@ -24,6 +25,7 @@ def get_task_status(
2425
async def get_task_result(
2526
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
2627
) -> Any:
28+
"""retruns the result of a task, which is directly whatever the remove hanlder returned"""
2729
try:
2830
return tasks_manager.get_task_result(
2931
task_id=task_id, with_task_context=task_context
@@ -38,23 +40,50 @@ async def get_task_result(
3840
async def remove_task(
3941
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
4042
) -> None:
43+
"""removes / cancels a task"""
4144
await tasks_manager.remove_task(task_id, with_task_context=task_context)
4245

4346

44-
# TODO reroute start via this with registration of hanlders
47+
async def start_task(
48+
tasks_manager: TasksManager,
49+
registered_task_name: RegisteredTaskName,
50+
*,
51+
unique: bool = False,
52+
task_context: TaskContext | None = None,
53+
task_name: str | None = None,
54+
fire_and_forget: bool = False,
55+
**task_kwargs: Any,
56+
) -> TaskId:
57+
"""
58+
Creates a background task from an async function.
4559
46-
# TODO: to support this we need handler registration capability in order to figure out if they can be started
47-
# this should be done via the final TasksManager
48-
# - also
60+
An asyncio task will be created out of it by injecting a `TaskProgress` as the first
61+
positional argument and adding all `handler_kwargs` as named parameters.
4962
50-
# TODO: this should be used to start everywhere not the client's start_task actually,
51-
# this way we completly isolate everything and are allowed to call it form everywhere
52-
# async def start_task(
53-
# tasks_manager: TasksManager,
54-
# task: TaskContext | None,
55-
# task_context: TaskContext | None = None,
56-
# **task_kwargs: Any,
57-
# ) -> TaskId:
58-
# return tasks_manager.start_task(
59-
# task=task, task_context=task_context, **task_kwargs
60-
# )
63+
NOTE: the progress is automatically bounded between 0 and 1
64+
NOTE: the `task` name must be unique in the module, otherwise when using
65+
the unique parameter is True, it will not be able to distinguish between
66+
them.
67+
68+
Args:
69+
tasks_manager (TasksManager): the tasks manager
70+
task (TaskProtocol): the tasks to be run in the background
71+
unique (bool, optional): If True, then only one such named task may be run. Defaults to False.
72+
task_context (Optional[TaskContext], optional): a task context storage can be retrieved during the task lifetime. Defaults to None.
73+
task_name (Optional[str], optional): optional task name. Defaults to None.
74+
fire_and_forget: if True, then the task will not be cancelled if the status is never called
75+
76+
Raises:
77+
TaskAlreadyRunningError: if unique is True, will raise if more than 1 such named task is started
78+
79+
Returns:
80+
TaskId: the task unique identifier
81+
"""
82+
return tasks_manager.start_task(
83+
registered_task_name,
84+
unique=unique,
85+
task_context=task_context,
86+
task_name=task_name,
87+
fire_and_forget=fire_and_forget,
88+
**task_kwargs,
89+
)

packages/service-library/src/servicelib/long_running_tasks/task.py

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -335,37 +335,12 @@ def start_task(
335335
self,
336336
registered_task_name: RegisteredTaskName,
337337
*,
338-
unique: bool = False,
339-
task_context: TaskContext | None = None,
340-
task_name: str | None = None,
341-
fire_and_forget: bool = False,
338+
unique: bool,
339+
task_context: TaskContext | None,
340+
task_name: str | None,
341+
fire_and_forget: bool,
342342
**task_kwargs: Any,
343343
) -> TaskId:
344-
"""
345-
Creates a background task from an async function.
346-
347-
An asyncio task will be created out of it by injecting a `TaskProgress` as the first
348-
positional argument and adding all `handler_kwargs` as named parameters.
349-
350-
NOTE: the progress is automatically bounded between 0 and 1
351-
NOTE: the `task` name must be unique in the module, otherwise when using
352-
the unique parameter is True, it will not be able to distinguish between
353-
them.
354-
355-
Args:
356-
tasks_manager (TasksManager): the tasks manager
357-
task (TaskProtocol): the tasks to be run in the background
358-
unique (bool, optional): If True, then only one such named task may be run. Defaults to False.
359-
task_context (Optional[TaskContext], optional): a task context storage can be retrieved during the task lifetime. Defaults to None.
360-
task_name (Optional[str], optional): optional task name. Defaults to None.
361-
fire_and_forget: if True, then the task will not be cancelled if the status is never called
362-
363-
Raises:
364-
TaskAlreadyRunningError: if unique is True, will raise if more than 1 such named task is started
365-
366-
Returns:
367-
TaskId: the task unique identifier
368-
"""
369344
if registered_task_name not in TaskRegistry.REGISTERED_TASKS:
370345
raise TaskNotRegisteredError(task_name=registered_task_name)
371346

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
get_long_running_manager,
2626
)
2727
from servicelib.fastapi.long_running_tasks.server import setup as setup_server
28+
from servicelib.long_running_tasks import http_endpoint_responses
2829
from servicelib.long_running_tasks.models import (
2930
TaskGet,
3031
TaskId,
@@ -78,7 +79,8 @@ async def create_string_list_task(
7879
*,
7980
fail: bool = False,
8081
) -> TaskId:
81-
return long_running_manager.tasks_manager.start_task(
82+
return await http_endpoint_responses.start_task(
83+
long_running_manager.tasks_manager,
8284
_string_list_task.__name__,
8385
num_strings=num_strings,
8486
sleep_time=sleep_time,

packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from servicelib.fastapi.long_running_tasks.client import setup as setup_client
1717
from servicelib.fastapi.long_running_tasks.server import get_long_running_manager
1818
from servicelib.fastapi.long_running_tasks.server import setup as setup_server
19+
from servicelib.long_running_tasks import http_endpoint_responses
1920
from servicelib.long_running_tasks.errors import (
2021
TaskClientTimeoutError,
2122
)
@@ -68,16 +69,18 @@ async def create_task_user_defined_route(
6869
FastAPILongRunningManager, Depends(get_long_running_manager)
6970
],
7071
) -> TaskId:
71-
return long_running_manager.tasks_manager.start_task(a_test_task.__name__)
72+
return await http_endpoint_responses.start_task(
73+
long_running_manager.tasks_manager, a_test_task.__name__
74+
)
7275

7376
@router.get("/api/failing", status_code=status.HTTP_200_OK)
7477
async def create_task_which_fails(
7578
long_running_manager: Annotated[
7679
FastAPILongRunningManager, Depends(get_long_running_manager)
7780
],
7881
) -> TaskId:
79-
return long_running_manager.tasks_manager.start_task(
80-
a_failing_test_task.__name__
82+
return await http_endpoint_responses.start_task(
83+
long_running_manager.tasks_manager, a_failing_test_task.__name__
8184
)
8285

8386
return router

packages/service-library/tests/long_running_tasks/test_long_running_tasks_task.py

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import pytest
1414
from faker import Faker
15+
from servicelib.long_running_tasks import http_endpoint_responses
1516
from servicelib.long_running_tasks.errors import (
1617
TaskAlreadyRunningError,
1718
TaskCancelledError,
@@ -84,7 +85,8 @@ async def tasks_manager() -> AsyncIterator[TasksManager]:
8485
async def test_task_is_auto_removed(
8586
tasks_manager: TasksManager, check_task_presence_before: bool
8687
):
87-
task_id = tasks_manager.start_task(
88+
task_id = await http_endpoint_responses.start_task(
89+
tasks_manager,
8890
a_background_task.__name__,
8991
raise_when_finished=False,
9092
total_sleep=10 * TEST_CHECK_STALE_INTERVAL_S,
@@ -110,7 +112,8 @@ async def test_task_is_auto_removed(
110112

111113

112114
async def test_checked_task_is_not_auto_removed(tasks_manager: TasksManager):
113-
task_id = tasks_manager.start_task(
115+
task_id = await http_endpoint_responses.start_task(
116+
tasks_manager,
114117
a_background_task.__name__,
115118
raise_when_finished=False,
116119
total_sleep=5 * TEST_CHECK_STALE_INTERVAL_S,
@@ -124,7 +127,8 @@ async def test_checked_task_is_not_auto_removed(tasks_manager: TasksManager):
124127

125128

126129
async def test_fire_and_forget_task_is_not_auto_removed(tasks_manager: TasksManager):
127-
task_id = tasks_manager.start_task(
130+
task_id = await http_endpoint_responses.start_task(
131+
tasks_manager,
128132
a_background_task.__name__,
129133
raise_when_finished=False,
130134
total_sleep=5 * TEST_CHECK_STALE_INTERVAL_S,
@@ -142,7 +146,8 @@ async def test_fire_and_forget_task_is_not_auto_removed(tasks_manager: TasksMana
142146

143147

144148
async def test_get_result_of_unfinished_task_raises(tasks_manager: TasksManager):
145-
task_id = tasks_manager.start_task(
149+
task_id = await http_endpoint_responses.start_task(
150+
tasks_manager,
146151
a_background_task.__name__,
147152
raise_when_finished=False,
148153
total_sleep=5 * TEST_CHECK_STALE_INTERVAL_S,
@@ -158,11 +163,15 @@ async def unique_task(progress: TaskProgress):
158163

159164
TaskRegistry.register(unique_task)
160165

161-
tasks_manager.start_task(unique_task.__name__, unique=True)
166+
await http_endpoint_responses.start_task(
167+
tasks_manager, unique_task.__name__, unique=True
168+
)
162169

163170
# ensure unique running task regardless of how many times it gets started
164171
with pytest.raises(TaskAlreadyRunningError) as exec_info:
165-
tasks_manager.start_task(unique_task.__name__, unique=True)
172+
await http_endpoint_responses.start_task(
173+
tasks_manager, unique_task.__name__, unique=True
174+
)
166175
assert "must be unique, found: " in f"{exec_info.value}"
167176

168177
TaskRegistry.unregister(unique_task)
@@ -175,7 +184,9 @@ async def not_unique_task(progress: TaskProgress):
175184
TaskRegistry.register(not_unique_task)
176185

177186
for _ in range(5):
178-
tasks_manager.start_task(not_unique_task.__name__)
187+
await http_endpoint_responses.start_task(
188+
tasks_manager, not_unique_task.__name__
189+
)
179190

180191
TaskRegistry.unregister(not_unique_task)
181192

@@ -188,7 +199,8 @@ def test_get_task_id(tasks_manager: TasksManager, faker: Faker, is_unique: bool)
188199

189200

190201
async def test_get_status(tasks_manager: TasksManager):
191-
task_id = tasks_manager.start_task(
202+
task_id = await http_endpoint_responses.start_task(
203+
tasks_manager,
192204
a_background_task.__name__,
193205
raise_when_finished=False,
194206
total_sleep=10,
@@ -208,7 +220,9 @@ async def test_get_status_missing(tasks_manager: TasksManager):
208220

209221

210222
async def test_get_result(tasks_manager: TasksManager):
211-
task_id = tasks_manager.start_task(fast_background_task.__name__)
223+
task_id = await http_endpoint_responses.start_task(
224+
tasks_manager, fast_background_task.__name__
225+
)
212226
await asyncio.sleep(0.1)
213227
result = tasks_manager.get_task_result(task_id, with_task_context=None)
214228
assert result == 42
@@ -221,7 +235,9 @@ async def test_get_result_missing(tasks_manager: TasksManager):
221235

222236

223237
async def test_get_result_finished_with_error(tasks_manager: TasksManager):
224-
task_id = tasks_manager.start_task(failing_background_task.__name__)
238+
task_id = await http_endpoint_responses.start_task(
239+
tasks_manager, failing_background_task.__name__
240+
)
225241
# wait for result
226242
async for attempt in AsyncRetrying(**_RETRY_PARAMS):
227243
with attempt:
@@ -234,7 +250,8 @@ async def test_get_result_finished_with_error(tasks_manager: TasksManager):
234250
async def test_get_result_task_was_cancelled_multiple_times(
235251
tasks_manager: TasksManager,
236252
):
237-
task_id = tasks_manager.start_task(
253+
task_id = await http_endpoint_responses.start_task(
254+
tasks_manager,
238255
a_background_task.__name__,
239256
raise_when_finished=False,
240257
total_sleep=10,
@@ -249,7 +266,8 @@ async def test_get_result_task_was_cancelled_multiple_times(
249266

250267

251268
async def test_remove_task(tasks_manager: TasksManager):
252-
task_id = tasks_manager.start_task(
269+
task_id = await http_endpoint_responses.start_task(
270+
tasks_manager,
253271
a_background_task.__name__,
254272
raise_when_finished=False,
255273
total_sleep=10,
@@ -264,7 +282,8 @@ async def test_remove_task(tasks_manager: TasksManager):
264282

265283
async def test_remove_task_with_task_context(tasks_manager: TasksManager):
266284
TASK_CONTEXT = {"some_context": "some_value"}
267-
task_id = tasks_manager.start_task(
285+
task_id = await http_endpoint_responses.start_task(
286+
tasks_manager,
268287
a_background_task.__name__,
269288
raise_when_finished=False,
270289
total_sleep=10,
@@ -296,7 +315,8 @@ async def test_remove_unknown_task(tasks_manager: TasksManager):
296315

297316
async def test_cancel_task_with_task_context(tasks_manager: TasksManager):
298317
TASK_CONTEXT = {"some_context": "some_value"}
299-
task_id = tasks_manager.start_task(
318+
task_id = await http_endpoint_responses.start_task(
319+
tasks_manager,
300320
a_background_task.__name__,
301321
raise_when_finished=False,
302322
total_sleep=10,
@@ -322,7 +342,8 @@ async def test_list_tasks(tasks_manager: TasksManager):
322342
task_ids = []
323343
for _ in range(NUM_TASKS):
324344
task_ids.append( # noqa: PERF401
325-
tasks_manager.start_task(
345+
await http_endpoint_responses.start_task(
346+
tasks_manager,
326347
a_background_task.__name__,
327348
raise_when_finished=False,
328349
total_sleep=10,
@@ -337,18 +358,21 @@ async def test_list_tasks(tasks_manager: TasksManager):
337358

338359

339360
async def test_list_tasks_filtering(tasks_manager: TasksManager):
340-
tasks_manager.start_task(
361+
await http_endpoint_responses.start_task(
362+
tasks_manager,
341363
a_background_task.__name__,
342364
raise_when_finished=False,
343365
total_sleep=10,
344366
)
345-
tasks_manager.start_task(
367+
await http_endpoint_responses.start_task(
368+
tasks_manager,
346369
a_background_task.__name__,
347370
raise_when_finished=False,
348371
total_sleep=10,
349372
task_context={"user_id": 213},
350373
)
351-
tasks_manager.start_task(
374+
await http_endpoint_responses.start_task(
375+
tasks_manager,
352376
a_background_task.__name__,
353377
raise_when_finished=False,
354378
total_sleep=10,
@@ -376,7 +400,8 @@ async def test_list_tasks_filtering(tasks_manager: TasksManager):
376400

377401
async def test_define_task_name(tasks_manager: TasksManager, faker: Faker):
378402
task_name = faker.name()
379-
task_id = tasks_manager.start_task(
403+
task_id = await http_endpoint_responses.start_task(
404+
tasks_manager,
380405
a_background_task.__name__,
381406
raise_when_finished=False,
382407
total_sleep=10,
@@ -387,4 +412,4 @@ async def test_define_task_name(tasks_manager: TasksManager, faker: Faker):
387412

388413
async def test_start_not_registered_task(tasks_manager: TasksManager):
389414
with pytest.raises(TaskNotRegisteredError):
390-
tasks_manager.start_task("not_registered_task")
415+
await http_endpoint_responses.start_task(tasks_manager, "not_registered_task")

0 commit comments

Comments
 (0)